##// END OF EJS Templates
pycompat: make pycompat demandimport friendly...
pycompat: make pycompat demandimport friendly pycompat.py includes hack to import modules whose names are changed in Python 3. We use try-except to load module according to the version of python. But this method forces us to import the modules to raise an ImportError and hence making it demandimport unfriendly. This patch changes the try-except blocks to a single if-else block. To avoid test-check-pyflakes.t complain about unused imports, pycompat.py is excluded from the test.

File last commit:

r29580:ee818645 default
r29584:06587edd default
Show More
commandserver.py
534 lines | 15.8 KiB | text/x-python | PythonLexer
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 # commandserver.py - communicate with Mercurial's API over a pipe
#
# Copyright Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Yuya Nishihara
commandserver: use absolute_import
r27351 from __future__ import absolute_import
import errno
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 import gc
Yuya Nishihara
commandserver: use absolute_import
r27351 import os
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 import random
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 import select
import signal
import socket
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 import struct
Yuya Nishihara
commandserver: use absolute_import
r27351 import sys
import traceback
from .i18n import _
from . import (
encoding,
error,
util,
)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
logfile = None
def log(*args):
if not logfile:
return
for a in args:
logfile.write(str(a))
logfile.flush()
class channeledoutput(object):
"""
Yuya Nishihara
cmdserver: correct doc of channeledoutput...
r22561 Write data to out in the following format:
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
data length (unsigned int),
data
"""
Yuya Nishihara
cmdserver: drop useless in_ attribute from channeledoutput...
r22563 def __init__(self, out, channel):
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.out = out
self.channel = channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415 @property
def name(self):
return '<%c-channel>' % self.channel
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def write(self, data):
if not data:
return
self.out.write(struct.pack('>cI', self.channel, len(data)))
self.out.write(data)
self.out.flush()
def __getattr__(self, attr):
Yuya Nishihara
commandserver: drop tell() and seek() from channels (issue5049)...
r27915 if attr in ('isatty', 'fileno', 'tell', 'seek'):
Augie Fackler
commandserver: clean up use of two-argument raise...
r18174 raise AttributeError(attr)
Yuya Nishihara
cmdserver: drop useless in_ attribute from channeledoutput...
r22563 return getattr(self.out, attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
class channeledinput(object):
"""
Read data from in_.
Requests for input are written to out in the following format:
channel identifier - 'I' for plain input, 'L' line based (1 byte)
how many bytes to send at most (unsigned int),
The client replies with:
data length (unsigned int), 0 meaning EOF
data
"""
maxchunksize = 4 * 1024
def __init__(self, in_, out, channel):
self.in_ = in_
self.out = out
self.channel = channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415 @property
def name(self):
return '<%c-channel>' % self.channel
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def read(self, size=-1):
if size < 0:
# if we need to consume all the clients input, ask for 4k chunks
# so the pipe doesn't fill up risking a deadlock
size = self.maxchunksize
s = self._read(size, self.channel)
buf = s
while s:
Idan Kamara
cmdserver: fix read-loop string concatenation
r14728 s = self._read(size, self.channel)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf += s
return buf
else:
return self._read(size, self.channel)
def _read(self, size, channel):
if not size:
return ''
assert size > 0
# tell the client we need at most size bytes
self.out.write(struct.pack('>cI', channel, size))
self.out.flush()
length = self.in_.read(4)
length = struct.unpack('>I', length)[0]
if not length:
return ''
else:
return self.in_.read(length)
def readline(self, size=-1):
if size < 0:
size = self.maxchunksize
s = self._read(size, 'L')
buf = s
# keep asking for more until there's either no more or
# we got a full line
while s and s[-1] != '\n':
Idan Kamara
cmdserver: fix read-loop string concatenation
r14728 s = self._read(size, 'L')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf += s
return buf
else:
return self._read(size, 'L')
def __iter__(self):
return self
def next(self):
l = self.readline()
if not l:
raise StopIteration
return l
def __getattr__(self, attr):
Yuya Nishihara
commandserver: drop tell() and seek() from channels (issue5049)...
r27915 if attr in ('isatty', 'fileno', 'tell', 'seek'):
Augie Fackler
commandserver: clean up use of two-argument raise...
r18174 raise AttributeError(attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 return getattr(self.in_, attr)
class server(object):
"""
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 Listens for commands on fin, runs them and writes the output on a channel
based stream to fout.
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 """
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 def __init__(self, ui, repo, fin, fout):
Idan Kamara
cmdserver: restore old working dir after dispatch when we have --cwd
r14864 self.cwd = os.getcwd()
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Matt Mackall
commandserver: mark developer-only logging option
r25832 # developer config: cmdserver.log
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 logpath = ui.config("cmdserver", "log", None)
if logpath:
global logfile
if logpath == '-':
Mads Kiilerich
fix wording and not-completely-trivial spelling errors and bad docstrings
r17425 # write log on a special 'd' (debug) channel
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 logfile = channeledoutput(fout, 'd')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 else:
logfile = open(logpath, 'a')
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 if repo:
# the ui here is really the repo ui so take its baseui so we don't
# end up with its local configuration
self.ui = repo.baseui
self.repo = repo
self.repoui = repo.ui
else:
self.ui = ui
self.repo = self.repoui = None
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 self.cerr = channeledoutput(fout, 'e')
self.cout = channeledoutput(fout, 'o')
self.cin = channeledinput(fin, fout, 'I')
self.cresult = channeledoutput(fout, 'r')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 self.client = fin
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: promote .cleanup() hook from chgserver...
r29512 def cleanup(self):
"""release and restore resources taken during server session"""
pass
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def _read(self, size):
Idan Kamara
cmdserver: don't raise EOFError when trying to read 0 bytes from the client
r14706 if not size:
return ''
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 data = self.client.read(size)
# is the other end closed?
if not data:
Brodie Rao
cleanup: "raise SomeException()" -> "raise SomeException"
r16687 raise EOFError
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
return data
Jun Wu
commandserver: add _readstr and _readlist...
r28156 def _readstr(self):
"""read a string from the channel
format:
data length (uint32), data
"""
length = struct.unpack('>I', self._read(4))[0]
if not length:
return ''
return self._read(length)
def _readlist(self):
"""read a list of NULL separated strings from the channel"""
s = self._readstr()
if s:
return s.split('\0')
else:
return []
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def runcommand(self):
""" reads a list of \0 terminated arguments, executes
and writes the return code to the result channel """
Yuya Nishihara
commandserver: cut import cycle by itself...
r27352 from . import dispatch # avoid cycle
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Jun Wu
commandserver: use _readlist...
r28157 args = self._readlist()
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Idan Kamara
cmdserver: copy repo.ui before running commands
r14750 # copy the uis so changes (e.g. --config or --verbose) don't
# persist between requests
Idan Kamara
cmdserver: assign repo.baseui before running commands...
r14751 copiedui = self.ui.copy()
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 uis = [copiedui]
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 if self.repo:
self.repo.baseui = copiedui
# clone ui without using ui.copy because this is protected
repoui = self.repoui.__class__(self.repoui)
repoui.copy = copiedui.copy # redo copy protection
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 uis.append(repoui)
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 self.repo.ui = self.repo.dirstate._ui = repoui
self.repo.invalidateall()
Idan Kamara
cmdserver: assign repo.baseui before running commands...
r14751
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 for ui in uis:
Yuya Nishihara
ui: provide official way to reset internal state per command...
r29366 ui.resetstate()
Yuya Nishihara
commandserver: do not set nontty flag if channel is replaced by a real file...
r27565 # any kind of interaction must use server channels, but chg may
# replace channels by fully functional tty files. so nontty is
# enforced only if cin is a channel.
if not util.safehasattr(self.cin, 'fileno'):
ui.setconfig('ui', 'nontty', 'true', 'commandserver')
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195
Idan Kamara
cmdserver: restore old working dir after dispatch when we have --cwd
r14864 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.cout, self.cerr)
Yuya Nishihara
cmdserver: mask return code of runcommand in the same way as dispatch.run...
r20631 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Idan Kamara
cmdserver: restore old working dir after dispatch when we have --cwd
r14864 # restore old cwd
if '--cwd' in args:
os.chdir(self.cwd)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.cresult.write(struct.pack('>i', int(ret)))
def getencoding(self):
""" writes the current encoding to the result channel """
self.cresult.write(encoding.encoding)
def serveone(self):
cmd = self.client.readline()[:-1]
if cmd:
handler = self.capabilities.get(cmd)
if handler:
handler(self)
else:
# clients are expected to check what commands are supported by
# looking at the servers capabilities
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('unknown command %s') % cmd)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
return cmd != ''
capabilities = {'runcommand' : runcommand,
'getencoding' : getencoding}
def serve(self):
Mads Kiilerich
commandserver: report capabilities sorted
r18359 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
Idan Kamara
cmdserver: write the hello message as one chunk on the 'o' channel...
r14719 hellomsg += '\n'
hellomsg += 'encoding: ' + encoding.encoding
Yuya Nishihara
cmdserver: include pid of server handling requests in hello message...
r23036 hellomsg += '\n'
timeless
util: enable getpid to be replaced...
r28027 hellomsg += 'pid: %d' % util.getpid()
Jun Wu
commandserver: send pgid in hello message...
r29580 if util.safehasattr(os, 'getpgid'):
hellomsg += '\n'
hellomsg += 'pgid: %d' % os.getpgid(0)
Idan Kamara
cmdserver: write the hello message as one chunk on the 'o' channel...
r14719
# write the hello msg in -one- chunk
self.cout.write(hellomsg)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
try:
while self.serveone():
pass
except EOFError:
# we'll get here if the client disconnected while we were reading
# its request
return 1
return 0
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988
Yuya Nishihara
cmdserver: protect pipe server streams against corruption caused by direct io...
r23324 def _protectio(ui):
""" duplicates streams and redirect original to null if ui uses stdio """
ui.flush()
newfiles = []
nullfd = os.open(os.devnull, os.O_RDWR)
for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
(ui.fout, sys.stdout, 'wb')]:
if f is sysf:
newfd = os.dup(f.fileno())
os.dup2(nullfd, f.fileno())
f = os.fdopen(newfd, mode)
newfiles.append(f)
os.close(nullfd)
return tuple(newfiles)
def _restoreio(ui, fin, fout):
""" restores streams from duplicated ones """
ui.flush()
for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
if f is not uif:
os.dup2(f.fileno(), uif.fileno())
f.close()
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988 class pipeservice(object):
def __init__(self, ui, repo, opts):
Yuya Nishihara
cmdserver: postpone creation of pipe server until run()...
r23323 self.ui = ui
self.repo = repo
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988
def init(self):
pass
def run(self):
Yuya Nishihara
cmdserver: postpone creation of pipe server until run()...
r23323 ui = self.ui
Yuya Nishihara
cmdserver: protect pipe server streams against corruption caused by direct io...
r23324 # redirect stdio to null device so that broken extensions or in-process
# hooks will never cause corruption of channel protocol.
fin, fout = _protectio(ui)
try:
sv = server(ui, self.repo, fin, fout)
return sv.serve()
finally:
Yuya Nishihara
commandserver: promote .cleanup() hook from chgserver...
r29512 sv.cleanup()
Yuya Nishihara
cmdserver: protect pipe server streams against corruption caused by direct io...
r23324 _restoreio(ui, fin, fout)
Yuya Nishihara
cmdserver: switch service objects by mode...
r22989
Yuya Nishihara
commandserver: extract function that serves for the current connection...
r29543 def _serverequest(ui, repo, conn, createcmdserver):
if True: # TODO: unindent
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 # use a different process group from the master process, making this
# process pass kernel "is_current_pgrp_orphaned" check so signals like
# SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
os.setpgid(0, 0)
# change random state otherwise forked request handlers would have a
# same state inherited from parent.
random.seed()
Yuya Nishihara
commandserver: manually create file objects from socket...
r29542
fin = conn.makefile('rb')
fout = conn.makefile('wb')
Yuya Nishihara
cmdserver: write early exception to 'e' channel in 'unix' mode...
r28511 sv = None
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 try:
Yuya Nishihara
commandserver: extract function that serves for the current connection...
r29543 sv = createcmdserver(repo, conn, fin, fout)
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 try:
sv.serve()
# handle exceptions that may be raised by command server. most of
# known exceptions are caught by dispatch.
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 except error.Abort as inst:
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 ui.warn(_('abort: %s\n') % inst)
Gregory Szorc
global: mass rewrite to use modern exception syntax...
r25660 except IOError as inst:
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 if inst.errno != errno.EPIPE:
raise
except KeyboardInterrupt:
pass
Yuya Nishihara
commandserver: promote .cleanup() hook from chgserver...
r29512 finally:
sv.cleanup()
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 except: # re-raises
# also write traceback to error channel. otherwise client cannot
# see it because it is written to server's stderr by default.
Yuya Nishihara
cmdserver: write early exception to 'e' channel in 'unix' mode...
r28511 if sv:
cerr = sv.cerr
else:
Yuya Nishihara
commandserver: manually create file objects from socket...
r29542 cerr = channeledoutput(fout, 'e')
Yuya Nishihara
cmdserver: write early exception to 'e' channel in 'unix' mode...
r28511 traceback.print_exc(file=cerr)
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 raise
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 finally:
Yuya Nishihara
commandserver: manually create file objects from socket...
r29542 fin.close()
try:
fout.close() # implicit flush() may cause another EPIPE
except IOError as inst:
if inst.errno != errno.EPIPE:
raise
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 # trigger __del__ since ForkingMixIn uses os._exit
gc.collect()
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 class unixservicehandler(object):
"""Set of pluggable operations for unix-mode services
Almost all methods except for createcmdserver() are called in the main
process. You can't pass mutable resource back from createcmdserver().
"""
pollinterval = None
def __init__(self, ui):
self.ui = ui
def bindsocket(self, sock, address):
util.bindunixsocket(sock, address)
def unlinksocket(self, address):
os.unlink(address)
def printbanner(self, address):
self.ui.status(_('listening at %s\n') % address)
self.ui.flush() # avoid buffering of status message
def shouldexit(self):
"""True if server should shut down; checked per pollinterval"""
return False
def newconnection(self):
"""Called when main process notices new connection"""
pass
def createcmdserver(self, repo, conn, fin, fout):
"""Create new command server instance; called in the process that
serves for the current connection"""
return server(self.ui, repo, fin, fout)
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548 class unixforkingservice(object):
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 """
Listens on unix domain socket and forks server per connection
"""
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548
def __init__(self, ui, repo, opts, handler=None):
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 self.ui = ui
self.repo = repo
self.address = opts['address']
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548 if not util.safehasattr(socket, 'AF_UNIX'):
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('unsupported platform'))
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 if not self.address:
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('no socket path specified with --address'))
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._servicehandler = handler or unixservicehandler(ui)
self._sock = None
self._oldsigchldhandler = None
self._workerpids = set() # updated by signal handler; do not iterate
def init(self):
self._sock = socket.socket(socket.AF_UNIX)
self._servicehandler.bindsocket(self._sock, self.address)
self._sock.listen(5)
o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
self._oldsigchldhandler = o
self._servicehandler.printbanner(self.address)
def _cleanup(self):
signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
self._sock.close()
self._servicehandler.unlinksocket(self.address)
# don't kill child processes as they have active clients, just wait
self._reapworkers(0)
def run(self):
try:
self._mainloop()
finally:
self._cleanup()
def _mainloop(self):
h = self._servicehandler
while not h.shouldexit():
try:
ready = select.select([self._sock], [], [], h.pollinterval)[0]
if not ready:
continue
conn, _addr = self._sock.accept()
except (select.error, socket.error) as inst:
if inst.args[0] == errno.EINTR:
continue
raise
pid = os.fork()
if pid:
try:
self.ui.debug('forked worker process (pid=%d)\n' % pid)
self._workerpids.add(pid)
h.newconnection()
finally:
conn.close() # release handle in parent process
else:
try:
self._serveworker(conn)
conn.close()
os._exit(0)
except: # never return, hence no re-raises
try:
self.ui.traceback(force=True)
finally:
os._exit(255)
def _sigchldhandler(self, signal, frame):
self._reapworkers(os.WNOHANG)
def _reapworkers(self, options):
while self._workerpids:
try:
pid, _status = os.waitpid(-1, options)
except OSError as inst:
if inst.errno == errno.EINTR:
continue
if inst.errno != errno.ECHILD:
raise
# no child processes at all (reaped by other waitpid()?)
self._workerpids.clear()
return
if pid == 0:
# no waitable child processes
return
self.ui.debug('worker process exited (pid=%d)\n' % pid)
self._workerpids.discard(pid)
def _serveworker(self, conn):
signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
h = self._servicehandler
_serverequest(self.ui, self.repo, conn, h.createcmdserver)
Yuya Nishihara
cmdserver: switch service objects by mode...
r22989 _servicemap = {
'pipe': pipeservice,
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 'unix': unixforkingservice,
Yuya Nishihara
cmdserver: switch service objects by mode...
r22989 }
def createservice(ui, repo, opts):
mode = opts['cmdserver']
try:
return _servicemap[mode](ui, repo, opts)
except KeyError:
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('unknown mode %s') % mode)