Show More
@@ -0,0 +1,66 | |||
|
1 | from __future__ import absolute_import | |
|
2 | ||
|
3 | import unittest | |
|
4 | ||
|
5 | from mercurial import ( | |
|
6 | error, | |
|
7 | wireprotoframing as framing, | |
|
8 | ) | |
|
9 | ||
|
10 | class SingleSendTests(unittest.TestCase): | |
|
11 | """A reactor that can only send once rejects subsequent sends.""" | |
|
12 | def testbasic(self): | |
|
13 | reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True) | |
|
14 | ||
|
15 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
16 | self.assertEqual(request.state, 'pending') | |
|
17 | self.assertEqual(action, 'noop') | |
|
18 | ||
|
19 | action, meta = reactor.flushcommands() | |
|
20 | self.assertEqual(action, 'sendframes') | |
|
21 | ||
|
22 | for frame in meta['framegen']: | |
|
23 | self.assertEqual(request.state, 'sending') | |
|
24 | ||
|
25 | self.assertEqual(request.state, 'sent') | |
|
26 | ||
|
27 | with self.assertRaisesRegexp(error.ProgrammingError, | |
|
28 | 'cannot issue new commands'): | |
|
29 | reactor.callcommand(b'foo', {}) | |
|
30 | ||
|
31 | with self.assertRaisesRegexp(error.ProgrammingError, | |
|
32 | 'cannot issue new commands'): | |
|
33 | reactor.callcommand(b'foo', {}) | |
|
34 | ||
|
35 | class NoBufferTests(unittest.TestCase): | |
|
36 | """A reactor without send buffering sends requests immediately.""" | |
|
37 | def testbasic(self): | |
|
38 | reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False) | |
|
39 | ||
|
40 | request, action, meta = reactor.callcommand(b'command1', {}) | |
|
41 | self.assertEqual(request.requestid, 1) | |
|
42 | self.assertEqual(action, 'sendframes') | |
|
43 | ||
|
44 | self.assertEqual(request.state, 'pending') | |
|
45 | ||
|
46 | for frame in meta['framegen']: | |
|
47 | self.assertEqual(request.state, 'sending') | |
|
48 | ||
|
49 | self.assertEqual(request.state, 'sent') | |
|
50 | ||
|
51 | action, meta = reactor.flushcommands() | |
|
52 | self.assertEqual(action, 'noop') | |
|
53 | ||
|
54 | # And we can send another command. | |
|
55 | request, action, meta = reactor.callcommand(b'command2', {}) | |
|
56 | self.assertEqual(request.requestid, 3) | |
|
57 | self.assertEqual(action, 'sendframes') | |
|
58 | ||
|
59 | for frame in meta['framegen']: | |
|
60 | self.assertEqual(request.state, 'sending') | |
|
61 | ||
|
62 | self.assertEqual(request.state, 'sent') | |
|
63 | ||
|
64 | if __name__ == '__main__': | |
|
65 | import silenttestrunner | |
|
66 | silenttestrunner.main(__name__) |
@@ -515,11 +515,16 class httpv2peer(object): | |||
|
515 | 515 | |
|
516 | 516 | # TODO this should be part of a generic peer for the frame-based |
|
517 | 517 | # protocol. |
|
518 |
|
|
|
519 | frames = wireprotoframing.createcommandframes(stream, 1, | |
|
520 | name, args) | |
|
518 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
|
519 | buffersends=True) | |
|
521 | 520 | |
|
522 | body = b''.join(map(bytes, frames)) | |
|
521 | request, action, meta = reactor.callcommand(name, args) | |
|
522 | assert action == 'noop' | |
|
523 | ||
|
524 | action, meta = reactor.flushcommands() | |
|
525 | assert action == 'sendframes' | |
|
526 | ||
|
527 | body = b''.join(map(bytes, meta['framegen'])) | |
|
523 | 528 | req = self._requestbuilder(pycompat.strurl(url), body, headers) |
|
524 | 529 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) |
|
525 | 530 |
@@ -11,6 +11,7 | |||
|
11 | 11 | |
|
12 | 12 | from __future__ import absolute_import |
|
13 | 13 | |
|
14 | import collections | |
|
14 | 15 | import struct |
|
15 | 16 | |
|
16 | 17 | from .i18n import _ |
@@ -876,3 +877,133 class serverreactor(object): | |||
|
876 | 877 | |
|
877 | 878 | def _onframeerrored(self, frame): |
|
878 | 879 | return self._makeerrorresult(_('server already errored')) |
|
880 | ||
|
881 | class commandrequest(object): | |
|
882 | """Represents a request to run a command.""" | |
|
883 | ||
|
884 | def __init__(self, requestid, name, args, datafh=None): | |
|
885 | self.requestid = requestid | |
|
886 | self.name = name | |
|
887 | self.args = args | |
|
888 | self.datafh = datafh | |
|
889 | self.state = 'pending' | |
|
890 | ||
|
891 | class clientreactor(object): | |
|
892 | """Holds state of a client issuing frame-based protocol requests. | |
|
893 | ||
|
894 | This is like ``serverreactor`` but for client-side state. | |
|
895 | ||
|
896 | Each instance is bound to the lifetime of a connection. For persistent | |
|
897 | connection transports using e.g. TCP sockets and speaking the raw | |
|
898 | framing protocol, there will be a single instance for the lifetime of | |
|
899 | the TCP socket. For transports where there are multiple discrete | |
|
900 | interactions (say tunneled within in HTTP request), there will be a | |
|
901 | separate instance for each distinct interaction. | |
|
902 | """ | |
|
903 | def __init__(self, hasmultiplesend=False, buffersends=True): | |
|
904 | """Create a new instance. | |
|
905 | ||
|
906 | ``hasmultiplesend`` indicates whether multiple sends are supported | |
|
907 | by the transport. When True, it is possible to send commands immediately | |
|
908 | instead of buffering until the caller signals an intent to finish a | |
|
909 | send operation. | |
|
910 | ||
|
911 | ``buffercommands`` indicates whether sends should be buffered until the | |
|
912 | last request has been issued. | |
|
913 | """ | |
|
914 | self._hasmultiplesend = hasmultiplesend | |
|
915 | self._buffersends = buffersends | |
|
916 | ||
|
917 | self._canissuecommands = True | |
|
918 | self._cansend = True | |
|
919 | ||
|
920 | self._nextrequestid = 1 | |
|
921 | # We only support a single outgoing stream for now. | |
|
922 | self._outgoingstream = stream(1) | |
|
923 | self._pendingrequests = collections.deque() | |
|
924 | self._activerequests = {} | |
|
925 | ||
|
926 | def callcommand(self, name, args, datafh=None): | |
|
927 | """Request that a command be executed. | |
|
928 | ||
|
929 | Receives the command name, a dict of arguments to pass to the command, | |
|
930 | and an optional file object containing the raw data for the command. | |
|
931 | ||
|
932 | Returns a 3-tuple of (request, action, action data). | |
|
933 | """ | |
|
934 | if not self._canissuecommands: | |
|
935 | raise error.ProgrammingError('cannot issue new commands') | |
|
936 | ||
|
937 | requestid = self._nextrequestid | |
|
938 | self._nextrequestid += 2 | |
|
939 | ||
|
940 | request = commandrequest(requestid, name, args, datafh=datafh) | |
|
941 | ||
|
942 | if self._buffersends: | |
|
943 | self._pendingrequests.append(request) | |
|
944 | return request, 'noop', {} | |
|
945 | else: | |
|
946 | if not self._cansend: | |
|
947 | raise error.ProgrammingError('sends cannot be performed on ' | |
|
948 | 'this instance') | |
|
949 | ||
|
950 | if not self._hasmultiplesend: | |
|
951 | self._cansend = False | |
|
952 | self._canissuecommands = False | |
|
953 | ||
|
954 | return request, 'sendframes', { | |
|
955 | 'framegen': self._makecommandframes(request), | |
|
956 | } | |
|
957 | ||
|
958 | def flushcommands(self): | |
|
959 | """Request that all queued commands be sent. | |
|
960 | ||
|
961 | If any commands are buffered, this will instruct the caller to send | |
|
962 | them over the wire. If no commands are buffered it instructs the client | |
|
963 | to no-op. | |
|
964 | ||
|
965 | If instances aren't configured for multiple sends, no new command | |
|
966 | requests are allowed after this is called. | |
|
967 | """ | |
|
968 | if not self._pendingrequests: | |
|
969 | return 'noop', {} | |
|
970 | ||
|
971 | if not self._cansend: | |
|
972 | raise error.ProgrammingError('sends cannot be performed on this ' | |
|
973 | 'instance') | |
|
974 | ||
|
975 | # If the instance only allows sending once, mark that we have fired | |
|
976 | # our one shot. | |
|
977 | if not self._hasmultiplesend: | |
|
978 | self._canissuecommands = False | |
|
979 | self._cansend = False | |
|
980 | ||
|
981 | def makeframes(): | |
|
982 | while self._pendingrequests: | |
|
983 | request = self._pendingrequests.popleft() | |
|
984 | for frame in self._makecommandframes(request): | |
|
985 | yield frame | |
|
986 | ||
|
987 | return 'sendframes', { | |
|
988 | 'framegen': makeframes(), | |
|
989 | } | |
|
990 | ||
|
991 | def _makecommandframes(self, request): | |
|
992 | """Emit frames to issue a command request. | |
|
993 | ||
|
994 | As a side-effect, update request accounting to reflect its changed | |
|
995 | state. | |
|
996 | """ | |
|
997 | self._activerequests[request.requestid] = request | |
|
998 | request.state = 'sending' | |
|
999 | ||
|
1000 | res = createcommandframes(self._outgoingstream, | |
|
1001 | request.requestid, | |
|
1002 | request.name, | |
|
1003 | request.args, | |
|
1004 | request.datafh) | |
|
1005 | ||
|
1006 | for frame in res: | |
|
1007 | yield frame | |
|
1008 | ||
|
1009 | request.state = 'sent' |
General Comments 0
You need to be logged in to leave comments.
Login now