##// END OF EJS Templates
wireproto: define attr-based classes for representing frames...
Gregory Szorc -
r37079:884a0c16 default
parent child Browse files
Show More
@@ -14,6 +14,9 from __future__ import absolute_import
14 14 import struct
15 15
16 16 from .i18n import _
17 from .thirdparty import (
18 attr,
19 )
17 20 from . import (
18 21 error,
19 22 util,
@@ -92,6 +95,24 FRAME_TYPE_FLAGS = {
92 95
93 96 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
94 97
98 @attr.s(slots=True)
99 class frameheader(object):
100 """Represents the data in a frame header."""
101
102 length = attr.ib()
103 requestid = attr.ib()
104 typeid = attr.ib()
105 flags = attr.ib()
106
107 @attr.s(slots=True)
108 class frame(object):
109 """Represents a parsed frame."""
110
111 requestid = attr.ib()
112 typeid = attr.ib()
113 flags = attr.ib()
114 payload = attr.ib()
115
95 116 def makeframe(requestid, frametype, frameflags, payload):
96 117 """Assemble a frame into a byte array."""
97 118 # TODO assert size of payload.
@@ -164,7 +185,7 def parseheader(data):
164 185 frametype = (typeflags & 0xf0) >> 4
165 186 frameflags = typeflags & 0x0f
166 187
167 return requestid, frametype, frameflags, framelength
188 return frameheader(framelength, requestid, frametype, frameflags)
168 189
169 190 def readframe(fh):
170 191 """Read a unified framing protocol frame from a file object.
@@ -184,14 +205,14 def readframe(fh):
184 205 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
185 206 (readcount, header))
186 207
187 requestid, frametype, frameflags, framelength = parseheader(header)
208 h = parseheader(header)
188 209
189 payload = fh.read(framelength)
190 if len(payload) != framelength:
210 payload = fh.read(h.length)
211 if len(payload) != h.length:
191 212 raise error.Abort(_('frame length error: expected %d; got %d') %
192 (framelength, len(payload)))
213 (h.length, len(payload)))
193 214
194 return requestid, frametype, frameflags, payload
215 return frame(h.requestid, h.typeid, h.flags, payload)
195 216
196 217 def createcommandframes(requestid, cmd, args, datafh=None):
197 218 """Create frames necessary to transmit a request to run a command.
@@ -433,7 +454,7 class serverreactor(object):
433 454 # request id -> dict of commands that are actively being received.
434 455 self._receivingcommands = {}
435 456
436 def onframerecv(self, requestid, frametype, frameflags, payload):
457 def onframerecv(self, frame):
437 458 """Process a frame that has been received off the wire.
438 459
439 460 Returns a dict with an ``action`` key that details what action,
@@ -449,7 +470,7 class serverreactor(object):
449 470 if not meth:
450 471 raise error.ProgrammingError('unhandled state: %s' % self._state)
451 472
452 return meth(requestid, frametype, frameflags, payload)
473 return meth(frame)
453 474
454 475 def onbytesresponseready(self, requestid, data):
455 476 """Signal that a bytes response is ready to be sent to the client.
@@ -518,32 +539,32 class serverreactor(object):
518 539 'state': self._state,
519 540 }
520 541
521 def _onframeidle(self, requestid, frametype, frameflags, payload):
542 def _onframeidle(self, frame):
522 543 # The only frame type that should be received in this state is a
523 544 # command request.
524 if frametype != FRAME_TYPE_COMMAND_NAME:
545 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
525 546 self._state = 'errored'
526 547 return self._makeerrorresult(
527 _('expected command frame; got %d') % frametype)
548 _('expected command frame; got %d') % frame.typeid)
528 549
529 if requestid in self._receivingcommands:
550 if frame.requestid in self._receivingcommands:
530 551 self._state = 'errored'
531 552 return self._makeerrorresult(
532 _('request with ID %d already received') % requestid)
553 _('request with ID %d already received') % frame.requestid)
533 554
534 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
535 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
555 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
556 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
536 557
537 self._receivingcommands[requestid] = {
538 'command': payload,
558 self._receivingcommands[frame.requestid] = {
559 'command': frame.payload,
539 560 'args': {},
540 561 'data': None,
541 562 'expectingargs': expectingargs,
542 563 'expectingdata': expectingdata,
543 564 }
544 565
545 if frameflags & FLAG_COMMAND_NAME_EOS:
546 return self._makeruncommandresult(requestid)
566 if frame.flags & FLAG_COMMAND_NAME_EOS:
567 return self._makeruncommandresult(frame.requestid)
547 568
548 569 if expectingargs or expectingdata:
549 570 self._state = 'command-receiving'
@@ -553,56 +574,52 class serverreactor(object):
553 574 return self._makeerrorresult(_('missing frame flags on '
554 575 'command frame'))
555 576
556 def _onframecommandreceiving(self, requestid, frametype, frameflags,
557 payload):
577 def _onframecommandreceiving(self, frame):
558 578 # It could be a new command request. Process it as such.
559 if frametype == FRAME_TYPE_COMMAND_NAME:
560 return self._onframeidle(requestid, frametype, frameflags, payload)
579 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
580 return self._onframeidle(frame)
561 581
562 582 # All other frames should be related to a command that is currently
563 583 # receiving.
564 if requestid not in self._receivingcommands:
584 if frame.requestid not in self._receivingcommands:
565 585 self._state = 'errored'
566 586 return self._makeerrorresult(
567 587 _('received frame for request that is not receiving: %d') %
568 requestid)
588 frame.requestid)
569 589
570 entry = self._receivingcommands[requestid]
590 entry = self._receivingcommands[frame.requestid]
571 591
572 if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
592 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
573 593 if not entry['expectingargs']:
574 594 self._state = 'errored'
575 595 return self._makeerrorresult(_(
576 596 'received command argument frame for request that is not '
577 'expecting arguments: %d') % requestid)
597 'expecting arguments: %d') % frame.requestid)
578 598
579 return self._handlecommandargsframe(requestid, entry, frametype,
580 frameflags, payload)
599 return self._handlecommandargsframe(frame, entry)
581 600
582 elif frametype == FRAME_TYPE_COMMAND_DATA:
601 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
583 602 if not entry['expectingdata']:
584 603 self._state = 'errored'
585 604 return self._makeerrorresult(_(
586 605 'received command data frame for request that is not '
587 'expecting data: %d') % requestid)
606 'expecting data: %d') % frame.requestid)
588 607
589 608 if entry['data'] is None:
590 609 entry['data'] = util.bytesio()
591 610
592 return self._handlecommanddataframe(requestid, entry, frametype,
593 frameflags, payload)
611 return self._handlecommanddataframe(frame, entry)
594 612
595 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
596 payload):
613 def _handlecommandargsframe(self, frame, entry):
597 614 # The frame and state of command should have already been validated.
598 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
615 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
599 616
600 617 offset = 0
601 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
618 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
602 619 offset += ARGUMENT_FRAME_HEADER.size
603 620
604 621 # The argument name MUST fit inside the frame.
605 argname = bytes(payload[offset:offset + namesize])
622 argname = bytes(frame.payload[offset:offset + namesize])
606 623 offset += namesize
607 624
608 625 if len(argname) != namesize:
@@ -610,11 +627,11 class serverreactor(object):
610 627 return self._makeerrorresult(_('malformed argument frame: '
611 628 'partial argument name'))
612 629
613 argvalue = bytes(payload[offset:])
630 argvalue = bytes(frame.payload[offset:])
614 631
615 632 # Argument value spans multiple frames. Record our active state
616 633 # and wait for the next frame.
617 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
634 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
618 635 raise error.ProgrammingError('not yet implemented')
619 636
620 637 # Common case: the argument value is completely contained in this
@@ -627,32 +644,31 class serverreactor(object):
627 644
628 645 entry['args'][argname] = argvalue
629 646
630 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
647 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
631 648 if entry['expectingdata']:
632 649 # TODO signal request to run a command once we don't
633 650 # buffer data frames.
634 651 return self._makewantframeresult()
635 652 else:
636 return self._makeruncommandresult(requestid)
653 return self._makeruncommandresult(frame.requestid)
637 654 else:
638 655 return self._makewantframeresult()
639 656
640 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
641 payload):
642 assert frametype == FRAME_TYPE_COMMAND_DATA
657 def _handlecommanddataframe(self, frame, entry):
658 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
643 659
644 660 # TODO support streaming data instead of buffering it.
645 entry['data'].write(payload)
661 entry['data'].write(frame.payload)
646 662
647 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
663 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
648 664 return self._makewantframeresult()
649 elif frameflags & FLAG_COMMAND_DATA_EOS:
665 elif frame.flags & FLAG_COMMAND_DATA_EOS:
650 666 entry['data'].seek(0)
651 return self._makeruncommandresult(requestid)
667 return self._makeruncommandresult(frame.requestid)
652 668 else:
653 669 self._state = 'errored'
654 670 return self._makeerrorresult(_('command data frame without '
655 671 'flags'))
656 672
657 def _onframeerrored(self, requestid, frametype, frameflags, payload):
673 def _onframeerrored(self, frame):
658 674 return self._makeerrorresult(_('server already errored'))
@@ -400,12 +400,11 def _processhttpv2reflectrequest(ui, rep
400 400 states.append(b'received: <no frame>')
401 401 break
402 402
403 requestid, frametype, frameflags, payload = frame
404 states.append(b'received: %d %d %d %s' % (frametype, frameflags,
405 requestid, payload))
403 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
404 frame.requestid,
405 frame.payload))
406 406
407 action, meta = reactor.onframerecv(requestid, frametype, frameflags,
408 payload)
407 action, meta = reactor.onframerecv(frame)
409 408 states.append(json.dumps((action, meta), sort_keys=True,
410 409 separators=(', ', ': ')))
411 410
@@ -434,7 +433,7 def _processhttpv2request(ui, repo, req,
434 433 if not frame:
435 434 break
436 435
437 action, meta = reactor.onframerecv(*frame)
436 action, meta = reactor.onframerecv(frame)
438 437
439 438 if action == 'wantframe':
440 439 # Need more data before we can do anything.
@@ -18,11 +18,14 def sendframes(reactor, gen):
18 18 Emits a generator of results from ``onframerecv()`` calls.
19 19 """
20 20 for frame in gen:
21 rid, frametype, frameflags, framelength = framing.parseheader(frame)
21 header = framing.parseheader(frame)
22 22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 assert len(payload) == framelength
23 assert len(payload) == header.length
24 24
25 yield reactor.onframerecv(rid, frametype, frameflags, payload)
25 yield reactor.onframerecv(framing.frame(header.requestid,
26 header.typeid,
27 header.flags,
28 payload))
26 29
27 30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
28 31 """Generate frames to run a command and send them to a reactor."""
General Comments 0
You need to be logged in to leave comments. Login now