Show More
@@ -0,0 +1,66 b'' | |||||
|
1 | from __future__ import absolute_import | |||
|
2 | ||||
|
3 | import unittest | |||
|
4 | ||||
|
5 | from mercurial import ( | |||
|
6 | error, | |||
|
7 | wireprotoframing as framing, | |||
|
8 | ) | |||
|
9 | ||||
|
10 | class SingleSendTests(unittest.TestCase): | |||
|
11 | """A reactor that can only send once rejects subsequent sends.""" | |||
|
12 | def testbasic(self): | |||
|
13 | reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True) | |||
|
14 | ||||
|
15 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
16 | self.assertEqual(request.state, 'pending') | |||
|
17 | self.assertEqual(action, 'noop') | |||
|
18 | ||||
|
19 | action, meta = reactor.flushcommands() | |||
|
20 | self.assertEqual(action, 'sendframes') | |||
|
21 | ||||
|
22 | for frame in meta['framegen']: | |||
|
23 | self.assertEqual(request.state, 'sending') | |||
|
24 | ||||
|
25 | self.assertEqual(request.state, 'sent') | |||
|
26 | ||||
|
27 | with self.assertRaisesRegexp(error.ProgrammingError, | |||
|
28 | 'cannot issue new commands'): | |||
|
29 | reactor.callcommand(b'foo', {}) | |||
|
30 | ||||
|
31 | with self.assertRaisesRegexp(error.ProgrammingError, | |||
|
32 | 'cannot issue new commands'): | |||
|
33 | reactor.callcommand(b'foo', {}) | |||
|
34 | ||||
|
35 | class NoBufferTests(unittest.TestCase): | |||
|
36 | """A reactor without send buffering sends requests immediately.""" | |||
|
37 | def testbasic(self): | |||
|
38 | reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False) | |||
|
39 | ||||
|
40 | request, action, meta = reactor.callcommand(b'command1', {}) | |||
|
41 | self.assertEqual(request.requestid, 1) | |||
|
42 | self.assertEqual(action, 'sendframes') | |||
|
43 | ||||
|
44 | self.assertEqual(request.state, 'pending') | |||
|
45 | ||||
|
46 | for frame in meta['framegen']: | |||
|
47 | self.assertEqual(request.state, 'sending') | |||
|
48 | ||||
|
49 | self.assertEqual(request.state, 'sent') | |||
|
50 | ||||
|
51 | action, meta = reactor.flushcommands() | |||
|
52 | self.assertEqual(action, 'noop') | |||
|
53 | ||||
|
54 | # And we can send another command. | |||
|
55 | request, action, meta = reactor.callcommand(b'command2', {}) | |||
|
56 | self.assertEqual(request.requestid, 3) | |||
|
57 | self.assertEqual(action, 'sendframes') | |||
|
58 | ||||
|
59 | for frame in meta['framegen']: | |||
|
60 | self.assertEqual(request.state, 'sending') | |||
|
61 | ||||
|
62 | self.assertEqual(request.state, 'sent') | |||
|
63 | ||||
|
64 | if __name__ == '__main__': | |||
|
65 | import silenttestrunner | |||
|
66 | silenttestrunner.main(__name__) |
@@ -515,11 +515,16 b' class httpv2peer(object):' | |||||
515 |
|
515 | |||
516 | # TODO this should be part of a generic peer for the frame-based |
|
516 | # TODO this should be part of a generic peer for the frame-based | |
517 | # protocol. |
|
517 | # protocol. | |
518 |
|
|
518 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
519 | frames = wireprotoframing.createcommandframes(stream, 1, |
|
519 | buffersends=True) | |
520 | name, args) |
|
|||
521 |
|
520 | |||
522 | body = b''.join(map(bytes, frames)) |
|
521 | request, action, meta = reactor.callcommand(name, args) | |
|
522 | assert action == 'noop' | |||
|
523 | ||||
|
524 | action, meta = reactor.flushcommands() | |||
|
525 | assert action == 'sendframes' | |||
|
526 | ||||
|
527 | body = b''.join(map(bytes, meta['framegen'])) | |||
523 | req = self._requestbuilder(pycompat.strurl(url), body, headers) |
|
528 | req = self._requestbuilder(pycompat.strurl(url), body, headers) | |
524 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) |
|
529 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) | |
525 |
|
530 |
@@ -11,6 +11,7 b'' | |||||
11 |
|
11 | |||
12 | from __future__ import absolute_import |
|
12 | from __future__ import absolute_import | |
13 |
|
13 | |||
|
14 | import collections | |||
14 | import struct |
|
15 | import struct | |
15 |
|
16 | |||
16 | from .i18n import _ |
|
17 | from .i18n import _ | |
@@ -876,3 +877,133 b' class serverreactor(object):' | |||||
876 |
|
877 | |||
877 | def _onframeerrored(self, frame): |
|
878 | def _onframeerrored(self, frame): | |
878 | return self._makeerrorresult(_('server already errored')) |
|
879 | return self._makeerrorresult(_('server already errored')) | |
|
880 | ||||
|
881 | class commandrequest(object): | |||
|
882 | """Represents a request to run a command.""" | |||
|
883 | ||||
|
884 | def __init__(self, requestid, name, args, datafh=None): | |||
|
885 | self.requestid = requestid | |||
|
886 | self.name = name | |||
|
887 | self.args = args | |||
|
888 | self.datafh = datafh | |||
|
889 | self.state = 'pending' | |||
|
890 | ||||
|
891 | class clientreactor(object): | |||
|
892 | """Holds state of a client issuing frame-based protocol requests. | |||
|
893 | ||||
|
894 | This is like ``serverreactor`` but for client-side state. | |||
|
895 | ||||
|
896 | Each instance is bound to the lifetime of a connection. For persistent | |||
|
897 | connection transports using e.g. TCP sockets and speaking the raw | |||
|
898 | framing protocol, there will be a single instance for the lifetime of | |||
|
899 | the TCP socket. For transports where there are multiple discrete | |||
|
900 | interactions (say tunneled within in HTTP request), there will be a | |||
|
901 | separate instance for each distinct interaction. | |||
|
902 | """ | |||
|
903 | def __init__(self, hasmultiplesend=False, buffersends=True): | |||
|
904 | """Create a new instance. | |||
|
905 | ||||
|
906 | ``hasmultiplesend`` indicates whether multiple sends are supported | |||
|
907 | by the transport. When True, it is possible to send commands immediately | |||
|
908 | instead of buffering until the caller signals an intent to finish a | |||
|
909 | send operation. | |||
|
910 | ||||
|
911 | ``buffercommands`` indicates whether sends should be buffered until the | |||
|
912 | last request has been issued. | |||
|
913 | """ | |||
|
914 | self._hasmultiplesend = hasmultiplesend | |||
|
915 | self._buffersends = buffersends | |||
|
916 | ||||
|
917 | self._canissuecommands = True | |||
|
918 | self._cansend = True | |||
|
919 | ||||
|
920 | self._nextrequestid = 1 | |||
|
921 | # We only support a single outgoing stream for now. | |||
|
922 | self._outgoingstream = stream(1) | |||
|
923 | self._pendingrequests = collections.deque() | |||
|
924 | self._activerequests = {} | |||
|
925 | ||||
|
926 | def callcommand(self, name, args, datafh=None): | |||
|
927 | """Request that a command be executed. | |||
|
928 | ||||
|
929 | Receives the command name, a dict of arguments to pass to the command, | |||
|
930 | and an optional file object containing the raw data for the command. | |||
|
931 | ||||
|
932 | Returns a 3-tuple of (request, action, action data). | |||
|
933 | """ | |||
|
934 | if not self._canissuecommands: | |||
|
935 | raise error.ProgrammingError('cannot issue new commands') | |||
|
936 | ||||
|
937 | requestid = self._nextrequestid | |||
|
938 | self._nextrequestid += 2 | |||
|
939 | ||||
|
940 | request = commandrequest(requestid, name, args, datafh=datafh) | |||
|
941 | ||||
|
942 | if self._buffersends: | |||
|
943 | self._pendingrequests.append(request) | |||
|
944 | return request, 'noop', {} | |||
|
945 | else: | |||
|
946 | if not self._cansend: | |||
|
947 | raise error.ProgrammingError('sends cannot be performed on ' | |||
|
948 | 'this instance') | |||
|
949 | ||||
|
950 | if not self._hasmultiplesend: | |||
|
951 | self._cansend = False | |||
|
952 | self._canissuecommands = False | |||
|
953 | ||||
|
954 | return request, 'sendframes', { | |||
|
955 | 'framegen': self._makecommandframes(request), | |||
|
956 | } | |||
|
957 | ||||
|
958 | def flushcommands(self): | |||
|
959 | """Request that all queued commands be sent. | |||
|
960 | ||||
|
961 | If any commands are buffered, this will instruct the caller to send | |||
|
962 | them over the wire. If no commands are buffered it instructs the client | |||
|
963 | to no-op. | |||
|
964 | ||||
|
965 | If instances aren't configured for multiple sends, no new command | |||
|
966 | requests are allowed after this is called. | |||
|
967 | """ | |||
|
968 | if not self._pendingrequests: | |||
|
969 | return 'noop', {} | |||
|
970 | ||||
|
971 | if not self._cansend: | |||
|
972 | raise error.ProgrammingError('sends cannot be performed on this ' | |||
|
973 | 'instance') | |||
|
974 | ||||
|
975 | # If the instance only allows sending once, mark that we have fired | |||
|
976 | # our one shot. | |||
|
977 | if not self._hasmultiplesend: | |||
|
978 | self._canissuecommands = False | |||
|
979 | self._cansend = False | |||
|
980 | ||||
|
981 | def makeframes(): | |||
|
982 | while self._pendingrequests: | |||
|
983 | request = self._pendingrequests.popleft() | |||
|
984 | for frame in self._makecommandframes(request): | |||
|
985 | yield frame | |||
|
986 | ||||
|
987 | return 'sendframes', { | |||
|
988 | 'framegen': makeframes(), | |||
|
989 | } | |||
|
990 | ||||
|
991 | def _makecommandframes(self, request): | |||
|
992 | """Emit frames to issue a command request. | |||
|
993 | ||||
|
994 | As a side-effect, update request accounting to reflect its changed | |||
|
995 | state. | |||
|
996 | """ | |||
|
997 | self._activerequests[request.requestid] = request | |||
|
998 | request.state = 'sending' | |||
|
999 | ||||
|
1000 | res = createcommandframes(self._outgoingstream, | |||
|
1001 | request.requestid, | |||
|
1002 | request.name, | |||
|
1003 | request.args, | |||
|
1004 | request.datafh) | |||
|
1005 | ||||
|
1006 | for frame in res: | |||
|
1007 | yield frame | |||
|
1008 | ||||
|
1009 | request.state = 'sent' |
General Comments 0
You need to be logged in to leave comments.
Login now