Show More
@@ -14,6 +14,9 from __future__ import absolute_import | |||
|
14 | 14 | import struct |
|
15 | 15 | |
|
16 | 16 | from .i18n import _ |
|
17 | from .thirdparty import ( | |
|
18 | attr, | |
|
19 | ) | |
|
17 | 20 | from . import ( |
|
18 | 21 | error, |
|
19 | 22 | util, |
@@ -92,6 +95,24 FRAME_TYPE_FLAGS = { | |||
|
92 | 95 | |
|
93 | 96 | ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') |
|
94 | 97 | |
|
98 | @attr.s(slots=True) | |
|
99 | class frameheader(object): | |
|
100 | """Represents the data in a frame header.""" | |
|
101 | ||
|
102 | length = attr.ib() | |
|
103 | requestid = attr.ib() | |
|
104 | typeid = attr.ib() | |
|
105 | flags = attr.ib() | |
|
106 | ||
|
107 | @attr.s(slots=True) | |
|
108 | class frame(object): | |
|
109 | """Represents a parsed frame.""" | |
|
110 | ||
|
111 | requestid = attr.ib() | |
|
112 | typeid = attr.ib() | |
|
113 | flags = attr.ib() | |
|
114 | payload = attr.ib() | |
|
115 | ||
|
95 | 116 | def makeframe(requestid, frametype, frameflags, payload): |
|
96 | 117 | """Assemble a frame into a byte array.""" |
|
97 | 118 | # TODO assert size of payload. |
@@ -164,7 +185,7 def parseheader(data): | |||
|
164 | 185 | frametype = (typeflags & 0xf0) >> 4 |
|
165 | 186 | frameflags = typeflags & 0x0f |
|
166 | 187 | |
|
167 |
return requestid, frametype, frameflags |
|
|
188 | return frameheader(framelength, requestid, frametype, frameflags) | |
|
168 | 189 | |
|
169 | 190 | def readframe(fh): |
|
170 | 191 | """Read a unified framing protocol frame from a file object. |
@@ -184,14 +205,14 def readframe(fh): | |||
|
184 | 205 | raise error.Abort(_('received incomplete frame: got %d bytes: %s') % |
|
185 | 206 | (readcount, header)) |
|
186 | 207 | |
|
187 | requestid, frametype, frameflags, framelength = parseheader(header) | |
|
208 | h = parseheader(header) | |
|
188 | 209 | |
|
189 |
payload = fh.read( |
|
|
190 |
if len(payload) != |
|
|
210 | payload = fh.read(h.length) | |
|
211 | if len(payload) != h.length: | |
|
191 | 212 | raise error.Abort(_('frame length error: expected %d; got %d') % |
|
192 |
( |
|
|
213 | (h.length, len(payload))) | |
|
193 | 214 | |
|
194 |
return requestid, |
|
|
215 | return frame(h.requestid, h.typeid, h.flags, payload) | |
|
195 | 216 | |
|
196 | 217 | def createcommandframes(requestid, cmd, args, datafh=None): |
|
197 | 218 | """Create frames necessary to transmit a request to run a command. |
@@ -433,7 +454,7 class serverreactor(object): | |||
|
433 | 454 | # request id -> dict of commands that are actively being received. |
|
434 | 455 | self._receivingcommands = {} |
|
435 | 456 | |
|
436 |
def onframerecv(self, |
|
|
457 | def onframerecv(self, frame): | |
|
437 | 458 | """Process a frame that has been received off the wire. |
|
438 | 459 | |
|
439 | 460 | Returns a dict with an ``action`` key that details what action, |
@@ -449,7 +470,7 class serverreactor(object): | |||
|
449 | 470 | if not meth: |
|
450 | 471 | raise error.ProgrammingError('unhandled state: %s' % self._state) |
|
451 | 472 | |
|
452 | return meth(requestid, frametype, frameflags, payload) | |
|
473 | return meth(frame) | |
|
453 | 474 | |
|
454 | 475 | def onbytesresponseready(self, requestid, data): |
|
455 | 476 | """Signal that a bytes response is ready to be sent to the client. |
@@ -518,32 +539,32 class serverreactor(object): | |||
|
518 | 539 | 'state': self._state, |
|
519 | 540 | } |
|
520 | 541 | |
|
521 |
def _onframeidle(self, |
|
|
542 | def _onframeidle(self, frame): | |
|
522 | 543 | # The only frame type that should be received in this state is a |
|
523 | 544 | # command request. |
|
524 | if frametype != FRAME_TYPE_COMMAND_NAME: | |
|
545 | if frame.typeid != FRAME_TYPE_COMMAND_NAME: | |
|
525 | 546 | self._state = 'errored' |
|
526 | 547 | return self._makeerrorresult( |
|
527 | _('expected command frame; got %d') % frametype) | |
|
548 | _('expected command frame; got %d') % frame.typeid) | |
|
528 | 549 | |
|
529 | if requestid in self._receivingcommands: | |
|
550 | if frame.requestid in self._receivingcommands: | |
|
530 | 551 | self._state = 'errored' |
|
531 | 552 | return self._makeerrorresult( |
|
532 | _('request with ID %d already received') % requestid) | |
|
553 | _('request with ID %d already received') % frame.requestid) | |
|
533 | 554 | |
|
534 | expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) | |
|
535 | expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) | |
|
555 | expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS) | |
|
556 | expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA) | |
|
536 | 557 | |
|
537 | self._receivingcommands[requestid] = { | |
|
538 | 'command': payload, | |
|
558 | self._receivingcommands[frame.requestid] = { | |
|
559 | 'command': frame.payload, | |
|
539 | 560 | 'args': {}, |
|
540 | 561 | 'data': None, |
|
541 | 562 | 'expectingargs': expectingargs, |
|
542 | 563 | 'expectingdata': expectingdata, |
|
543 | 564 | } |
|
544 | 565 | |
|
545 | if frameflags & FLAG_COMMAND_NAME_EOS: | |
|
546 | return self._makeruncommandresult(requestid) | |
|
566 | if frame.flags & FLAG_COMMAND_NAME_EOS: | |
|
567 | return self._makeruncommandresult(frame.requestid) | |
|
547 | 568 | |
|
548 | 569 | if expectingargs or expectingdata: |
|
549 | 570 | self._state = 'command-receiving' |
@@ -553,56 +574,52 class serverreactor(object): | |||
|
553 | 574 | return self._makeerrorresult(_('missing frame flags on ' |
|
554 | 575 | 'command frame')) |
|
555 | 576 | |
|
556 |
def _onframecommandreceiving(self, |
|
|
557 | payload): | |
|
577 | def _onframecommandreceiving(self, frame): | |
|
558 | 578 | # It could be a new command request. Process it as such. |
|
559 | if frametype == FRAME_TYPE_COMMAND_NAME: | |
|
560 |
return self._onframeidle( |
|
|
579 | if frame.typeid == FRAME_TYPE_COMMAND_NAME: | |
|
580 | return self._onframeidle(frame) | |
|
561 | 581 | |
|
562 | 582 | # All other frames should be related to a command that is currently |
|
563 | 583 | # receiving. |
|
564 | if requestid not in self._receivingcommands: | |
|
584 | if frame.requestid not in self._receivingcommands: | |
|
565 | 585 | self._state = 'errored' |
|
566 | 586 | return self._makeerrorresult( |
|
567 | 587 | _('received frame for request that is not receiving: %d') % |
|
568 | requestid) | |
|
588 | frame.requestid) | |
|
569 | 589 | |
|
570 | entry = self._receivingcommands[requestid] | |
|
590 | entry = self._receivingcommands[frame.requestid] | |
|
571 | 591 | |
|
572 | if frametype == FRAME_TYPE_COMMAND_ARGUMENT: | |
|
592 | if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT: | |
|
573 | 593 | if not entry['expectingargs']: |
|
574 | 594 | self._state = 'errored' |
|
575 | 595 | return self._makeerrorresult(_( |
|
576 | 596 | 'received command argument frame for request that is not ' |
|
577 | 'expecting arguments: %d') % requestid) | |
|
597 | 'expecting arguments: %d') % frame.requestid) | |
|
578 | 598 | |
|
579 |
return self._handlecommandargsframe( |
|
|
580 | frameflags, payload) | |
|
599 | return self._handlecommandargsframe(frame, entry) | |
|
581 | 600 | |
|
582 | elif frametype == FRAME_TYPE_COMMAND_DATA: | |
|
601 | elif frame.typeid == FRAME_TYPE_COMMAND_DATA: | |
|
583 | 602 | if not entry['expectingdata']: |
|
584 | 603 | self._state = 'errored' |
|
585 | 604 | return self._makeerrorresult(_( |
|
586 | 605 | 'received command data frame for request that is not ' |
|
587 | 'expecting data: %d') % requestid) | |
|
606 | 'expecting data: %d') % frame.requestid) | |
|
588 | 607 | |
|
589 | 608 | if entry['data'] is None: |
|
590 | 609 | entry['data'] = util.bytesio() |
|
591 | 610 | |
|
592 |
return self._handlecommanddataframe( |
|
|
593 | frameflags, payload) | |
|
611 | return self._handlecommanddataframe(frame, entry) | |
|
594 | 612 | |
|
595 |
def _handlecommandargsframe(self, |
|
|
596 | payload): | |
|
613 | def _handlecommandargsframe(self, frame, entry): | |
|
597 | 614 | # The frame and state of command should have already been validated. |
|
598 | assert frametype == FRAME_TYPE_COMMAND_ARGUMENT | |
|
615 | assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT | |
|
599 | 616 | |
|
600 | 617 | offset = 0 |
|
601 | namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) | |
|
618 | namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload) | |
|
602 | 619 | offset += ARGUMENT_FRAME_HEADER.size |
|
603 | 620 | |
|
604 | 621 | # The argument name MUST fit inside the frame. |
|
605 | argname = bytes(payload[offset:offset + namesize]) | |
|
622 | argname = bytes(frame.payload[offset:offset + namesize]) | |
|
606 | 623 | offset += namesize |
|
607 | 624 | |
|
608 | 625 | if len(argname) != namesize: |
@@ -610,11 +627,11 class serverreactor(object): | |||
|
610 | 627 | return self._makeerrorresult(_('malformed argument frame: ' |
|
611 | 628 | 'partial argument name')) |
|
612 | 629 | |
|
613 | argvalue = bytes(payload[offset:]) | |
|
630 | argvalue = bytes(frame.payload[offset:]) | |
|
614 | 631 | |
|
615 | 632 | # Argument value spans multiple frames. Record our active state |
|
616 | 633 | # and wait for the next frame. |
|
617 | if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: | |
|
634 | if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION: | |
|
618 | 635 | raise error.ProgrammingError('not yet implemented') |
|
619 | 636 | |
|
620 | 637 | # Common case: the argument value is completely contained in this |
@@ -627,32 +644,31 class serverreactor(object): | |||
|
627 | 644 | |
|
628 | 645 | entry['args'][argname] = argvalue |
|
629 | 646 | |
|
630 | if frameflags & FLAG_COMMAND_ARGUMENT_EOA: | |
|
647 | if frame.flags & FLAG_COMMAND_ARGUMENT_EOA: | |
|
631 | 648 | if entry['expectingdata']: |
|
632 | 649 | # TODO signal request to run a command once we don't |
|
633 | 650 | # buffer data frames. |
|
634 | 651 | return self._makewantframeresult() |
|
635 | 652 | else: |
|
636 | return self._makeruncommandresult(requestid) | |
|
653 | return self._makeruncommandresult(frame.requestid) | |
|
637 | 654 | else: |
|
638 | 655 | return self._makewantframeresult() |
|
639 | 656 | |
|
640 |
def _handlecommanddataframe(self, |
|
|
641 | payload): | |
|
642 | assert frametype == FRAME_TYPE_COMMAND_DATA | |
|
657 | def _handlecommanddataframe(self, frame, entry): | |
|
658 | assert frame.typeid == FRAME_TYPE_COMMAND_DATA | |
|
643 | 659 | |
|
644 | 660 | # TODO support streaming data instead of buffering it. |
|
645 | entry['data'].write(payload) | |
|
661 | entry['data'].write(frame.payload) | |
|
646 | 662 | |
|
647 | if frameflags & FLAG_COMMAND_DATA_CONTINUATION: | |
|
663 | if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: | |
|
648 | 664 | return self._makewantframeresult() |
|
649 | elif frameflags & FLAG_COMMAND_DATA_EOS: | |
|
665 | elif frame.flags & FLAG_COMMAND_DATA_EOS: | |
|
650 | 666 | entry['data'].seek(0) |
|
651 | return self._makeruncommandresult(requestid) | |
|
667 | return self._makeruncommandresult(frame.requestid) | |
|
652 | 668 | else: |
|
653 | 669 | self._state = 'errored' |
|
654 | 670 | return self._makeerrorresult(_('command data frame without ' |
|
655 | 671 | 'flags')) |
|
656 | 672 | |
|
657 |
def _onframeerrored(self, |
|
|
673 | def _onframeerrored(self, frame): | |
|
658 | 674 | return self._makeerrorresult(_('server already errored')) |
@@ -400,12 +400,11 def _processhttpv2reflectrequest(ui, rep | |||
|
400 | 400 | states.append(b'received: <no frame>') |
|
401 | 401 | break |
|
402 | 402 | |
|
403 | requestid, frametype, frameflags, payload = frame | |
|
404 | states.append(b'received: %d %d %d %s' % (frametype, frameflags, | |
|
405 |
|
|
|
403 | states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags, | |
|
404 | frame.requestid, | |
|
405 | frame.payload)) | |
|
406 | 406 | |
|
407 |
action, meta = reactor.onframerecv( |
|
|
408 | payload) | |
|
407 | action, meta = reactor.onframerecv(frame) | |
|
409 | 408 | states.append(json.dumps((action, meta), sort_keys=True, |
|
410 | 409 | separators=(', ', ': '))) |
|
411 | 410 | |
@@ -434,7 +433,7 def _processhttpv2request(ui, repo, req, | |||
|
434 | 433 | if not frame: |
|
435 | 434 | break |
|
436 | 435 | |
|
437 |
action, meta = reactor.onframerecv( |
|
|
436 | action, meta = reactor.onframerecv(frame) | |
|
438 | 437 | |
|
439 | 438 | if action == 'wantframe': |
|
440 | 439 | # Need more data before we can do anything. |
@@ -18,11 +18,14 def sendframes(reactor, gen): | |||
|
18 | 18 | Emits a generator of results from ``onframerecv()`` calls. |
|
19 | 19 | """ |
|
20 | 20 | for frame in gen: |
|
21 |
|
|
|
21 | header = framing.parseheader(frame) | |
|
22 | 22 | payload = frame[framing.FRAME_HEADER_SIZE:] |
|
23 |
assert len(payload) == |
|
|
23 | assert len(payload) == header.length | |
|
24 | 24 | |
|
25 |
yield reactor.onframerecv(rid, |
|
|
25 | yield reactor.onframerecv(framing.frame(header.requestid, | |
|
26 | header.typeid, | |
|
27 | header.flags, | |
|
28 | payload)) | |
|
26 | 29 | |
|
27 | 30 | def sendcommandframes(reactor, rid, cmd, args, datafh=None): |
|
28 | 31 | """Generate frames to run a command and send them to a reactor.""" |
General Comments 0
You need to be logged in to leave comments.
Login now