diff --git a/mercurial/sshpeer.py b/mercurial/sshpeer.py --- a/mercurial/sshpeer.py +++ b/mercurial/sshpeer.py @@ -36,6 +36,71 @@ def _forwardoutput(ui, pipe): for l in s.splitlines(): ui.status(_("remote: "), l, '\n') +class doublepipe(object): + """Operate a side-channel pipe in addition of a main one + + The side-channel pipe contains server output to be forwarded to the user + input. The double pipe will behave as the "main" pipe, but will ensure the + content of the "side" pipe is properly processed while we wait for blocking + call on the "main" pipe. + + If large amounts of data are read from "main", the forward will cease after + the first bytes start to appear. This simplifies the implementation + without affecting actual output of sshpeer too much as we rarely issue + large read for data not yet emitted by the server. + + The main pipe is expected to be a 'bufferedinputpipe' from the util module + that handle all the os specific bites. This class lives in this module + because it focus on behavior specifig to the ssh protocol.""" + + def __init__(self, ui, main, side): + self._ui = ui + self._main = main + self._side = side + + def _wait(self): + """wait until some data are available on main or side + + return a pair of boolean (ismainready, issideready) + + (This will only wait for data if the setup is supported by `util.poll`) + """ + if self._main.hasbuffer: + return (True, True) # main has data, assume side is worth poking at. + fds = [self._main.fileno(), self._side.fileno()] + try: + act = util.poll(fds) + except NotImplementedError: + # non supported yet case, assume all have data. + act = fds + return (self._main.fileno() in act, self._side.fileno() in act) + + def read(self, size): + return self._call('read', size) + + def readline(self): + return self._call('readline') + + def _call(self, methname, size=None): + """call on "main", forward output of "side" while blocking + """ + if size == 0 or self._main.closed: + _forwardoutput(self._ui, self._side) + return '' + while True: + mainready, sideready = self._wait() + if sideready: + _forwardoutput(self._ui, self._side) + if mainready: + meth = getattr(self._main, methname) + if size is None: + return meth() + else: + return meth(size) + + def close(self): + return self._main.close() + class sshpeer(wireproto.wirepeer): def __init__(self, ui, path, create=False): self._url = path