##// END OF EJS Templates
wireproto: start to associate frame generation with a stream...
Gregory Szorc -
r37303:3ed34454 default
parent child Browse files
Show More
@@ -1,720 +1,732
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import struct
15 15
16 16 from .i18n import _
17 17 from .thirdparty import (
18 18 attr,
19 19 )
20 20 from . import (
21 21 error,
22 22 util,
23 23 )
24 24 from .utils import (
25 25 stringutil,
26 26 )
27 27
28 28 FRAME_HEADER_SIZE = 6
29 29 DEFAULT_MAX_FRAME_SIZE = 32768
30 30
31 31 FRAME_TYPE_COMMAND_NAME = 0x01
32 32 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
33 33 FRAME_TYPE_COMMAND_DATA = 0x03
34 34 FRAME_TYPE_BYTES_RESPONSE = 0x04
35 35 FRAME_TYPE_ERROR_RESPONSE = 0x05
36 36 FRAME_TYPE_TEXT_OUTPUT = 0x06
37 37
38 38 FRAME_TYPES = {
39 39 b'command-name': FRAME_TYPE_COMMAND_NAME,
40 40 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
41 41 b'command-data': FRAME_TYPE_COMMAND_DATA,
42 42 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
43 43 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
44 44 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
45 45 }
46 46
47 47 FLAG_COMMAND_NAME_EOS = 0x01
48 48 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
49 49 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
50 50
51 51 FLAGS_COMMAND = {
52 52 b'eos': FLAG_COMMAND_NAME_EOS,
53 53 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
54 54 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
55 55 }
56 56
57 57 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
58 58 FLAG_COMMAND_ARGUMENT_EOA = 0x02
59 59
60 60 FLAGS_COMMAND_ARGUMENT = {
61 61 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
62 62 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
63 63 }
64 64
65 65 FLAG_COMMAND_DATA_CONTINUATION = 0x01
66 66 FLAG_COMMAND_DATA_EOS = 0x02
67 67
68 68 FLAGS_COMMAND_DATA = {
69 69 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
70 70 b'eos': FLAG_COMMAND_DATA_EOS,
71 71 }
72 72
73 73 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
74 74 FLAG_BYTES_RESPONSE_EOS = 0x02
75 75
76 76 FLAGS_BYTES_RESPONSE = {
77 77 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
78 78 b'eos': FLAG_BYTES_RESPONSE_EOS,
79 79 }
80 80
81 81 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
82 82 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
83 83
84 84 FLAGS_ERROR_RESPONSE = {
85 85 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
86 86 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
87 87 }
88 88
89 89 # Maps frame types to their available flags.
90 90 FRAME_TYPE_FLAGS = {
91 91 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
92 92 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
93 93 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
94 94 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
95 95 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
96 96 FRAME_TYPE_TEXT_OUTPUT: {},
97 97 }
98 98
99 99 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
100 100
101 101 @attr.s(slots=True)
102 102 class frameheader(object):
103 103 """Represents the data in a frame header."""
104 104
105 105 length = attr.ib()
106 106 requestid = attr.ib()
107 107 typeid = attr.ib()
108 108 flags = attr.ib()
109 109
110 110 @attr.s(slots=True)
111 111 class frame(object):
112 112 """Represents a parsed frame."""
113 113
114 114 requestid = attr.ib()
115 115 typeid = attr.ib()
116 116 flags = attr.ib()
117 117 payload = attr.ib()
118 118
119 119 def makeframe(requestid, typeid, flags, payload):
120 120 """Assemble a frame into a byte array."""
121 121 # TODO assert size of payload.
122 122 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
123 123
124 124 # 24 bits length
125 125 # 16 bits request id
126 126 # 4 bits type
127 127 # 4 bits flags
128 128
129 129 l = struct.pack(r'<I', len(payload))
130 130 frame[0:3] = l[0:3]
131 131 struct.pack_into(r'<H', frame, 3, requestid)
132 132 frame[5] = (typeid << 4) | flags
133 133 frame[6:] = payload
134 134
135 135 return frame
136 136
137 137 def makeframefromhumanstring(s):
138 138 """Create a frame from a human readable string
139 139
140 140 Strings have the form:
141 141
142 142 <request-id> <type> <flags> <payload>
143 143
144 144 This can be used by user-facing applications and tests for creating
145 145 frames easily without having to type out a bunch of constants.
146 146
147 147 Request ID is an integer.
148 148
149 149 Frame type and flags can be specified by integer or named constant.
150 150
151 151 Flags can be delimited by `|` to bitwise OR them together.
152 152 """
153 153 requestid, frametype, frameflags, payload = s.split(b' ', 3)
154 154
155 155 requestid = int(requestid)
156 156
157 157 if frametype in FRAME_TYPES:
158 158 frametype = FRAME_TYPES[frametype]
159 159 else:
160 160 frametype = int(frametype)
161 161
162 162 finalflags = 0
163 163 validflags = FRAME_TYPE_FLAGS[frametype]
164 164 for flag in frameflags.split(b'|'):
165 165 if flag in validflags:
166 166 finalflags |= validflags[flag]
167 167 else:
168 168 finalflags |= int(flag)
169 169
170 170 payload = stringutil.unescapestr(payload)
171 171
172 172 return makeframe(requestid=requestid, typeid=frametype,
173 173 flags=finalflags, payload=payload)
174 174
175 175 def parseheader(data):
176 176 """Parse a unified framing protocol frame header from a buffer.
177 177
178 178 The header is expected to be in the buffer at offset 0 and the
179 179 buffer is expected to be large enough to hold a full header.
180 180 """
181 181 # 24 bits payload length (little endian)
182 182 # 4 bits frame type
183 183 # 4 bits frame flags
184 184 # ... payload
185 185 framelength = data[0] + 256 * data[1] + 16384 * data[2]
186 186 requestid = struct.unpack_from(r'<H', data, 3)[0]
187 187 typeflags = data[5]
188 188
189 189 frametype = (typeflags & 0xf0) >> 4
190 190 frameflags = typeflags & 0x0f
191 191
192 192 return frameheader(framelength, requestid, frametype, frameflags)
193 193
194 194 def readframe(fh):
195 195 """Read a unified framing protocol frame from a file object.
196 196
197 197 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
198 198 None if no frame is available. May raise if a malformed frame is
199 199 seen.
200 200 """
201 201 header = bytearray(FRAME_HEADER_SIZE)
202 202
203 203 readcount = fh.readinto(header)
204 204
205 205 if readcount == 0:
206 206 return None
207 207
208 208 if readcount != FRAME_HEADER_SIZE:
209 209 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
210 210 (readcount, header))
211 211
212 212 h = parseheader(header)
213 213
214 214 payload = fh.read(h.length)
215 215 if len(payload) != h.length:
216 216 raise error.Abort(_('frame length error: expected %d; got %d') %
217 217 (h.length, len(payload)))
218 218
219 219 return frame(h.requestid, h.typeid, h.flags, payload)
220 220
221 def createcommandframes(requestid, cmd, args, datafh=None):
221 def createcommandframes(stream, requestid, cmd, args, datafh=None):
222 222 """Create frames necessary to transmit a request to run a command.
223 223
224 224 This is a generator of bytearrays. Each item represents a frame
225 225 ready to be sent over the wire to a peer.
226 226 """
227 227 flags = 0
228 228 if args:
229 229 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
230 230 if datafh:
231 231 flags |= FLAG_COMMAND_NAME_HAVE_DATA
232 232
233 233 if not flags:
234 234 flags |= FLAG_COMMAND_NAME_EOS
235 235
236 yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
237 flags=flags, payload=cmd)
236 yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
237 flags=flags, payload=cmd)
238 238
239 239 for i, k in enumerate(sorted(args)):
240 240 v = args[k]
241 241 last = i == len(args) - 1
242 242
243 243 # TODO handle splitting of argument values across frames.
244 244 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
245 245 offset = 0
246 246 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
247 247 offset += ARGUMENT_FRAME_HEADER.size
248 248 payload[offset:offset + len(k)] = k
249 249 offset += len(k)
250 250 payload[offset:offset + len(v)] = v
251 251
252 252 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
253 yield makeframe(requestid=requestid,
254 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
255 flags=flags,
256 payload=payload)
253 yield stream.makeframe(requestid=requestid,
254 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
255 flags=flags,
256 payload=payload)
257 257
258 258 if datafh:
259 259 while True:
260 260 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
261 261
262 262 done = False
263 263 if len(data) == DEFAULT_MAX_FRAME_SIZE:
264 264 flags = FLAG_COMMAND_DATA_CONTINUATION
265 265 else:
266 266 flags = FLAG_COMMAND_DATA_EOS
267 267 assert datafh.read(1) == b''
268 268 done = True
269 269
270 yield makeframe(requestid=requestid,
271 typeid=FRAME_TYPE_COMMAND_DATA,
272 flags=flags,
273 payload=data)
270 yield stream.makeframe(requestid=requestid,
271 typeid=FRAME_TYPE_COMMAND_DATA,
272 flags=flags,
273 payload=data)
274 274
275 275 if done:
276 276 break
277 277
278 def createbytesresponseframesfrombytes(requestid, data,
278 def createbytesresponseframesfrombytes(stream, requestid, data,
279 279 maxframesize=DEFAULT_MAX_FRAME_SIZE):
280 280 """Create a raw frame to send a bytes response from static bytes input.
281 281
282 282 Returns a generator of bytearrays.
283 283 """
284 284
285 285 # Simple case of a single frame.
286 286 if len(data) <= maxframesize:
287 yield makeframe(requestid=requestid,
288 typeid=FRAME_TYPE_BYTES_RESPONSE,
289 flags=FLAG_BYTES_RESPONSE_EOS,
290 payload=data)
287 yield stream.makeframe(requestid=requestid,
288 typeid=FRAME_TYPE_BYTES_RESPONSE,
289 flags=FLAG_BYTES_RESPONSE_EOS,
290 payload=data)
291 291 return
292 292
293 293 offset = 0
294 294 while True:
295 295 chunk = data[offset:offset + maxframesize]
296 296 offset += len(chunk)
297 297 done = offset == len(data)
298 298
299 299 if done:
300 300 flags = FLAG_BYTES_RESPONSE_EOS
301 301 else:
302 302 flags = FLAG_BYTES_RESPONSE_CONTINUATION
303 303
304 yield makeframe(requestid=requestid,
305 typeid=FRAME_TYPE_BYTES_RESPONSE,
306 flags=flags,
307 payload=chunk)
304 yield stream.makeframe(requestid=requestid,
305 typeid=FRAME_TYPE_BYTES_RESPONSE,
306 flags=flags,
307 payload=chunk)
308 308
309 309 if done:
310 310 break
311 311
312 def createerrorframe(requestid, msg, protocol=False, application=False):
312 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
313 313 # TODO properly handle frame size limits.
314 314 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
315 315
316 316 flags = 0
317 317 if protocol:
318 318 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
319 319 if application:
320 320 flags |= FLAG_ERROR_RESPONSE_APPLICATION
321 321
322 yield makeframe(requestid=requestid,
323 typeid=FRAME_TYPE_ERROR_RESPONSE,
324 flags=flags,
325 payload=msg)
322 yield stream.makeframe(requestid=requestid,
323 typeid=FRAME_TYPE_ERROR_RESPONSE,
324 flags=flags,
325 payload=msg)
326 326
327 def createtextoutputframe(requestid, atoms):
327 def createtextoutputframe(stream, requestid, atoms):
328 328 """Create a text output frame to render text to people.
329 329
330 330 ``atoms`` is a 3-tuple of (formatting string, args, labels).
331 331
332 332 The formatting string contains ``%s`` tokens to be replaced by the
333 333 corresponding indexed entry in ``args``. ``labels`` is an iterable of
334 334 formatters to be applied at rendering time. In terms of the ``ui``
335 335 class, each atom corresponds to a ``ui.write()``.
336 336 """
337 337 bytesleft = DEFAULT_MAX_FRAME_SIZE
338 338 atomchunks = []
339 339
340 340 for (formatting, args, labels) in atoms:
341 341 if len(args) > 255:
342 342 raise ValueError('cannot use more than 255 formatting arguments')
343 343 if len(labels) > 255:
344 344 raise ValueError('cannot use more than 255 labels')
345 345
346 346 # TODO look for localstr, other types here?
347 347
348 348 if not isinstance(formatting, bytes):
349 349 raise ValueError('must use bytes formatting strings')
350 350 for arg in args:
351 351 if not isinstance(arg, bytes):
352 352 raise ValueError('must use bytes for arguments')
353 353 for label in labels:
354 354 if not isinstance(label, bytes):
355 355 raise ValueError('must use bytes for labels')
356 356
357 357 # Formatting string must be UTF-8.
358 358 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
359 359
360 360 # Arguments must be UTF-8.
361 361 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
362 362
363 363 # Labels must be ASCII.
364 364 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
365 365 for l in labels]
366 366
367 367 if len(formatting) > 65535:
368 368 raise ValueError('formatting string cannot be longer than 64k')
369 369
370 370 if any(len(a) > 65535 for a in args):
371 371 raise ValueError('argument string cannot be longer than 64k')
372 372
373 373 if any(len(l) > 255 for l in labels):
374 374 raise ValueError('label string cannot be longer than 255 bytes')
375 375
376 376 chunks = [
377 377 struct.pack(r'<H', len(formatting)),
378 378 struct.pack(r'<BB', len(labels), len(args)),
379 379 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
380 380 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
381 381 ]
382 382 chunks.append(formatting)
383 383 chunks.extend(labels)
384 384 chunks.extend(args)
385 385
386 386 atom = b''.join(chunks)
387 387 atomchunks.append(atom)
388 388 bytesleft -= len(atom)
389 389
390 390 if bytesleft < 0:
391 391 raise ValueError('cannot encode data in a single frame')
392 392
393 yield makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_TEXT_OUTPUT,
395 flags=0,
396 payload=b''.join(atomchunks))
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_TEXT_OUTPUT,
395 flags=0,
396 payload=b''.join(atomchunks))
397
398 class stream(object):
399 """Represents a logical unidirectional series of frames."""
400
401 def makeframe(self, requestid, typeid, flags, payload):
402 """Create a frame to be sent out over this stream.
403
404 Only returns the frame instance. Does not actually send it.
405 """
406 return makeframe(requestid, typeid, flags, payload)
397 407
398 408 class serverreactor(object):
399 409 """Holds state of a server handling frame-based protocol requests.
400 410
401 411 This class is the "brain" of the unified frame-based protocol server
402 412 component. While the protocol is stateless from the perspective of
403 413 requests/commands, something needs to track which frames have been
404 414 received, what frames to expect, etc. This class is that thing.
405 415
406 416 Instances are modeled as a state machine of sorts. Instances are also
407 417 reactionary to external events. The point of this class is to encapsulate
408 418 the state of the connection and the exchange of frames, not to perform
409 419 work. Instead, callers tell this class when something occurs, like a
410 420 frame arriving. If that activity is worthy of a follow-up action (say
411 421 *run a command*), the return value of that handler will say so.
412 422
413 423 I/O and CPU intensive operations are purposefully delegated outside of
414 424 this class.
415 425
416 426 Consumers are expected to tell instances when events occur. They do so by
417 427 calling the various ``on*`` methods. These methods return a 2-tuple
418 428 describing any follow-up action(s) to take. The first element is the
419 429 name of an action to perform. The second is a data structure (usually
420 430 a dict) specific to that action that contains more information. e.g.
421 431 if the server wants to send frames back to the client, the data structure
422 432 will contain a reference to those frames.
423 433
424 434 Valid actions that consumers can be instructed to take are:
425 435
426 436 sendframes
427 437 Indicates that frames should be sent to the client. The ``framegen``
428 438 key contains a generator of frames that should be sent. The server
429 439 assumes that all frames are sent to the client.
430 440
431 441 error
432 442 Indicates that an error occurred. Consumer should probably abort.
433 443
434 444 runcommand
435 445 Indicates that the consumer should run a wire protocol command. Details
436 446 of the command to run are given in the data structure.
437 447
438 448 wantframe
439 449 Indicates that nothing of interest happened and the server is waiting on
440 450 more frames from the client before anything interesting can be done.
441 451
442 452 noop
443 453 Indicates no additional action is required.
444 454
445 455 Known Issues
446 456 ------------
447 457
448 458 There are no limits to the number of partially received commands or their
449 459 size. A malicious client could stream command request data and exhaust the
450 460 server's memory.
451 461
452 462 Partially received commands are not acted upon when end of input is
453 463 reached. Should the server error if it receives a partial request?
454 464 Should the client send a message to abort a partially transmitted request
455 465 to facilitate graceful shutdown?
456 466
457 467 Active requests that haven't been responded to aren't tracked. This means
458 468 that if we receive a command and instruct its dispatch, another command
459 469 with its request ID can come in over the wire and there will be a race
460 470 between who responds to what.
461 471 """
462 472
463 473 def __init__(self, deferoutput=False):
464 474 """Construct a new server reactor.
465 475
466 476 ``deferoutput`` can be used to indicate that no output frames should be
467 477 instructed to be sent until input has been exhausted. In this mode,
468 478 events that would normally generate output frames (such as a command
469 479 response being ready) will instead defer instructing the consumer to
470 480 send those frames. This is useful for half-duplex transports where the
471 481 sender cannot receive until all data has been transmitted.
472 482 """
473 483 self._deferoutput = deferoutput
474 484 self._state = 'idle'
475 485 self._bufferedframegens = []
476 486 # request id -> dict of commands that are actively being received.
477 487 self._receivingcommands = {}
478 488 # Request IDs that have been received and are actively being processed.
479 489 # Once all output for a request has been sent, it is removed from this
480 490 # set.
481 491 self._activecommands = set()
482 492
483 493 def onframerecv(self, frame):
484 494 """Process a frame that has been received off the wire.
485 495
486 496 Returns a dict with an ``action`` key that details what action,
487 497 if any, the consumer should take next.
488 498 """
489 499 handlers = {
490 500 'idle': self._onframeidle,
491 501 'command-receiving': self._onframecommandreceiving,
492 502 'errored': self._onframeerrored,
493 503 }
494 504
495 505 meth = handlers.get(self._state)
496 506 if not meth:
497 507 raise error.ProgrammingError('unhandled state: %s' % self._state)
498 508
499 509 return meth(frame)
500 510
501 def onbytesresponseready(self, requestid, data):
511 def onbytesresponseready(self, stream, requestid, data):
502 512 """Signal that a bytes response is ready to be sent to the client.
503 513
504 514 The raw bytes response is passed as an argument.
505 515 """
506 516 def sendframes():
507 for frame in createbytesresponseframesfrombytes(requestid, data):
517 for frame in createbytesresponseframesfrombytes(stream, requestid,
518 data):
508 519 yield frame
509 520
510 521 self._activecommands.remove(requestid)
511 522
512 523 result = sendframes()
513 524
514 525 if self._deferoutput:
515 526 self._bufferedframegens.append(result)
516 527 return 'noop', {}
517 528 else:
518 529 return 'sendframes', {
519 530 'framegen': result,
520 531 }
521 532
522 533 def oninputeof(self):
523 534 """Signals that end of input has been received.
524 535
525 536 No more frames will be received. All pending activity should be
526 537 completed.
527 538 """
528 539 # TODO should we do anything about in-flight commands?
529 540
530 541 if not self._deferoutput or not self._bufferedframegens:
531 542 return 'noop', {}
532 543
533 544 # If we buffered all our responses, emit those.
534 545 def makegen():
535 546 for gen in self._bufferedframegens:
536 547 for frame in gen:
537 548 yield frame
538 549
539 550 return 'sendframes', {
540 551 'framegen': makegen(),
541 552 }
542 553
543 def onapplicationerror(self, requestid, msg):
554 def onapplicationerror(self, stream, requestid, msg):
544 555 return 'sendframes', {
545 'framegen': createerrorframe(requestid, msg, application=True),
556 'framegen': createerrorframe(stream, requestid, msg,
557 application=True),
546 558 }
547 559
548 560 def _makeerrorresult(self, msg):
549 561 return 'error', {
550 562 'message': msg,
551 563 }
552 564
553 565 def _makeruncommandresult(self, requestid):
554 566 entry = self._receivingcommands[requestid]
555 567 del self._receivingcommands[requestid]
556 568
557 569 if self._receivingcommands:
558 570 self._state = 'command-receiving'
559 571 else:
560 572 self._state = 'idle'
561 573
562 574 assert requestid not in self._activecommands
563 575 self._activecommands.add(requestid)
564 576
565 577 return 'runcommand', {
566 578 'requestid': requestid,
567 579 'command': entry['command'],
568 580 'args': entry['args'],
569 581 'data': entry['data'].getvalue() if entry['data'] else None,
570 582 }
571 583
572 584 def _makewantframeresult(self):
573 585 return 'wantframe', {
574 586 'state': self._state,
575 587 }
576 588
577 589 def _onframeidle(self, frame):
578 590 # The only frame type that should be received in this state is a
579 591 # command request.
580 592 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
581 593 self._state = 'errored'
582 594 return self._makeerrorresult(
583 595 _('expected command frame; got %d') % frame.typeid)
584 596
585 597 if frame.requestid in self._receivingcommands:
586 598 self._state = 'errored'
587 599 return self._makeerrorresult(
588 600 _('request with ID %d already received') % frame.requestid)
589 601
590 602 if frame.requestid in self._activecommands:
591 603 self._state = 'errored'
592 604 return self._makeerrorresult((
593 605 _('request with ID %d is already active') % frame.requestid))
594 606
595 607 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
596 608 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
597 609
598 610 self._receivingcommands[frame.requestid] = {
599 611 'command': frame.payload,
600 612 'args': {},
601 613 'data': None,
602 614 'expectingargs': expectingargs,
603 615 'expectingdata': expectingdata,
604 616 }
605 617
606 618 if frame.flags & FLAG_COMMAND_NAME_EOS:
607 619 return self._makeruncommandresult(frame.requestid)
608 620
609 621 if expectingargs or expectingdata:
610 622 self._state = 'command-receiving'
611 623 return self._makewantframeresult()
612 624 else:
613 625 self._state = 'errored'
614 626 return self._makeerrorresult(_('missing frame flags on '
615 627 'command frame'))
616 628
617 629 def _onframecommandreceiving(self, frame):
618 630 # It could be a new command request. Process it as such.
619 631 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
620 632 return self._onframeidle(frame)
621 633
622 634 # All other frames should be related to a command that is currently
623 635 # receiving but is not active.
624 636 if frame.requestid in self._activecommands:
625 637 self._state = 'errored'
626 638 return self._makeerrorresult(
627 639 _('received frame for request that is still active: %d') %
628 640 frame.requestid)
629 641
630 642 if frame.requestid not in self._receivingcommands:
631 643 self._state = 'errored'
632 644 return self._makeerrorresult(
633 645 _('received frame for request that is not receiving: %d') %
634 646 frame.requestid)
635 647
636 648 entry = self._receivingcommands[frame.requestid]
637 649
638 650 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
639 651 if not entry['expectingargs']:
640 652 self._state = 'errored'
641 653 return self._makeerrorresult(_(
642 654 'received command argument frame for request that is not '
643 655 'expecting arguments: %d') % frame.requestid)
644 656
645 657 return self._handlecommandargsframe(frame, entry)
646 658
647 659 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
648 660 if not entry['expectingdata']:
649 661 self._state = 'errored'
650 662 return self._makeerrorresult(_(
651 663 'received command data frame for request that is not '
652 664 'expecting data: %d') % frame.requestid)
653 665
654 666 if entry['data'] is None:
655 667 entry['data'] = util.bytesio()
656 668
657 669 return self._handlecommanddataframe(frame, entry)
658 670
659 671 def _handlecommandargsframe(self, frame, entry):
660 672 # The frame and state of command should have already been validated.
661 673 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
662 674
663 675 offset = 0
664 676 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
665 677 offset += ARGUMENT_FRAME_HEADER.size
666 678
667 679 # The argument name MUST fit inside the frame.
668 680 argname = bytes(frame.payload[offset:offset + namesize])
669 681 offset += namesize
670 682
671 683 if len(argname) != namesize:
672 684 self._state = 'errored'
673 685 return self._makeerrorresult(_('malformed argument frame: '
674 686 'partial argument name'))
675 687
676 688 argvalue = bytes(frame.payload[offset:])
677 689
678 690 # Argument value spans multiple frames. Record our active state
679 691 # and wait for the next frame.
680 692 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
681 693 raise error.ProgrammingError('not yet implemented')
682 694
683 695 # Common case: the argument value is completely contained in this
684 696 # frame.
685 697
686 698 if len(argvalue) != valuesize:
687 699 self._state = 'errored'
688 700 return self._makeerrorresult(_('malformed argument frame: '
689 701 'partial argument value'))
690 702
691 703 entry['args'][argname] = argvalue
692 704
693 705 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
694 706 if entry['expectingdata']:
695 707 # TODO signal request to run a command once we don't
696 708 # buffer data frames.
697 709 return self._makewantframeresult()
698 710 else:
699 711 return self._makeruncommandresult(frame.requestid)
700 712 else:
701 713 return self._makewantframeresult()
702 714
703 715 def _handlecommanddataframe(self, frame, entry):
704 716 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
705 717
706 718 # TODO support streaming data instead of buffering it.
707 719 entry['data'].write(frame.payload)
708 720
709 721 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
710 722 return self._makewantframeresult()
711 723 elif frame.flags & FLAG_COMMAND_DATA_EOS:
712 724 entry['data'].seek(0)
713 725 return self._makeruncommandresult(frame.requestid)
714 726 else:
715 727 self._state = 'errored'
716 728 return self._makeerrorresult(_('command data frame without '
717 729 'flags'))
718 730
719 731 def _onframeerrored(self, frame):
720 732 return self._makeerrorresult(_('server already errored'))
@@ -1,1047 +1,1049
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import sys
12 12 import threading
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 encoding,
17 17 error,
18 18 hook,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 wireprotoframing,
23 23 wireprototypes,
24 24 )
25 25 from .utils import (
26 26 procutil,
27 27 )
28 28
29 29 stringio = util.stringio
30 30
31 31 urlerr = util.urlerr
32 32 urlreq = util.urlreq
33 33
34 34 HTTP_OK = 200
35 35
36 36 HGTYPE = 'application/mercurial-0.1'
37 37 HGTYPE2 = 'application/mercurial-0.2'
38 38 HGERRTYPE = 'application/hg-error'
39 39 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
40 40
41 41 HTTPV2 = wireprototypes.HTTPV2
42 42 SSHV1 = wireprototypes.SSHV1
43 43 SSHV2 = wireprototypes.SSHV2
44 44
45 45 def decodevaluefromheaders(req, headerprefix):
46 46 """Decode a long value from multiple HTTP request headers.
47 47
48 48 Returns the value as a bytes, not a str.
49 49 """
50 50 chunks = []
51 51 i = 1
52 52 while True:
53 53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
54 54 if v is None:
55 55 break
56 56 chunks.append(pycompat.bytesurl(v))
57 57 i += 1
58 58
59 59 return ''.join(chunks)
60 60
61 61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
62 62 def __init__(self, req, ui, checkperm):
63 63 self._req = req
64 64 self._ui = ui
65 65 self._checkperm = checkperm
66 66
67 67 @property
68 68 def name(self):
69 69 return 'http-v1'
70 70
71 71 def getargs(self, args):
72 72 knownargs = self._args()
73 73 data = {}
74 74 keys = args.split()
75 75 for k in keys:
76 76 if k == '*':
77 77 star = {}
78 78 for key in knownargs.keys():
79 79 if key != 'cmd' and key not in keys:
80 80 star[key] = knownargs[key][0]
81 81 data['*'] = star
82 82 else:
83 83 data[k] = knownargs[k][0]
84 84 return [data[k] for k in keys]
85 85
86 86 def _args(self):
87 87 args = self._req.qsparams.asdictoflists()
88 88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
89 89 if postlen:
90 90 args.update(urlreq.parseqs(
91 91 self._req.bodyfh.read(postlen), keep_blank_values=True))
92 92 return args
93 93
94 94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
95 95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
96 96 return args
97 97
98 98 def forwardpayload(self, fp):
99 99 # Existing clients *always* send Content-Length.
100 100 length = int(self._req.headers[b'Content-Length'])
101 101
102 102 # If httppostargs is used, we need to read Content-Length
103 103 # minus the amount that was consumed by args.
104 104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
105 105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
106 106 fp.write(s)
107 107
108 108 @contextlib.contextmanager
109 109 def mayberedirectstdio(self):
110 110 oldout = self._ui.fout
111 111 olderr = self._ui.ferr
112 112
113 113 out = util.stringio()
114 114
115 115 try:
116 116 self._ui.fout = out
117 117 self._ui.ferr = out
118 118 yield out
119 119 finally:
120 120 self._ui.fout = oldout
121 121 self._ui.ferr = olderr
122 122
123 123 def client(self):
124 124 return 'remote:%s:%s:%s' % (
125 125 self._req.urlscheme,
126 126 urlreq.quote(self._req.remotehost or ''),
127 127 urlreq.quote(self._req.remoteuser or ''))
128 128
129 129 def addcapabilities(self, repo, caps):
130 130 caps.append(b'batch')
131 131
132 132 caps.append('httpheader=%d' %
133 133 repo.ui.configint('server', 'maxhttpheaderlen'))
134 134 if repo.ui.configbool('experimental', 'httppostargs'):
135 135 caps.append('httppostargs')
136 136
137 137 # FUTURE advertise 0.2rx once support is implemented
138 138 # FUTURE advertise minrx and mintx after consulting config option
139 139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
140 140
141 141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
142 142 if compengines:
143 143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
144 144 for e in compengines)
145 145 caps.append('compression=%s' % comptypes)
146 146
147 147 return caps
148 148
149 149 def checkperm(self, perm):
150 150 return self._checkperm(perm)
151 151
152 152 # This method exists mostly so that extensions like remotefilelog can
153 153 # disable a kludgey legacy method only over http. As of early 2018,
154 154 # there are no other known users, so with any luck we can discard this
155 155 # hook if remotefilelog becomes a first-party extension.
156 156 def iscmd(cmd):
157 157 return cmd in wireproto.commands
158 158
159 159 def handlewsgirequest(rctx, req, res, checkperm):
160 160 """Possibly process a wire protocol request.
161 161
162 162 If the current request is a wire protocol request, the request is
163 163 processed by this function.
164 164
165 165 ``req`` is a ``parsedrequest`` instance.
166 166 ``res`` is a ``wsgiresponse`` instance.
167 167
168 168 Returns a bool indicating if the request was serviced. If set, the caller
169 169 should stop processing the request, as a response has already been issued.
170 170 """
171 171 # Avoid cycle involving hg module.
172 172 from .hgweb import common as hgwebcommon
173 173
174 174 repo = rctx.repo
175 175
176 176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
177 177 # string parameter. If it isn't present, this isn't a wire protocol
178 178 # request.
179 179 if 'cmd' not in req.qsparams:
180 180 return False
181 181
182 182 cmd = req.qsparams['cmd']
183 183
184 184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
185 185 # While not all wire protocol commands are available for all transports,
186 186 # if we see a "cmd" value that resembles a known wire protocol command, we
187 187 # route it to a protocol handler. This is better than routing possible
188 188 # wire protocol requests to hgweb because it prevents hgweb from using
189 189 # known wire protocol commands and it is less confusing for machine
190 190 # clients.
191 191 if not iscmd(cmd):
192 192 return False
193 193
194 194 # The "cmd" query string argument is only valid on the root path of the
195 195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
196 196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
197 197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
198 198 if req.dispatchpath:
199 199 res.status = hgwebcommon.statusmessage(404)
200 200 res.headers['Content-Type'] = HGTYPE
201 201 # TODO This is not a good response to issue for this request. This
202 202 # is mostly for BC for now.
203 203 res.setbodybytes('0\n%s\n' % b'Not Found')
204 204 return True
205 205
206 206 proto = httpv1protocolhandler(req, repo.ui,
207 207 lambda perm: checkperm(rctx, req, perm))
208 208
209 209 # The permissions checker should be the only thing that can raise an
210 210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
211 211 # exception here. So consider refactoring into a exception type that
212 212 # is associated with the wire protocol.
213 213 try:
214 214 _callhttp(repo, req, res, proto, cmd)
215 215 except hgwebcommon.ErrorResponse as e:
216 216 for k, v in e.headers:
217 217 res.headers[k] = v
218 218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
219 219 # TODO This response body assumes the failed command was
220 220 # "unbundle." That assumption is not always valid.
221 221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
222 222
223 223 return True
224 224
225 225 def handlewsgiapirequest(rctx, req, res, checkperm):
226 226 """Handle requests to /api/*."""
227 227 assert req.dispatchparts[0] == b'api'
228 228
229 229 repo = rctx.repo
230 230
231 231 # This whole URL space is experimental for now. But we want to
232 232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
233 233 if not repo.ui.configbool('experimental', 'web.apiserver'):
234 234 res.status = b'404 Not Found'
235 235 res.headers[b'Content-Type'] = b'text/plain'
236 236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
237 237 return
238 238
239 239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
240 240 # by <protocol>.
241 241
242 242 # Registered APIs are made available via config options of the name of
243 243 # the protocol.
244 244 availableapis = set()
245 245 for k, v in API_HANDLERS.items():
246 246 section, option = v['config']
247 247 if repo.ui.configbool(section, option):
248 248 availableapis.add(k)
249 249
250 250 # Requests to /api/ list available APIs.
251 251 if req.dispatchparts == [b'api']:
252 252 res.status = b'200 OK'
253 253 res.headers[b'Content-Type'] = b'text/plain'
254 254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
255 255 'one of the following:\n')]
256 256 if availableapis:
257 257 lines.extend(sorted(availableapis))
258 258 else:
259 259 lines.append(_('(no available APIs)\n'))
260 260 res.setbodybytes(b'\n'.join(lines))
261 261 return
262 262
263 263 proto = req.dispatchparts[1]
264 264
265 265 if proto not in API_HANDLERS:
266 266 res.status = b'404 Not Found'
267 267 res.headers[b'Content-Type'] = b'text/plain'
268 268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
269 269 proto, b', '.join(sorted(availableapis))))
270 270 return
271 271
272 272 if proto not in availableapis:
273 273 res.status = b'404 Not Found'
274 274 res.headers[b'Content-Type'] = b'text/plain'
275 275 res.setbodybytes(_('API %s not enabled\n') % proto)
276 276 return
277 277
278 278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
279 279 req.dispatchparts[2:])
280 280
281 281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
282 282 from .hgweb import common as hgwebcommon
283 283
284 284 # URL space looks like: <permissions>/<command>, where <permission> can
285 285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
286 286
287 287 # Root URL does nothing meaningful... yet.
288 288 if not urlparts:
289 289 res.status = b'200 OK'
290 290 res.headers[b'Content-Type'] = b'text/plain'
291 291 res.setbodybytes(_('HTTP version 2 API handler'))
292 292 return
293 293
294 294 if len(urlparts) == 1:
295 295 res.status = b'404 Not Found'
296 296 res.headers[b'Content-Type'] = b'text/plain'
297 297 res.setbodybytes(_('do not know how to process %s\n') %
298 298 req.dispatchpath)
299 299 return
300 300
301 301 permission, command = urlparts[0:2]
302 302
303 303 if permission not in (b'ro', b'rw'):
304 304 res.status = b'404 Not Found'
305 305 res.headers[b'Content-Type'] = b'text/plain'
306 306 res.setbodybytes(_('unknown permission: %s') % permission)
307 307 return
308 308
309 309 if req.method != 'POST':
310 310 res.status = b'405 Method Not Allowed'
311 311 res.headers[b'Allow'] = b'POST'
312 312 res.setbodybytes(_('commands require POST requests'))
313 313 return
314 314
315 315 # At some point we'll want to use our own API instead of recycling the
316 316 # behavior of version 1 of the wire protocol...
317 317 # TODO return reasonable responses - not responses that overload the
318 318 # HTTP status line message for error reporting.
319 319 try:
320 320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
321 321 except hgwebcommon.ErrorResponse as e:
322 322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
323 323 for k, v in e.headers:
324 324 res.headers[k] = v
325 325 res.setbodybytes('permission denied')
326 326 return
327 327
328 328 # We have a special endpoint to reflect the request back at the client.
329 329 if command == b'debugreflect':
330 330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
331 331 return
332 332
333 333 # Extra commands that we handle that aren't really wire protocol
334 334 # commands. Think extra hard before making this hackery available to
335 335 # extension.
336 336 extracommands = {'multirequest'}
337 337
338 338 if command not in wireproto.commands and command not in extracommands:
339 339 res.status = b'404 Not Found'
340 340 res.headers[b'Content-Type'] = b'text/plain'
341 341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
342 342 return
343 343
344 344 repo = rctx.repo
345 345 ui = repo.ui
346 346
347 347 proto = httpv2protocolhandler(req, ui)
348 348
349 349 if (not wireproto.commands.commandavailable(command, proto)
350 350 and command not in extracommands):
351 351 res.status = b'404 Not Found'
352 352 res.headers[b'Content-Type'] = b'text/plain'
353 353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
354 354 return
355 355
356 356 # TODO consider cases where proxies may add additional Accept headers.
357 357 if req.headers.get(b'Accept') != FRAMINGTYPE:
358 358 res.status = b'406 Not Acceptable'
359 359 res.headers[b'Content-Type'] = b'text/plain'
360 360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
361 361 % FRAMINGTYPE)
362 362 return
363 363
364 364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
365 365 res.status = b'415 Unsupported Media Type'
366 366 # TODO we should send a response with appropriate media type,
367 367 # since client does Accept it.
368 368 res.headers[b'Content-Type'] = b'text/plain'
369 369 res.setbodybytes(_('client MUST send Content-Type header with '
370 370 'value: %s\n') % FRAMINGTYPE)
371 371 return
372 372
373 373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
374 374
375 375 def _processhttpv2reflectrequest(ui, repo, req, res):
376 376 """Reads unified frame protocol request and dumps out state to client.
377 377
378 378 This special endpoint can be used to help debug the wire protocol.
379 379
380 380 Instead of routing the request through the normal dispatch mechanism,
381 381 we instead read all frames, decode them, and feed them into our state
382 382 tracker. We then dump the log of all that activity back out to the
383 383 client.
384 384 """
385 385 import json
386 386
387 387 # Reflection APIs have a history of being abused, accidentally disclosing
388 388 # sensitive data, etc. So we have a config knob.
389 389 if not ui.configbool('experimental', 'web.api.debugreflect'):
390 390 res.status = b'404 Not Found'
391 391 res.headers[b'Content-Type'] = b'text/plain'
392 392 res.setbodybytes(_('debugreflect service not available'))
393 393 return
394 394
395 395 # We assume we have a unified framing protocol request body.
396 396
397 397 reactor = wireprotoframing.serverreactor()
398 398 states = []
399 399
400 400 while True:
401 401 frame = wireprotoframing.readframe(req.bodyfh)
402 402
403 403 if not frame:
404 404 states.append(b'received: <no frame>')
405 405 break
406 406
407 407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
408 408 frame.requestid,
409 409 frame.payload))
410 410
411 411 action, meta = reactor.onframerecv(frame)
412 412 states.append(json.dumps((action, meta), sort_keys=True,
413 413 separators=(', ', ': ')))
414 414
415 415 action, meta = reactor.oninputeof()
416 416 meta['action'] = action
417 417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
418 418
419 419 res.status = b'200 OK'
420 420 res.headers[b'Content-Type'] = b'text/plain'
421 421 res.setbodybytes(b'\n'.join(states))
422 422
423 423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
424 424 """Post-validation handler for HTTPv2 requests.
425 425
426 426 Called when the HTTP request contains unified frame-based protocol
427 427 frames for evaluation.
428 428 """
429 429 # TODO Some HTTP clients are full duplex and can receive data before
430 430 # the entire request is transmitted. Figure out a way to indicate support
431 431 # for that so we can opt into full duplex mode.
432 432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 433 seencommand = False
434 434
435 435 while True:
436 436 frame = wireprotoframing.readframe(req.bodyfh)
437 437 if not frame:
438 438 break
439 439
440 440 action, meta = reactor.onframerecv(frame)
441 441
442 442 if action == 'wantframe':
443 443 # Need more data before we can do anything.
444 444 continue
445 445 elif action == 'runcommand':
446 446 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
447 447 reqcommand, reactor, meta,
448 448 issubsequent=seencommand)
449 449
450 450 if sentoutput:
451 451 return
452 452
453 453 seencommand = True
454 454
455 455 elif action == 'error':
456 456 # TODO define proper error mechanism.
457 457 res.status = b'200 OK'
458 458 res.headers[b'Content-Type'] = b'text/plain'
459 459 res.setbodybytes(meta['message'] + b'\n')
460 460 return
461 461 else:
462 462 raise error.ProgrammingError(
463 463 'unhandled action from frame processor: %s' % action)
464 464
465 465 action, meta = reactor.oninputeof()
466 466 if action == 'sendframes':
467 467 # We assume we haven't started sending the response yet. If we're
468 468 # wrong, the response type will raise an exception.
469 469 res.status = b'200 OK'
470 470 res.headers[b'Content-Type'] = FRAMINGTYPE
471 471 res.setbodygen(meta['framegen'])
472 472 elif action == 'noop':
473 473 pass
474 474 else:
475 475 raise error.ProgrammingError('unhandled action from frame processor: %s'
476 476 % action)
477 477
478 478 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
479 479 command, issubsequent):
480 480 """Dispatch a wire protocol command made from HTTPv2 requests.
481 481
482 482 The authenticated permission (``authedperm``) along with the original
483 483 command from the URL (``reqcommand``) are passed in.
484 484 """
485 485 # We already validated that the session has permissions to perform the
486 486 # actions in ``authedperm``. In the unified frame protocol, the canonical
487 487 # command to run is expressed in a frame. However, the URL also requested
488 488 # to run a specific command. We need to be careful that the command we
489 489 # run doesn't have permissions requirements greater than what was granted
490 490 # by ``authedperm``.
491 491 #
492 492 # Our rule for this is we only allow one command per HTTP request and
493 493 # that command must match the command in the URL. However, we make
494 494 # an exception for the ``multirequest`` URL. This URL is allowed to
495 495 # execute multiple commands. We double check permissions of each command
496 496 # as it is invoked to ensure there is no privilege escalation.
497 497 # TODO consider allowing multiple commands to regular command URLs
498 498 # iff each command is the same.
499 499
500 500 proto = httpv2protocolhandler(req, ui, args=command['args'])
501 501
502 502 if reqcommand == b'multirequest':
503 503 if not wireproto.commands.commandavailable(command['command'], proto):
504 504 # TODO proper error mechanism
505 505 res.status = b'200 OK'
506 506 res.headers[b'Content-Type'] = b'text/plain'
507 507 res.setbodybytes(_('wire protocol command not available: %s') %
508 508 command['command'])
509 509 return True
510 510
511 511 # TODO don't use assert here, since it may be elided by -O.
512 512 assert authedperm in (b'ro', b'rw')
513 513 wirecommand = wireproto.commands[command['command']]
514 514 assert wirecommand.permission in ('push', 'pull')
515 515
516 516 if authedperm == b'ro' and wirecommand.permission != 'pull':
517 517 # TODO proper error mechanism
518 518 res.status = b'403 Forbidden'
519 519 res.headers[b'Content-Type'] = b'text/plain'
520 520 res.setbodybytes(_('insufficient permissions to execute '
521 521 'command: %s') % command['command'])
522 522 return True
523 523
524 524 # TODO should we also call checkperm() here? Maybe not if we're going
525 525 # to overhaul that API. The granted scope from the URL check should
526 526 # be good enough.
527 527
528 528 else:
529 529 # Don't allow multiple commands outside of ``multirequest`` URL.
530 530 if issubsequent:
531 531 # TODO proper error mechanism
532 532 res.status = b'200 OK'
533 533 res.headers[b'Content-Type'] = b'text/plain'
534 534 res.setbodybytes(_('multiple commands cannot be issued to this '
535 535 'URL'))
536 536 return True
537 537
538 538 if reqcommand != command['command']:
539 539 # TODO define proper error mechanism
540 540 res.status = b'200 OK'
541 541 res.headers[b'Content-Type'] = b'text/plain'
542 542 res.setbodybytes(_('command in frame must match command in URL'))
543 543 return True
544 544
545 545 rsp = wireproto.dispatch(repo, proto, command['command'])
546 546
547 547 res.status = b'200 OK'
548 548 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream()
549 550
550 551 if isinstance(rsp, wireprototypes.bytesresponse):
551 action, meta = reactor.onbytesresponseready(command['requestid'],
552 action, meta = reactor.onbytesresponseready(stream,
553 command['requestid'],
552 554 rsp.data)
553 555 else:
554 556 action, meta = reactor.onapplicationerror(
555 557 _('unhandled response type from wire proto command'))
556 558
557 559 if action == 'sendframes':
558 560 res.setbodygen(meta['framegen'])
559 561 return True
560 562 elif action == 'noop':
561 563 return False
562 564 else:
563 565 raise error.ProgrammingError('unhandled event from reactor: %s' %
564 566 action)
565 567
566 568 # Maps API name to metadata so custom API can be registered.
567 569 API_HANDLERS = {
568 570 HTTPV2: {
569 571 'config': ('experimental', 'web.api.http-v2'),
570 572 'handler': _handlehttpv2request,
571 573 },
572 574 }
573 575
574 576 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
575 577 def __init__(self, req, ui, args=None):
576 578 self._req = req
577 579 self._ui = ui
578 580 self._args = args
579 581
580 582 @property
581 583 def name(self):
582 584 return HTTPV2
583 585
584 586 def getargs(self, args):
585 587 data = {}
586 588 for k in args.split():
587 589 if k == '*':
588 590 raise NotImplementedError('do not support * args')
589 591 else:
590 592 data[k] = self._args[k]
591 593
592 594 return [data[k] for k in args.split()]
593 595
594 596 def forwardpayload(self, fp):
595 597 raise NotImplementedError
596 598
597 599 @contextlib.contextmanager
598 600 def mayberedirectstdio(self):
599 601 raise NotImplementedError
600 602
601 603 def client(self):
602 604 raise NotImplementedError
603 605
604 606 def addcapabilities(self, repo, caps):
605 607 return caps
606 608
607 609 def checkperm(self, perm):
608 610 raise NotImplementedError
609 611
610 612 def _httpresponsetype(ui, req, prefer_uncompressed):
611 613 """Determine the appropriate response type and compression settings.
612 614
613 615 Returns a tuple of (mediatype, compengine, engineopts).
614 616 """
615 617 # Determine the response media type and compression engine based
616 618 # on the request parameters.
617 619 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
618 620
619 621 if '0.2' in protocaps:
620 622 # All clients are expected to support uncompressed data.
621 623 if prefer_uncompressed:
622 624 return HGTYPE2, util._noopengine(), {}
623 625
624 626 # Default as defined by wire protocol spec.
625 627 compformats = ['zlib', 'none']
626 628 for cap in protocaps:
627 629 if cap.startswith('comp='):
628 630 compformats = cap[5:].split(',')
629 631 break
630 632
631 633 # Now find an agreed upon compression format.
632 634 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
633 635 if engine.wireprotosupport().name in compformats:
634 636 opts = {}
635 637 level = ui.configint('server', '%slevel' % engine.name())
636 638 if level is not None:
637 639 opts['level'] = level
638 640
639 641 return HGTYPE2, engine, opts
640 642
641 643 # No mutually supported compression format. Fall back to the
642 644 # legacy protocol.
643 645
644 646 # Don't allow untrusted settings because disabling compression or
645 647 # setting a very high compression level could lead to flooding
646 648 # the server's network or CPU.
647 649 opts = {'level': ui.configint('server', 'zliblevel')}
648 650 return HGTYPE, util.compengines['zlib'], opts
649 651
650 652 def _callhttp(repo, req, res, proto, cmd):
651 653 # Avoid cycle involving hg module.
652 654 from .hgweb import common as hgwebcommon
653 655
654 656 def genversion2(gen, engine, engineopts):
655 657 # application/mercurial-0.2 always sends a payload header
656 658 # identifying the compression engine.
657 659 name = engine.wireprotosupport().name
658 660 assert 0 < len(name) < 256
659 661 yield struct.pack('B', len(name))
660 662 yield name
661 663
662 664 for chunk in gen:
663 665 yield chunk
664 666
665 667 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
666 668 if code == HTTP_OK:
667 669 res.status = '200 Script output follows'
668 670 else:
669 671 res.status = hgwebcommon.statusmessage(code)
670 672
671 673 res.headers['Content-Type'] = contenttype
672 674
673 675 if bodybytes is not None:
674 676 res.setbodybytes(bodybytes)
675 677 if bodygen is not None:
676 678 res.setbodygen(bodygen)
677 679
678 680 if not wireproto.commands.commandavailable(cmd, proto):
679 681 setresponse(HTTP_OK, HGERRTYPE,
680 682 _('requested wire protocol command is not available over '
681 683 'HTTP'))
682 684 return
683 685
684 686 proto.checkperm(wireproto.commands[cmd].permission)
685 687
686 688 rsp = wireproto.dispatch(repo, proto, cmd)
687 689
688 690 if isinstance(rsp, bytes):
689 691 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
690 692 elif isinstance(rsp, wireprototypes.bytesresponse):
691 693 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
692 694 elif isinstance(rsp, wireprototypes.streamreslegacy):
693 695 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
694 696 elif isinstance(rsp, wireprototypes.streamres):
695 697 gen = rsp.gen
696 698
697 699 # This code for compression should not be streamres specific. It
698 700 # is here because we only compress streamres at the moment.
699 701 mediatype, engine, engineopts = _httpresponsetype(
700 702 repo.ui, req, rsp.prefer_uncompressed)
701 703 gen = engine.compressstream(gen, engineopts)
702 704
703 705 if mediatype == HGTYPE2:
704 706 gen = genversion2(gen, engine, engineopts)
705 707
706 708 setresponse(HTTP_OK, mediatype, bodygen=gen)
707 709 elif isinstance(rsp, wireprototypes.pushres):
708 710 rsp = '%d\n%s' % (rsp.res, rsp.output)
709 711 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
710 712 elif isinstance(rsp, wireprototypes.pusherr):
711 713 rsp = '0\n%s\n' % rsp.res
712 714 res.drain = True
713 715 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
714 716 elif isinstance(rsp, wireprototypes.ooberror):
715 717 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
716 718 else:
717 719 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
718 720
719 721 def _sshv1respondbytes(fout, value):
720 722 """Send a bytes response for protocol version 1."""
721 723 fout.write('%d\n' % len(value))
722 724 fout.write(value)
723 725 fout.flush()
724 726
725 727 def _sshv1respondstream(fout, source):
726 728 write = fout.write
727 729 for chunk in source.gen:
728 730 write(chunk)
729 731 fout.flush()
730 732
731 733 def _sshv1respondooberror(fout, ferr, rsp):
732 734 ferr.write(b'%s\n-\n' % rsp)
733 735 ferr.flush()
734 736 fout.write(b'\n')
735 737 fout.flush()
736 738
737 739 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
738 740 """Handler for requests services via version 1 of SSH protocol."""
739 741 def __init__(self, ui, fin, fout):
740 742 self._ui = ui
741 743 self._fin = fin
742 744 self._fout = fout
743 745
744 746 @property
745 747 def name(self):
746 748 return wireprototypes.SSHV1
747 749
748 750 def getargs(self, args):
749 751 data = {}
750 752 keys = args.split()
751 753 for n in xrange(len(keys)):
752 754 argline = self._fin.readline()[:-1]
753 755 arg, l = argline.split()
754 756 if arg not in keys:
755 757 raise error.Abort(_("unexpected parameter %r") % arg)
756 758 if arg == '*':
757 759 star = {}
758 760 for k in xrange(int(l)):
759 761 argline = self._fin.readline()[:-1]
760 762 arg, l = argline.split()
761 763 val = self._fin.read(int(l))
762 764 star[arg] = val
763 765 data['*'] = star
764 766 else:
765 767 val = self._fin.read(int(l))
766 768 data[arg] = val
767 769 return [data[k] for k in keys]
768 770
769 771 def forwardpayload(self, fpout):
770 772 # We initially send an empty response. This tells the client it is
771 773 # OK to start sending data. If a client sees any other response, it
772 774 # interprets it as an error.
773 775 _sshv1respondbytes(self._fout, b'')
774 776
775 777 # The file is in the form:
776 778 #
777 779 # <chunk size>\n<chunk>
778 780 # ...
779 781 # 0\n
780 782 count = int(self._fin.readline())
781 783 while count:
782 784 fpout.write(self._fin.read(count))
783 785 count = int(self._fin.readline())
784 786
785 787 @contextlib.contextmanager
786 788 def mayberedirectstdio(self):
787 789 yield None
788 790
789 791 def client(self):
790 792 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
791 793 return 'remote:ssh:' + client
792 794
793 795 def addcapabilities(self, repo, caps):
794 796 caps.append(b'batch')
795 797 return caps
796 798
797 799 def checkperm(self, perm):
798 800 pass
799 801
800 802 class sshv2protocolhandler(sshv1protocolhandler):
801 803 """Protocol handler for version 2 of the SSH protocol."""
802 804
803 805 @property
804 806 def name(self):
805 807 return wireprototypes.SSHV2
806 808
807 809 def _runsshserver(ui, repo, fin, fout, ev):
808 810 # This function operates like a state machine of sorts. The following
809 811 # states are defined:
810 812 #
811 813 # protov1-serving
812 814 # Server is in protocol version 1 serving mode. Commands arrive on
813 815 # new lines. These commands are processed in this state, one command
814 816 # after the other.
815 817 #
816 818 # protov2-serving
817 819 # Server is in protocol version 2 serving mode.
818 820 #
819 821 # upgrade-initial
820 822 # The server is going to process an upgrade request.
821 823 #
822 824 # upgrade-v2-filter-legacy-handshake
823 825 # The protocol is being upgraded to version 2. The server is expecting
824 826 # the legacy handshake from version 1.
825 827 #
826 828 # upgrade-v2-finish
827 829 # The upgrade to version 2 of the protocol is imminent.
828 830 #
829 831 # shutdown
830 832 # The server is shutting down, possibly in reaction to a client event.
831 833 #
832 834 # And here are their transitions:
833 835 #
834 836 # protov1-serving -> shutdown
835 837 # When server receives an empty request or encounters another
836 838 # error.
837 839 #
838 840 # protov1-serving -> upgrade-initial
839 841 # An upgrade request line was seen.
840 842 #
841 843 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
842 844 # Upgrade to version 2 in progress. Server is expecting to
843 845 # process a legacy handshake.
844 846 #
845 847 # upgrade-v2-filter-legacy-handshake -> shutdown
846 848 # Client did not fulfill upgrade handshake requirements.
847 849 #
848 850 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
849 851 # Client fulfilled version 2 upgrade requirements. Finishing that
850 852 # upgrade.
851 853 #
852 854 # upgrade-v2-finish -> protov2-serving
853 855 # Protocol upgrade to version 2 complete. Server can now speak protocol
854 856 # version 2.
855 857 #
856 858 # protov2-serving -> protov1-serving
857 859 # Ths happens by default since protocol version 2 is the same as
858 860 # version 1 except for the handshake.
859 861
860 862 state = 'protov1-serving'
861 863 proto = sshv1protocolhandler(ui, fin, fout)
862 864 protoswitched = False
863 865
864 866 while not ev.is_set():
865 867 if state == 'protov1-serving':
866 868 # Commands are issued on new lines.
867 869 request = fin.readline()[:-1]
868 870
869 871 # Empty lines signal to terminate the connection.
870 872 if not request:
871 873 state = 'shutdown'
872 874 continue
873 875
874 876 # It looks like a protocol upgrade request. Transition state to
875 877 # handle it.
876 878 if request.startswith(b'upgrade '):
877 879 if protoswitched:
878 880 _sshv1respondooberror(fout, ui.ferr,
879 881 b'cannot upgrade protocols multiple '
880 882 b'times')
881 883 state = 'shutdown'
882 884 continue
883 885
884 886 state = 'upgrade-initial'
885 887 continue
886 888
887 889 available = wireproto.commands.commandavailable(request, proto)
888 890
889 891 # This command isn't available. Send an empty response and go
890 892 # back to waiting for a new command.
891 893 if not available:
892 894 _sshv1respondbytes(fout, b'')
893 895 continue
894 896
895 897 rsp = wireproto.dispatch(repo, proto, request)
896 898
897 899 if isinstance(rsp, bytes):
898 900 _sshv1respondbytes(fout, rsp)
899 901 elif isinstance(rsp, wireprototypes.bytesresponse):
900 902 _sshv1respondbytes(fout, rsp.data)
901 903 elif isinstance(rsp, wireprototypes.streamres):
902 904 _sshv1respondstream(fout, rsp)
903 905 elif isinstance(rsp, wireprototypes.streamreslegacy):
904 906 _sshv1respondstream(fout, rsp)
905 907 elif isinstance(rsp, wireprototypes.pushres):
906 908 _sshv1respondbytes(fout, b'')
907 909 _sshv1respondbytes(fout, b'%d' % rsp.res)
908 910 elif isinstance(rsp, wireprototypes.pusherr):
909 911 _sshv1respondbytes(fout, rsp.res)
910 912 elif isinstance(rsp, wireprototypes.ooberror):
911 913 _sshv1respondooberror(fout, ui.ferr, rsp.message)
912 914 else:
913 915 raise error.ProgrammingError('unhandled response type from '
914 916 'wire protocol command: %s' % rsp)
915 917
916 918 # For now, protocol version 2 serving just goes back to version 1.
917 919 elif state == 'protov2-serving':
918 920 state = 'protov1-serving'
919 921 continue
920 922
921 923 elif state == 'upgrade-initial':
922 924 # We should never transition into this state if we've switched
923 925 # protocols.
924 926 assert not protoswitched
925 927 assert proto.name == wireprototypes.SSHV1
926 928
927 929 # Expected: upgrade <token> <capabilities>
928 930 # If we get something else, the request is malformed. It could be
929 931 # from a future client that has altered the upgrade line content.
930 932 # We treat this as an unknown command.
931 933 try:
932 934 token, caps = request.split(b' ')[1:]
933 935 except ValueError:
934 936 _sshv1respondbytes(fout, b'')
935 937 state = 'protov1-serving'
936 938 continue
937 939
938 940 # Send empty response if we don't support upgrading protocols.
939 941 if not ui.configbool('experimental', 'sshserver.support-v2'):
940 942 _sshv1respondbytes(fout, b'')
941 943 state = 'protov1-serving'
942 944 continue
943 945
944 946 try:
945 947 caps = urlreq.parseqs(caps)
946 948 except ValueError:
947 949 _sshv1respondbytes(fout, b'')
948 950 state = 'protov1-serving'
949 951 continue
950 952
951 953 # We don't see an upgrade request to protocol version 2. Ignore
952 954 # the upgrade request.
953 955 wantedprotos = caps.get(b'proto', [b''])[0]
954 956 if SSHV2 not in wantedprotos:
955 957 _sshv1respondbytes(fout, b'')
956 958 state = 'protov1-serving'
957 959 continue
958 960
959 961 # It looks like we can honor this upgrade request to protocol 2.
960 962 # Filter the rest of the handshake protocol request lines.
961 963 state = 'upgrade-v2-filter-legacy-handshake'
962 964 continue
963 965
964 966 elif state == 'upgrade-v2-filter-legacy-handshake':
965 967 # Client should have sent legacy handshake after an ``upgrade``
966 968 # request. Expected lines:
967 969 #
968 970 # hello
969 971 # between
970 972 # pairs 81
971 973 # 0000...-0000...
972 974
973 975 ok = True
974 976 for line in (b'hello', b'between', b'pairs 81'):
975 977 request = fin.readline()[:-1]
976 978
977 979 if request != line:
978 980 _sshv1respondooberror(fout, ui.ferr,
979 981 b'malformed handshake protocol: '
980 982 b'missing %s' % line)
981 983 ok = False
982 984 state = 'shutdown'
983 985 break
984 986
985 987 if not ok:
986 988 continue
987 989
988 990 request = fin.read(81)
989 991 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
990 992 _sshv1respondooberror(fout, ui.ferr,
991 993 b'malformed handshake protocol: '
992 994 b'missing between argument value')
993 995 state = 'shutdown'
994 996 continue
995 997
996 998 state = 'upgrade-v2-finish'
997 999 continue
998 1000
999 1001 elif state == 'upgrade-v2-finish':
1000 1002 # Send the upgrade response.
1001 1003 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1002 1004 servercaps = wireproto.capabilities(repo, proto)
1003 1005 rsp = b'capabilities: %s' % servercaps.data
1004 1006 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1005 1007 fout.flush()
1006 1008
1007 1009 proto = sshv2protocolhandler(ui, fin, fout)
1008 1010 protoswitched = True
1009 1011
1010 1012 state = 'protov2-serving'
1011 1013 continue
1012 1014
1013 1015 elif state == 'shutdown':
1014 1016 break
1015 1017
1016 1018 else:
1017 1019 raise error.ProgrammingError('unhandled ssh server state: %s' %
1018 1020 state)
1019 1021
1020 1022 class sshserver(object):
1021 1023 def __init__(self, ui, repo, logfh=None):
1022 1024 self._ui = ui
1023 1025 self._repo = repo
1024 1026 self._fin = ui.fin
1025 1027 self._fout = ui.fout
1026 1028
1027 1029 # Log write I/O to stdout and stderr if configured.
1028 1030 if logfh:
1029 1031 self._fout = util.makeloggingfileobject(
1030 1032 logfh, self._fout, 'o', logdata=True)
1031 1033 ui.ferr = util.makeloggingfileobject(
1032 1034 logfh, ui.ferr, 'e', logdata=True)
1033 1035
1034 1036 hook.redirect(True)
1035 1037 ui.fout = repo.ui.fout = ui.ferr
1036 1038
1037 1039 # Prevent insertion/deletion of CRs
1038 1040 procutil.setbinary(self._fin)
1039 1041 procutil.setbinary(self._fout)
1040 1042
1041 1043 def serve_forever(self):
1042 1044 self.serveuntil(threading.Event())
1043 1045 sys.exit(0)
1044 1046
1045 1047 def serveuntil(self, ev):
1046 1048 """Serve until a threading.Event is set."""
1047 1049 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,639 +1,678
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial import (
6 6 util,
7 7 wireprotoframing as framing,
8 8 )
9 9
10 10 ffs = framing.makeframefromhumanstring
11 11
12 12 def makereactor(deferoutput=False):
13 13 return framing.serverreactor(deferoutput=deferoutput)
14 14
15 15 def sendframes(reactor, gen):
16 16 """Send a generator of frame bytearray to a reactor.
17 17
18 18 Emits a generator of results from ``onframerecv()`` calls.
19 19 """
20 20 for frame in gen:
21 21 header = framing.parseheader(frame)
22 22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 23 assert len(payload) == header.length
24 24
25 25 yield reactor.onframerecv(framing.frame(header.requestid,
26 26 header.typeid,
27 27 header.flags,
28 28 payload))
29 29
30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
30 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
31 31 """Generate frames to run a command and send them to a reactor."""
32 32 return sendframes(reactor,
33 framing.createcommandframes(rid, cmd, args, datafh))
33 framing.createcommandframes(stream, rid, cmd, args,
34 datafh))
34 35
35 36 class FrameTests(unittest.TestCase):
36 37 def testdataexactframesize(self):
37 38 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
38 39
39 frames = list(framing.createcommandframes(1, b'command', {}, data))
40 stream = framing.stream()
41 frames = list(framing.createcommandframes(stream, 1, b'command',
42 {}, data))
40 43 self.assertEqual(frames, [
41 44 ffs(b'1 command-name have-data command'),
42 45 ffs(b'1 command-data continuation %s' % data.getvalue()),
43 46 ffs(b'1 command-data eos ')
44 47 ])
45 48
46 49 def testdatamultipleframes(self):
47 50 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
48 frames = list(framing.createcommandframes(1, b'command', {}, data))
51
52 stream = framing.stream()
53 frames = list(framing.createcommandframes(stream, 1, b'command', {},
54 data))
49 55 self.assertEqual(frames, [
50 56 ffs(b'1 command-name have-data command'),
51 57 ffs(b'1 command-data continuation %s' % (
52 58 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
53 59 ffs(b'1 command-data eos x'),
54 60 ])
55 61
56 62 def testargsanddata(self):
57 63 data = util.bytesio(b'x' * 100)
58 64
59 frames = list(framing.createcommandframes(1, b'command', {
65 stream = framing.stream()
66 frames = list(framing.createcommandframes(stream, 1, b'command', {
60 67 b'key1': b'key1value',
61 68 b'key2': b'key2value',
62 69 b'key3': b'key3value',
63 70 }, data))
64 71
65 72 self.assertEqual(frames, [
66 73 ffs(b'1 command-name have-args|have-data command'),
67 74 ffs(br'1 command-argument 0 \x04\x00\x09\x00key1key1value'),
68 75 ffs(br'1 command-argument 0 \x04\x00\x09\x00key2key2value'),
69 76 ffs(br'1 command-argument eoa \x04\x00\x09\x00key3key3value'),
70 77 ffs(b'1 command-data eos %s' % data.getvalue()),
71 78 ])
72 79
73 80 def testtextoutputexcessiveargs(self):
74 81 """At most 255 formatting arguments are allowed."""
75 82 with self.assertRaisesRegexp(ValueError,
76 83 'cannot use more than 255 formatting'):
77 84 args = [b'x' for i in range(256)]
78 list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
85 list(framing.createtextoutputframe(None, 1,
86 [(b'bleh', args, [])]))
79 87
80 88 def testtextoutputexcessivelabels(self):
81 89 """At most 255 labels are allowed."""
82 90 with self.assertRaisesRegexp(ValueError,
83 91 'cannot use more than 255 labels'):
84 92 labels = [b'l' for i in range(256)]
85 list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
93 list(framing.createtextoutputframe(None, 1,
94 [(b'bleh', [], labels)]))
86 95
87 96 def testtextoutputformattingstringtype(self):
88 97 """Formatting string must be bytes."""
89 98 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
90 list(framing.createtextoutputframe(1, [
99 list(framing.createtextoutputframe(None, 1, [
91 100 (b'foo'.decode('ascii'), [], [])]))
92 101
93 102 def testtextoutputargumentbytes(self):
94 103 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
95 list(framing.createtextoutputframe(1, [
104 list(framing.createtextoutputframe(None, 1, [
96 105 (b'foo', [b'foo'.decode('ascii')], [])]))
97 106
98 107 def testtextoutputlabelbytes(self):
99 108 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
100 list(framing.createtextoutputframe(1, [
109 list(framing.createtextoutputframe(None, 1, [
101 110 (b'foo', [], [b'foo'.decode('ascii')])]))
102 111
103 112 def testtextoutputtoolongformatstring(self):
104 113 with self.assertRaisesRegexp(ValueError,
105 114 'formatting string cannot be longer than'):
106 list(framing.createtextoutputframe(1, [
115 list(framing.createtextoutputframe(None, 1, [
107 116 (b'x' * 65536, [], [])]))
108 117
109 118 def testtextoutputtoolongargumentstring(self):
110 119 with self.assertRaisesRegexp(ValueError,
111 120 'argument string cannot be longer than'):
112 list(framing.createtextoutputframe(1, [
121 list(framing.createtextoutputframe(None, 1, [
113 122 (b'bleh', [b'x' * 65536], [])]))
114 123
115 124 def testtextoutputtoolonglabelstring(self):
116 125 with self.assertRaisesRegexp(ValueError,
117 126 'label string cannot be longer than'):
118 list(framing.createtextoutputframe(1, [
127 list(framing.createtextoutputframe(None, 1, [
119 128 (b'bleh', [], [b'x' * 65536])]))
120 129
121 130 def testtextoutput1simpleatom(self):
122 val = list(framing.createtextoutputframe(1, [
131 stream = framing.stream()
132 val = list(framing.createtextoutputframe(stream, 1, [
123 133 (b'foo', [], [])]))
124 134
125 135 self.assertEqual(val, [
126 136 ffs(br'1 text-output 0 \x03\x00\x00\x00foo'),
127 137 ])
128 138
129 139 def testtextoutput2simpleatoms(self):
130 val = list(framing.createtextoutputframe(1, [
140 stream = framing.stream()
141 val = list(framing.createtextoutputframe(stream, 1, [
131 142 (b'foo', [], []),
132 143 (b'bar', [], []),
133 144 ]))
134 145
135 146 self.assertEqual(val, [
136 147 ffs(br'1 text-output 0 \x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
137 148 ])
138 149
139 150 def testtextoutput1arg(self):
140 val = list(framing.createtextoutputframe(1, [
151 stream = framing.stream()
152 val = list(framing.createtextoutputframe(stream, 1, [
141 153 (b'foo %s', [b'val1'], []),
142 154 ]))
143 155
144 156 self.assertEqual(val, [
145 157 ffs(br'1 text-output 0 \x06\x00\x00\x01\x04\x00foo %sval1'),
146 158 ])
147 159
148 160 def testtextoutput2arg(self):
149 val = list(framing.createtextoutputframe(1, [
161 stream = framing.stream()
162 val = list(framing.createtextoutputframe(stream, 1, [
150 163 (b'foo %s %s', [b'val', b'value'], []),
151 164 ]))
152 165
153 166 self.assertEqual(val, [
154 167 ffs(br'1 text-output 0 \x09\x00\x00\x02\x03\x00\x05\x00'
155 168 br'foo %s %svalvalue'),
156 169 ])
157 170
158 171 def testtextoutput1label(self):
159 val = list(framing.createtextoutputframe(1, [
172 stream = framing.stream()
173 val = list(framing.createtextoutputframe(stream, 1, [
160 174 (b'foo', [], [b'label']),
161 175 ]))
162 176
163 177 self.assertEqual(val, [
164 178 ffs(br'1 text-output 0 \x03\x00\x01\x00\x05foolabel'),
165 179 ])
166 180
167 181 def testargandlabel(self):
168 val = list(framing.createtextoutputframe(1, [
182 stream = framing.stream()
183 val = list(framing.createtextoutputframe(stream, 1, [
169 184 (b'foo %s', [b'arg'], [b'label']),
170 185 ]))
171 186
172 187 self.assertEqual(val, [
173 188 ffs(br'1 text-output 0 \x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
174 189 ])
175 190
176 191 class ServerReactorTests(unittest.TestCase):
177 192 def _sendsingleframe(self, reactor, f):
178 193 results = list(sendframes(reactor, [f]))
179 194 self.assertEqual(len(results), 1)
180 195
181 196 return results[0]
182 197
183 198 def assertaction(self, res, expected):
184 199 self.assertIsInstance(res, tuple)
185 200 self.assertEqual(len(res), 2)
186 201 self.assertIsInstance(res[1], dict)
187 202 self.assertEqual(res[0], expected)
188 203
189 204 def assertframesequal(self, frames, framestrings):
190 205 expected = [ffs(s) for s in framestrings]
191 206 self.assertEqual(list(frames), expected)
192 207
193 208 def test1framecommand(self):
194 209 """Receiving a command in a single frame yields request to run it."""
195 210 reactor = makereactor()
196 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
211 stream = framing.stream()
212 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
197 213 self.assertEqual(len(results), 1)
198 214 self.assertaction(results[0], 'runcommand')
199 215 self.assertEqual(results[0][1], {
200 216 'requestid': 1,
201 217 'command': b'mycommand',
202 218 'args': {},
203 219 'data': None,
204 220 })
205 221
206 222 result = reactor.oninputeof()
207 223 self.assertaction(result, 'noop')
208 224
209 225 def test1argument(self):
210 226 reactor = makereactor()
211 results = list(sendcommandframes(reactor, 41, b'mycommand',
227 stream = framing.stream()
228 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
212 229 {b'foo': b'bar'}))
213 230 self.assertEqual(len(results), 2)
214 231 self.assertaction(results[0], 'wantframe')
215 232 self.assertaction(results[1], 'runcommand')
216 233 self.assertEqual(results[1][1], {
217 234 'requestid': 41,
218 235 'command': b'mycommand',
219 236 'args': {b'foo': b'bar'},
220 237 'data': None,
221 238 })
222 239
223 240 def testmultiarguments(self):
224 241 reactor = makereactor()
225 results = list(sendcommandframes(reactor, 1, b'mycommand',
242 stream = framing.stream()
243 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
226 244 {b'foo': b'bar', b'biz': b'baz'}))
227 245 self.assertEqual(len(results), 3)
228 246 self.assertaction(results[0], 'wantframe')
229 247 self.assertaction(results[1], 'wantframe')
230 248 self.assertaction(results[2], 'runcommand')
231 249 self.assertEqual(results[2][1], {
232 250 'requestid': 1,
233 251 'command': b'mycommand',
234 252 'args': {b'foo': b'bar', b'biz': b'baz'},
235 253 'data': None,
236 254 })
237 255
238 256 def testsimplecommanddata(self):
239 257 reactor = makereactor()
240 results = list(sendcommandframes(reactor, 1, b'mycommand', {},
258 stream = framing.stream()
259 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
241 260 util.bytesio(b'data!')))
242 261 self.assertEqual(len(results), 2)
243 262 self.assertaction(results[0], 'wantframe')
244 263 self.assertaction(results[1], 'runcommand')
245 264 self.assertEqual(results[1][1], {
246 265 'requestid': 1,
247 266 'command': b'mycommand',
248 267 'args': {},
249 268 'data': b'data!',
250 269 })
251 270
252 271 def testmultipledataframes(self):
253 272 frames = [
254 273 ffs(b'1 command-name have-data mycommand'),
255 274 ffs(b'1 command-data continuation data1'),
256 275 ffs(b'1 command-data continuation data2'),
257 276 ffs(b'1 command-data eos data3'),
258 277 ]
259 278
260 279 reactor = makereactor()
261 280 results = list(sendframes(reactor, frames))
262 281 self.assertEqual(len(results), 4)
263 282 for i in range(3):
264 283 self.assertaction(results[i], 'wantframe')
265 284 self.assertaction(results[3], 'runcommand')
266 285 self.assertEqual(results[3][1], {
267 286 'requestid': 1,
268 287 'command': b'mycommand',
269 288 'args': {},
270 289 'data': b'data1data2data3',
271 290 })
272 291
273 292 def testargumentanddata(self):
274 293 frames = [
275 294 ffs(b'1 command-name have-args|have-data command'),
276 295 ffs(br'1 command-argument 0 \x03\x00\x03\x00keyval'),
277 296 ffs(br'1 command-argument eoa \x03\x00\x03\x00foobar'),
278 297 ffs(b'1 command-data continuation value1'),
279 298 ffs(b'1 command-data eos value2'),
280 299 ]
281 300
282 301 reactor = makereactor()
283 302 results = list(sendframes(reactor, frames))
284 303
285 304 self.assertaction(results[-1], 'runcommand')
286 305 self.assertEqual(results[-1][1], {
287 306 'requestid': 1,
288 307 'command': b'command',
289 308 'args': {
290 309 b'key': b'val',
291 310 b'foo': b'bar',
292 311 },
293 312 'data': b'value1value2',
294 313 })
295 314
296 315 def testunexpectedcommandargument(self):
297 316 """Command argument frame when not running a command is an error."""
298 317 result = self._sendsingleframe(makereactor(),
299 318 ffs(b'1 command-argument 0 ignored'))
300 319 self.assertaction(result, 'error')
301 320 self.assertEqual(result[1], {
302 321 'message': b'expected command frame; got 2',
303 322 })
304 323
305 324 def testunexpectedcommandargumentreceiving(self):
306 325 """Same as above but the command is receiving."""
307 326 results = list(sendframes(makereactor(), [
308 327 ffs(b'1 command-name have-data command'),
309 328 ffs(b'1 command-argument eoa ignored'),
310 329 ]))
311 330
312 331 self.assertaction(results[1], 'error')
313 332 self.assertEqual(results[1][1], {
314 333 'message': b'received command argument frame for request that is '
315 334 b'not expecting arguments: 1',
316 335 })
317 336
318 337 def testunexpectedcommanddata(self):
319 338 """Command argument frame when not running a command is an error."""
320 339 result = self._sendsingleframe(makereactor(),
321 340 ffs(b'1 command-data 0 ignored'))
322 341 self.assertaction(result, 'error')
323 342 self.assertEqual(result[1], {
324 343 'message': b'expected command frame; got 3',
325 344 })
326 345
327 346 def testunexpectedcommanddatareceiving(self):
328 347 """Same as above except the command is receiving."""
329 348 results = list(sendframes(makereactor(), [
330 349 ffs(b'1 command-name have-args command'),
331 350 ffs(b'1 command-data eos ignored'),
332 351 ]))
333 352
334 353 self.assertaction(results[1], 'error')
335 354 self.assertEqual(results[1][1], {
336 355 'message': b'received command data frame for request that is not '
337 356 b'expecting data: 1',
338 357 })
339 358
340 359 def testmissingcommandframeflags(self):
341 360 """Command name frame must have flags set."""
342 361 result = self._sendsingleframe(makereactor(),
343 362 ffs(b'1 command-name 0 command'))
344 363 self.assertaction(result, 'error')
345 364 self.assertEqual(result[1], {
346 365 'message': b'missing frame flags on command frame',
347 366 })
348 367
349 368 def testconflictingrequestidallowed(self):
350 369 """Multiple fully serviced commands with same request ID is allowed."""
351 370 reactor = makereactor()
352 371 results = []
372 outstream = framing.stream()
353 373 results.append(self._sendsingleframe(
354 374 reactor, ffs(b'1 command-name eos command')))
355 result = reactor.onbytesresponseready(1, b'response1')
375 result = reactor.onbytesresponseready(outstream, 1, b'response1')
356 376 self.assertaction(result, 'sendframes')
357 377 list(result[1]['framegen'])
358 378 results.append(self._sendsingleframe(
359 379 reactor, ffs(b'1 command-name eos command')))
360 result = reactor.onbytesresponseready(1, b'response2')
380 result = reactor.onbytesresponseready(outstream, 1, b'response2')
361 381 self.assertaction(result, 'sendframes')
362 382 list(result[1]['framegen'])
363 383 results.append(self._sendsingleframe(
364 384 reactor, ffs(b'1 command-name eos command')))
365 result = reactor.onbytesresponseready(1, b'response3')
385 result = reactor.onbytesresponseready(outstream, 1, b'response3')
366 386 self.assertaction(result, 'sendframes')
367 387 list(result[1]['framegen'])
368 388
369 389 for i in range(3):
370 390 self.assertaction(results[i], 'runcommand')
371 391 self.assertEqual(results[i][1], {
372 392 'requestid': 1,
373 393 'command': b'command',
374 394 'args': {},
375 395 'data': None,
376 396 })
377 397
378 398 def testconflictingrequestid(self):
379 399 """Request ID for new command matching in-flight command is illegal."""
380 400 results = list(sendframes(makereactor(), [
381 401 ffs(b'1 command-name have-args command'),
382 402 ffs(b'1 command-name eos command'),
383 403 ]))
384 404
385 405 self.assertaction(results[0], 'wantframe')
386 406 self.assertaction(results[1], 'error')
387 407 self.assertEqual(results[1][1], {
388 408 'message': b'request with ID 1 already received',
389 409 })
390 410
391 411 def testinterleavedcommands(self):
392 412 results = list(sendframes(makereactor(), [
393 413 ffs(b'1 command-name have-args command1'),
394 414 ffs(b'3 command-name have-args command3'),
395 415 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
396 416 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
397 417 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
398 418 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
399 419 ]))
400 420
401 421 self.assertEqual([t[0] for t in results], [
402 422 'wantframe',
403 423 'wantframe',
404 424 'wantframe',
405 425 'wantframe',
406 426 'runcommand',
407 427 'runcommand',
408 428 ])
409 429
410 430 self.assertEqual(results[4][1], {
411 431 'requestid': 3,
412 432 'command': 'command3',
413 433 'args': {b'biz': b'baz', b'key': b'val'},
414 434 'data': None,
415 435 })
416 436 self.assertEqual(results[5][1], {
417 437 'requestid': 1,
418 438 'command': 'command1',
419 439 'args': {b'foo': b'bar', b'key1': b'val'},
420 440 'data': None,
421 441 })
422 442
423 443 def testmissingargumentframe(self):
424 444 # This test attempts to test behavior when reactor has an incomplete
425 445 # command request waiting on argument data. But it doesn't handle that
426 446 # scenario yet. So this test does nothing of value.
427 447 frames = [
428 448 ffs(b'1 command-name have-args command'),
429 449 ]
430 450
431 451 results = list(sendframes(makereactor(), frames))
432 452 self.assertaction(results[0], 'wantframe')
433 453
434 454 def testincompleteargumentname(self):
435 455 """Argument frame with incomplete name."""
436 456 frames = [
437 457 ffs(b'1 command-name have-args command1'),
438 458 ffs(br'1 command-argument eoa \x04\x00\xde\xadfoo'),
439 459 ]
440 460
441 461 results = list(sendframes(makereactor(), frames))
442 462 self.assertEqual(len(results), 2)
443 463 self.assertaction(results[0], 'wantframe')
444 464 self.assertaction(results[1], 'error')
445 465 self.assertEqual(results[1][1], {
446 466 'message': b'malformed argument frame: partial argument name',
447 467 })
448 468
449 469 def testincompleteargumentvalue(self):
450 470 """Argument frame with incomplete value."""
451 471 frames = [
452 472 ffs(b'1 command-name have-args command'),
453 473 ffs(br'1 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
454 474 ]
455 475
456 476 results = list(sendframes(makereactor(), frames))
457 477 self.assertEqual(len(results), 2)
458 478 self.assertaction(results[0], 'wantframe')
459 479 self.assertaction(results[1], 'error')
460 480 self.assertEqual(results[1][1], {
461 481 'message': b'malformed argument frame: partial argument value',
462 482 })
463 483
464 484 def testmissingcommanddataframe(self):
465 485 # The reactor doesn't currently handle partially received commands.
466 486 # So this test is failing to do anything with request 1.
467 487 frames = [
468 488 ffs(b'1 command-name have-data command1'),
469 489 ffs(b'3 command-name eos command2'),
470 490 ]
471 491 results = list(sendframes(makereactor(), frames))
472 492 self.assertEqual(len(results), 2)
473 493 self.assertaction(results[0], 'wantframe')
474 494 self.assertaction(results[1], 'runcommand')
475 495
476 496 def testmissingcommanddataframeflags(self):
477 497 frames = [
478 498 ffs(b'1 command-name have-data command1'),
479 499 ffs(b'1 command-data 0 data'),
480 500 ]
481 501 results = list(sendframes(makereactor(), frames))
482 502 self.assertEqual(len(results), 2)
483 503 self.assertaction(results[0], 'wantframe')
484 504 self.assertaction(results[1], 'error')
485 505 self.assertEqual(results[1][1], {
486 506 'message': b'command data frame without flags',
487 507 })
488 508
489 509 def testframefornonreceivingrequest(self):
490 510 """Receiving a frame for a command that is not receiving is illegal."""
491 511 results = list(sendframes(makereactor(), [
492 512 ffs(b'1 command-name eos command1'),
493 513 ffs(b'3 command-name have-data command3'),
494 514 ffs(b'5 command-argument eoa ignored'),
495 515 ]))
496 516 self.assertaction(results[2], 'error')
497 517 self.assertEqual(results[2][1], {
498 518 'message': b'received frame for request that is not receiving: 5',
499 519 })
500 520
501 521 def testsimpleresponse(self):
502 522 """Bytes response to command sends result frames."""
503 523 reactor = makereactor()
504 list(sendcommandframes(reactor, 1, b'mycommand', {}))
524 instream = framing.stream()
525 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
505 526
506 result = reactor.onbytesresponseready(1, b'response')
527 outstream = framing.stream()
528 result = reactor.onbytesresponseready(outstream, 1, b'response')
507 529 self.assertaction(result, 'sendframes')
508 530 self.assertframesequal(result[1]['framegen'], [
509 531 b'1 bytes-response eos response',
510 532 ])
511 533
512 534 def testmultiframeresponse(self):
513 535 """Bytes response spanning multiple frames is handled."""
514 536 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
515 537 second = b'y' * 100
516 538
517 539 reactor = makereactor()
518 list(sendcommandframes(reactor, 1, b'mycommand', {}))
540 instream = framing.stream()
541 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
519 542
520 result = reactor.onbytesresponseready(1, first + second)
543 outstream = framing.stream()
544 result = reactor.onbytesresponseready(outstream, 1, first + second)
521 545 self.assertaction(result, 'sendframes')
522 546 self.assertframesequal(result[1]['framegen'], [
523 547 b'1 bytes-response continuation %s' % first,
524 548 b'1 bytes-response eos %s' % second,
525 549 ])
526 550
527 551 def testapplicationerror(self):
528 552 reactor = makereactor()
529 list(sendcommandframes(reactor, 1, b'mycommand', {}))
553 instream = framing.stream()
554 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
530 555
531 result = reactor.onapplicationerror(1, b'some message')
556 outstream = framing.stream()
557 result = reactor.onapplicationerror(outstream, 1, b'some message')
532 558 self.assertaction(result, 'sendframes')
533 559 self.assertframesequal(result[1]['framegen'], [
534 560 b'1 error-response application some message',
535 561 ])
536 562
537 563 def test1commanddeferresponse(self):
538 564 """Responses when in deferred output mode are delayed until EOF."""
539 565 reactor = makereactor(deferoutput=True)
540 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
566 instream = framing.stream()
567 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
568 {}))
541 569 self.assertEqual(len(results), 1)
542 570 self.assertaction(results[0], 'runcommand')
543 571
544 result = reactor.onbytesresponseready(1, b'response')
572 outstream = framing.stream()
573 result = reactor.onbytesresponseready(outstream, 1, b'response')
545 574 self.assertaction(result, 'noop')
546 575 result = reactor.oninputeof()
547 576 self.assertaction(result, 'sendframes')
548 577 self.assertframesequal(result[1]['framegen'], [
549 578 b'1 bytes-response eos response',
550 579 ])
551 580
552 581 def testmultiplecommanddeferresponse(self):
553 582 reactor = makereactor(deferoutput=True)
554 list(sendcommandframes(reactor, 1, b'command1', {}))
555 list(sendcommandframes(reactor, 3, b'command2', {}))
583 instream = framing.stream()
584 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
585 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
556 586
557 result = reactor.onbytesresponseready(1, b'response1')
587 outstream = framing.stream()
588 result = reactor.onbytesresponseready(outstream, 1, b'response1')
558 589 self.assertaction(result, 'noop')
559 result = reactor.onbytesresponseready(3, b'response2')
590 result = reactor.onbytesresponseready(outstream, 3, b'response2')
560 591 self.assertaction(result, 'noop')
561 592 result = reactor.oninputeof()
562 593 self.assertaction(result, 'sendframes')
563 594 self.assertframesequal(result[1]['framegen'], [
564 595 b'1 bytes-response eos response1',
565 596 b'3 bytes-response eos response2'
566 597 ])
567 598
568 599 def testrequestidtracking(self):
569 600 reactor = makereactor(deferoutput=True)
570 list(sendcommandframes(reactor, 1, b'command1', {}))
571 list(sendcommandframes(reactor, 3, b'command2', {}))
572 list(sendcommandframes(reactor, 5, b'command3', {}))
601 instream = framing.stream()
602 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
603 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
604 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
573 605
574 606 # Register results for commands out of order.
575 reactor.onbytesresponseready(3, b'response3')
576 reactor.onbytesresponseready(1, b'response1')
577 reactor.onbytesresponseready(5, b'response5')
607 outstream = framing.stream()
608 reactor.onbytesresponseready(outstream, 3, b'response3')
609 reactor.onbytesresponseready(outstream, 1, b'response1')
610 reactor.onbytesresponseready(outstream, 5, b'response5')
578 611
579 612 result = reactor.oninputeof()
580 613 self.assertaction(result, 'sendframes')
581 614 self.assertframesequal(result[1]['framegen'], [
582 615 b'3 bytes-response eos response3',
583 616 b'1 bytes-response eos response1',
584 617 b'5 bytes-response eos response5',
585 618 ])
586 619
587 620 def testduplicaterequestonactivecommand(self):
588 621 """Receiving a request ID that matches a request that isn't finished."""
589 622 reactor = makereactor()
590 list(sendcommandframes(reactor, 1, b'command1', {}))
591 results = list(sendcommandframes(reactor, 1, b'command1', {}))
623 stream = framing.stream()
624 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
625 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
592 626
593 627 self.assertaction(results[0], 'error')
594 628 self.assertEqual(results[0][1], {
595 629 'message': b'request with ID 1 is already active',
596 630 })
597 631
598 632 def testduplicaterequestonactivecommandnosend(self):
599 633 """Same as above but we've registered a response but haven't sent it."""
600 634 reactor = makereactor()
601 list(sendcommandframes(reactor, 1, b'command1', {}))
602 reactor.onbytesresponseready(1, b'response')
635 instream = framing.stream()
636 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
637 outstream = framing.stream()
638 reactor.onbytesresponseready(outstream, 1, b'response')
603 639
604 640 # We've registered the response but haven't sent it. From the
605 641 # perspective of the reactor, the command is still active.
606 642
607 results = list(sendcommandframes(reactor, 1, b'command1', {}))
643 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
608 644 self.assertaction(results[0], 'error')
609 645 self.assertEqual(results[0][1], {
610 646 'message': b'request with ID 1 is already active',
611 647 })
612 648
613 649 def testduplicaterequestargumentframe(self):
614 650 """Variant on above except we sent an argument frame instead of name."""
615 651 reactor = makereactor()
616 list(sendcommandframes(reactor, 1, b'command', {}))
652 stream = framing.stream()
653 list(sendcommandframes(reactor, stream, 1, b'command', {}))
617 654 results = list(sendframes(reactor, [
618 655 ffs(b'3 command-name have-args command'),
619 656 ffs(b'1 command-argument 0 ignored'),
620 657 ]))
621 658 self.assertaction(results[0], 'wantframe')
622 659 self.assertaction(results[1], 'error')
623 660 self.assertEqual(results[1][1], {
624 661 'message': 'received frame for request that is still active: 1',
625 662 })
626 663
627 664 def testduplicaterequestaftersend(self):
628 665 """We can use a duplicate request ID after we've sent the response."""
629 666 reactor = makereactor()
630 list(sendcommandframes(reactor, 1, b'command1', {}))
631 res = reactor.onbytesresponseready(1, b'response')
667 instream = framing.stream()
668 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
669 outstream = framing.stream()
670 res = reactor.onbytesresponseready(outstream, 1, b'response')
632 671 list(res[1]['framegen'])
633 672
634 results = list(sendcommandframes(reactor, 1, b'command1', {}))
673 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
635 674 self.assertaction(results[0], 'runcommand')
636 675
637 676 if __name__ == '__main__':
638 677 import silenttestrunner
639 678 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now