# HG changeset patch # User Gregory Szorc # Date 2018-04-09 22:32:01 # Node ID 01361be9e2dc8dc0ebd6315fd5daf8dd80ae5921 # Parent 1ec5ce21cb46bdf625ea7209dcf8f574a8814552 wireproto: introduce a reactor for client-side state We have a nice state machine of sorts for reacting to server-side events. Now it is time to implement the client equivalent. We introduce a "clientreactor." It allows callers to request that commands be issued. It has multiple modes of operation to reflect what the underlying transport supports. e.g. for SSH, we can perform wire sends immediately but for HTTP we need to buffer sends until all command requests are received. In addition, SSH allows sending multiple requests as long as the connection is open. But HTTP/1.1 only allows sending request data once. For SSH, we'll have one reactor per connection. For HTTP, we'll have one reactor per HTTP request. But because code that calls wire protocol commands should not be aware of how the underlying transport works, this will all be abstracted away by the peer interface. Our crude HTTP peer has been updated to use the reactor instead of formulating frames directly. No behavior should have changed here and tests seem to confirm that. Basic unit tests for the reactor behavior have been added. Differential Revision: https://phab.mercurial-scm.org/D3223 diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -515,11 +515,16 @@ class httpv2peer(object): # TODO this should be part of a generic peer for the frame-based # protocol. - stream = wireprotoframing.stream(1) - frames = wireprotoframing.createcommandframes(stream, 1, - name, args) + reactor = wireprotoframing.clientreactor(hasmultiplesend=False, + buffersends=True) - body = b''.join(map(bytes, frames)) + request, action, meta = reactor.callcommand(name, args) + assert action == 'noop' + + action, meta = reactor.flushcommands() + assert action == 'sendframes' + + body = b''.join(map(bytes, meta['framegen'])) req = self._requestbuilder(pycompat.strurl(url), body, headers) req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -11,6 +11,7 @@ from __future__ import absolute_import +import collections import struct from .i18n import _ @@ -876,3 +877,133 @@ class serverreactor(object): def _onframeerrored(self, frame): return self._makeerrorresult(_('server already errored')) + +class commandrequest(object): + """Represents a request to run a command.""" + + def __init__(self, requestid, name, args, datafh=None): + self.requestid = requestid + self.name = name + self.args = args + self.datafh = datafh + self.state = 'pending' + +class clientreactor(object): + """Holds state of a client issuing frame-based protocol requests. + + This is like ``serverreactor`` but for client-side state. + + Each instance is bound to the lifetime of a connection. For persistent + connection transports using e.g. TCP sockets and speaking the raw + framing protocol, there will be a single instance for the lifetime of + the TCP socket. For transports where there are multiple discrete + interactions (say tunneled within in HTTP request), there will be a + separate instance for each distinct interaction. + """ + def __init__(self, hasmultiplesend=False, buffersends=True): + """Create a new instance. + + ``hasmultiplesend`` indicates whether multiple sends are supported + by the transport. When True, it is possible to send commands immediately + instead of buffering until the caller signals an intent to finish a + send operation. + + ``buffercommands`` indicates whether sends should be buffered until the + last request has been issued. + """ + self._hasmultiplesend = hasmultiplesend + self._buffersends = buffersends + + self._canissuecommands = True + self._cansend = True + + self._nextrequestid = 1 + # We only support a single outgoing stream for now. + self._outgoingstream = stream(1) + self._pendingrequests = collections.deque() + self._activerequests = {} + + def callcommand(self, name, args, datafh=None): + """Request that a command be executed. + + Receives the command name, a dict of arguments to pass to the command, + and an optional file object containing the raw data for the command. + + Returns a 3-tuple of (request, action, action data). + """ + if not self._canissuecommands: + raise error.ProgrammingError('cannot issue new commands') + + requestid = self._nextrequestid + self._nextrequestid += 2 + + request = commandrequest(requestid, name, args, datafh=datafh) + + if self._buffersends: + self._pendingrequests.append(request) + return request, 'noop', {} + else: + if not self._cansend: + raise error.ProgrammingError('sends cannot be performed on ' + 'this instance') + + if not self._hasmultiplesend: + self._cansend = False + self._canissuecommands = False + + return request, 'sendframes', { + 'framegen': self._makecommandframes(request), + } + + def flushcommands(self): + """Request that all queued commands be sent. + + If any commands are buffered, this will instruct the caller to send + them over the wire. If no commands are buffered it instructs the client + to no-op. + + If instances aren't configured for multiple sends, no new command + requests are allowed after this is called. + """ + if not self._pendingrequests: + return 'noop', {} + + if not self._cansend: + raise error.ProgrammingError('sends cannot be performed on this ' + 'instance') + + # If the instance only allows sending once, mark that we have fired + # our one shot. + if not self._hasmultiplesend: + self._canissuecommands = False + self._cansend = False + + def makeframes(): + while self._pendingrequests: + request = self._pendingrequests.popleft() + for frame in self._makecommandframes(request): + yield frame + + return 'sendframes', { + 'framegen': makeframes(), + } + + def _makecommandframes(self, request): + """Emit frames to issue a command request. + + As a side-effect, update request accounting to reflect its changed + state. + """ + self._activerequests[request.requestid] = request + request.state = 'sending' + + res = createcommandframes(self._outgoingstream, + request.requestid, + request.name, + request.args, + request.datafh) + + for frame in res: + yield frame + + request.state = 'sent' diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py new file mode 100644 --- /dev/null +++ b/tests/test-wireproto-clientreactor.py @@ -0,0 +1,66 @@ +from __future__ import absolute_import + +import unittest + +from mercurial import ( + error, + wireprotoframing as framing, +) + +class SingleSendTests(unittest.TestCase): + """A reactor that can only send once rejects subsequent sends.""" + def testbasic(self): + reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True) + + request, action, meta = reactor.callcommand(b'foo', {}) + self.assertEqual(request.state, 'pending') + self.assertEqual(action, 'noop') + + action, meta = reactor.flushcommands() + self.assertEqual(action, 'sendframes') + + for frame in meta['framegen']: + self.assertEqual(request.state, 'sending') + + self.assertEqual(request.state, 'sent') + + with self.assertRaisesRegexp(error.ProgrammingError, + 'cannot issue new commands'): + reactor.callcommand(b'foo', {}) + + with self.assertRaisesRegexp(error.ProgrammingError, + 'cannot issue new commands'): + reactor.callcommand(b'foo', {}) + +class NoBufferTests(unittest.TestCase): + """A reactor without send buffering sends requests immediately.""" + def testbasic(self): + reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False) + + request, action, meta = reactor.callcommand(b'command1', {}) + self.assertEqual(request.requestid, 1) + self.assertEqual(action, 'sendframes') + + self.assertEqual(request.state, 'pending') + + for frame in meta['framegen']: + self.assertEqual(request.state, 'sending') + + self.assertEqual(request.state, 'sent') + + action, meta = reactor.flushcommands() + self.assertEqual(action, 'noop') + + # And we can send another command. + request, action, meta = reactor.callcommand(b'command2', {}) + self.assertEqual(request.requestid, 3) + self.assertEqual(action, 'sendframes') + + for frame in meta['framegen']: + self.assertEqual(request.state, 'sending') + + self.assertEqual(request.state, 'sent') + +if __name__ == '__main__': + import silenttestrunner + silenttestrunner.main(__name__)