##// END OF EJS Templates
wireproto: explicitly track which requests are active...
Gregory Szorc -
r37081:39304dd6 default
parent child Browse files
Show More
@@ -472,6 +472,10 b' class serverreactor(object):'
472 self._bufferedframegens = []
472 self._bufferedframegens = []
473 # request id -> dict of commands that are actively being received.
473 # request id -> dict of commands that are actively being received.
474 self._receivingcommands = {}
474 self._receivingcommands = {}
475 # Request IDs that have been received and are actively being processed.
476 # Once all output for a request has been sent, it is removed from this
477 # set.
478 self._activecommands = set()
475
479
476 def onframerecv(self, frame):
480 def onframerecv(self, frame):
477 """Process a frame that has been received off the wire.
481 """Process a frame that has been received off the wire.
@@ -496,14 +500,20 b' class serverreactor(object):'
496
500
497 The raw bytes response is passed as an argument.
501 The raw bytes response is passed as an argument.
498 """
502 """
499 framegen = createbytesresponseframesfrombytes(requestid, data)
503 def sendframes():
504 for frame in createbytesresponseframesfrombytes(requestid, data):
505 yield frame
506
507 self._activecommands.remove(requestid)
508
509 result = sendframes()
500
510
501 if self._deferoutput:
511 if self._deferoutput:
502 self._bufferedframegens.append(framegen)
512 self._bufferedframegens.append(result)
503 return 'noop', {}
513 return 'noop', {}
504 else:
514 else:
505 return 'sendframes', {
515 return 'sendframes', {
506 'framegen': framegen,
516 'framegen': result,
507 }
517 }
508
518
509 def oninputeof(self):
519 def oninputeof(self):
@@ -546,6 +556,9 b' class serverreactor(object):'
546 else:
556 else:
547 self._state = 'idle'
557 self._state = 'idle'
548
558
559 assert requestid not in self._activecommands
560 self._activecommands.add(requestid)
561
549 return 'runcommand', {
562 return 'runcommand', {
550 'requestid': requestid,
563 'requestid': requestid,
551 'command': entry['command'],
564 'command': entry['command'],
@@ -571,6 +584,11 b' class serverreactor(object):'
571 return self._makeerrorresult(
584 return self._makeerrorresult(
572 _('request with ID %d already received') % frame.requestid)
585 _('request with ID %d already received') % frame.requestid)
573
586
587 if frame.requestid in self._activecommands:
588 self._state = 'errored'
589 return self._makeerrorresult((
590 _('request with ID %d is already active') % frame.requestid))
591
574 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
592 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
575 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
593 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
576
594
@@ -599,7 +617,13 b' class serverreactor(object):'
599 return self._onframeidle(frame)
617 return self._onframeidle(frame)
600
618
601 # All other frames should be related to a command that is currently
619 # All other frames should be related to a command that is currently
602 # receiving.
620 # receiving but is not active.
621 if frame.requestid in self._activecommands:
622 self._state = 'errored'
623 return self._makeerrorresult(
624 _('received frame for request that is still active: %d') %
625 frame.requestid)
626
603 if frame.requestid not in self._receivingcommands:
627 if frame.requestid not in self._receivingcommands:
604 self._state = 'errored'
628 self._state = 'errored'
605 return self._makeerrorresult(
629 return self._makeerrorresult(
@@ -478,11 +478,11 b' class ServerReactorTests(unittest.TestCa'
478 results = list(sendframes(makereactor(), [
478 results = list(sendframes(makereactor(), [
479 ffs(b'1 command-name eos command1'),
479 ffs(b'1 command-name eos command1'),
480 ffs(b'3 command-name have-data command3'),
480 ffs(b'3 command-name have-data command3'),
481 ffs(b'1 command-argument eoa ignored'),
481 ffs(b'5 command-argument eoa ignored'),
482 ]))
482 ]))
483 self.assertaction(results[2], 'error')
483 self.assertaction(results[2], 'error')
484 self.assertEqual(results[2][1], {
484 self.assertEqual(results[2][1], {
485 'message': b'received frame for request that is not receiving: 1',
485 'message': b'received frame for request that is not receiving: 5',
486 })
486 })
487
487
488 def testsimpleresponse(self):
488 def testsimpleresponse(self):
@@ -571,6 +571,56 b' class ServerReactorTests(unittest.TestCa'
571 b'5 bytes-response eos response5',
571 b'5 bytes-response eos response5',
572 ])
572 ])
573
573
574 def testduplicaterequestonactivecommand(self):
575 """Receiving a request ID that matches a request that isn't finished."""
576 reactor = makereactor()
577 list(sendcommandframes(reactor, 1, b'command1', {}))
578 results = list(sendcommandframes(reactor, 1, b'command1', {}))
579
580 self.assertaction(results[0], 'error')
581 self.assertEqual(results[0][1], {
582 'message': b'request with ID 1 is already active',
583 })
584
585 def testduplicaterequestonactivecommandnosend(self):
586 """Same as above but we've registered a response but haven't sent it."""
587 reactor = makereactor()
588 list(sendcommandframes(reactor, 1, b'command1', {}))
589 reactor.onbytesresponseready(1, b'response')
590
591 # We've registered the response but haven't sent it. From the
592 # perspective of the reactor, the command is still active.
593
594 results = list(sendcommandframes(reactor, 1, b'command1', {}))
595 self.assertaction(results[0], 'error')
596 self.assertEqual(results[0][1], {
597 'message': b'request with ID 1 is already active',
598 })
599
600 def testduplicaterequestargumentframe(self):
601 """Variant on above except we sent an argument frame instead of name."""
602 reactor = makereactor()
603 list(sendcommandframes(reactor, 1, b'command', {}))
604 results = list(sendframes(reactor, [
605 ffs(b'3 command-name have-args command'),
606 ffs(b'1 command-argument 0 ignored'),
607 ]))
608 self.assertaction(results[0], 'wantframe')
609 self.assertaction(results[1], 'error')
610 self.assertEqual(results[1][1], {
611 'message': 'received frame for request that is still active: 1',
612 })
613
614 def testduplicaterequestaftersend(self):
615 """We can use a duplicate request ID after we've sent the response."""
616 reactor = makereactor()
617 list(sendcommandframes(reactor, 1, b'command1', {}))
618 res = reactor.onbytesresponseready(1, b'response')
619 list(res[1]['framegen'])
620
621 results = list(sendcommandframes(reactor, 1, b'command1', {}))
622 self.assertaction(results[0], 'runcommand')
623
574 if __name__ == '__main__':
624 if __name__ == '__main__':
575 import silenttestrunner
625 import silenttestrunner
576 silenttestrunner.main(__name__)
626 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now