diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py --- a/mercurial/commandserver.py +++ b/mercurial/commandserver.py @@ -506,12 +506,19 @@ class unixforkingservice(object): raise error.Abort(_('no socket path specified with --address')) self._servicehandler = handler or unixservicehandler(ui) self._sock = None + self._mainipc = None + self._workeripc = None self._oldsigchldhandler = None self._workerpids = set() # updated by signal handler; do not iterate self._socketunlinked = None def init(self): self._sock = socket.socket(socket.AF_UNIX) + # IPC channel from many workers to one main process; this is actually + # a uni-directional pipe, but is backed by a DGRAM socket so each + # message can be easily separated. + o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) + self._mainipc, self._workeripc = o self._servicehandler.bindsocket(self._sock, self.address) if util.safehasattr(procutil, 'unblocksignal'): procutil.unblocksignal(signal.SIGCHLD) @@ -527,6 +534,8 @@ class unixforkingservice(object): def _cleanup(self): signal.signal(signal.SIGCHLD, self._oldsigchldhandler) self._sock.close() + self._mainipc.close() + self._workeripc.close() self._unlinksocket() # don't kill child processes as they have active clients, just wait self._reapworkers(0) @@ -543,6 +552,8 @@ class unixforkingservice(object): selector = selectors.DefaultSelector() selector.register(self._sock, selectors.EVENT_READ, self._acceptnewconnection) + selector.register(self._mainipc, selectors.EVENT_READ, + self._handlemainipc) while True: if not exiting and h.shouldexit(): # clients can no longer connect() to the domain socket, so @@ -592,8 +603,10 @@ class unixforkingservice(object): try: selector.close() sock.close() + self._mainipc.close() self._runworker(conn) conn.close() + self._workeripc.close() os._exit(0) except: # never return, hence no re-raises try: @@ -601,6 +614,17 @@ class unixforkingservice(object): finally: os._exit(255) + def _handlemainipc(self, sock, selector): + """Process messages sent from a worker""" + try: + path = sock.recv(32768) # large enough to receive path + except socket.error as inst: + if inst.args[0] == errno.EINTR: + return + raise + + self.ui.log(b'cmdserver', b'repository: %s\n', path) + def _sigchldhandler(self, signal, frame): self._reapworkers(os.WNOHANG) @@ -628,6 +652,22 @@ class unixforkingservice(object): h = self._servicehandler try: _serverequest(self.ui, self.repo, conn, h.createcmdserver, - prereposetups=None) # TODO: pass in hook functions + prereposetups=[self._reposetup]) finally: gc.collect() # trigger __del__ since worker process uses os._exit + + def _reposetup(self, ui, repo): + if not repo.local(): + return + + class unixcmdserverrepo(repo.__class__): + def close(self): + super(unixcmdserverrepo, self).close() + try: + self._cmdserveripc.send(self.root) + except socket.error: + self.ui.log(b'cmdserver', + b'failed to send repo root to master\n') + + repo.__class__ = unixcmdserverrepo + repo._cmdserveripc = self._workeripc diff --git a/tests/test-chg.t b/tests/test-chg.t --- a/tests/test-chg.t +++ b/tests/test-chg.t @@ -230,7 +230,6 @@ print only the last 10 lines, since we a preserved: $ cat log/server.log.1 log/server.log | tail -10 | filterlog - YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...) YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... YYYY/MM/DD HH:MM:SS (PID)> received fds: ... YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' @@ -238,5 +237,6 @@ preserved: YYYY/MM/DD HH:MM:SS (PID)> setenv: ... YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... YYYY/MM/DD HH:MM:SS (PID)> validate: [] + YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.