##// END OF EJS Templates
wireproto: introduce a reactor for client-side state...
Gregory Szorc -
r37561:01361be9 default
parent child Browse files
Show More
@@ -0,0 +1,66 b''
1 from __future__ import absolute_import
2
3 import unittest
4
5 from mercurial import (
6 error,
7 wireprotoframing as framing,
8 )
9
10 class SingleSendTests(unittest.TestCase):
11 """A reactor that can only send once rejects subsequent sends."""
12 def testbasic(self):
13 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
14
15 request, action, meta = reactor.callcommand(b'foo', {})
16 self.assertEqual(request.state, 'pending')
17 self.assertEqual(action, 'noop')
18
19 action, meta = reactor.flushcommands()
20 self.assertEqual(action, 'sendframes')
21
22 for frame in meta['framegen']:
23 self.assertEqual(request.state, 'sending')
24
25 self.assertEqual(request.state, 'sent')
26
27 with self.assertRaisesRegexp(error.ProgrammingError,
28 'cannot issue new commands'):
29 reactor.callcommand(b'foo', {})
30
31 with self.assertRaisesRegexp(error.ProgrammingError,
32 'cannot issue new commands'):
33 reactor.callcommand(b'foo', {})
34
35 class NoBufferTests(unittest.TestCase):
36 """A reactor without send buffering sends requests immediately."""
37 def testbasic(self):
38 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
39
40 request, action, meta = reactor.callcommand(b'command1', {})
41 self.assertEqual(request.requestid, 1)
42 self.assertEqual(action, 'sendframes')
43
44 self.assertEqual(request.state, 'pending')
45
46 for frame in meta['framegen']:
47 self.assertEqual(request.state, 'sending')
48
49 self.assertEqual(request.state, 'sent')
50
51 action, meta = reactor.flushcommands()
52 self.assertEqual(action, 'noop')
53
54 # And we can send another command.
55 request, action, meta = reactor.callcommand(b'command2', {})
56 self.assertEqual(request.requestid, 3)
57 self.assertEqual(action, 'sendframes')
58
59 for frame in meta['framegen']:
60 self.assertEqual(request.state, 'sending')
61
62 self.assertEqual(request.state, 'sent')
63
64 if __name__ == '__main__':
65 import silenttestrunner
66 silenttestrunner.main(__name__)
@@ -515,11 +515,16 b' class httpv2peer(object):'
515
515
516 # TODO this should be part of a generic peer for the frame-based
516 # TODO this should be part of a generic peer for the frame-based
517 # protocol.
517 # protocol.
518 stream = wireprotoframing.stream(1)
518 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
519 frames = wireprotoframing.createcommandframes(stream, 1,
519 buffersends=True)
520 name, args)
521
520
522 body = b''.join(map(bytes, frames))
521 request, action, meta = reactor.callcommand(name, args)
522 assert action == 'noop'
523
524 action, meta = reactor.flushcommands()
525 assert action == 'sendframes'
526
527 body = b''.join(map(bytes, meta['framegen']))
523 req = self._requestbuilder(pycompat.strurl(url), body, headers)
528 req = self._requestbuilder(pycompat.strurl(url), body, headers)
524 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
529 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
525
530
@@ -11,6 +11,7 b''
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import struct
15 import struct
15
16
16 from .i18n import _
17 from .i18n import _
@@ -876,3 +877,133 b' class serverreactor(object):'
876
877
877 def _onframeerrored(self, frame):
878 def _onframeerrored(self, frame):
878 return self._makeerrorresult(_('server already errored'))
879 return self._makeerrorresult(_('server already errored'))
880
881 class commandrequest(object):
882 """Represents a request to run a command."""
883
884 def __init__(self, requestid, name, args, datafh=None):
885 self.requestid = requestid
886 self.name = name
887 self.args = args
888 self.datafh = datafh
889 self.state = 'pending'
890
891 class clientreactor(object):
892 """Holds state of a client issuing frame-based protocol requests.
893
894 This is like ``serverreactor`` but for client-side state.
895
896 Each instance is bound to the lifetime of a connection. For persistent
897 connection transports using e.g. TCP sockets and speaking the raw
898 framing protocol, there will be a single instance for the lifetime of
899 the TCP socket. For transports where there are multiple discrete
900 interactions (say tunneled within in HTTP request), there will be a
901 separate instance for each distinct interaction.
902 """
903 def __init__(self, hasmultiplesend=False, buffersends=True):
904 """Create a new instance.
905
906 ``hasmultiplesend`` indicates whether multiple sends are supported
907 by the transport. When True, it is possible to send commands immediately
908 instead of buffering until the caller signals an intent to finish a
909 send operation.
910
911 ``buffercommands`` indicates whether sends should be buffered until the
912 last request has been issued.
913 """
914 self._hasmultiplesend = hasmultiplesend
915 self._buffersends = buffersends
916
917 self._canissuecommands = True
918 self._cansend = True
919
920 self._nextrequestid = 1
921 # We only support a single outgoing stream for now.
922 self._outgoingstream = stream(1)
923 self._pendingrequests = collections.deque()
924 self._activerequests = {}
925
926 def callcommand(self, name, args, datafh=None):
927 """Request that a command be executed.
928
929 Receives the command name, a dict of arguments to pass to the command,
930 and an optional file object containing the raw data for the command.
931
932 Returns a 3-tuple of (request, action, action data).
933 """
934 if not self._canissuecommands:
935 raise error.ProgrammingError('cannot issue new commands')
936
937 requestid = self._nextrequestid
938 self._nextrequestid += 2
939
940 request = commandrequest(requestid, name, args, datafh=datafh)
941
942 if self._buffersends:
943 self._pendingrequests.append(request)
944 return request, 'noop', {}
945 else:
946 if not self._cansend:
947 raise error.ProgrammingError('sends cannot be performed on '
948 'this instance')
949
950 if not self._hasmultiplesend:
951 self._cansend = False
952 self._canissuecommands = False
953
954 return request, 'sendframes', {
955 'framegen': self._makecommandframes(request),
956 }
957
958 def flushcommands(self):
959 """Request that all queued commands be sent.
960
961 If any commands are buffered, this will instruct the caller to send
962 them over the wire. If no commands are buffered it instructs the client
963 to no-op.
964
965 If instances aren't configured for multiple sends, no new command
966 requests are allowed after this is called.
967 """
968 if not self._pendingrequests:
969 return 'noop', {}
970
971 if not self._cansend:
972 raise error.ProgrammingError('sends cannot be performed on this '
973 'instance')
974
975 # If the instance only allows sending once, mark that we have fired
976 # our one shot.
977 if not self._hasmultiplesend:
978 self._canissuecommands = False
979 self._cansend = False
980
981 def makeframes():
982 while self._pendingrequests:
983 request = self._pendingrequests.popleft()
984 for frame in self._makecommandframes(request):
985 yield frame
986
987 return 'sendframes', {
988 'framegen': makeframes(),
989 }
990
991 def _makecommandframes(self, request):
992 """Emit frames to issue a command request.
993
994 As a side-effect, update request accounting to reflect its changed
995 state.
996 """
997 self._activerequests[request.requestid] = request
998 request.state = 'sending'
999
1000 res = createcommandframes(self._outgoingstream,
1001 request.requestid,
1002 request.name,
1003 request.args,
1004 request.datafh)
1005
1006 for frame in res:
1007 yield frame
1008
1009 request.state = 'sent'
General Comments 0
You need to be logged in to leave comments. Login now