Show More
@@ -14,6 +14,9 from __future__ import absolute_import | |||||
14 | import struct |
|
14 | import struct | |
15 |
|
15 | |||
16 | from .i18n import _ |
|
16 | from .i18n import _ | |
|
17 | from .thirdparty import ( | |||
|
18 | attr, | |||
|
19 | ) | |||
17 | from . import ( |
|
20 | from . import ( | |
18 | error, |
|
21 | error, | |
19 | util, |
|
22 | util, | |
@@ -92,6 +95,24 FRAME_TYPE_FLAGS = { | |||||
92 |
|
95 | |||
93 | ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') |
|
96 | ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH') | |
94 |
|
97 | |||
|
98 | @attr.s(slots=True) | |||
|
99 | class frameheader(object): | |||
|
100 | """Represents the data in a frame header.""" | |||
|
101 | ||||
|
102 | length = attr.ib() | |||
|
103 | requestid = attr.ib() | |||
|
104 | typeid = attr.ib() | |||
|
105 | flags = attr.ib() | |||
|
106 | ||||
|
107 | @attr.s(slots=True) | |||
|
108 | class frame(object): | |||
|
109 | """Represents a parsed frame.""" | |||
|
110 | ||||
|
111 | requestid = attr.ib() | |||
|
112 | typeid = attr.ib() | |||
|
113 | flags = attr.ib() | |||
|
114 | payload = attr.ib() | |||
|
115 | ||||
95 | def makeframe(requestid, frametype, frameflags, payload): |
|
116 | def makeframe(requestid, frametype, frameflags, payload): | |
96 | """Assemble a frame into a byte array.""" |
|
117 | """Assemble a frame into a byte array.""" | |
97 | # TODO assert size of payload. |
|
118 | # TODO assert size of payload. | |
@@ -164,7 +185,7 def parseheader(data): | |||||
164 | frametype = (typeflags & 0xf0) >> 4 |
|
185 | frametype = (typeflags & 0xf0) >> 4 | |
165 | frameflags = typeflags & 0x0f |
|
186 | frameflags = typeflags & 0x0f | |
166 |
|
187 | |||
167 |
return requestid, frametype, frameflags |
|
188 | return frameheader(framelength, requestid, frametype, frameflags) | |
168 |
|
189 | |||
169 | def readframe(fh): |
|
190 | def readframe(fh): | |
170 | """Read a unified framing protocol frame from a file object. |
|
191 | """Read a unified framing protocol frame from a file object. | |
@@ -184,14 +205,14 def readframe(fh): | |||||
184 | raise error.Abort(_('received incomplete frame: got %d bytes: %s') % |
|
205 | raise error.Abort(_('received incomplete frame: got %d bytes: %s') % | |
185 | (readcount, header)) |
|
206 | (readcount, header)) | |
186 |
|
207 | |||
187 | requestid, frametype, frameflags, framelength = parseheader(header) |
|
208 | h = parseheader(header) | |
188 |
|
209 | |||
189 |
payload = fh.read( |
|
210 | payload = fh.read(h.length) | |
190 |
if len(payload) != |
|
211 | if len(payload) != h.length: | |
191 | raise error.Abort(_('frame length error: expected %d; got %d') % |
|
212 | raise error.Abort(_('frame length error: expected %d; got %d') % | |
192 |
( |
|
213 | (h.length, len(payload))) | |
193 |
|
214 | |||
194 |
return requestid, |
|
215 | return frame(h.requestid, h.typeid, h.flags, payload) | |
195 |
|
216 | |||
196 | def createcommandframes(requestid, cmd, args, datafh=None): |
|
217 | def createcommandframes(requestid, cmd, args, datafh=None): | |
197 | """Create frames necessary to transmit a request to run a command. |
|
218 | """Create frames necessary to transmit a request to run a command. | |
@@ -433,7 +454,7 class serverreactor(object): | |||||
433 | # request id -> dict of commands that are actively being received. |
|
454 | # request id -> dict of commands that are actively being received. | |
434 | self._receivingcommands = {} |
|
455 | self._receivingcommands = {} | |
435 |
|
456 | |||
436 |
def onframerecv(self, |
|
457 | def onframerecv(self, frame): | |
437 | """Process a frame that has been received off the wire. |
|
458 | """Process a frame that has been received off the wire. | |
438 |
|
459 | |||
439 | Returns a dict with an ``action`` key that details what action, |
|
460 | Returns a dict with an ``action`` key that details what action, | |
@@ -449,7 +470,7 class serverreactor(object): | |||||
449 | if not meth: |
|
470 | if not meth: | |
450 | raise error.ProgrammingError('unhandled state: %s' % self._state) |
|
471 | raise error.ProgrammingError('unhandled state: %s' % self._state) | |
451 |
|
472 | |||
452 | return meth(requestid, frametype, frameflags, payload) |
|
473 | return meth(frame) | |
453 |
|
474 | |||
454 | def onbytesresponseready(self, requestid, data): |
|
475 | def onbytesresponseready(self, requestid, data): | |
455 | """Signal that a bytes response is ready to be sent to the client. |
|
476 | """Signal that a bytes response is ready to be sent to the client. | |
@@ -518,32 +539,32 class serverreactor(object): | |||||
518 | 'state': self._state, |
|
539 | 'state': self._state, | |
519 | } |
|
540 | } | |
520 |
|
541 | |||
521 |
def _onframeidle(self, |
|
542 | def _onframeidle(self, frame): | |
522 | # The only frame type that should be received in this state is a |
|
543 | # The only frame type that should be received in this state is a | |
523 | # command request. |
|
544 | # command request. | |
524 | if frametype != FRAME_TYPE_COMMAND_NAME: |
|
545 | if frame.typeid != FRAME_TYPE_COMMAND_NAME: | |
525 | self._state = 'errored' |
|
546 | self._state = 'errored' | |
526 | return self._makeerrorresult( |
|
547 | return self._makeerrorresult( | |
527 | _('expected command frame; got %d') % frametype) |
|
548 | _('expected command frame; got %d') % frame.typeid) | |
528 |
|
549 | |||
529 | if requestid in self._receivingcommands: |
|
550 | if frame.requestid in self._receivingcommands: | |
530 | self._state = 'errored' |
|
551 | self._state = 'errored' | |
531 | return self._makeerrorresult( |
|
552 | return self._makeerrorresult( | |
532 | _('request with ID %d already received') % requestid) |
|
553 | _('request with ID %d already received') % frame.requestid) | |
533 |
|
554 | |||
534 | expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS) |
|
555 | expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS) | |
535 | expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA) |
|
556 | expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA) | |
536 |
|
557 | |||
537 | self._receivingcommands[requestid] = { |
|
558 | self._receivingcommands[frame.requestid] = { | |
538 | 'command': payload, |
|
559 | 'command': frame.payload, | |
539 | 'args': {}, |
|
560 | 'args': {}, | |
540 | 'data': None, |
|
561 | 'data': None, | |
541 | 'expectingargs': expectingargs, |
|
562 | 'expectingargs': expectingargs, | |
542 | 'expectingdata': expectingdata, |
|
563 | 'expectingdata': expectingdata, | |
543 | } |
|
564 | } | |
544 |
|
565 | |||
545 | if frameflags & FLAG_COMMAND_NAME_EOS: |
|
566 | if frame.flags & FLAG_COMMAND_NAME_EOS: | |
546 | return self._makeruncommandresult(requestid) |
|
567 | return self._makeruncommandresult(frame.requestid) | |
547 |
|
568 | |||
548 | if expectingargs or expectingdata: |
|
569 | if expectingargs or expectingdata: | |
549 | self._state = 'command-receiving' |
|
570 | self._state = 'command-receiving' | |
@@ -553,56 +574,52 class serverreactor(object): | |||||
553 | return self._makeerrorresult(_('missing frame flags on ' |
|
574 | return self._makeerrorresult(_('missing frame flags on ' | |
554 | 'command frame')) |
|
575 | 'command frame')) | |
555 |
|
576 | |||
556 |
def _onframecommandreceiving(self, |
|
577 | def _onframecommandreceiving(self, frame): | |
557 | payload): |
|
|||
558 | # It could be a new command request. Process it as such. |
|
578 | # It could be a new command request. Process it as such. | |
559 | if frametype == FRAME_TYPE_COMMAND_NAME: |
|
579 | if frame.typeid == FRAME_TYPE_COMMAND_NAME: | |
560 |
return self._onframeidle( |
|
580 | return self._onframeidle(frame) | |
561 |
|
581 | |||
562 | # All other frames should be related to a command that is currently |
|
582 | # All other frames should be related to a command that is currently | |
563 | # receiving. |
|
583 | # receiving. | |
564 | if requestid not in self._receivingcommands: |
|
584 | if frame.requestid not in self._receivingcommands: | |
565 | self._state = 'errored' |
|
585 | self._state = 'errored' | |
566 | return self._makeerrorresult( |
|
586 | return self._makeerrorresult( | |
567 | _('received frame for request that is not receiving: %d') % |
|
587 | _('received frame for request that is not receiving: %d') % | |
568 | requestid) |
|
588 | frame.requestid) | |
569 |
|
589 | |||
570 | entry = self._receivingcommands[requestid] |
|
590 | entry = self._receivingcommands[frame.requestid] | |
571 |
|
591 | |||
572 | if frametype == FRAME_TYPE_COMMAND_ARGUMENT: |
|
592 | if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT: | |
573 | if not entry['expectingargs']: |
|
593 | if not entry['expectingargs']: | |
574 | self._state = 'errored' |
|
594 | self._state = 'errored' | |
575 | return self._makeerrorresult(_( |
|
595 | return self._makeerrorresult(_( | |
576 | 'received command argument frame for request that is not ' |
|
596 | 'received command argument frame for request that is not ' | |
577 | 'expecting arguments: %d') % requestid) |
|
597 | 'expecting arguments: %d') % frame.requestid) | |
578 |
|
598 | |||
579 |
return self._handlecommandargsframe( |
|
599 | return self._handlecommandargsframe(frame, entry) | |
580 | frameflags, payload) |
|
|||
581 |
|
600 | |||
582 | elif frametype == FRAME_TYPE_COMMAND_DATA: |
|
601 | elif frame.typeid == FRAME_TYPE_COMMAND_DATA: | |
583 | if not entry['expectingdata']: |
|
602 | if not entry['expectingdata']: | |
584 | self._state = 'errored' |
|
603 | self._state = 'errored' | |
585 | return self._makeerrorresult(_( |
|
604 | return self._makeerrorresult(_( | |
586 | 'received command data frame for request that is not ' |
|
605 | 'received command data frame for request that is not ' | |
587 | 'expecting data: %d') % requestid) |
|
606 | 'expecting data: %d') % frame.requestid) | |
588 |
|
607 | |||
589 | if entry['data'] is None: |
|
608 | if entry['data'] is None: | |
590 | entry['data'] = util.bytesio() |
|
609 | entry['data'] = util.bytesio() | |
591 |
|
610 | |||
592 |
return self._handlecommanddataframe( |
|
611 | return self._handlecommanddataframe(frame, entry) | |
593 | frameflags, payload) |
|
|||
594 |
|
612 | |||
595 |
def _handlecommandargsframe(self, |
|
613 | def _handlecommandargsframe(self, frame, entry): | |
596 | payload): |
|
|||
597 | # The frame and state of command should have already been validated. |
|
614 | # The frame and state of command should have already been validated. | |
598 | assert frametype == FRAME_TYPE_COMMAND_ARGUMENT |
|
615 | assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT | |
599 |
|
616 | |||
600 | offset = 0 |
|
617 | offset = 0 | |
601 | namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload) |
|
618 | namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload) | |
602 | offset += ARGUMENT_FRAME_HEADER.size |
|
619 | offset += ARGUMENT_FRAME_HEADER.size | |
603 |
|
620 | |||
604 | # The argument name MUST fit inside the frame. |
|
621 | # The argument name MUST fit inside the frame. | |
605 | argname = bytes(payload[offset:offset + namesize]) |
|
622 | argname = bytes(frame.payload[offset:offset + namesize]) | |
606 | offset += namesize |
|
623 | offset += namesize | |
607 |
|
624 | |||
608 | if len(argname) != namesize: |
|
625 | if len(argname) != namesize: | |
@@ -610,11 +627,11 class serverreactor(object): | |||||
610 | return self._makeerrorresult(_('malformed argument frame: ' |
|
627 | return self._makeerrorresult(_('malformed argument frame: ' | |
611 | 'partial argument name')) |
|
628 | 'partial argument name')) | |
612 |
|
629 | |||
613 | argvalue = bytes(payload[offset:]) |
|
630 | argvalue = bytes(frame.payload[offset:]) | |
614 |
|
631 | |||
615 | # Argument value spans multiple frames. Record our active state |
|
632 | # Argument value spans multiple frames. Record our active state | |
616 | # and wait for the next frame. |
|
633 | # and wait for the next frame. | |
617 | if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION: |
|
634 | if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION: | |
618 | raise error.ProgrammingError('not yet implemented') |
|
635 | raise error.ProgrammingError('not yet implemented') | |
619 |
|
636 | |||
620 | # Common case: the argument value is completely contained in this |
|
637 | # Common case: the argument value is completely contained in this | |
@@ -627,32 +644,31 class serverreactor(object): | |||||
627 |
|
644 | |||
628 | entry['args'][argname] = argvalue |
|
645 | entry['args'][argname] = argvalue | |
629 |
|
646 | |||
630 | if frameflags & FLAG_COMMAND_ARGUMENT_EOA: |
|
647 | if frame.flags & FLAG_COMMAND_ARGUMENT_EOA: | |
631 | if entry['expectingdata']: |
|
648 | if entry['expectingdata']: | |
632 | # TODO signal request to run a command once we don't |
|
649 | # TODO signal request to run a command once we don't | |
633 | # buffer data frames. |
|
650 | # buffer data frames. | |
634 | return self._makewantframeresult() |
|
651 | return self._makewantframeresult() | |
635 | else: |
|
652 | else: | |
636 | return self._makeruncommandresult(requestid) |
|
653 | return self._makeruncommandresult(frame.requestid) | |
637 | else: |
|
654 | else: | |
638 | return self._makewantframeresult() |
|
655 | return self._makewantframeresult() | |
639 |
|
656 | |||
640 |
def _handlecommanddataframe(self, |
|
657 | def _handlecommanddataframe(self, frame, entry): | |
641 | payload): |
|
658 | assert frame.typeid == FRAME_TYPE_COMMAND_DATA | |
642 | assert frametype == FRAME_TYPE_COMMAND_DATA |
|
|||
643 |
|
659 | |||
644 | # TODO support streaming data instead of buffering it. |
|
660 | # TODO support streaming data instead of buffering it. | |
645 | entry['data'].write(payload) |
|
661 | entry['data'].write(frame.payload) | |
646 |
|
662 | |||
647 | if frameflags & FLAG_COMMAND_DATA_CONTINUATION: |
|
663 | if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: | |
648 | return self._makewantframeresult() |
|
664 | return self._makewantframeresult() | |
649 | elif frameflags & FLAG_COMMAND_DATA_EOS: |
|
665 | elif frame.flags & FLAG_COMMAND_DATA_EOS: | |
650 | entry['data'].seek(0) |
|
666 | entry['data'].seek(0) | |
651 | return self._makeruncommandresult(requestid) |
|
667 | return self._makeruncommandresult(frame.requestid) | |
652 | else: |
|
668 | else: | |
653 | self._state = 'errored' |
|
669 | self._state = 'errored' | |
654 | return self._makeerrorresult(_('command data frame without ' |
|
670 | return self._makeerrorresult(_('command data frame without ' | |
655 | 'flags')) |
|
671 | 'flags')) | |
656 |
|
672 | |||
657 |
def _onframeerrored(self, |
|
673 | def _onframeerrored(self, frame): | |
658 | return self._makeerrorresult(_('server already errored')) |
|
674 | return self._makeerrorresult(_('server already errored')) |
@@ -400,12 +400,11 def _processhttpv2reflectrequest(ui, rep | |||||
400 | states.append(b'received: <no frame>') |
|
400 | states.append(b'received: <no frame>') | |
401 | break |
|
401 | break | |
402 |
|
402 | |||
403 | requestid, frametype, frameflags, payload = frame |
|
403 | states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags, | |
404 | states.append(b'received: %d %d %d %s' % (frametype, frameflags, |
|
404 | frame.requestid, | |
405 |
|
|
405 | frame.payload)) | |
406 |
|
406 | |||
407 |
action, meta = reactor.onframerecv( |
|
407 | action, meta = reactor.onframerecv(frame) | |
408 | payload) |
|
|||
409 | states.append(json.dumps((action, meta), sort_keys=True, |
|
408 | states.append(json.dumps((action, meta), sort_keys=True, | |
410 | separators=(', ', ': '))) |
|
409 | separators=(', ', ': '))) | |
411 |
|
410 | |||
@@ -434,7 +433,7 def _processhttpv2request(ui, repo, req, | |||||
434 | if not frame: |
|
433 | if not frame: | |
435 | break |
|
434 | break | |
436 |
|
435 | |||
437 |
action, meta = reactor.onframerecv( |
|
436 | action, meta = reactor.onframerecv(frame) | |
438 |
|
437 | |||
439 | if action == 'wantframe': |
|
438 | if action == 'wantframe': | |
440 | # Need more data before we can do anything. |
|
439 | # Need more data before we can do anything. |
@@ -18,11 +18,14 def sendframes(reactor, gen): | |||||
18 | Emits a generator of results from ``onframerecv()`` calls. |
|
18 | Emits a generator of results from ``onframerecv()`` calls. | |
19 | """ |
|
19 | """ | |
20 | for frame in gen: |
|
20 | for frame in gen: | |
21 |
|
|
21 | header = framing.parseheader(frame) | |
22 | payload = frame[framing.FRAME_HEADER_SIZE:] |
|
22 | payload = frame[framing.FRAME_HEADER_SIZE:] | |
23 |
assert len(payload) == |
|
23 | assert len(payload) == header.length | |
24 |
|
24 | |||
25 |
yield reactor.onframerecv(rid, |
|
25 | yield reactor.onframerecv(framing.frame(header.requestid, | |
|
26 | header.typeid, | |||
|
27 | header.flags, | |||
|
28 | payload)) | |||
26 |
|
29 | |||
27 | def sendcommandframes(reactor, rid, cmd, args, datafh=None): |
|
30 | def sendcommandframes(reactor, rid, cmd, args, datafh=None): | |
28 | """Generate frames to run a command and send them to a reactor.""" |
|
31 | """Generate frames to run a command and send them to a reactor.""" |
General Comments 0
You need to be logged in to leave comments.
Login now