diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py --- a/mercurial/commandserver.py +++ b/mercurial/commandserver.py @@ -11,6 +11,9 @@ import errno import gc import os import random +import select +import signal +import socket import struct import sys import traceback @@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create # trigger __del__ since ForkingMixIn uses os._exit gc.collect() +class unixservicehandler(object): + """Set of pluggable operations for unix-mode services + + Almost all methods except for createcmdserver() are called in the main + process. You can't pass mutable resource back from createcmdserver(). + """ + + pollinterval = None + + def __init__(self, ui): + self.ui = ui + + def bindsocket(self, sock, address): + util.bindunixsocket(sock, address) + + def unlinksocket(self, address): + os.unlink(address) + + def printbanner(self, address): + self.ui.status(_('listening at %s\n') % address) + self.ui.flush() # avoid buffering of status message + + def shouldexit(self): + """True if server should shut down; checked per pollinterval""" + return False + + def newconnection(self): + """Called when main process notices new connection""" + pass + + def createcmdserver(self, repo, conn, fin, fout): + """Create new command server instance; called in the process that + serves for the current connection""" + return server(self.ui, repo, fin, fout) + class _requesthandler(socketserver.BaseRequestHandler): def handle(self): _serverequest(self.server.ui, self.server.repo, self.request, @@ -424,9 +462,96 @@ class unixservice(object): finally: self._cleanup() +class unixforkingservice(unixservice): + def __init__(self, ui, repo, opts, handler=None): + super(unixforkingservice, self).__init__(ui, repo, opts) + self._servicehandler = handler or unixservicehandler(ui) + self._sock = None + self._oldsigchldhandler = None + self._workerpids = set() # updated by signal handler; do not iterate + + def init(self): + self._sock = socket.socket(socket.AF_UNIX) + self._servicehandler.bindsocket(self._sock, self.address) + self._sock.listen(5) + o = signal.signal(signal.SIGCHLD, self._sigchldhandler) + self._oldsigchldhandler = o + self._servicehandler.printbanner(self.address) + + def _cleanup(self): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + self._sock.close() + self._servicehandler.unlinksocket(self.address) + # don't kill child processes as they have active clients, just wait + self._reapworkers(0) + + def run(self): + try: + self._mainloop() + finally: + self._cleanup() + + def _mainloop(self): + h = self._servicehandler + while not h.shouldexit(): + try: + ready = select.select([self._sock], [], [], h.pollinterval)[0] + if not ready: + continue + conn, _addr = self._sock.accept() + except (select.error, socket.error) as inst: + if inst.args[0] == errno.EINTR: + continue + raise + + pid = os.fork() + if pid: + try: + self.ui.debug('forked worker process (pid=%d)\n' % pid) + self._workerpids.add(pid) + h.newconnection() + finally: + conn.close() # release handle in parent process + else: + try: + self._serveworker(conn) + conn.close() + os._exit(0) + except: # never return, hence no re-raises + try: + self.ui.traceback(force=True) + finally: + os._exit(255) + + def _sigchldhandler(self, signal, frame): + self._reapworkers(os.WNOHANG) + + def _reapworkers(self, options): + while self._workerpids: + try: + pid, _status = os.waitpid(-1, options) + except OSError as inst: + if inst.errno == errno.EINTR: + continue + if inst.errno != errno.ECHILD: + raise + # no child processes at all (reaped by other waitpid()?) + self._workerpids.clear() + return + if pid == 0: + # no waitable child processes + return + self.ui.debug('worker process exited (pid=%d)\n' % pid) + self._workerpids.discard(pid) + + def _serveworker(self, conn): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + h = self._servicehandler + _serverequest(self.ui, self.repo, conn, h.createcmdserver) + _servicemap = { 'pipe': pipeservice, - 'unix': unixservice, + 'unix': unixforkingservice, } def createservice(ui, repo, opts):