commandserver.py
354 lines
| 10.2 KiB
| text/x-python
|
PythonLexer
/ mercurial / commandserver.py
Idan Kamara
|
r14647 | # commandserver.py - communicate with Mercurial's API over a pipe | ||
# | ||||
# Copyright Matt Mackall <mpm@selenic.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from i18n import _ | ||||
import struct | ||||
Yuya Nishihara
|
r23324 | import sys, os, errno, traceback, SocketServer | ||
Idan Kamara
|
r14647 | import dispatch, encoding, util | ||
logfile = None | ||||
def log(*args): | ||||
if not logfile: | ||||
return | ||||
for a in args: | ||||
logfile.write(str(a)) | ||||
logfile.flush() | ||||
class channeledoutput(object): | ||||
""" | ||||
Yuya Nishihara
|
r22561 | Write data to out in the following format: | ||
Idan Kamara
|
r14647 | |||
data length (unsigned int), | ||||
data | ||||
""" | ||||
Yuya Nishihara
|
r22563 | def __init__(self, out, channel): | ||
Idan Kamara
|
r14647 | self.out = out | ||
self.channel = channel | ||||
def write(self, data): | ||||
if not data: | ||||
return | ||||
self.out.write(struct.pack('>cI', self.channel, len(data))) | ||||
self.out.write(data) | ||||
self.out.flush() | ||||
def __getattr__(self, attr): | ||||
if attr in ('isatty', 'fileno'): | ||||
Augie Fackler
|
r18174 | raise AttributeError(attr) | ||
Yuya Nishihara
|
r22563 | return getattr(self.out, attr) | ||
Idan Kamara
|
r14647 | |||
class channeledinput(object): | ||||
""" | ||||
Read data from in_. | ||||
Requests for input are written to out in the following format: | ||||
channel identifier - 'I' for plain input, 'L' line based (1 byte) | ||||
how many bytes to send at most (unsigned int), | ||||
The client replies with: | ||||
data length (unsigned int), 0 meaning EOF | ||||
data | ||||
""" | ||||
maxchunksize = 4 * 1024 | ||||
def __init__(self, in_, out, channel): | ||||
self.in_ = in_ | ||||
self.out = out | ||||
self.channel = channel | ||||
def read(self, size=-1): | ||||
if size < 0: | ||||
# if we need to consume all the clients input, ask for 4k chunks | ||||
# so the pipe doesn't fill up risking a deadlock | ||||
size = self.maxchunksize | ||||
s = self._read(size, self.channel) | ||||
buf = s | ||||
while s: | ||||
Idan Kamara
|
r14728 | s = self._read(size, self.channel) | ||
Idan Kamara
|
r14647 | buf += s | ||
return buf | ||||
else: | ||||
return self._read(size, self.channel) | ||||
def _read(self, size, channel): | ||||
if not size: | ||||
return '' | ||||
assert size > 0 | ||||
# tell the client we need at most size bytes | ||||
self.out.write(struct.pack('>cI', channel, size)) | ||||
self.out.flush() | ||||
length = self.in_.read(4) | ||||
length = struct.unpack('>I', length)[0] | ||||
if not length: | ||||
return '' | ||||
else: | ||||
return self.in_.read(length) | ||||
def readline(self, size=-1): | ||||
if size < 0: | ||||
size = self.maxchunksize | ||||
s = self._read(size, 'L') | ||||
buf = s | ||||
# keep asking for more until there's either no more or | ||||
# we got a full line | ||||
while s and s[-1] != '\n': | ||||
Idan Kamara
|
r14728 | s = self._read(size, 'L') | ||
Idan Kamara
|
r14647 | buf += s | ||
return buf | ||||
else: | ||||
return self._read(size, 'L') | ||||
def __iter__(self): | ||||
return self | ||||
def next(self): | ||||
l = self.readline() | ||||
if not l: | ||||
raise StopIteration | ||||
return l | ||||
def __getattr__(self, attr): | ||||
if attr in ('isatty', 'fileno'): | ||||
Augie Fackler
|
r18174 | raise AttributeError(attr) | ||
Idan Kamara
|
r14647 | return getattr(self.in_, attr) | ||
class server(object): | ||||
""" | ||||
Yuya Nishihara
|
r22990 | Listens for commands on fin, runs them and writes the output on a channel | ||
based stream to fout. | ||||
Idan Kamara
|
r14647 | """ | ||
Yuya Nishihara
|
r22990 | def __init__(self, ui, repo, fin, fout): | ||
Idan Kamara
|
r14864 | self.cwd = os.getcwd() | ||
Idan Kamara
|
r14647 | |||
Matt Mackall
|
r25832 | # developer config: cmdserver.log | ||
Idan Kamara
|
r14647 | logpath = ui.config("cmdserver", "log", None) | ||
if logpath: | ||||
global logfile | ||||
if logpath == '-': | ||||
Mads Kiilerich
|
r17425 | # write log on a special 'd' (debug) channel | ||
Yuya Nishihara
|
r22990 | logfile = channeledoutput(fout, 'd') | ||
Idan Kamara
|
r14647 | else: | ||
logfile = open(logpath, 'a') | ||||
Yuya Nishihara
|
r20650 | if repo: | ||
# the ui here is really the repo ui so take its baseui so we don't | ||||
# end up with its local configuration | ||||
self.ui = repo.baseui | ||||
self.repo = repo | ||||
self.repoui = repo.ui | ||||
else: | ||||
self.ui = ui | ||||
self.repo = self.repoui = None | ||||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r22990 | self.cerr = channeledoutput(fout, 'e') | ||
self.cout = channeledoutput(fout, 'o') | ||||
self.cin = channeledinput(fin, fout, 'I') | ||||
self.cresult = channeledoutput(fout, 'r') | ||||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r22990 | self.client = fin | ||
Idan Kamara
|
r14647 | |||
def _read(self, size): | ||||
Idan Kamara
|
r14706 | if not size: | ||
return '' | ||||
Idan Kamara
|
r14647 | data = self.client.read(size) | ||
# is the other end closed? | ||||
if not data: | ||||
Brodie Rao
|
r16687 | raise EOFError | ||
Idan Kamara
|
r14647 | |||
return data | ||||
def runcommand(self): | ||||
""" reads a list of \0 terminated arguments, executes | ||||
and writes the return code to the result channel """ | ||||
length = struct.unpack('>I', self._read(4))[0] | ||||
Idan Kamara
|
r14707 | if not length: | ||
args = [] | ||||
else: | ||||
args = self._read(length).split('\0') | ||||
Idan Kamara
|
r14647 | |||
Idan Kamara
|
r14750 | # copy the uis so changes (e.g. --config or --verbose) don't | ||
# persist between requests | ||||
Idan Kamara
|
r14751 | copiedui = self.ui.copy() | ||
Yuya Nishihara
|
r21195 | uis = [copiedui] | ||
Yuya Nishihara
|
r20650 | if self.repo: | ||
self.repo.baseui = copiedui | ||||
# clone ui without using ui.copy because this is protected | ||||
repoui = self.repoui.__class__(self.repoui) | ||||
repoui.copy = copiedui.copy # redo copy protection | ||||
Yuya Nishihara
|
r21195 | uis.append(repoui) | ||
Yuya Nishihara
|
r20650 | self.repo.ui = self.repo.dirstate._ui = repoui | ||
self.repo.invalidateall() | ||||
Idan Kamara
|
r14751 | |||
Yuya Nishihara
|
r21195 | for ui in uis: | ||
# any kind of interaction must use server channels | ||||
ui.setconfig('ui', 'nontty', 'true', 'commandserver') | ||||
Idan Kamara
|
r14864 | req = dispatch.request(args[:], copiedui, self.repo, self.cin, | ||
Idan Kamara
|
r14647 | self.cout, self.cerr) | ||
Yuya Nishihara
|
r20631 | ret = (dispatch.dispatch(req) or 0) & 255 # might return None | ||
Idan Kamara
|
r14647 | |||
Idan Kamara
|
r14864 | # restore old cwd | ||
if '--cwd' in args: | ||||
os.chdir(self.cwd) | ||||
Idan Kamara
|
r14647 | self.cresult.write(struct.pack('>i', int(ret))) | ||
def getencoding(self): | ||||
""" writes the current encoding to the result channel """ | ||||
self.cresult.write(encoding.encoding) | ||||
def serveone(self): | ||||
cmd = self.client.readline()[:-1] | ||||
if cmd: | ||||
handler = self.capabilities.get(cmd) | ||||
if handler: | ||||
handler(self) | ||||
else: | ||||
# clients are expected to check what commands are supported by | ||||
# looking at the servers capabilities | ||||
raise util.Abort(_('unknown command %s') % cmd) | ||||
return cmd != '' | ||||
capabilities = {'runcommand' : runcommand, | ||||
'getencoding' : getencoding} | ||||
def serve(self): | ||||
Mads Kiilerich
|
r18359 | hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities)) | ||
Idan Kamara
|
r14719 | hellomsg += '\n' | ||
hellomsg += 'encoding: ' + encoding.encoding | ||||
Yuya Nishihara
|
r23036 | hellomsg += '\n' | ||
hellomsg += 'pid: %d' % os.getpid() | ||||
Idan Kamara
|
r14719 | |||
# write the hello msg in -one- chunk | ||||
self.cout.write(hellomsg) | ||||
Idan Kamara
|
r14647 | |||
try: | ||||
while self.serveone(): | ||||
pass | ||||
except EOFError: | ||||
# we'll get here if the client disconnected while we were reading | ||||
# its request | ||||
return 1 | ||||
return 0 | ||||
Yuya Nishihara
|
r22988 | |||
Yuya Nishihara
|
r23324 | def _protectio(ui): | ||
""" duplicates streams and redirect original to null if ui uses stdio """ | ||||
ui.flush() | ||||
newfiles = [] | ||||
nullfd = os.open(os.devnull, os.O_RDWR) | ||||
for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'), | ||||
(ui.fout, sys.stdout, 'wb')]: | ||||
if f is sysf: | ||||
newfd = os.dup(f.fileno()) | ||||
os.dup2(nullfd, f.fileno()) | ||||
f = os.fdopen(newfd, mode) | ||||
newfiles.append(f) | ||||
os.close(nullfd) | ||||
return tuple(newfiles) | ||||
def _restoreio(ui, fin, fout): | ||||
""" restores streams from duplicated ones """ | ||||
ui.flush() | ||||
for f, uif in [(fin, ui.fin), (fout, ui.fout)]: | ||||
if f is not uif: | ||||
os.dup2(f.fileno(), uif.fileno()) | ||||
f.close() | ||||
Yuya Nishihara
|
r22988 | class pipeservice(object): | ||
def __init__(self, ui, repo, opts): | ||||
Yuya Nishihara
|
r23323 | self.ui = ui | ||
self.repo = repo | ||||
Yuya Nishihara
|
r22988 | |||
def init(self): | ||||
pass | ||||
def run(self): | ||||
Yuya Nishihara
|
r23323 | ui = self.ui | ||
Yuya Nishihara
|
r23324 | # redirect stdio to null device so that broken extensions or in-process | ||
# hooks will never cause corruption of channel protocol. | ||||
fin, fout = _protectio(ui) | ||||
try: | ||||
sv = server(ui, self.repo, fin, fout) | ||||
return sv.serve() | ||||
finally: | ||||
_restoreio(ui, fin, fout) | ||||
Yuya Nishihara
|
r22989 | |||
Yuya Nishihara
|
r22994 | class _requesthandler(SocketServer.StreamRequestHandler): | ||
def handle(self): | ||||
ui = self.server.ui | ||||
repo = self.server.repo | ||||
sv = server(ui, repo, self.rfile, self.wfile) | ||||
try: | ||||
try: | ||||
sv.serve() | ||||
# handle exceptions that may be raised by command server. most of | ||||
# known exceptions are caught by dispatch. | ||||
Gregory Szorc
|
r25660 | except util.Abort as inst: | ||
Yuya Nishihara
|
r22994 | ui.warn(_('abort: %s\n') % inst) | ||
Gregory Szorc
|
r25660 | except IOError as inst: | ||
Yuya Nishihara
|
r22994 | if inst.errno != errno.EPIPE: | ||
raise | ||||
except KeyboardInterrupt: | ||||
pass | ||||
except: # re-raises | ||||
# also write traceback to error channel. otherwise client cannot | ||||
# see it because it is written to server's stderr by default. | ||||
traceback.print_exc(file=sv.cerr) | ||||
raise | ||||
class unixservice(object): | ||||
""" | ||||
Listens on unix domain socket and forks server per connection | ||||
""" | ||||
def __init__(self, ui, repo, opts): | ||||
self.ui = ui | ||||
self.repo = repo | ||||
self.address = opts['address'] | ||||
if not util.safehasattr(SocketServer, 'UnixStreamServer'): | ||||
raise util.Abort(_('unsupported platform')) | ||||
if not self.address: | ||||
raise util.Abort(_('no socket path specified with --address')) | ||||
def init(self): | ||||
class cls(SocketServer.ForkingMixIn, SocketServer.UnixStreamServer): | ||||
ui = self.ui | ||||
repo = self.repo | ||||
self.server = cls(self.address, _requesthandler) | ||||
self.ui.status(_('listening at %s\n') % self.address) | ||||
self.ui.flush() # avoid buffering of status message | ||||
def run(self): | ||||
try: | ||||
self.server.serve_forever() | ||||
finally: | ||||
os.unlink(self.address) | ||||
Yuya Nishihara
|
r22989 | _servicemap = { | ||
'pipe': pipeservice, | ||||
Yuya Nishihara
|
r22994 | 'unix': unixservice, | ||
Yuya Nishihara
|
r22989 | } | ||
def createservice(ui, repo, opts): | ||||
mode = opts['cmdserver'] | ||||
try: | ||||
return _servicemap[mode](ui, repo, opts) | ||||
except KeyError: | ||||
raise util.Abort(_('unknown mode %s') % mode) | ||||