##// END OF EJS Templates
import-checker: allow 'from typing import ...'...
import-checker: allow 'from typing import ...' Suppresses the following error in test-check-module-imports.t: mercurial/encoding.py:24: relative import of stdlib module

File last commit:

r43812:2fe6121c default
r43995:0ad5d6c4 default
Show More
commandserver.py
727 lines | 22.6 KiB | text/x-python | PythonLexer
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 # commandserver.py - communicate with Mercurial's API over a pipe
#
# Copyright Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Yuya Nishihara
commandserver: use absolute_import
r27351 from __future__ import absolute_import
import errno
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 import gc
Yuya Nishihara
commandserver: use absolute_import
r27351 import os
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 import random
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 import signal
import socket
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 import struct
Yuya Nishihara
commandserver: use absolute_import
r27351 import traceback
Augie Fackler
commandserver: prefer first-party selectors module from Python 3 to backport...
r36958 try:
import selectors
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
commandserver: prefer first-party selectors module from Python 3 to backport...
r36958 selectors.BaseSelector
except ImportError:
from .thirdparty import selectors2 as selectors
Yuya Nishihara
commandserver: use absolute_import
r27351 from .i18n import _
Gregory Szorc
py3: manually import getattr where it is needed...
r43359 from .pycompat import getattr
Yuya Nishihara
commandserver: use absolute_import
r27351 from . import (
encoding,
error,
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 loggingutil,
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 pycompat,
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 repocache,
Yuya Nishihara
commandserver: use absolute_import
r27351 util,
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 vfs as vfsmod,
Yuya Nishihara
commandserver: use absolute_import
r27351 )
Yuya Nishihara
procutil: bulk-replace util.std* to point to new module
r37137 from .utils import (
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 cborutil,
Yuya Nishihara
procutil: bulk-replace util.std* to point to new module
r37137 procutil,
)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Augie Fackler
formatting: blacken the codebase...
r43346
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 class channeledoutput(object):
"""
Yuya Nishihara
cmdserver: correct doc of channeledoutput...
r22561 Write data to out in the following format:
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
data length (unsigned int),
data
"""
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
cmdserver: drop useless in_ attribute from channeledoutput...
r22563 def __init__(self, out, channel):
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.out = out
self.channel = channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415 @property
def name(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'<%c-channel>' % self.channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def write(self, data):
if not data:
return
Yuya Nishihara
cmdserver: write channel header and payload by a single write() call...
r30263 # single write() to guarantee the same atomicity as the underlying file
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.out.flush()
def __getattr__(self, attr):
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 if attr in ('isatty', 'fileno', 'tell', 'seek'):
Augie Fackler
commandserver: clean up use of two-argument raise...
r18174 raise AttributeError(attr)
Yuya Nishihara
cmdserver: drop useless in_ attribute from channeledoutput...
r22563 return getattr(self.out, attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 class channeledmessage(object):
"""
Write encoded message and metadata to out in the following format:
data length (unsigned int),
encoded message and metadata, as a flat key-value dict.
Yuya Nishihara
ui: extract helpers to write message with type or label...
r40626
Each message should have 'type' attribute. Messages of unknown type
should be ignored.
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 """
# teach ui that write() can take **opts
structured = True
def __init__(self, out, channel, encodename, encodefn):
self._cout = channeledoutput(out, channel)
self.encoding = encodename
self._encodefn = encodefn
def write(self, data, **opts):
opts = pycompat.byteskwargs(opts)
Yuya Nishihara
commandserver: send raw progress information to message channel...
r40630 if data is not None:
opts[b'data'] = data
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 self._cout.write(self._encodefn(opts))
def __getattr__(self, attr):
return getattr(self._cout, attr)
Augie Fackler
formatting: blacken the codebase...
r43346
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 class channeledinput(object):
"""
Read data from in_.
Requests for input are written to out in the following format:
channel identifier - 'I' for plain input, 'L' line based (1 byte)
how many bytes to send at most (unsigned int),
The client replies with:
data length (unsigned int), 0 meaning EOF
data
"""
maxchunksize = 4 * 1024
def __init__(self, in_, out, channel):
self.in_ = in_
self.out = out
self.channel = channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415 @property
def name(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b'<%c-channel>' % self.channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def read(self, size=-1):
if size < 0:
# if we need to consume all the clients input, ask for 4k chunks
# so the pipe doesn't fill up risking a deadlock
size = self.maxchunksize
s = self._read(size, self.channel)
buf = s
while s:
Idan Kamara
cmdserver: fix read-loop string concatenation
r14728 s = self._read(size, self.channel)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf += s
return buf
else:
return self._read(size, self.channel)
def _read(self, size, channel):
if not size:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b''
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 assert size > 0
# tell the client we need at most size bytes
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.out.write(struct.pack(b'>cI', channel, size))
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.out.flush()
length = self.in_.read(4)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 length = struct.unpack(b'>I', length)[0]
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 if not length:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b''
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 else:
return self.in_.read(length)
def readline(self, size=-1):
if size < 0:
size = self.maxchunksize
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 s = self._read(size, b'L')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf = s
# keep asking for more until there's either no more or
# we got a full line
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 while s and s[-1] != b'\n':
s = self._read(size, b'L')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf += s
return buf
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return self._read(size, b'L')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
def __iter__(self):
return self
def next(self):
l = self.readline()
if not l:
raise StopIteration
return l
Yuya Nishihara
py3: alias next to __next__ in commandserver.py
r40394 __next__ = next
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def __getattr__(self, attr):
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 if attr in ('isatty', 'fileno', 'tell', 'seek'):
Augie Fackler
commandserver: clean up use of two-argument raise...
r18174 raise AttributeError(attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 return getattr(self.in_, attr)
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 _messageencoders = {
b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
}
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 def _selectmessageencoder(ui):
# experimental config: cmdserver.message-encodings
encnames = ui.configlist(b'cmdserver', b'message-encodings')
for n in encnames:
f = _messageencoders.get(n)
if f:
return n, f
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
b'no supported message encodings: %s' % b' '.join(encnames)
)
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 class server(object):
"""
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 Listens for commands on fin, runs them and writes the output on a channel
based stream to fout.
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 """
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 def __init__(self, ui, repo, fin, fout, prereposetups=None):
Matt Harbison
py3: rename pycompat.getcwd() to encoding.getcwd() (API)...
r39843 self.cwd = encoding.getcwd()
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 if repo:
# the ui here is really the repo ui so take its baseui so we don't
# end up with its local configuration
self.ui = repo.baseui
self.repo = repo
self.repoui = repo.ui
else:
self.ui = ui
self.repo = self.repoui = None
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 self._prereposetups = prereposetups
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.cdebug = channeledoutput(fout, b'd')
self.cerr = channeledoutput(fout, b'e')
self.cout = channeledoutput(fout, b'o')
self.cin = channeledinput(fin, fout, b'I')
self.cresult = channeledoutput(fout, b'r')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 if self.ui.config(b'cmdserver', b'log') == b'-':
# switch log stream of server's ui to the 'd' (debug) channel
# (don't touch repo.ui as its lifetime is longer than the server)
self.ui = self.ui.copy()
setuplogging(self.ui, repo=None, fp=self.cdebug)
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 # TODO: add this to help/config.txt when stabilized
# ``channel``
# Use separate channel for structured output. (Command-server only)
self.cmsg = None
if ui.config(b'ui', b'message-output') == b'channel':
encname, encfn = _selectmessageencoder(ui)
self.cmsg = channeledmessage(fout, b'm', encname, encfn)
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 self.client = fin
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: promote .cleanup() hook from chgserver...
r29512 def cleanup(self):
"""release and restore resources taken during server session"""
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def _read(self, size):
Idan Kamara
cmdserver: don't raise EOFError when trying to read 0 bytes from the client
r14706 if not size:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b''
Idan Kamara
cmdserver: don't raise EOFError when trying to read 0 bytes from the client
r14706
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 data = self.client.read(size)
# is the other end closed?
if not data:
Brodie Rao
cleanup: "raise SomeException()" -> "raise SomeException"
r16687 raise EOFError
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
return data
Jun Wu
commandserver: add _readstr and _readlist...
r28156 def _readstr(self):
"""read a string from the channel
format:
data length (uint32), data
"""
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 length = struct.unpack(b'>I', self._read(4))[0]
Jun Wu
commandserver: add _readstr and _readlist...
r28156 if not length:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b''
Jun Wu
commandserver: add _readstr and _readlist...
r28156 return self._read(length)
def _readlist(self):
"""read a list of NULL separated strings from the channel"""
s = self._readstr()
if s:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return s.split(b'\0')
Jun Wu
commandserver: add _readstr and _readlist...
r28156 else:
return []
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def runcommand(self):
""" reads a list of \0 terminated arguments, executes
and writes the return code to the result channel """
Yuya Nishihara
commandserver: cut import cycle by itself...
r27352 from . import dispatch # avoid cycle
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Jun Wu
commandserver: use _readlist...
r28157 args = self._readlist()
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Idan Kamara
cmdserver: copy repo.ui before running commands
r14750 # copy the uis so changes (e.g. --config or --verbose) don't
# persist between requests
Idan Kamara
cmdserver: assign repo.baseui before running commands...
r14751 copiedui = self.ui.copy()
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 uis = [copiedui]
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 if self.repo:
self.repo.baseui = copiedui
# clone ui without using ui.copy because this is protected
repoui = self.repoui.__class__(self.repoui)
Augie Fackler
formatting: blacken the codebase...
r43346 repoui.copy = copiedui.copy # redo copy protection
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 uis.append(repoui)
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 self.repo.ui = self.repo.dirstate._ui = repoui
self.repo.invalidateall()
Idan Kamara
cmdserver: assign repo.baseui before running commands...
r14751
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 for ui in uis:
Yuya Nishihara
ui: provide official way to reset internal state per command...
r29366 ui.resetstate()
Yuya Nishihara
commandserver: do not set nontty flag if channel is replaced by a real file...
r27565 # any kind of interaction must use server channels, but chg may
# replace channels by fully functional tty files. so nontty is
# enforced only if cin is a channel.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not util.safehasattr(self.cin, b'fileno'):
ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195
Augie Fackler
formatting: blacken the codebase...
r43346 req = dispatch.request(
args[:],
copiedui,
self.repo,
self.cin,
self.cout,
self.cerr,
self.cmsg,
prereposetups=self._prereposetups,
)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Gregory Szorc
commandserver: restore cwd in case of exception...
r35670 try:
Yuya Nishihara
dispatch: unify handling of None returned by a command function...
r38015 ret = dispatch.dispatch(req) & 255
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.cresult.write(struct.pack(b'>i', int(ret)))
Gregory Szorc
commandserver: restore cwd in case of exception...
r35670 finally:
# restore old cwd
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if b'--cwd' in args:
Gregory Szorc
commandserver: restore cwd in case of exception...
r35670 os.chdir(self.cwd)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
def getencoding(self):
""" writes the current encoding to the result channel """
self.cresult.write(encoding.encoding)
def serveone(self):
cmd = self.client.readline()[:-1]
if cmd:
handler = self.capabilities.get(cmd)
if handler:
handler(self)
else:
# clients are expected to check what commands are supported by
# looking at the servers capabilities
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.Abort(_(b'unknown command %s') % cmd)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return cmd != b''
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
def serve(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
hellomsg += b'\n'
hellomsg += b'encoding: ' + encoding.encoding
hellomsg += b'\n'
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 if self.cmsg:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
hellomsg += b'pid: %d' % procutil.getpid()
if util.safehasattr(os, b'getpgid'):
hellomsg += b'\n'
hellomsg += b'pgid: %d' % os.getpgid(0)
Idan Kamara
cmdserver: write the hello message as one chunk on the 'o' channel...
r14719
# write the hello msg in -one- chunk
self.cout.write(hellomsg)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
try:
while self.serveone():
pass
except EOFError:
# we'll get here if the client disconnected while we were reading
# its request
return 1
return 0
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 def setuplogging(ui, repo=None, fp=None):
Yuya Nishihara
commandserver: enable logging when server process started...
r40858 """Set up server logging facility
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 If cmdserver.log is '-', log messages will be sent to the given fp.
It should be the 'd' channel while a client is connected, and otherwise
is the stderr of the server process.
Yuya Nishihara
commandserver: enable logging when server process started...
r40858 """
# developer config: cmdserver.log
logpath = ui.config(b'cmdserver', b'log')
if not logpath:
return
Yuya Nishihara
commandserver: add config knob for various logging options...
r40862 # developer config: cmdserver.track-log
tracked = set(ui.configlist(b'cmdserver', b'track-log'))
Yuya Nishihara
commandserver: enable logging when server process started...
r40858
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 if logpath == b'-' and fp:
logger = loggingutil.fileobjectlogger(fp, tracked)
elif logpath == b'-':
logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
else:
Yuya Nishihara
commandserver: expand log path for convenience...
r40861 logpath = os.path.abspath(util.expandpath(logpath))
Yuya Nishihara
commandserver: add config knob for various logging options...
r40862 # developer config: cmdserver.max-log-files
maxfiles = ui.configint(b'cmdserver', b'max-log-files')
# developer config: cmdserver.max-log-size
maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 vfs = vfsmod.vfs(os.path.dirname(logpath))
Augie Fackler
formatting: blacken the codebase...
r43346 logger = loggingutil.filelogger(
vfs,
os.path.basename(logpath),
tracked,
maxfiles=maxfiles,
maxsize=maxsize,
)
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859
targetuis = {ui}
if repo:
targetuis.add(repo.baseui)
targetuis.add(repo.ui)
for u in targetuis:
u.setlogger(b'cmdserver', logger)
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988 class pipeservice(object):
def __init__(self, ui, repo, opts):
Yuya Nishihara
cmdserver: postpone creation of pipe server until run()...
r23323 self.ui = ui
self.repo = repo
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988
def init(self):
pass
def run(self):
Yuya Nishihara
cmdserver: postpone creation of pipe server until run()...
r23323 ui = self.ui
Yuya Nishihara
cmdserver: protect pipe server streams against corruption caused by direct io...
r23324 # redirect stdio to null device so that broken extensions or in-process
# hooks will never cause corruption of channel protocol.
Yuya Nishihara
ui: move protectedstdio() context manager from procutil...
r41320 with ui.protectedfinout() as (fin, fout):
Yuya Nishihara
commandserver: fix reference before assignment error in pipeservice cleanup...
r40624 sv = server(ui, self.repo, fin, fout)
Yuya Nishihara
procutil: introduce context-manager interface for protect/restorestdio...
r37142 try:
return sv.serve()
finally:
sv.cleanup()
Yuya Nishihara
cmdserver: switch service objects by mode...
r22989
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 def _initworkerprocess():
Jun Wu
commandserver: update comment about setpgid...
r29609 # use a different process group from the master process, in order to:
# 1. make the current process group no longer "orphaned" (because the
# parent of this process is in a different process group while
# remains in a same session)
# according to POSIX 2.2.2.52, orphaned process group will ignore
# terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
# cause trouble for things like ncurses.
# 2. the client can use kill(-pgid, sig) to simulate terminal-generated
# SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
# processes like ssh will be killed properly, without affecting
# unrelated processes.
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 os.setpgid(0, 0)
# change random state otherwise forked request handlers would have a
# same state inherited from parent.
random.seed()
Yuya Nishihara
commandserver: manually create file objects from socket...
r29542
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 fin = conn.makefile('rb')
fout = conn.makefile('wb')
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 sv = None
try:
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 try:
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 sv.serve()
# handle exceptions that may be raised by command server. most of
# known exceptions are caught by dispatch.
except error.Abort as inst:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 ui.error(_(b'abort: %s\n') % inst)
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 except IOError as inst:
if inst.errno != errno.EPIPE:
raise
except KeyboardInterrupt:
pass
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 finally:
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 sv.cleanup()
Augie Fackler
formatting: blacken the codebase...
r43346 except: # re-raises
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 # also write traceback to error channel. otherwise client cannot
# see it because it is written to server's stderr by default.
if sv:
cerr = sv.cerr
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 cerr = channeledoutput(fout, b'e')
Yuya Nishihara
py3: don't use traceback.print_exc() in commandserver.py...
r40397 cerr.write(encoding.strtolocal(traceback.format_exc()))
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 raise
finally:
fin.close()
try:
fout.close() # implicit flush() may cause another EPIPE
except IOError as inst:
if inst.errno != errno.EPIPE:
raise
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 class unixservicehandler(object):
"""Set of pluggable operations for unix-mode services
Almost all methods except for createcmdserver() are called in the main
process. You can't pass mutable resource back from createcmdserver().
"""
pollinterval = None
def __init__(self, ui):
self.ui = ui
def bindsocket(self, sock, address):
util.bindunixsocket(sock, address)
Jun Wu
commandserver: move "listen" responsibility from service to handler...
r32232 sock.listen(socket.SOMAXCONN)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.status(_(b'listening at %s\n') % address)
Jun Wu
commandserver: move printbanner logic to bindsocket...
r32233 self.ui.flush() # avoid buffering of status message
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
def unlinksocket(self, address):
os.unlink(address)
def shouldexit(self):
"""True if server should shut down; checked per pollinterval"""
return False
def newconnection(self):
"""Called when main process notices new connection"""
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 """Create new command server instance; called in the process that
serves for the current connection"""
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 return server(self.ui, repo, fin, fout, prereposetups)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548 class unixforkingservice(object):
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 """
Listens on unix domain socket and forks server per connection
"""
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548
def __init__(self, ui, repo, opts, handler=None):
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 self.ui = ui
self.repo = repo
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.address = opts[b'address']
if not util.safehasattr(socket, b'AF_UNIX'):
raise error.Abort(_(b'unsupported platform'))
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 if not self.address:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.Abort(_(b'no socket path specified with --address'))
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._servicehandler = handler or unixservicehandler(ui)
self._sock = None
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._mainipc = None
self._workeripc = None
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._oldsigchldhandler = None
self._workerpids = set() # updated by signal handler; do not iterate
Jun Wu
commandserver: prevent unlink socket twice...
r30887 self._socketunlinked = None
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 # experimental config: cmdserver.max-repo-cache
maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
if maxlen < 0:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader = repocache.repoloader(ui, maxlen)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
def init(self):
self._sock = socket.socket(socket.AF_UNIX)
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 # IPC channel from many workers to one main process; this is actually
# a uni-directional pipe, but is backed by a DGRAM socket so each
# message can be easily separated.
o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
self._mainipc, self._workeripc = o
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._servicehandler.bindsocket(self._sock, self.address)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if util.safehasattr(procutil, b'unblocksignal'):
Yuya Nishihara
procutil: bulk-replace function calls to point to new module
r37138 procutil.unblocksignal(signal.SIGCHLD)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
self._oldsigchldhandler = o
Jun Wu
commandserver: prevent unlink socket twice...
r30887 self._socketunlinked = False
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader.start()
Jun Wu
commandserver: prevent unlink socket twice...
r30887
def _unlinksocket(self):
if not self._socketunlinked:
self._servicehandler.unlinksocket(self.address)
self._socketunlinked = True
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
def _cleanup(self):
signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
self._sock.close()
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._mainipc.close()
self._workeripc.close()
Jun Wu
commandserver: prevent unlink socket twice...
r30887 self._unlinksocket()
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader.stop()
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 # don't kill child processes as they have active clients, just wait
self._reapworkers(0)
def run(self):
try:
self._mainloop()
finally:
self._cleanup()
def _mainloop(self):
Jun Wu
commandserver: handle backlog before exiting...
r30891 exiting = False
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 h = self._servicehandler
Augie Fackler
commandserver: prefer first-party selectors module from Python 3 to backport...
r36958 selector = selectors.DefaultSelector()
Augie Fackler
formatting: blacken the codebase...
r43346 selector.register(
self._sock, selectors.EVENT_READ, self._acceptnewconnection
)
selector.register(
self._mainipc, selectors.EVENT_READ, self._handlemainipc
)
Jun Wu
commandserver: handle backlog before exiting...
r30891 while True:
if not exiting and h.shouldexit():
# clients can no longer connect() to the domain socket, so
# we stop queuing new requests.
# for requests that are queued (connect()-ed, but haven't been
# accept()-ed), handle them before exit. otherwise, clients
# waiting for recv() will receive ECONNRESET.
self._unlinksocket()
exiting = True
Yuya Nishihara
commandserver: get around ETIMEDOUT raised by selectors2...
r40833 try:
Yuya Nishihara
commandserver: loop over selector events...
r40914 events = selector.select(timeout=h.pollinterval)
Yuya Nishihara
commandserver: get around ETIMEDOUT raised by selectors2...
r40833 except OSError as inst:
# selectors2 raises ETIMEDOUT if timeout exceeded while
# handling signal interrupt. That's probably wrong, but
# we can easily get around it.
if inst.errno != errno.ETIMEDOUT:
raise
Yuya Nishihara
commandserver: loop over selector events...
r40914 events = []
if not events:
Jun Wu
commandserver: do not handle EINTR for selector.select...
r33543 # only exit if we completed all queued requests
if exiting:
break
continue
Yuya Nishihara
commandserver: loop over selector events...
r40914 for key, _mask in events:
key.data(key.fileobj, selector)
Yuya Nishihara
commandserver: extract handler of new socket connection...
r40912 selector.close()
def _acceptnewconnection(self, sock, selector):
h = self._servicehandler
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 try:
conn, _addr = sock.accept()
except socket.error as inst:
if inst.args[0] == errno.EINTR:
return
raise
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
# to prevent COW memory from being touched by GC.
# https://instagram-engineering.com/
# copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 pid = os.fork()
if pid:
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 try:
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.log(
b'cmdserver', b'forked worker process (pid=%d)\n', pid
)
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 self._workerpids.add(pid)
h.newconnection()
finally:
conn.close() # release handle in parent process
else:
try:
selector.close()
sock.close()
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._mainipc.close()
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 self._runworker(conn)
conn.close()
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._workeripc.close()
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 os._exit(0)
except: # never return, hence no re-raises
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 try:
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 self.ui.traceback(force=True)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 finally:
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 os._exit(255)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 def _handlemainipc(self, sock, selector):
"""Process messages sent from a worker"""
try:
path = sock.recv(32768) # large enough to receive path
except socket.error as inst:
if inst.args[0] == errno.EINTR:
return
raise
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader.load(path)
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 def _sigchldhandler(self, signal, frame):
self._reapworkers(os.WNOHANG)
def _reapworkers(self, options):
while self._workerpids:
try:
pid, _status = os.waitpid(-1, options)
except OSError as inst:
if inst.errno == errno.EINTR:
continue
if inst.errno != errno.ECHILD:
raise
# no child processes at all (reaped by other waitpid()?)
self._workerpids.clear()
return
if pid == 0:
# no waitable child processes
return
Yuya Nishihara
commandserver: turn server debug messages into logs...
r40863 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._workerpids.discard(pid)
Yuya Nishihara
commandserver: rename _serveworker() to _runworker()...
r29587 def _runworker(self, conn):
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 _initworkerprocess()
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 h = self._servicehandler
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 try:
Augie Fackler
formatting: blacken the codebase...
r43346 _serverequest(
self.ui,
self.repo,
conn,
h.createcmdserver,
prereposetups=[self._reposetup],
)
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 finally:
gc.collect() # trigger __del__ since worker process uses os._exit
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034
def _reposetup(self, ui, repo):
if not repo.local():
return
class unixcmdserverrepo(repo.__class__):
def close(self):
super(unixcmdserverrepo, self).close()
try:
self._cmdserveripc.send(self.root)
except socket.error:
Augie Fackler
formatting: blacken the codebase...
r43346 self.ui.log(
b'cmdserver', b'failed to send repo root to master\n'
)
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034
repo.__class__ = unixcmdserverrepo
repo._cmdserveripc = self._workeripc
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035
cachedrepo = self._repoloader.get(repo.root)
if cachedrepo is None:
return
repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
repocache.copycache(cachedrepo, repo)