##// END OF EJS Templates
commandserver: add new forking server implemented without using SocketServer...
Yuya Nishihara -
r29544:024e8f82 default
parent child Browse files
Show More
@@ -11,6 +11,9 b' import errno'
11 11 import gc
12 12 import os
13 13 import random
14 import select
15 import signal
16 import socket
14 17 import struct
15 18 import sys
16 19 import traceback
@@ -385,6 +388,41 b' def _serverequest(ui, repo, conn, create'
385 388 # trigger __del__ since ForkingMixIn uses os._exit
386 389 gc.collect()
387 390
391 class unixservicehandler(object):
392 """Set of pluggable operations for unix-mode services
393
394 Almost all methods except for createcmdserver() are called in the main
395 process. You can't pass mutable resource back from createcmdserver().
396 """
397
398 pollinterval = None
399
400 def __init__(self, ui):
401 self.ui = ui
402
403 def bindsocket(self, sock, address):
404 util.bindunixsocket(sock, address)
405
406 def unlinksocket(self, address):
407 os.unlink(address)
408
409 def printbanner(self, address):
410 self.ui.status(_('listening at %s\n') % address)
411 self.ui.flush() # avoid buffering of status message
412
413 def shouldexit(self):
414 """True if server should shut down; checked per pollinterval"""
415 return False
416
417 def newconnection(self):
418 """Called when main process notices new connection"""
419 pass
420
421 def createcmdserver(self, repo, conn, fin, fout):
422 """Create new command server instance; called in the process that
423 serves for the current connection"""
424 return server(self.ui, repo, fin, fout)
425
388 426 class _requesthandler(socketserver.BaseRequestHandler):
389 427 def handle(self):
390 428 _serverequest(self.server.ui, self.server.repo, self.request,
@@ -424,9 +462,96 b' class unixservice(object):'
424 462 finally:
425 463 self._cleanup()
426 464
465 class unixforkingservice(unixservice):
466 def __init__(self, ui, repo, opts, handler=None):
467 super(unixforkingservice, self).__init__(ui, repo, opts)
468 self._servicehandler = handler or unixservicehandler(ui)
469 self._sock = None
470 self._oldsigchldhandler = None
471 self._workerpids = set() # updated by signal handler; do not iterate
472
473 def init(self):
474 self._sock = socket.socket(socket.AF_UNIX)
475 self._servicehandler.bindsocket(self._sock, self.address)
476 self._sock.listen(5)
477 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
478 self._oldsigchldhandler = o
479 self._servicehandler.printbanner(self.address)
480
481 def _cleanup(self):
482 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
483 self._sock.close()
484 self._servicehandler.unlinksocket(self.address)
485 # don't kill child processes as they have active clients, just wait
486 self._reapworkers(0)
487
488 def run(self):
489 try:
490 self._mainloop()
491 finally:
492 self._cleanup()
493
494 def _mainloop(self):
495 h = self._servicehandler
496 while not h.shouldexit():
497 try:
498 ready = select.select([self._sock], [], [], h.pollinterval)[0]
499 if not ready:
500 continue
501 conn, _addr = self._sock.accept()
502 except (select.error, socket.error) as inst:
503 if inst.args[0] == errno.EINTR:
504 continue
505 raise
506
507 pid = os.fork()
508 if pid:
509 try:
510 self.ui.debug('forked worker process (pid=%d)\n' % pid)
511 self._workerpids.add(pid)
512 h.newconnection()
513 finally:
514 conn.close() # release handle in parent process
515 else:
516 try:
517 self._serveworker(conn)
518 conn.close()
519 os._exit(0)
520 except: # never return, hence no re-raises
521 try:
522 self.ui.traceback(force=True)
523 finally:
524 os._exit(255)
525
526 def _sigchldhandler(self, signal, frame):
527 self._reapworkers(os.WNOHANG)
528
529 def _reapworkers(self, options):
530 while self._workerpids:
531 try:
532 pid, _status = os.waitpid(-1, options)
533 except OSError as inst:
534 if inst.errno == errno.EINTR:
535 continue
536 if inst.errno != errno.ECHILD:
537 raise
538 # no child processes at all (reaped by other waitpid()?)
539 self._workerpids.clear()
540 return
541 if pid == 0:
542 # no waitable child processes
543 return
544 self.ui.debug('worker process exited (pid=%d)\n' % pid)
545 self._workerpids.discard(pid)
546
547 def _serveworker(self, conn):
548 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
549 h = self._servicehandler
550 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
551
427 552 _servicemap = {
428 553 'pipe': pipeservice,
429 'unix': unixservice,
554 'unix': unixforkingservice,
430 555 }
431 556
432 557 def createservice(ui, repo, opts):
General Comments 0
You need to be logged in to leave comments. Login now