commandserver.py
771 lines
| 24.6 KiB
| text/x-python
|
PythonLexer
/ mercurial / commandserver.py
Idan Kamara
|
r14647 | # commandserver.py - communicate with Mercurial's API over a pipe | ||
# | ||||
# Copyright Matt Mackall <mpm@selenic.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Yuya Nishihara
|
r27351 | from __future__ import absolute_import | ||
import errno | ||||
Yuya Nishihara
|
r29513 | import gc | ||
Yuya Nishihara
|
r27351 | import os | ||
Yuya Nishihara
|
r29513 | import random | ||
Yuya Nishihara
|
r29544 | import signal | ||
import socket | ||||
Idan Kamara
|
r14647 | import struct | ||
Yuya Nishihara
|
r27351 | import traceback | ||
Augie Fackler
|
r36958 | try: | ||
import selectors | ||||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r36958 | selectors.BaseSelector | ||
except ImportError: | ||||
from .thirdparty import selectors2 as selectors | ||||
Yuya Nishihara
|
r27351 | from .i18n import _ | ||
Gregory Szorc
|
r43359 | from .pycompat import getattr | ||
Yuya Nishihara
|
r27351 | from . import ( | ||
encoding, | ||||
error, | ||||
Yuya Nishihara
|
r40859 | loggingutil, | ||
Yuya Nishihara
|
r40625 | pycompat, | ||
Yuya Nishihara
|
r41035 | repocache, | ||
Yuya Nishihara
|
r27351 | util, | ||
Yuya Nishihara
|
r40859 | vfs as vfsmod, | ||
Yuya Nishihara
|
r27351 | ) | ||
Yuya Nishihara
|
r37137 | from .utils import ( | ||
Yuya Nishihara
|
r40625 | cborutil, | ||
Yuya Nishihara
|
r37137 | procutil, | ||
) | ||||
Idan Kamara
|
r14647 | |||
Augie Fackler
|
r43346 | |||
Idan Kamara
|
r14647 | class channeledoutput(object): | ||
""" | ||||
Yuya Nishihara
|
r22561 | Write data to out in the following format: | ||
Idan Kamara
|
r14647 | |||
data length (unsigned int), | ||||
data | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r22563 | def __init__(self, out, channel): | ||
Idan Kamara
|
r14647 | self.out = out | ||
self.channel = channel | ||||
Yuya Nishihara
|
r27415 | @property | ||
def name(self): | ||||
Augie Fackler
|
r43347 | return b'<%c-channel>' % self.channel | ||
Yuya Nishihara
|
r27415 | |||
Idan Kamara
|
r14647 | def write(self, data): | ||
if not data: | ||||
return | ||||
Yuya Nishihara
|
r30263 | # single write() to guarantee the same atomicity as the underlying file | ||
Augie Fackler
|
r43347 | self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data) | ||
Idan Kamara
|
r14647 | self.out.flush() | ||
def __getattr__(self, attr): | ||||
Augie Fackler
|
r43906 | if attr in ('isatty', 'fileno', 'tell', 'seek'): | ||
Augie Fackler
|
r18174 | raise AttributeError(attr) | ||
Yuya Nishihara
|
r22563 | return getattr(self.out, attr) | ||
Idan Kamara
|
r14647 | |||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r40625 | class channeledmessage(object): | ||
""" | ||||
Write encoded message and metadata to out in the following format: | ||||
data length (unsigned int), | ||||
encoded message and metadata, as a flat key-value dict. | ||||
Yuya Nishihara
|
r40626 | |||
Each message should have 'type' attribute. Messages of unknown type | ||||
should be ignored. | ||||
Yuya Nishihara
|
r40625 | """ | ||
# teach ui that write() can take **opts | ||||
structured = True | ||||
def __init__(self, out, channel, encodename, encodefn): | ||||
self._cout = channeledoutput(out, channel) | ||||
self.encoding = encodename | ||||
self._encodefn = encodefn | ||||
def write(self, data, **opts): | ||||
opts = pycompat.byteskwargs(opts) | ||||
Yuya Nishihara
|
r40630 | if data is not None: | ||
opts[b'data'] = data | ||||
Yuya Nishihara
|
r40625 | self._cout.write(self._encodefn(opts)) | ||
def __getattr__(self, attr): | ||||
return getattr(self._cout, attr) | ||||
Augie Fackler
|
r43346 | |||
Idan Kamara
|
r14647 | class channeledinput(object): | ||
""" | ||||
Read data from in_. | ||||
Requests for input are written to out in the following format: | ||||
channel identifier - 'I' for plain input, 'L' line based (1 byte) | ||||
how many bytes to send at most (unsigned int), | ||||
The client replies with: | ||||
data length (unsigned int), 0 meaning EOF | ||||
data | ||||
""" | ||||
maxchunksize = 4 * 1024 | ||||
def __init__(self, in_, out, channel): | ||||
self.in_ = in_ | ||||
self.out = out | ||||
self.channel = channel | ||||
Yuya Nishihara
|
r27415 | @property | ||
def name(self): | ||||
Augie Fackler
|
r43347 | return b'<%c-channel>' % self.channel | ||
Yuya Nishihara
|
r27415 | |||
Idan Kamara
|
r14647 | def read(self, size=-1): | ||
if size < 0: | ||||
# if we need to consume all the clients input, ask for 4k chunks | ||||
# so the pipe doesn't fill up risking a deadlock | ||||
size = self.maxchunksize | ||||
s = self._read(size, self.channel) | ||||
buf = s | ||||
while s: | ||||
Idan Kamara
|
r14728 | s = self._read(size, self.channel) | ||
Idan Kamara
|
r14647 | buf += s | ||
return buf | ||||
else: | ||||
return self._read(size, self.channel) | ||||
def _read(self, size, channel): | ||||
if not size: | ||||
Augie Fackler
|
r43347 | return b'' | ||
Idan Kamara
|
r14647 | assert size > 0 | ||
# tell the client we need at most size bytes | ||||
Augie Fackler
|
r43347 | self.out.write(struct.pack(b'>cI', channel, size)) | ||
Idan Kamara
|
r14647 | self.out.flush() | ||
length = self.in_.read(4) | ||||
Augie Fackler
|
r43347 | length = struct.unpack(b'>I', length)[0] | ||
Idan Kamara
|
r14647 | if not length: | ||
Augie Fackler
|
r43347 | return b'' | ||
Idan Kamara
|
r14647 | else: | ||
return self.in_.read(length) | ||||
def readline(self, size=-1): | ||||
if size < 0: | ||||
size = self.maxchunksize | ||||
Augie Fackler
|
r43347 | s = self._read(size, b'L') | ||
Idan Kamara
|
r14647 | buf = s | ||
# keep asking for more until there's either no more or | ||||
# we got a full line | ||||
Yuya Nishihara
|
r44857 | while s and not s.endswith(b'\n'): | ||
Augie Fackler
|
r43347 | s = self._read(size, b'L') | ||
Idan Kamara
|
r14647 | buf += s | ||
return buf | ||||
else: | ||||
Augie Fackler
|
r43347 | return self._read(size, b'L') | ||
Idan Kamara
|
r14647 | |||
def __iter__(self): | ||||
return self | ||||
def next(self): | ||||
l = self.readline() | ||||
if not l: | ||||
raise StopIteration | ||||
return l | ||||
Yuya Nishihara
|
r40394 | __next__ = next | ||
Idan Kamara
|
r14647 | def __getattr__(self, attr): | ||
Augie Fackler
|
r43906 | if attr in ('isatty', 'fileno', 'tell', 'seek'): | ||
Augie Fackler
|
r18174 | raise AttributeError(attr) | ||
Idan Kamara
|
r14647 | return getattr(self.in_, attr) | ||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r40625 | _messageencoders = { | ||
b'cbor': lambda v: b''.join(cborutil.streamencode(v)), | ||||
} | ||||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r40625 | def _selectmessageencoder(ui): | ||
encnames = ui.configlist(b'cmdserver', b'message-encodings') | ||||
for n in encnames: | ||||
f = _messageencoders.get(n) | ||||
if f: | ||||
return n, f | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
b'no supported message encodings: %s' % b' '.join(encnames) | ||||
) | ||||
Yuya Nishihara
|
r40625 | |||
Idan Kamara
|
r14647 | class server(object): | ||
""" | ||||
Yuya Nishihara
|
r22990 | Listens for commands on fin, runs them and writes the output on a channel | ||
based stream to fout. | ||||
Idan Kamara
|
r14647 | """ | ||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r40911 | def __init__(self, ui, repo, fin, fout, prereposetups=None): | ||
Matt Harbison
|
r39843 | self.cwd = encoding.getcwd() | ||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r20650 | if repo: | ||
# the ui here is really the repo ui so take its baseui so we don't | ||||
# end up with its local configuration | ||||
self.ui = repo.baseui | ||||
self.repo = repo | ||||
self.repoui = repo.ui | ||||
else: | ||||
self.ui = ui | ||||
self.repo = self.repoui = None | ||||
Yuya Nishihara
|
r40911 | self._prereposetups = prereposetups | ||
Idan Kamara
|
r14647 | |||
Augie Fackler
|
r43347 | self.cdebug = channeledoutput(fout, b'd') | ||
self.cerr = channeledoutput(fout, b'e') | ||||
self.cout = channeledoutput(fout, b'o') | ||||
self.cin = channeledinput(fin, fout, b'I') | ||||
self.cresult = channeledoutput(fout, b'r') | ||||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r40859 | if self.ui.config(b'cmdserver', b'log') == b'-': | ||
# switch log stream of server's ui to the 'd' (debug) channel | ||||
# (don't touch repo.ui as its lifetime is longer than the server) | ||||
self.ui = self.ui.copy() | ||||
setuplogging(self.ui, repo=None, fp=self.cdebug) | ||||
Yuya Nishihara
|
r40625 | self.cmsg = None | ||
if ui.config(b'ui', b'message-output') == b'channel': | ||||
encname, encfn = _selectmessageencoder(ui) | ||||
self.cmsg = channeledmessage(fout, b'm', encname, encfn) | ||||
Yuya Nishihara
|
r22990 | self.client = fin | ||
Idan Kamara
|
r14647 | |||
Yuya Nishihara
|
r45602 | # If shutdown-on-interrupt is off, the default SIGINT handler is | ||
# removed so that client-server communication wouldn't be interrupted. | ||||
# For example, 'runcommand' handler will issue three short read()s. | ||||
# If one of the first two read()s were interrupted, the communication | ||||
# channel would be left at dirty state and the subsequent request | ||||
# wouldn't be parsed. So catching KeyboardInterrupt isn't enough. | ||||
self._shutdown_on_interrupt = ui.configbool( | ||||
b'cmdserver', b'shutdown-on-interrupt' | ||||
) | ||||
self._old_inthandler = None | ||||
if not self._shutdown_on_interrupt: | ||||
self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
Yuya Nishihara
|
r29512 | def cleanup(self): | ||
"""release and restore resources taken during server session""" | ||||
Yuya Nishihara
|
r45602 | if not self._shutdown_on_interrupt: | ||
signal.signal(signal.SIGINT, self._old_inthandler) | ||||
Yuya Nishihara
|
r29512 | |||
Idan Kamara
|
r14647 | def _read(self, size): | ||
Idan Kamara
|
r14706 | if not size: | ||
Augie Fackler
|
r43347 | return b'' | ||
Idan Kamara
|
r14706 | |||
Idan Kamara
|
r14647 | data = self.client.read(size) | ||
# is the other end closed? | ||||
if not data: | ||||
Brodie Rao
|
r16687 | raise EOFError | ||
Idan Kamara
|
r14647 | |||
return data | ||||
Jun Wu
|
r28156 | def _readstr(self): | ||
"""read a string from the channel | ||||
format: | ||||
data length (uint32), data | ||||
""" | ||||
Augie Fackler
|
r43347 | length = struct.unpack(b'>I', self._read(4))[0] | ||
Jun Wu
|
r28156 | if not length: | ||
Augie Fackler
|
r43347 | return b'' | ||
Jun Wu
|
r28156 | return self._read(length) | ||
def _readlist(self): | ||||
"""read a list of NULL separated strings from the channel""" | ||||
s = self._readstr() | ||||
if s: | ||||
Augie Fackler
|
r43347 | return s.split(b'\0') | ||
Jun Wu
|
r28156 | else: | ||
return [] | ||||
Yuya Nishihara
|
r45602 | def _dispatchcommand(self, req): | ||
from . import dispatch # avoid cycle | ||||
if self._shutdown_on_interrupt: | ||||
# no need to restore SIGINT handler as it is unmodified. | ||||
return dispatch.dispatch(req) | ||||
try: | ||||
signal.signal(signal.SIGINT, self._old_inthandler) | ||||
return dispatch.dispatch(req) | ||||
except error.SignalInterrupt: | ||||
# propagate SIGBREAK, SIGHUP, or SIGTERM. | ||||
raise | ||||
except KeyboardInterrupt: | ||||
# SIGINT may be received out of the try-except block of dispatch(), | ||||
# so catch it as last ditch. Another KeyboardInterrupt may be | ||||
# raised while handling exceptions here, but there's no way to | ||||
# avoid that except for doing everything in C. | ||||
pass | ||||
finally: | ||||
signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
# On KeyboardInterrupt, print error message and exit *after* SIGINT | ||||
# handler removed. | ||||
req.ui.error(_(b'interrupted!\n')) | ||||
return -1 | ||||
Idan Kamara
|
r14647 | def runcommand(self): | ||
""" reads a list of \0 terminated arguments, executes | ||||
and writes the return code to the result channel """ | ||||
Yuya Nishihara
|
r27352 | from . import dispatch # avoid cycle | ||
Idan Kamara
|
r14647 | |||
Jun Wu
|
r28157 | args = self._readlist() | ||
Idan Kamara
|
r14647 | |||
Idan Kamara
|
r14750 | # copy the uis so changes (e.g. --config or --verbose) don't | ||
# persist between requests | ||||
Idan Kamara
|
r14751 | copiedui = self.ui.copy() | ||
Yuya Nishihara
|
r21195 | uis = [copiedui] | ||
Yuya Nishihara
|
r20650 | if self.repo: | ||
self.repo.baseui = copiedui | ||||
# clone ui without using ui.copy because this is protected | ||||
repoui = self.repoui.__class__(self.repoui) | ||||
Augie Fackler
|
r43346 | repoui.copy = copiedui.copy # redo copy protection | ||
Yuya Nishihara
|
r21195 | uis.append(repoui) | ||
Yuya Nishihara
|
r20650 | self.repo.ui = self.repo.dirstate._ui = repoui | ||
self.repo.invalidateall() | ||||
Idan Kamara
|
r14751 | |||
Yuya Nishihara
|
r21195 | for ui in uis: | ||
Yuya Nishihara
|
r29366 | ui.resetstate() | ||
Yuya Nishihara
|
r27565 | # any kind of interaction must use server channels, but chg may | ||
# replace channels by fully functional tty files. so nontty is | ||||
# enforced only if cin is a channel. | ||||
Augie Fackler
|
r43347 | if not util.safehasattr(self.cin, b'fileno'): | ||
ui.setconfig(b'ui', b'nontty', b'true', b'commandserver') | ||||
Yuya Nishihara
|
r21195 | |||
Augie Fackler
|
r43346 | req = dispatch.request( | ||
args[:], | ||||
copiedui, | ||||
self.repo, | ||||
self.cin, | ||||
self.cout, | ||||
self.cerr, | ||||
self.cmsg, | ||||
prereposetups=self._prereposetups, | ||||
) | ||||
Idan Kamara
|
r14647 | |||
Gregory Szorc
|
r35670 | try: | ||
Yuya Nishihara
|
r45602 | ret = self._dispatchcommand(req) & 255 | ||
# If shutdown-on-interrupt is off, it's important to write the | ||||
# result code *after* SIGINT handler removed. If the result code | ||||
# were lost, the client wouldn't be able to continue processing. | ||||
Augie Fackler
|
r43347 | self.cresult.write(struct.pack(b'>i', int(ret))) | ||
Gregory Szorc
|
r35670 | finally: | ||
# restore old cwd | ||||
Augie Fackler
|
r43347 | if b'--cwd' in args: | ||
Gregory Szorc
|
r35670 | os.chdir(self.cwd) | ||
Idan Kamara
|
r14647 | |||
def getencoding(self): | ||||
""" writes the current encoding to the result channel """ | ||||
self.cresult.write(encoding.encoding) | ||||
def serveone(self): | ||||
cmd = self.client.readline()[:-1] | ||||
if cmd: | ||||
handler = self.capabilities.get(cmd) | ||||
if handler: | ||||
handler(self) | ||||
else: | ||||
# clients are expected to check what commands are supported by | ||||
# looking at the servers capabilities | ||||
Augie Fackler
|
r43347 | raise error.Abort(_(b'unknown command %s') % cmd) | ||
Idan Kamara
|
r14647 | |||
Augie Fackler
|
r43347 | return cmd != b'' | ||
Idan Kamara
|
r14647 | |||
Augie Fackler
|
r43347 | capabilities = {b'runcommand': runcommand, b'getencoding': getencoding} | ||
Idan Kamara
|
r14647 | |||
def serve(self): | ||||
Augie Fackler
|
r43347 | hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities)) | ||
hellomsg += b'\n' | ||||
hellomsg += b'encoding: ' + encoding.encoding | ||||
hellomsg += b'\n' | ||||
Yuya Nishihara
|
r40625 | if self.cmsg: | ||
Augie Fackler
|
r43347 | hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding | ||
hellomsg += b'pid: %d' % procutil.getpid() | ||||
if util.safehasattr(os, b'getpgid'): | ||||
hellomsg += b'\n' | ||||
hellomsg += b'pgid: %d' % os.getpgid(0) | ||||
Idan Kamara
|
r14719 | |||
# write the hello msg in -one- chunk | ||||
self.cout.write(hellomsg) | ||||
Idan Kamara
|
r14647 | |||
try: | ||||
while self.serveone(): | ||||
pass | ||||
except EOFError: | ||||
# we'll get here if the client disconnected while we were reading | ||||
# its request | ||||
return 1 | ||||
return 0 | ||||
Yuya Nishihara
|
r22988 | |||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r40859 | def setuplogging(ui, repo=None, fp=None): | ||
Yuya Nishihara
|
r40858 | """Set up server logging facility | ||
Yuya Nishihara
|
r40859 | If cmdserver.log is '-', log messages will be sent to the given fp. | ||
It should be the 'd' channel while a client is connected, and otherwise | ||||
is the stderr of the server process. | ||||
Yuya Nishihara
|
r40858 | """ | ||
# developer config: cmdserver.log | ||||
logpath = ui.config(b'cmdserver', b'log') | ||||
if not logpath: | ||||
return | ||||
Yuya Nishihara
|
r40862 | # developer config: cmdserver.track-log | ||
tracked = set(ui.configlist(b'cmdserver', b'track-log')) | ||||
Yuya Nishihara
|
r40858 | |||
Yuya Nishihara
|
r40859 | if logpath == b'-' and fp: | ||
logger = loggingutil.fileobjectlogger(fp, tracked) | ||||
elif logpath == b'-': | ||||
logger = loggingutil.fileobjectlogger(ui.ferr, tracked) | ||||
else: | ||||
Yuya Nishihara
|
r40861 | logpath = os.path.abspath(util.expandpath(logpath)) | ||
Yuya Nishihara
|
r40862 | # developer config: cmdserver.max-log-files | ||
maxfiles = ui.configint(b'cmdserver', b'max-log-files') | ||||
# developer config: cmdserver.max-log-size | ||||
maxsize = ui.configbytes(b'cmdserver', b'max-log-size') | ||||
Yuya Nishihara
|
r40859 | vfs = vfsmod.vfs(os.path.dirname(logpath)) | ||
Augie Fackler
|
r43346 | logger = loggingutil.filelogger( | ||
vfs, | ||||
os.path.basename(logpath), | ||||
tracked, | ||||
maxfiles=maxfiles, | ||||
maxsize=maxsize, | ||||
) | ||||
Yuya Nishihara
|
r40859 | |||
targetuis = {ui} | ||||
if repo: | ||||
targetuis.add(repo.baseui) | ||||
targetuis.add(repo.ui) | ||||
for u in targetuis: | ||||
u.setlogger(b'cmdserver', logger) | ||||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r22988 | class pipeservice(object): | ||
def __init__(self, ui, repo, opts): | ||||
Yuya Nishihara
|
r23323 | self.ui = ui | ||
self.repo = repo | ||||
Yuya Nishihara
|
r22988 | |||
def init(self): | ||||
pass | ||||
def run(self): | ||||
Yuya Nishihara
|
r23323 | ui = self.ui | ||
Yuya Nishihara
|
r23324 | # redirect stdio to null device so that broken extensions or in-process | ||
# hooks will never cause corruption of channel protocol. | ||||
Yuya Nishihara
|
r41320 | with ui.protectedfinout() as (fin, fout): | ||
Yuya Nishihara
|
r40624 | sv = server(ui, self.repo, fin, fout) | ||
Yuya Nishihara
|
r37142 | try: | ||
return sv.serve() | ||||
finally: | ||||
sv.cleanup() | ||||
Yuya Nishihara
|
r22989 | |||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r29586 | def _initworkerprocess(): | ||
Jun Wu
|
r29609 | # use a different process group from the master process, in order to: | ||
# 1. make the current process group no longer "orphaned" (because the | ||||
# parent of this process is in a different process group while | ||||
# remains in a same session) | ||||
# according to POSIX 2.2.2.52, orphaned process group will ignore | ||||
# terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will | ||||
# cause trouble for things like ncurses. | ||||
# 2. the client can use kill(-pgid, sig) to simulate terminal-generated | ||||
# SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child | ||||
# processes like ssh will be killed properly, without affecting | ||||
# unrelated processes. | ||||
Yuya Nishihara
|
r29585 | os.setpgid(0, 0) | ||
# change random state otherwise forked request handlers would have a | ||||
# same state inherited from parent. | ||||
random.seed() | ||||
Yuya Nishihara
|
r29542 | |||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r40911 | def _serverequest(ui, repo, conn, createcmdserver, prereposetups): | ||
Augie Fackler
|
r43906 | fin = conn.makefile('rb') | ||
fout = conn.makefile('wb') | ||||
Yuya Nishihara
|
r29585 | sv = None | ||
try: | ||||
Yuya Nishihara
|
r40911 | sv = createcmdserver(repo, conn, fin, fout, prereposetups) | ||
Yuya Nishihara
|
r22994 | try: | ||
Yuya Nishihara
|
r29585 | sv.serve() | ||
# handle exceptions that may be raised by command server. most of | ||||
# known exceptions are caught by dispatch. | ||||
except error.Abort as inst: | ||||
Martin von Zweigbergk
|
r46274 | ui.error(_(b'abort: %s\n') % inst.message) | ||
Yuya Nishihara
|
r29585 | except IOError as inst: | ||
if inst.errno != errno.EPIPE: | ||||
raise | ||||
except KeyboardInterrupt: | ||||
pass | ||||
Yuya Nishihara
|
r29513 | finally: | ||
Yuya Nishihara
|
r29585 | sv.cleanup() | ||
Augie Fackler
|
r43346 | except: # re-raises | ||
Yuya Nishihara
|
r29585 | # also write traceback to error channel. otherwise client cannot | ||
# see it because it is written to server's stderr by default. | ||||
if sv: | ||||
cerr = sv.cerr | ||||
else: | ||||
Augie Fackler
|
r43347 | cerr = channeledoutput(fout, b'e') | ||
Yuya Nishihara
|
r40397 | cerr.write(encoding.strtolocal(traceback.format_exc())) | ||
Yuya Nishihara
|
r29585 | raise | ||
finally: | ||||
fin.close() | ||||
try: | ||||
fout.close() # implicit flush() may cause another EPIPE | ||||
except IOError as inst: | ||||
if inst.errno != errno.EPIPE: | ||||
raise | ||||
Yuya Nishihara
|
r22994 | |||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r29544 | class unixservicehandler(object): | ||
"""Set of pluggable operations for unix-mode services | ||||
Almost all methods except for createcmdserver() are called in the main | ||||
process. You can't pass mutable resource back from createcmdserver(). | ||||
""" | ||||
pollinterval = None | ||||
def __init__(self, ui): | ||||
self.ui = ui | ||||
def bindsocket(self, sock, address): | ||||
util.bindunixsocket(sock, address) | ||||
Jun Wu
|
r32232 | sock.listen(socket.SOMAXCONN) | ||
Augie Fackler
|
r43347 | self.ui.status(_(b'listening at %s\n') % address) | ||
Jun Wu
|
r32233 | self.ui.flush() # avoid buffering of status message | ||
Yuya Nishihara
|
r29544 | |||
def unlinksocket(self, address): | ||||
os.unlink(address) | ||||
def shouldexit(self): | ||||
"""True if server should shut down; checked per pollinterval""" | ||||
return False | ||||
def newconnection(self): | ||||
"""Called when main process notices new connection""" | ||||
Yuya Nishihara
|
r40911 | def createcmdserver(self, repo, conn, fin, fout, prereposetups): | ||
Yuya Nishihara
|
r29544 | """Create new command server instance; called in the process that | ||
serves for the current connection""" | ||||
Yuya Nishihara
|
r40911 | return server(self.ui, repo, fin, fout, prereposetups) | ||
Yuya Nishihara
|
r29544 | |||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r29548 | class unixforkingservice(object): | ||
Yuya Nishihara
|
r22994 | """ | ||
Listens on unix domain socket and forks server per connection | ||||
""" | ||||
Yuya Nishihara
|
r29548 | |||
def __init__(self, ui, repo, opts, handler=None): | ||||
Yuya Nishihara
|
r22994 | self.ui = ui | ||
self.repo = repo | ||||
Augie Fackler
|
r43347 | self.address = opts[b'address'] | ||
if not util.safehasattr(socket, b'AF_UNIX'): | ||||
raise error.Abort(_(b'unsupported platform')) | ||||
Yuya Nishihara
|
r22994 | if not self.address: | ||
Augie Fackler
|
r43347 | raise error.Abort(_(b'no socket path specified with --address')) | ||
Yuya Nishihara
|
r29544 | self._servicehandler = handler or unixservicehandler(ui) | ||
self._sock = None | ||||
Yuya Nishihara
|
r41034 | self._mainipc = None | ||
self._workeripc = None | ||||
Yuya Nishihara
|
r29544 | self._oldsigchldhandler = None | ||
self._workerpids = set() # updated by signal handler; do not iterate | ||||
Jun Wu
|
r30887 | self._socketunlinked = None | ||
Yuya Nishihara
|
r41035 | # experimental config: cmdserver.max-repo-cache | ||
maxlen = ui.configint(b'cmdserver', b'max-repo-cache') | ||||
if maxlen < 0: | ||||
Augie Fackler
|
r43347 | raise error.Abort(_(b'negative max-repo-cache size not allowed')) | ||
Yuya Nishihara
|
r41035 | self._repoloader = repocache.repoloader(ui, maxlen) | ||
Kyle Lippincott
|
r44850 | # attempt to avoid crash in CoreFoundation when using chg after fix in | ||
# a89381e04c58 | ||||
if pycompat.isdarwin: | ||||
procutil.gui() | ||||
Yuya Nishihara
|
r29544 | |||
def init(self): | ||||
self._sock = socket.socket(socket.AF_UNIX) | ||||
Yuya Nishihara
|
r41034 | # IPC channel from many workers to one main process; this is actually | ||
# a uni-directional pipe, but is backed by a DGRAM socket so each | ||||
# message can be easily separated. | ||||
o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) | ||||
self._mainipc, self._workeripc = o | ||||
Yuya Nishihara
|
r29544 | self._servicehandler.bindsocket(self._sock, self.address) | ||
Augie Fackler
|
r43347 | if util.safehasattr(procutil, b'unblocksignal'): | ||
Yuya Nishihara
|
r37138 | procutil.unblocksignal(signal.SIGCHLD) | ||
Yuya Nishihara
|
r29544 | o = signal.signal(signal.SIGCHLD, self._sigchldhandler) | ||
self._oldsigchldhandler = o | ||||
Jun Wu
|
r30887 | self._socketunlinked = False | ||
Yuya Nishihara
|
r41035 | self._repoloader.start() | ||
Jun Wu
|
r30887 | |||
def _unlinksocket(self): | ||||
if not self._socketunlinked: | ||||
self._servicehandler.unlinksocket(self.address) | ||||
self._socketunlinked = True | ||||
Yuya Nishihara
|
r29544 | |||
def _cleanup(self): | ||||
signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | ||||
self._sock.close() | ||||
Yuya Nishihara
|
r41034 | self._mainipc.close() | ||
self._workeripc.close() | ||||
Jun Wu
|
r30887 | self._unlinksocket() | ||
Yuya Nishihara
|
r41035 | self._repoloader.stop() | ||
Yuya Nishihara
|
r29544 | # don't kill child processes as they have active clients, just wait | ||
self._reapworkers(0) | ||||
def run(self): | ||||
try: | ||||
self._mainloop() | ||||
finally: | ||||
self._cleanup() | ||||
def _mainloop(self): | ||||
Jun Wu
|
r30891 | exiting = False | ||
Yuya Nishihara
|
r29544 | h = self._servicehandler | ||
Augie Fackler
|
r36958 | selector = selectors.DefaultSelector() | ||
Augie Fackler
|
r43346 | selector.register( | ||
self._sock, selectors.EVENT_READ, self._acceptnewconnection | ||||
) | ||||
selector.register( | ||||
self._mainipc, selectors.EVENT_READ, self._handlemainipc | ||||
) | ||||
Jun Wu
|
r30891 | while True: | ||
if not exiting and h.shouldexit(): | ||||
# clients can no longer connect() to the domain socket, so | ||||
# we stop queuing new requests. | ||||
# for requests that are queued (connect()-ed, but haven't been | ||||
# accept()-ed), handle them before exit. otherwise, clients | ||||
# waiting for recv() will receive ECONNRESET. | ||||
self._unlinksocket() | ||||
exiting = True | ||||
Yuya Nishihara
|
r40833 | try: | ||
Yuya Nishihara
|
r40914 | events = selector.select(timeout=h.pollinterval) | ||
Yuya Nishihara
|
r40833 | except OSError as inst: | ||
# selectors2 raises ETIMEDOUT if timeout exceeded while | ||||
# handling signal interrupt. That's probably wrong, but | ||||
# we can easily get around it. | ||||
if inst.errno != errno.ETIMEDOUT: | ||||
raise | ||||
Yuya Nishihara
|
r40914 | events = [] | ||
if not events: | ||||
Jun Wu
|
r33543 | # only exit if we completed all queued requests | ||
if exiting: | ||||
break | ||||
continue | ||||
Yuya Nishihara
|
r40914 | for key, _mask in events: | ||
key.data(key.fileobj, selector) | ||||
Yuya Nishihara
|
r40912 | selector.close() | ||
def _acceptnewconnection(self, sock, selector): | ||||
h = self._servicehandler | ||||
Yuya Nishihara
|
r40913 | try: | ||
conn, _addr = sock.accept() | ||||
except socket.error as inst: | ||||
if inst.args[0] == errno.EINTR: | ||||
return | ||||
raise | ||||
Yuya Nishihara
|
r41035 | # Future improvement: On Python 3.7, maybe gc.freeze() can be used | ||
# to prevent COW memory from being touched by GC. | ||||
# https://instagram-engineering.com/ | ||||
# copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf | ||||
Yuya Nishihara
|
r40913 | pid = os.fork() | ||
if pid: | ||||
Yuya Nishihara
|
r29544 | try: | ||
Augie Fackler
|
r43346 | self.ui.log( | ||
b'cmdserver', b'forked worker process (pid=%d)\n', pid | ||||
) | ||||
Yuya Nishihara
|
r40913 | self._workerpids.add(pid) | ||
h.newconnection() | ||||
finally: | ||||
conn.close() # release handle in parent process | ||||
else: | ||||
try: | ||||
selector.close() | ||||
sock.close() | ||||
Yuya Nishihara
|
r41034 | self._mainipc.close() | ||
Yuya Nishihara
|
r40913 | self._runworker(conn) | ||
conn.close() | ||||
Yuya Nishihara
|
r41034 | self._workeripc.close() | ||
Yuya Nishihara
|
r40913 | os._exit(0) | ||
except: # never return, hence no re-raises | ||||
Yuya Nishihara
|
r29544 | try: | ||
Yuya Nishihara
|
r40913 | self.ui.traceback(force=True) | ||
Yuya Nishihara
|
r29544 | finally: | ||
Yuya Nishihara
|
r40913 | os._exit(255) | ||
Yuya Nishihara
|
r29544 | |||
Yuya Nishihara
|
r41034 | def _handlemainipc(self, sock, selector): | ||
"""Process messages sent from a worker""" | ||||
try: | ||||
path = sock.recv(32768) # large enough to receive path | ||||
except socket.error as inst: | ||||
if inst.args[0] == errno.EINTR: | ||||
return | ||||
raise | ||||
Yuya Nishihara
|
r41035 | self._repoloader.load(path) | ||
Yuya Nishihara
|
r41034 | |||
Yuya Nishihara
|
r29544 | def _sigchldhandler(self, signal, frame): | ||
self._reapworkers(os.WNOHANG) | ||||
def _reapworkers(self, options): | ||||
while self._workerpids: | ||||
try: | ||||
pid, _status = os.waitpid(-1, options) | ||||
except OSError as inst: | ||||
if inst.errno == errno.EINTR: | ||||
continue | ||||
if inst.errno != errno.ECHILD: | ||||
raise | ||||
# no child processes at all (reaped by other waitpid()?) | ||||
self._workerpids.clear() | ||||
return | ||||
if pid == 0: | ||||
# no waitable child processes | ||||
return | ||||
Yuya Nishihara
|
r40863 | self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid) | ||
Yuya Nishihara
|
r29544 | self._workerpids.discard(pid) | ||
Yuya Nishihara
|
r29587 | def _runworker(self, conn): | ||
Yuya Nishihara
|
r29544 | signal.signal(signal.SIGCHLD, self._oldsigchldhandler) | ||
Yuya Nishihara
|
r29586 | _initworkerprocess() | ||
Yuya Nishihara
|
r29544 | h = self._servicehandler | ||
Yuya Nishihara
|
r29586 | try: | ||
Augie Fackler
|
r43346 | _serverequest( | ||
self.ui, | ||||
self.repo, | ||||
conn, | ||||
h.createcmdserver, | ||||
prereposetups=[self._reposetup], | ||||
) | ||||
Yuya Nishihara
|
r29586 | finally: | ||
gc.collect() # trigger __del__ since worker process uses os._exit | ||||
Yuya Nishihara
|
r41034 | |||
def _reposetup(self, ui, repo): | ||||
if not repo.local(): | ||||
return | ||||
class unixcmdserverrepo(repo.__class__): | ||||
def close(self): | ||||
super(unixcmdserverrepo, self).close() | ||||
try: | ||||
self._cmdserveripc.send(self.root) | ||||
except socket.error: | ||||
Augie Fackler
|
r43346 | self.ui.log( | ||
b'cmdserver', b'failed to send repo root to master\n' | ||||
) | ||||
Yuya Nishihara
|
r41034 | |||
repo.__class__ = unixcmdserverrepo | ||||
repo._cmdserveripc = self._workeripc | ||||
Yuya Nishihara
|
r41035 | |||
cachedrepo = self._repoloader.get(repo.root) | ||||
if cachedrepo is None: | ||||
return | ||||
repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root) | ||||
repocache.copycache(cachedrepo, repo) | ||||