Show More
commandserver.py
537 lines
| 16.4 KiB
| text/x-python
|
PythonLexer
/ mercurial / commandserver.py
Idan Kamara
|
r14647 | # commandserver.py - communicate with Mercurial's API over a pipe | ||
# | ||||
# Copyright Matt Mackall <mpm@selenic.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Yuya Nishihara
|
r27351 | from __future__ import absolute_import | ||
import errno | ||||
Yuya Nishihara
|
r29513 | import gc | ||
Yuya Nishihara
|
r27351 | import os | ||
Yuya Nishihara
|
r29513 | import random | ||
Yuya Nishihara
|
r29544 | import signal | ||
import socket | ||||
Idan Kamara
|
r14647 | import struct | ||
Yuya Nishihara
|
r27351 | import traceback | ||
Augie Fackler
|
r36958 | try: | ||
import selectors | ||||
selectors.BaseSelector | ||||
except ImportError: | ||||
from .thirdparty import selectors2 as selectors | ||||
Yuya Nishihara
|
r27351 | from .i18n import _ | ||
from . import ( | ||||
encoding, | ||||
error, | ||||
Pulkit Goyal
|
r30519 | pycompat, | ||
Yuya Nishihara
|
r27351 | util, | ||
) | ||||
Yuya Nishihara
|
r37137 | from .utils import ( | ||
procutil, | ||||
) | ||||
Idan Kamara
|
r14647 | |||
logfile = None | ||||
def log(*args): | ||||
if not logfile: | ||||
return | ||||
for a in args: | ||||
logfile.write(str(a)) | ||||
logfile.flush() | ||||
class channeledoutput(object): | ||||
""" | ||||
Yuya Nishihara
|
r22561 | Write data to out in the following format: | ||
Idan Kamara
|
r14647 | |||
data length (unsigned int), | ||||
data | ||||
""" | ||||
Yuya Nishihara
|
r22563 | def __init__(self, out, channel): | ||
Idan Kamara
|
r14647 | self.out = out | ||
self.channel = channel | ||||
Yuya Nishihara
|
r27415 | @property | ||
def name(self): | ||||
return '<%c-channel>' % self.channel | ||||
Idan Kamara
|
r14647 | def write(self, data): | ||
if not data: | ||||
return | ||||
Yuya Nishihara
|
r30263 | # single write() to guarantee the same atomicity as the underlying file | ||
self.out.write(struct.pack('>cI', self.channel, len(data)) + data) | ||||
Idan Kamara
|
r14647 | self.out.flush() | ||
def __getattr__(self, attr): | ||||
Yuya Nishihara
|
r27915 | if attr in ('isatty', 'fileno', 'tell', 'seek'): | ||
Augie Fackler
|
r18174 | raise AttributeError(attr) | ||
Yuya Nishihara
|
r22563 | return getattr(self.out, attr) | ||
Idan Kamara
|
r14647 | |||
class channeledinput(object): | ||||
""" | ||||
Read data from in_. | ||||
Requests for input are written to out in the following format: | ||||
channel identifier - 'I' for plain input, 'L' line based (1 byte) | ||||
how many bytes to send at most (unsigned int), | ||||
The client replies with: | ||||
data length (unsigned int), 0 meaning EOF | ||||
data | ||||
""" | ||||
maxchunksize = 4 * 1024 | ||||
def __init__(self, in_, out, channel): | ||||
self.in_ = in_ | ||||
self.out = out | ||||
self.channel = channel | ||||
Yuya Nishihara
|
r27415 | @property | ||
def name(self): | ||||
return '<%c-channel>' % self.channel | ||||
Idan Kamara
|
r14647 | def read(self, size=-1): | ||
if size < 0: | ||||
# if we need to consume all the clients input, ask for 4k chunks | ||||
# so the pipe doesn't fill up risking a deadlock | ||||
size = self.maxchunksize | ||||
s = self._read(size, self.channel) | ||||
buf = s | ||||
while s: | ||||
Idan Kamara
|
r14728 | s = self._read(size, self.channel) | ||
Idan Kamara
|
r14647 | buf += s | ||
return buf | ||||
else: | ||||
return self._read(size, self.channel) | ||||
def _read(self, size, channel): | ||||
if not size: | ||||
return '' | ||||
assert size > 0 | ||||
# tell the client we need at most size bytes | ||||
self.out.write(struct.pack('>cI', channel, size)) | ||||
self.out.flush() | ||||
length = self.in_.read(4) | ||||
length = struct.unpack('>I', length)[0] | ||||
if not length: | ||||
return '' | ||||
else: | ||||
return self.in_.read(length) | ||||
def readline(self, size=-1): | ||||
if size < 0: | ||||
size = self.maxchunksize | ||||
s = self._read(size, 'L') | ||||
buf = s | ||||
# keep asking for more until there's either no more or | ||||
# we got a full line | ||||
while s and s[-1] != '\n': | ||||
Idan Kamara
|
r14728 | s = self._read(size, 'L') | ||
Idan Kamara
|
r14647 | buf += s | ||
return buf | ||||
else: | ||||
return self._read(size, 'L') | ||||
def __iter__(self): | ||||
return self | ||||
def next(self): | ||||
l = self.readline() | ||||
if not l: | ||||
raise StopIteration | ||||
return l | ||||
def __getattr__(self, attr): | ||||
Yuya Nishihara
|
r27915 | if attr in ('isatty', 'fileno', 'tell', 'seek'): | ||
Augie Fackler
|
r18174 | raise AttributeError(attr) | ||
Idan Kamara
|
r14647 | return getattr(self.in_, attr) | ||
class server(object): | ||||
""" | ||||
Yuya Nishihara
|
r22990 | Listens for commands on fin, runs them and writes the output on a channel | ||
based stream to fout. | ||||
Idan Kamara
|
r14647 | """ | ||
Yuya Nishihara
|
r22990 | def __init__(self, ui, repo, fin, fout): | ||
Pulkit Goyal
|
r30519 | self.cwd = pycompat.getcwd() | ||
Idan Kamara
|
r14647 | |||
Matt Mackall
|
r25832 | # developer config: cmdserver.log | ||
Jun Wu
|
r33499 | logpath = ui.config("cmdserver", "log") | ||
Idan Kamara
|
r14647 | if logpath: | ||
global logfile | ||||
if logpath == '-': | ||||
Mads Kiilerich
|
r17425 | # write log on a special 'd' (debug) channel | ||
Yuya Nishihara
|
r22990 | logfile = channeledoutput(fout, 'd') | ||
Idan Kamara
|
r14647 | else: | ||
logfile = open(logpath, 'a') | ||||
Yuya Nishihara
|
r20650 | if repo: | ||
# the ui here is really the repo ui so take its baseui so we don't | ||||
# end up with its local configuration | ||||
self.ui = repo.baseui | ||||
self.repo = repo | ||||
self.repoui = repo.ui | ||||
else: | ||||
self.ui = ui | ||||
self.repo = self.repoui = None | ||||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r22990 | self.cerr = channeledoutput(fout, 'e') | ||
self.cout = channeledoutput(fout, 'o') | ||||
self.cin = channeledinput(fin, fout, 'I') | ||||
self.cresult = channeledoutput(fout, 'r') | ||||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r22990 | self.client = fin | ||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r29512 | def cleanup(self): | ||
"""release and restore resources taken during server session""" | ||||
Idan Kamara
|
r14647 | def _read(self, size): | ||
Idan Kamara
|
r14706 | if not size: | ||
return '' | ||||
Idan Kamara
|
r14647 | data = self.client.read(size) | ||
# is the other end closed? | ||||
if not data: | ||||
Brodie Rao
|
r16687 | raise EOFError | ||
Idan Kamara
|
r14647 | |||
return data | ||||
Jun Wu
|
r28156 | def _readstr(self): | ||
"""read a string from the channel | ||||
format: | ||||
data length (uint32), data | ||||
""" | ||||
length = struct.unpack('>I', self._read(4))[0] | ||||
if not length: | ||||
return '' | ||||
return self._read(length) | ||||
def _readlist(self): | ||||
"""read a list of NULL separated strings from the channel""" | ||||
s = self._readstr() | ||||
if s: | ||||
return s.split('\0') | ||||
else: | ||||
return [] | ||||
Idan Kamara
|
r14647 | def runcommand(self): | ||
""" reads a list of \0 terminated arguments, executes | ||||
and writes the return code to the result channel """ | ||||
Yuya Nishihara
|
r27352 | from . import dispatch # avoid cycle | ||
Idan Kamara
|
r14647 | |||
Jun Wu
|
r28157 | args = self._readlist() | ||
Idan Kamara
|
r14647 | |||
Idan Kamara
|
r14750 | # copy the uis so changes (e.g. --config or --verbose) don't | ||
# persist between requests | ||||
Idan Kamara
|
r14751 | copiedui = self.ui.copy() | ||
Yuya Nishihara
|
r21195 | uis = [copiedui] | ||
Yuya Nishihara
|
r20650 | if self.repo: | ||
self.repo.baseui = copiedui | ||||
# clone ui without using ui.copy because this is protected | ||||
repoui = self.repoui.__class__(self.repoui) | ||||
repoui.copy = copiedui.copy # redo copy protection | ||||
Yuya Nishihara
|
r21195 | uis.append(repoui) | ||
Yuya Nishihara
|
r20650 | self.repo.ui = self.repo.dirstate._ui = repoui | ||
self.repo.invalidateall() | ||||
Idan Kamara
|
r14751 | |||
Yuya Nishihara
|
r21195 | for ui in uis: | ||
Yuya Nishihara
|
r29366 | ui.resetstate() | ||
Yuya Nishihara
|
r27565 | # any kind of interaction must use server channels, but chg may | ||
# replace channels by fully functional tty files. so nontty is | ||||
# enforced only if cin is a channel. | ||||
if not util.safehasattr(self.cin, 'fileno'): | ||||
ui.setconfig('ui', 'nontty', 'true', 'commandserver') | ||||
Yuya Nishihara
|
r21195 | |||
Idan Kamara
|
r14864 | req = dispatch.request(args[:], copiedui, self.repo, self.cin, | ||
Idan Kamara
|
r14647 | self.cout, self.cerr) | ||
Gregory Szorc
|
r35670 | try: | ||
Yuya Nishihara
|
r38015 | ret = dispatch.dispatch(req) & 255 | ||
Gregory Szorc
|
r35670 | self.cresult.write(struct.pack('>i', int(ret))) | ||
finally: | ||||
# restore old cwd | ||||
if '--cwd' in args: | ||||
os.chdir(self.cwd) | ||||
Idan Kamara
|
r14647 | |||
def getencoding(self): | ||||
""" writes the current encoding to the result channel """ | ||||
self.cresult.write(encoding.encoding) | ||||
def serveone(self): | ||||
cmd = self.client.readline()[:-1] | ||||
if cmd: | ||||
handler = self.capabilities.get(cmd) | ||||
if handler: | ||||
handler(self) | ||||
else: | ||||
# clients are expected to check what commands are supported by | ||||
# looking at the servers capabilities | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_('unknown command %s') % cmd) | ||
Idan Kamara
|
r14647 | |||
return cmd != '' | ||||
Alex Gaynor
|
r34487 | capabilities = {'runcommand': runcommand, | ||
'getencoding': getencoding} | ||||
Idan Kamara
|
r14647 | |||
def serve(self): | ||||
Mads Kiilerich
|
r18359 | hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities)) | ||
Idan Kamara
|
r14719 | hellomsg += '\n' | ||
hellomsg += 'encoding: ' + encoding.encoding | ||||
Yuya Nishihara
|
r23036 | hellomsg += '\n' | ||
Yuya Nishihara
|
r37138 | hellomsg += 'pid: %d' % procutil.getpid() | ||
Jun Wu
|
r29580 | if util.safehasattr(os, 'getpgid'): | ||
hellomsg += '\n' | ||||
hellomsg += 'pgid: %d' % os.getpgid(0) | ||||
Idan Kamara
|
r14719 | |||
# write the hello msg in -one- chunk | ||||
self.cout.write(hellomsg) | ||||
Idan Kamara
|
r14647 | |||
try: | ||||
while self.serveone(): | ||||
pass | ||||
except EOFError: | ||||
# we'll get here if the client disconnected while we were reading | ||||
# its request | ||||
return 1 | ||||
return 0 | ||||
Yuya Nishihara
|
r22988 | |||
class pipeservice(object): | ||||
def __init__(self, ui, repo, opts): | ||||
Yuya Nishihara
|
r23323 | self.ui = ui | ||
self.repo = repo | ||||
Yuya Nishihara
|
r22988 | |||
def init(self): | ||||
pass | ||||
def run(self): | ||||
Yuya Nishihara
|
r23323 | ui = self.ui | ||
Yuya Nishihara
|
r23324 | # redirect stdio to null device so that broken extensions or in-process | ||
# hooks will never cause corruption of channel protocol. | ||||
Yuya Nishihara
|
r37142 | with procutil.protectedstdio(ui.fin, ui.fout) as (fin, fout): | ||
try: | ||||
sv = server(ui, self.repo, fin, fout) | ||||
return sv.serve() | ||||
finally: | ||||
sv.cleanup() | ||||
Yuya Nishihara
|
r22989 | |||
Yuya Nishihara
|
r29586 | def _initworkerprocess(): | ||
Jun Wu
|
r29609 | # use a different process group from the master process, in order to: | ||
# 1. make the current process group no longer "orphaned" (because the | ||||
# parent of this process is in a different process group while | ||||
# remains in a same session) | ||||
# according to POSIX 2.2.2.52, orphaned process group will ignore | ||||
# terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will | ||||
# cause trouble for things like ncurses. | ||||
# 2. the client can use kill(-pgid, sig) to simulate terminal-generated | ||||
# SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child | ||||
# processes like ssh will be killed properly, without affecting | ||||
# unrelated processes. | ||||
Yuya Nishihara
|
r29585 | os.setpgid(0, 0) | ||
# change random state otherwise forked request handlers would have a | ||||
# same state inherited from parent. | ||||
random.seed() | ||||
Yuya Nishihara
|
r29542 | |||
Yuya Nishihara
|
r29586 | def _serverequest(ui, repo, conn, createcmdserver): | ||
Yuya Nishihara
|
r29585 | fin = conn.makefile('rb') | ||
fout = conn.makefile('wb') | ||||
sv = None | ||||
try: | ||||
sv = createcmdserver(repo, conn, fin, fout) | ||||
Yuya Nishihara
|
r22994 | try: | ||
Yuya Nishihara
|
r29585 | sv.serve() | ||
# handle exceptions that may be raised by command server. most of | ||||
# known exceptions are caught by dispatch. | ||||
except error.Abort as inst: | ||||
Rodrigo Damazio Bovendorp
|
r38791 | ui.error(_('abort: %s\n') % inst) | ||
Yuya Nishihara
|
r29585 | except IOError as inst: | ||
if inst.errno != errno.EPIPE: | ||||
raise | ||||
except KeyboardInterrupt: | ||||
pass | ||||
Yuya Nishihara
|
r29513 | finally: | ||
Yuya Nishihara
|
r29585 | sv.cleanup() | ||
except: # re-raises | ||||
# also write traceback to error channel. otherwise client cannot | ||||
# see it because it is written to server's stderr by default. | ||||
if sv: | ||||
cerr = sv.cerr | ||||
else: | ||||
cerr = channeledoutput(fout, 'e') | ||||
traceback.print_exc(file=cerr) | ||||
raise | ||||
finally: | ||||
fin.close() | ||||
try: | ||||
fout.close() # implicit flush() may cause another EPIPE | ||||
except IOError as inst: | ||||
if inst.errno != errno.EPIPE: | ||||
raise | ||||
Yuya Nishihara
|
r22994 | |||
Yuya Nishihara
|
r29544 | class unixservicehandler(object): | ||
"""Set of pluggable operations for unix-mode services | ||||
Almost all methods except for createcmdserver() are called in the main | ||||
process. You can't pass mutable resource back from createcmdserver(). | ||||
""" | ||||
pollinterval = None | ||||
def __init__(self, ui): | ||||
self.ui = ui | ||||
def bindsocket(self, sock, address): | ||||
util.bindunixsocket(sock, address) | ||||
Jun Wu
|
r32232 | sock.listen(socket.SOMAXCONN) | ||
Jun Wu
|
r32233 | self.ui.status(_('listening at %s\n') % address) | ||
self.ui.flush() # avoid buffering of status message | ||||
Yuya Nishihara
|
r29544 | |||
def unlinksocket(self, address): | ||||
os.unlink(address) | ||||
def shouldexit(self): | ||||
"""True if server should shut down; checked per pollinterval""" | ||||
return False | ||||
def newconnection(self): | ||||
"""Called when main process notices new connection""" | ||||
def createcmdserver(self, repo, conn, fin, fout): | ||||
"""Create new command server instance; called in the process that | ||||
serves for the current connection""" | ||||
return server(self.ui, repo, fin, fout) | ||||
Yuya Nishihara
|
r29548 | class unixforkingservice(object): | ||
Yuya Nishihara
|
r22994 | """ | ||
Listens on unix domain socket and forks server per connection | ||||
""" | ||||
Yuya Nishihara
|
r29548 | |||
def __init__(self, ui, repo, opts, handler=None): | ||||
Yuya Nishihara
|
r22994 | self.ui = ui | ||
self.repo = repo | ||||
self.address = opts['address'] | ||||
Yuya Nishihara
|
r29548 | if not util.safehasattr(socket, 'AF_UNIX'): | ||
Pierre-Yves David
|
r26587 | raise error.Abort(_('unsupported platform')) | ||
Yuya Nishihara
|
r22994 | if not self.address: | ||
Pierre-Yves David
|
r26587 | raise error.Abort(_('no socket path specified with --address')) | ||
Yuya Nishihara
|
r29544 | self._servicehandler = handler or unixservicehandler(ui) | ||
self._sock = None | ||||
self._oldsigchldhandler = None | ||||
self._workerpids = set() # updated by signal handler; do not iterate | ||||
Jun Wu
|
r30887 | self._socketunlinked = None | ||
Yuya Nishihara
|
r29544 | |||
def init(self): | ||||
self._sock = socket.socket(socket.AF_UNIX) | ||||
self._servicehandler.bindsocket(self._sock, self.address) | ||||
Yuya Nishihara
|
r37138 | if util.safehasattr(procutil, 'unblocksignal'): | ||
procutil.unblocksignal(signal.SIGCHLD) | ||||
Yuya Nishihara
|
r29544 | o = signal.signal(signal.SIGCHLD, self._sigchldhandler) | ||
self._oldsigchldhandler = o | ||||
Jun Wu
|
r30887 | self._socketunlinked = False | ||
def _unlinksocket(self): | ||||
if not self._socketunlinked: | ||||
self._servicehandler.unlinksocket(self.address) | ||||
self._socketunlinked = True | ||||
Yuya Nishihara
|
r29544 | |||
def _cleanup(self): | ||||
signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | ||||
self._sock.close() | ||||
Jun Wu
|
r30887 | self._unlinksocket() | ||
Yuya Nishihara
|
r29544 | # don't kill child processes as they have active clients, just wait | ||
self._reapworkers(0) | ||||
def run(self): | ||||
try: | ||||
self._mainloop() | ||||
finally: | ||||
self._cleanup() | ||||
def _mainloop(self): | ||||
Jun Wu
|
r30891 | exiting = False | ||
Yuya Nishihara
|
r29544 | h = self._servicehandler | ||
Augie Fackler
|
r36958 | selector = selectors.DefaultSelector() | ||
selector.register(self._sock, selectors.EVENT_READ) | ||||
Jun Wu
|
r30891 | while True: | ||
if not exiting and h.shouldexit(): | ||||
# clients can no longer connect() to the domain socket, so | ||||
# we stop queuing new requests. | ||||
# for requests that are queued (connect()-ed, but haven't been | ||||
# accept()-ed), handle them before exit. otherwise, clients | ||||
# waiting for recv() will receive ECONNRESET. | ||||
self._unlinksocket() | ||||
exiting = True | ||||
Jun Wu
|
r33543 | ready = selector.select(timeout=h.pollinterval) | ||
if not ready: | ||||
# only exit if we completed all queued requests | ||||
if exiting: | ||||
break | ||||
continue | ||||
Yuya Nishihara
|
r29544 | try: | ||
conn, _addr = self._sock.accept() | ||||
Jun Wu
|
r33543 | except socket.error as inst: | ||
Yuya Nishihara
|
r29544 | if inst.args[0] == errno.EINTR: | ||
continue | ||||
raise | ||||
pid = os.fork() | ||||
if pid: | ||||
try: | ||||
self.ui.debug('forked worker process (pid=%d)\n' % pid) | ||||
self._workerpids.add(pid) | ||||
h.newconnection() | ||||
finally: | ||||
conn.close() # release handle in parent process | ||||
else: | ||||
try: | ||||
Jun Wu
|
r38311 | selector.close() | ||
self._sock.close() | ||||
Yuya Nishihara
|
r29587 | self._runworker(conn) | ||
Yuya Nishihara
|
r29544 | conn.close() | ||
os._exit(0) | ||||
except: # never return, hence no re-raises | ||||
try: | ||||
self.ui.traceback(force=True) | ||||
finally: | ||||
os._exit(255) | ||||
Jun Wu
|
r33506 | selector.close() | ||
Yuya Nishihara
|
r29544 | |||
def _sigchldhandler(self, signal, frame): | ||||
self._reapworkers(os.WNOHANG) | ||||
def _reapworkers(self, options): | ||||
while self._workerpids: | ||||
try: | ||||
pid, _status = os.waitpid(-1, options) | ||||
except OSError as inst: | ||||
if inst.errno == errno.EINTR: | ||||
continue | ||||
if inst.errno != errno.ECHILD: | ||||
raise | ||||
# no child processes at all (reaped by other waitpid()?) | ||||
self._workerpids.clear() | ||||
return | ||||
if pid == 0: | ||||
# no waitable child processes | ||||
return | ||||
self.ui.debug('worker process exited (pid=%d)\n' % pid) | ||||
self._workerpids.discard(pid) | ||||
Yuya Nishihara
|
r29587 | def _runworker(self, conn): | ||
Yuya Nishihara
|
r29544 | signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | ||
Yuya Nishihara
|
r29586 | _initworkerprocess() | ||
Yuya Nishihara
|
r29544 | h = self._servicehandler | ||
Yuya Nishihara
|
r29586 | try: | ||
_serverequest(self.ui, self.repo, conn, h.createcmdserver) | ||||
finally: | ||||
gc.collect() # trigger __del__ since worker process uses os._exit | ||||