##// END OF EJS Templates
wireproto: define attr-based classes for representing frames...
Gregory Szorc -
r37079:884a0c16 default
parent child Browse files
Show More
@@ -14,6 +14,9 b' from __future__ import absolute_import'
14 import struct
14 import struct
15
15
16 from .i18n import _
16 from .i18n import _
17 from .thirdparty import (
18 attr,
19 )
17 from . import (
20 from . import (
18 error,
21 error,
19 util,
22 util,
@@ -92,6 +95,24 b' FRAME_TYPE_FLAGS = {'
92
95
93 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
96 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
94
97
98 @attr.s(slots=True)
99 class frameheader(object):
100 """Represents the data in a frame header."""
101
102 length = attr.ib()
103 requestid = attr.ib()
104 typeid = attr.ib()
105 flags = attr.ib()
106
107 @attr.s(slots=True)
108 class frame(object):
109 """Represents a parsed frame."""
110
111 requestid = attr.ib()
112 typeid = attr.ib()
113 flags = attr.ib()
114 payload = attr.ib()
115
95 def makeframe(requestid, frametype, frameflags, payload):
116 def makeframe(requestid, frametype, frameflags, payload):
96 """Assemble a frame into a byte array."""
117 """Assemble a frame into a byte array."""
97 # TODO assert size of payload.
118 # TODO assert size of payload.
@@ -164,7 +185,7 b' def parseheader(data):'
164 frametype = (typeflags & 0xf0) >> 4
185 frametype = (typeflags & 0xf0) >> 4
165 frameflags = typeflags & 0x0f
186 frameflags = typeflags & 0x0f
166
187
167 return requestid, frametype, frameflags, framelength
188 return frameheader(framelength, requestid, frametype, frameflags)
168
189
169 def readframe(fh):
190 def readframe(fh):
170 """Read a unified framing protocol frame from a file object.
191 """Read a unified framing protocol frame from a file object.
@@ -184,14 +205,14 b' def readframe(fh):'
184 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
205 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
185 (readcount, header))
206 (readcount, header))
186
207
187 requestid, frametype, frameflags, framelength = parseheader(header)
208 h = parseheader(header)
188
209
189 payload = fh.read(framelength)
210 payload = fh.read(h.length)
190 if len(payload) != framelength:
211 if len(payload) != h.length:
191 raise error.Abort(_('frame length error: expected %d; got %d') %
212 raise error.Abort(_('frame length error: expected %d; got %d') %
192 (framelength, len(payload)))
213 (h.length, len(payload)))
193
214
194 return requestid, frametype, frameflags, payload
215 return frame(h.requestid, h.typeid, h.flags, payload)
195
216
196 def createcommandframes(requestid, cmd, args, datafh=None):
217 def createcommandframes(requestid, cmd, args, datafh=None):
197 """Create frames necessary to transmit a request to run a command.
218 """Create frames necessary to transmit a request to run a command.
@@ -433,7 +454,7 b' class serverreactor(object):'
433 # request id -> dict of commands that are actively being received.
454 # request id -> dict of commands that are actively being received.
434 self._receivingcommands = {}
455 self._receivingcommands = {}
435
456
436 def onframerecv(self, requestid, frametype, frameflags, payload):
457 def onframerecv(self, frame):
437 """Process a frame that has been received off the wire.
458 """Process a frame that has been received off the wire.
438
459
439 Returns a dict with an ``action`` key that details what action,
460 Returns a dict with an ``action`` key that details what action,
@@ -449,7 +470,7 b' class serverreactor(object):'
449 if not meth:
470 if not meth:
450 raise error.ProgrammingError('unhandled state: %s' % self._state)
471 raise error.ProgrammingError('unhandled state: %s' % self._state)
451
472
452 return meth(requestid, frametype, frameflags, payload)
473 return meth(frame)
453
474
454 def onbytesresponseready(self, requestid, data):
475 def onbytesresponseready(self, requestid, data):
455 """Signal that a bytes response is ready to be sent to the client.
476 """Signal that a bytes response is ready to be sent to the client.
@@ -518,32 +539,32 b' class serverreactor(object):'
518 'state': self._state,
539 'state': self._state,
519 }
540 }
520
541
521 def _onframeidle(self, requestid, frametype, frameflags, payload):
542 def _onframeidle(self, frame):
522 # The only frame type that should be received in this state is a
543 # The only frame type that should be received in this state is a
523 # command request.
544 # command request.
524 if frametype != FRAME_TYPE_COMMAND_NAME:
545 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
525 self._state = 'errored'
546 self._state = 'errored'
526 return self._makeerrorresult(
547 return self._makeerrorresult(
527 _('expected command frame; got %d') % frametype)
548 _('expected command frame; got %d') % frame.typeid)
528
549
529 if requestid in self._receivingcommands:
550 if frame.requestid in self._receivingcommands:
530 self._state = 'errored'
551 self._state = 'errored'
531 return self._makeerrorresult(
552 return self._makeerrorresult(
532 _('request with ID %d already received') % requestid)
553 _('request with ID %d already received') % frame.requestid)
533
554
534 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
555 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
535 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
556 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
536
557
537 self._receivingcommands[requestid] = {
558 self._receivingcommands[frame.requestid] = {
538 'command': payload,
559 'command': frame.payload,
539 'args': {},
560 'args': {},
540 'data': None,
561 'data': None,
541 'expectingargs': expectingargs,
562 'expectingargs': expectingargs,
542 'expectingdata': expectingdata,
563 'expectingdata': expectingdata,
543 }
564 }
544
565
545 if frameflags & FLAG_COMMAND_NAME_EOS:
566 if frame.flags & FLAG_COMMAND_NAME_EOS:
546 return self._makeruncommandresult(requestid)
567 return self._makeruncommandresult(frame.requestid)
547
568
548 if expectingargs or expectingdata:
569 if expectingargs or expectingdata:
549 self._state = 'command-receiving'
570 self._state = 'command-receiving'
@@ -553,56 +574,52 b' class serverreactor(object):'
553 return self._makeerrorresult(_('missing frame flags on '
574 return self._makeerrorresult(_('missing frame flags on '
554 'command frame'))
575 'command frame'))
555
576
556 def _onframecommandreceiving(self, requestid, frametype, frameflags,
577 def _onframecommandreceiving(self, frame):
557 payload):
558 # It could be a new command request. Process it as such.
578 # It could be a new command request. Process it as such.
559 if frametype == FRAME_TYPE_COMMAND_NAME:
579 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
560 return self._onframeidle(requestid, frametype, frameflags, payload)
580 return self._onframeidle(frame)
561
581
562 # All other frames should be related to a command that is currently
582 # All other frames should be related to a command that is currently
563 # receiving.
583 # receiving.
564 if requestid not in self._receivingcommands:
584 if frame.requestid not in self._receivingcommands:
565 self._state = 'errored'
585 self._state = 'errored'
566 return self._makeerrorresult(
586 return self._makeerrorresult(
567 _('received frame for request that is not receiving: %d') %
587 _('received frame for request that is not receiving: %d') %
568 requestid)
588 frame.requestid)
569
589
570 entry = self._receivingcommands[requestid]
590 entry = self._receivingcommands[frame.requestid]
571
591
572 if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
592 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
573 if not entry['expectingargs']:
593 if not entry['expectingargs']:
574 self._state = 'errored'
594 self._state = 'errored'
575 return self._makeerrorresult(_(
595 return self._makeerrorresult(_(
576 'received command argument frame for request that is not '
596 'received command argument frame for request that is not '
577 'expecting arguments: %d') % requestid)
597 'expecting arguments: %d') % frame.requestid)
578
598
579 return self._handlecommandargsframe(requestid, entry, frametype,
599 return self._handlecommandargsframe(frame, entry)
580 frameflags, payload)
581
600
582 elif frametype == FRAME_TYPE_COMMAND_DATA:
601 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
583 if not entry['expectingdata']:
602 if not entry['expectingdata']:
584 self._state = 'errored'
603 self._state = 'errored'
585 return self._makeerrorresult(_(
604 return self._makeerrorresult(_(
586 'received command data frame for request that is not '
605 'received command data frame for request that is not '
587 'expecting data: %d') % requestid)
606 'expecting data: %d') % frame.requestid)
588
607
589 if entry['data'] is None:
608 if entry['data'] is None:
590 entry['data'] = util.bytesio()
609 entry['data'] = util.bytesio()
591
610
592 return self._handlecommanddataframe(requestid, entry, frametype,
611 return self._handlecommanddataframe(frame, entry)
593 frameflags, payload)
594
612
595 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
613 def _handlecommandargsframe(self, frame, entry):
596 payload):
597 # The frame and state of command should have already been validated.
614 # The frame and state of command should have already been validated.
598 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
615 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
599
616
600 offset = 0
617 offset = 0
601 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
618 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
602 offset += ARGUMENT_FRAME_HEADER.size
619 offset += ARGUMENT_FRAME_HEADER.size
603
620
604 # The argument name MUST fit inside the frame.
621 # The argument name MUST fit inside the frame.
605 argname = bytes(payload[offset:offset + namesize])
622 argname = bytes(frame.payload[offset:offset + namesize])
606 offset += namesize
623 offset += namesize
607
624
608 if len(argname) != namesize:
625 if len(argname) != namesize:
@@ -610,11 +627,11 b' class serverreactor(object):'
610 return self._makeerrorresult(_('malformed argument frame: '
627 return self._makeerrorresult(_('malformed argument frame: '
611 'partial argument name'))
628 'partial argument name'))
612
629
613 argvalue = bytes(payload[offset:])
630 argvalue = bytes(frame.payload[offset:])
614
631
615 # Argument value spans multiple frames. Record our active state
632 # Argument value spans multiple frames. Record our active state
616 # and wait for the next frame.
633 # and wait for the next frame.
617 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
634 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
618 raise error.ProgrammingError('not yet implemented')
635 raise error.ProgrammingError('not yet implemented')
619
636
620 # Common case: the argument value is completely contained in this
637 # Common case: the argument value is completely contained in this
@@ -627,32 +644,31 b' class serverreactor(object):'
627
644
628 entry['args'][argname] = argvalue
645 entry['args'][argname] = argvalue
629
646
630 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
647 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
631 if entry['expectingdata']:
648 if entry['expectingdata']:
632 # TODO signal request to run a command once we don't
649 # TODO signal request to run a command once we don't
633 # buffer data frames.
650 # buffer data frames.
634 return self._makewantframeresult()
651 return self._makewantframeresult()
635 else:
652 else:
636 return self._makeruncommandresult(requestid)
653 return self._makeruncommandresult(frame.requestid)
637 else:
654 else:
638 return self._makewantframeresult()
655 return self._makewantframeresult()
639
656
640 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
657 def _handlecommanddataframe(self, frame, entry):
641 payload):
658 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
642 assert frametype == FRAME_TYPE_COMMAND_DATA
643
659
644 # TODO support streaming data instead of buffering it.
660 # TODO support streaming data instead of buffering it.
645 entry['data'].write(payload)
661 entry['data'].write(frame.payload)
646
662
647 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
663 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
648 return self._makewantframeresult()
664 return self._makewantframeresult()
649 elif frameflags & FLAG_COMMAND_DATA_EOS:
665 elif frame.flags & FLAG_COMMAND_DATA_EOS:
650 entry['data'].seek(0)
666 entry['data'].seek(0)
651 return self._makeruncommandresult(requestid)
667 return self._makeruncommandresult(frame.requestid)
652 else:
668 else:
653 self._state = 'errored'
669 self._state = 'errored'
654 return self._makeerrorresult(_('command data frame without '
670 return self._makeerrorresult(_('command data frame without '
655 'flags'))
671 'flags'))
656
672
657 def _onframeerrored(self, requestid, frametype, frameflags, payload):
673 def _onframeerrored(self, frame):
658 return self._makeerrorresult(_('server already errored'))
674 return self._makeerrorresult(_('server already errored'))
@@ -400,12 +400,11 b' def _processhttpv2reflectrequest(ui, rep'
400 states.append(b'received: <no frame>')
400 states.append(b'received: <no frame>')
401 break
401 break
402
402
403 requestid, frametype, frameflags, payload = frame
403 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
404 states.append(b'received: %d %d %d %s' % (frametype, frameflags,
404 frame.requestid,
405 requestid, payload))
405 frame.payload))
406
406
407 action, meta = reactor.onframerecv(requestid, frametype, frameflags,
407 action, meta = reactor.onframerecv(frame)
408 payload)
409 states.append(json.dumps((action, meta), sort_keys=True,
408 states.append(json.dumps((action, meta), sort_keys=True,
410 separators=(', ', ': ')))
409 separators=(', ', ': ')))
411
410
@@ -434,7 +433,7 b' def _processhttpv2request(ui, repo, req,'
434 if not frame:
433 if not frame:
435 break
434 break
436
435
437 action, meta = reactor.onframerecv(*frame)
436 action, meta = reactor.onframerecv(frame)
438
437
439 if action == 'wantframe':
438 if action == 'wantframe':
440 # Need more data before we can do anything.
439 # Need more data before we can do anything.
@@ -18,11 +18,14 b' def sendframes(reactor, gen):'
18 Emits a generator of results from ``onframerecv()`` calls.
18 Emits a generator of results from ``onframerecv()`` calls.
19 """
19 """
20 for frame in gen:
20 for frame in gen:
21 rid, frametype, frameflags, framelength = framing.parseheader(frame)
21 header = framing.parseheader(frame)
22 payload = frame[framing.FRAME_HEADER_SIZE:]
22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 assert len(payload) == framelength
23 assert len(payload) == header.length
24
24
25 yield reactor.onframerecv(rid, frametype, frameflags, payload)
25 yield reactor.onframerecv(framing.frame(header.requestid,
26 header.typeid,
27 header.flags,
28 payload))
26
29
27 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
28 """Generate frames to run a command and send them to a reactor."""
31 """Generate frames to run a command and send them to a reactor."""
General Comments 0
You need to be logged in to leave comments. Login now