##// END OF EJS Templates
discovery: slowly increase sampling size...
discovery: slowly increase sampling size Some pathological discovery runs can requires many roundtrip. When this happens things can get very slow. To make the algorithm more resilience again such pathological case. We slowly increase the sample size with each roundtrip (+5%). This will have a negligible impact on "normal" discovery with few roundtrips, but a large positive impact of case with many roundtrips. Asking more question per roundtrip helps to reduce the undecided set faster. Instead of reducing the undecided set a linear speed (in the worst case), we reduce it as a guaranteed (small) exponential rate. The data below show this slow ramp up in sample size: round trip | 1 | 5 | 10 | 20 | 50 | 100 | 130 | sample size | 200 | 254 | 321 | 517 | 2 199 | 25 123 | 108 549 | covered nodes | 200 | 1 357 | 2 821 | 7 031 | 42 658 | 524 530 | 2 276 755 | To be a bit more concrete, lets take a very pathological case as an example. We are doing discovery from a copy of Mozilla-try to a more recent version of mozilla-unified. Mozilla-unified heads are unknown to the mozilla-try repo and there are over 1 million "missing" changesets. (the discovery is "local" to avoid network interference) Without this change, the discovery: - last 1858 seconds (31 minutes), - does 1700 round trip, - asking about 340 000 nodes. With this change, the discovery: - last 218 seconds (3 minutes, 38 seconds a -88% improvement), - does 94 round trip (-94%), - asking about 344 211 nodes (+1%). Of course, this is an extreme case (and 3 minutes is still slow). However this give a good example of how this sample size increase act as a safety net catching any bad situations. We could image a steeper increase than 5%. For example 10% would give the following number: round trip | 1 | 5 | 10 | 20 | 50 | 75 | 100 | sample size | 200 | 321 | 514 | 1 326 | 23 060 | 249 812 | 2 706 594 | covered nodes | 200 | 1 541 | 3 690 | 12 671 | 251 871 | 2 746 254 | 29 770 966 | In parallel, it is useful to understand these pathological cases and improve them. However the current change provides a general purpose safety net to smooth the impact of pathological cases. To avoid issue with older http server, the increase in sample size only occurs if the protocol has not limit on command argument size.

File last commit:

r41320:b0e3f2d7 default
r42546:dbd0fcca default
Show More
commandserver.py
690 lines | 22.4 KiB | text/x-python | PythonLexer
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 # commandserver.py - communicate with Mercurial's API over a pipe
#
# Copyright Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Yuya Nishihara
commandserver: use absolute_import
r27351 from __future__ import absolute_import
import errno
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 import gc
Yuya Nishihara
commandserver: use absolute_import
r27351 import os
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 import random
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 import signal
import socket
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 import struct
Yuya Nishihara
commandserver: use absolute_import
r27351 import traceback
Augie Fackler
commandserver: prefer first-party selectors module from Python 3 to backport...
r36958 try:
import selectors
selectors.BaseSelector
except ImportError:
from .thirdparty import selectors2 as selectors
Yuya Nishihara
commandserver: use absolute_import
r27351 from .i18n import _
from . import (
encoding,
error,
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 loggingutil,
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 pycompat,
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 repocache,
Yuya Nishihara
commandserver: use absolute_import
r27351 util,
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 vfs as vfsmod,
Yuya Nishihara
commandserver: use absolute_import
r27351 )
Yuya Nishihara
procutil: bulk-replace util.std* to point to new module
r37137 from .utils import (
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 cborutil,
Yuya Nishihara
procutil: bulk-replace util.std* to point to new module
r37137 procutil,
)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
class channeledoutput(object):
"""
Yuya Nishihara
cmdserver: correct doc of channeledoutput...
r22561 Write data to out in the following format:
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
data length (unsigned int),
data
"""
Yuya Nishihara
cmdserver: drop useless in_ attribute from channeledoutput...
r22563 def __init__(self, out, channel):
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.out = out
self.channel = channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415 @property
def name(self):
return '<%c-channel>' % self.channel
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def write(self, data):
if not data:
return
Yuya Nishihara
cmdserver: write channel header and payload by a single write() call...
r30263 # single write() to guarantee the same atomicity as the underlying file
self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 self.out.flush()
def __getattr__(self, attr):
Yuya Nishihara
py3: system-stringify list of attributes to be forwarded from commandserver.py...
r40393 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
Augie Fackler
commandserver: clean up use of two-argument raise...
r18174 raise AttributeError(attr)
Yuya Nishihara
cmdserver: drop useless in_ attribute from channeledoutput...
r22563 return getattr(self.out, attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 class channeledmessage(object):
"""
Write encoded message and metadata to out in the following format:
data length (unsigned int),
encoded message and metadata, as a flat key-value dict.
Yuya Nishihara
ui: extract helpers to write message with type or label...
r40626
Each message should have 'type' attribute. Messages of unknown type
should be ignored.
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 """
# teach ui that write() can take **opts
structured = True
def __init__(self, out, channel, encodename, encodefn):
self._cout = channeledoutput(out, channel)
self.encoding = encodename
self._encodefn = encodefn
def write(self, data, **opts):
opts = pycompat.byteskwargs(opts)
Yuya Nishihara
commandserver: send raw progress information to message channel...
r40630 if data is not None:
opts[b'data'] = data
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 self._cout.write(self._encodefn(opts))
def __getattr__(self, attr):
return getattr(self._cout, attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 class channeledinput(object):
"""
Read data from in_.
Requests for input are written to out in the following format:
channel identifier - 'I' for plain input, 'L' line based (1 byte)
how many bytes to send at most (unsigned int),
The client replies with:
data length (unsigned int), 0 meaning EOF
data
"""
maxchunksize = 4 * 1024
def __init__(self, in_, out, channel):
self.in_ = in_
self.out = out
self.channel = channel
Yuya Nishihara
commandserver: implement name() to clarify channel is not a plain file...
r27415 @property
def name(self):
return '<%c-channel>' % self.channel
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def read(self, size=-1):
if size < 0:
# if we need to consume all the clients input, ask for 4k chunks
# so the pipe doesn't fill up risking a deadlock
size = self.maxchunksize
s = self._read(size, self.channel)
buf = s
while s:
Idan Kamara
cmdserver: fix read-loop string concatenation
r14728 s = self._read(size, self.channel)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf += s
return buf
else:
return self._read(size, self.channel)
def _read(self, size, channel):
if not size:
return ''
assert size > 0
# tell the client we need at most size bytes
self.out.write(struct.pack('>cI', channel, size))
self.out.flush()
length = self.in_.read(4)
length = struct.unpack('>I', length)[0]
if not length:
return ''
else:
return self.in_.read(length)
def readline(self, size=-1):
if size < 0:
size = self.maxchunksize
s = self._read(size, 'L')
buf = s
# keep asking for more until there's either no more or
# we got a full line
while s and s[-1] != '\n':
Idan Kamara
cmdserver: fix read-loop string concatenation
r14728 s = self._read(size, 'L')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 buf += s
return buf
else:
return self._read(size, 'L')
def __iter__(self):
return self
def next(self):
l = self.readline()
if not l:
raise StopIteration
return l
Yuya Nishihara
py3: alias next to __next__ in commandserver.py
r40394 __next__ = next
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def __getattr__(self, attr):
Yuya Nishihara
py3: system-stringify list of attributes to be forwarded from commandserver.py...
r40393 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
Augie Fackler
commandserver: clean up use of two-argument raise...
r18174 raise AttributeError(attr)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 return getattr(self.in_, attr)
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 _messageencoders = {
b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
}
def _selectmessageencoder(ui):
# experimental config: cmdserver.message-encodings
encnames = ui.configlist(b'cmdserver', b'message-encodings')
for n in encnames:
f = _messageencoders.get(n)
if f:
return n, f
raise error.Abort(b'no supported message encodings: %s'
% b' '.join(encnames))
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 class server(object):
"""
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 Listens for commands on fin, runs them and writes the output on a channel
based stream to fout.
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 """
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 def __init__(self, ui, repo, fin, fout, prereposetups=None):
Matt Harbison
py3: rename pycompat.getcwd() to encoding.getcwd() (API)...
r39843 self.cwd = encoding.getcwd()
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 if repo:
# the ui here is really the repo ui so take its baseui so we don't
# end up with its local configuration
self.ui = repo.baseui
self.repo = repo
self.repoui = repo.ui
else:
self.ui = ui
self.repo = self.repoui = None
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 self._prereposetups = prereposetups
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: switch logging facility to ui.log() interface...
r40860 self.cdebug = channeledoutput(fout, 'd')
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 self.cerr = channeledoutput(fout, 'e')
self.cout = channeledoutput(fout, 'o')
self.cin = channeledinput(fin, fout, 'I')
self.cresult = channeledoutput(fout, 'r')
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 if self.ui.config(b'cmdserver', b'log') == b'-':
# switch log stream of server's ui to the 'd' (debug) channel
# (don't touch repo.ui as its lifetime is longer than the server)
self.ui = self.ui.copy()
setuplogging(self.ui, repo=None, fp=self.cdebug)
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 # TODO: add this to help/config.txt when stabilized
# ``channel``
# Use separate channel for structured output. (Command-server only)
self.cmsg = None
if ui.config(b'ui', b'message-output') == b'channel':
encname, encfn = _selectmessageencoder(ui)
self.cmsg = channeledmessage(fout, b'm', encname, encfn)
Yuya Nishihara
cmdserver: make server streams switchable...
r22990 self.client = fin
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Yuya Nishihara
commandserver: promote .cleanup() hook from chgserver...
r29512 def cleanup(self):
"""release and restore resources taken during server session"""
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def _read(self, size):
Idan Kamara
cmdserver: don't raise EOFError when trying to read 0 bytes from the client
r14706 if not size:
return ''
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 data = self.client.read(size)
# is the other end closed?
if not data:
Brodie Rao
cleanup: "raise SomeException()" -> "raise SomeException"
r16687 raise EOFError
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
return data
Jun Wu
commandserver: add _readstr and _readlist...
r28156 def _readstr(self):
"""read a string from the channel
format:
data length (uint32), data
"""
length = struct.unpack('>I', self._read(4))[0]
if not length:
return ''
return self._read(length)
def _readlist(self):
"""read a list of NULL separated strings from the channel"""
s = self._readstr()
if s:
return s.split('\0')
else:
return []
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647 def runcommand(self):
""" reads a list of \0 terminated arguments, executes
and writes the return code to the result channel """
Yuya Nishihara
commandserver: cut import cycle by itself...
r27352 from . import dispatch # avoid cycle
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Jun Wu
commandserver: use _readlist...
r28157 args = self._readlist()
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Idan Kamara
cmdserver: copy repo.ui before running commands
r14750 # copy the uis so changes (e.g. --config or --verbose) don't
# persist between requests
Idan Kamara
cmdserver: assign repo.baseui before running commands...
r14751 copiedui = self.ui.copy()
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 uis = [copiedui]
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 if self.repo:
self.repo.baseui = copiedui
# clone ui without using ui.copy because this is protected
repoui = self.repoui.__class__(self.repoui)
repoui.copy = copiedui.copy # redo copy protection
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 uis.append(repoui)
Yuya Nishihara
cmdserver: allow to start server without repository...
r20650 self.repo.ui = self.repo.dirstate._ui = repoui
self.repo.invalidateall()
Idan Kamara
cmdserver: assign repo.baseui before running commands...
r14751
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195 for ui in uis:
Yuya Nishihara
ui: provide official way to reset internal state per command...
r29366 ui.resetstate()
Yuya Nishihara
commandserver: do not set nontty flag if channel is replaced by a real file...
r27565 # any kind of interaction must use server channels, but chg may
# replace channels by fully functional tty files. so nontty is
# enforced only if cin is a channel.
if not util.safehasattr(self.cin, 'fileno'):
ui.setconfig('ui', 'nontty', 'true', 'commandserver')
Yuya Nishihara
cmdserver: forcibly use L channel to read password input (issue3161)...
r21195
Idan Kamara
cmdserver: restore old working dir after dispatch when we have --cwd
r14864 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 self.cout, self.cerr, self.cmsg,
prereposetups=self._prereposetups)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
Gregory Szorc
commandserver: restore cwd in case of exception...
r35670 try:
Yuya Nishihara
dispatch: unify handling of None returned by a command function...
r38015 ret = dispatch.dispatch(req) & 255
Gregory Szorc
commandserver: restore cwd in case of exception...
r35670 self.cresult.write(struct.pack('>i', int(ret)))
finally:
# restore old cwd
if '--cwd' in args:
os.chdir(self.cwd)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
def getencoding(self):
""" writes the current encoding to the result channel """
self.cresult.write(encoding.encoding)
def serveone(self):
cmd = self.client.readline()[:-1]
if cmd:
handler = self.capabilities.get(cmd)
if handler:
handler(self)
else:
# clients are expected to check what commands are supported by
# looking at the servers capabilities
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('unknown command %s') % cmd)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
return cmd != ''
Alex Gaynor
style: never use a space before a colon or comma...
r34487 capabilities = {'runcommand': runcommand,
'getencoding': getencoding}
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
def serve(self):
Mads Kiilerich
commandserver: report capabilities sorted
r18359 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
Idan Kamara
cmdserver: write the hello message as one chunk on the 'o' channel...
r14719 hellomsg += '\n'
hellomsg += 'encoding: ' + encoding.encoding
Yuya Nishihara
cmdserver: include pid of server handling requests in hello message...
r23036 hellomsg += '\n'
Yuya Nishihara
commandserver: add experimental option to use separate message channel...
r40625 if self.cmsg:
hellomsg += 'message-encoding: %s\n' % self.cmsg.encoding
Yuya Nishihara
procutil: bulk-replace function calls to point to new module
r37138 hellomsg += 'pid: %d' % procutil.getpid()
Jun Wu
commandserver: send pgid in hello message...
r29580 if util.safehasattr(os, 'getpgid'):
hellomsg += '\n'
hellomsg += 'pgid: %d' % os.getpgid(0)
Idan Kamara
cmdserver: write the hello message as one chunk on the 'o' channel...
r14719
# write the hello msg in -one- chunk
self.cout.write(hellomsg)
Idan Kamara
serve: add --cmdserver option to communicate with hg over a pipe
r14647
try:
while self.serveone():
pass
except EOFError:
# we'll get here if the client disconnected while we were reading
# its request
return 1
return 0
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 def setuplogging(ui, repo=None, fp=None):
Yuya Nishihara
commandserver: enable logging when server process started...
r40858 """Set up server logging facility
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 If cmdserver.log is '-', log messages will be sent to the given fp.
It should be the 'd' channel while a client is connected, and otherwise
is the stderr of the server process.
Yuya Nishihara
commandserver: enable logging when server process started...
r40858 """
# developer config: cmdserver.log
logpath = ui.config(b'cmdserver', b'log')
if not logpath:
return
Yuya Nishihara
commandserver: add config knob for various logging options...
r40862 # developer config: cmdserver.track-log
tracked = set(ui.configlist(b'cmdserver', b'track-log'))
Yuya Nishihara
commandserver: enable logging when server process started...
r40858
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 if logpath == b'-' and fp:
logger = loggingutil.fileobjectlogger(fp, tracked)
elif logpath == b'-':
logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
else:
Yuya Nishihara
commandserver: expand log path for convenience...
r40861 logpath = os.path.abspath(util.expandpath(logpath))
Yuya Nishihara
commandserver: add config knob for various logging options...
r40862 # developer config: cmdserver.max-log-files
maxfiles = ui.configint(b'cmdserver', b'max-log-files')
# developer config: cmdserver.max-log-size
maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859 vfs = vfsmod.vfs(os.path.dirname(logpath))
Yuya Nishihara
commandserver: add config knob for various logging options...
r40862 logger = loggingutil.filelogger(vfs, os.path.basename(logpath), tracked,
maxfiles=maxfiles, maxsize=maxsize)
Yuya Nishihara
commandserver: install logger to record server events through canonical API...
r40859
targetuis = {ui}
if repo:
targetuis.add(repo.baseui)
targetuis.add(repo.ui)
for u in targetuis:
u.setlogger(b'cmdserver', logger)
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988 class pipeservice(object):
def __init__(self, ui, repo, opts):
Yuya Nishihara
cmdserver: postpone creation of pipe server until run()...
r23323 self.ui = ui
self.repo = repo
Yuya Nishihara
cmdserver: wrap 'pipe' mode server by service object...
r22988
def init(self):
pass
def run(self):
Yuya Nishihara
cmdserver: postpone creation of pipe server until run()...
r23323 ui = self.ui
Yuya Nishihara
cmdserver: protect pipe server streams against corruption caused by direct io...
r23324 # redirect stdio to null device so that broken extensions or in-process
# hooks will never cause corruption of channel protocol.
Yuya Nishihara
ui: move protectedstdio() context manager from procutil...
r41320 with ui.protectedfinout() as (fin, fout):
Yuya Nishihara
commandserver: fix reference before assignment error in pipeservice cleanup...
r40624 sv = server(ui, self.repo, fin, fout)
Yuya Nishihara
procutil: introduce context-manager interface for protect/restorestdio...
r37142 try:
return sv.serve()
finally:
sv.cleanup()
Yuya Nishihara
cmdserver: switch service objects by mode...
r22989
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 def _initworkerprocess():
Jun Wu
commandserver: update comment about setpgid...
r29609 # use a different process group from the master process, in order to:
# 1. make the current process group no longer "orphaned" (because the
# parent of this process is in a different process group while
# remains in a same session)
# according to POSIX 2.2.2.52, orphaned process group will ignore
# terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
# cause trouble for things like ncurses.
# 2. the client can use kill(-pgid, sig) to simulate terminal-generated
# SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
# processes like ssh will be killed properly, without affecting
# unrelated processes.
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 os.setpgid(0, 0)
# change random state otherwise forked request handlers would have a
# same state inherited from parent.
random.seed()
Yuya Nishihara
commandserver: manually create file objects from socket...
r29542
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
Yuya Nishihara
py3: system-stringify file mode in commandserver.py...
r40395 fin = conn.makefile(r'rb')
fout = conn.makefile(r'wb')
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 sv = None
try:
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 try:
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 sv.serve()
# handle exceptions that may be raised by command server. most of
# known exceptions are caught by dispatch.
except error.Abort as inst:
Rodrigo Damazio Bovendorp
dispatch: making all hg abortions be output with a specific label...
r38791 ui.error(_('abort: %s\n') % inst)
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 except IOError as inst:
if inst.errno != errno.EPIPE:
raise
except KeyboardInterrupt:
pass
Yuya Nishihara
commandserver: backport handling of forking server from chgserver...
r29513 finally:
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 sv.cleanup()
except: # re-raises
# also write traceback to error channel. otherwise client cannot
# see it because it is written to server's stderr by default.
if sv:
cerr = sv.cerr
else:
cerr = channeledoutput(fout, 'e')
Yuya Nishihara
py3: don't use traceback.print_exc() in commandserver.py...
r40397 cerr.write(encoding.strtolocal(traceback.format_exc()))
Yuya Nishihara
commandserver: unindent superfluous "if True" blocks
r29585 raise
finally:
fin.close()
try:
fout.close() # implicit flush() may cause another EPIPE
except IOError as inst:
if inst.errno != errno.EPIPE:
raise
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 class unixservicehandler(object):
"""Set of pluggable operations for unix-mode services
Almost all methods except for createcmdserver() are called in the main
process. You can't pass mutable resource back from createcmdserver().
"""
pollinterval = None
def __init__(self, ui):
self.ui = ui
def bindsocket(self, sock, address):
util.bindunixsocket(sock, address)
Jun Wu
commandserver: move "listen" responsibility from service to handler...
r32232 sock.listen(socket.SOMAXCONN)
Jun Wu
commandserver: move printbanner logic to bindsocket...
r32233 self.ui.status(_('listening at %s\n') % address)
self.ui.flush() # avoid buffering of status message
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
def unlinksocket(self, address):
os.unlink(address)
def shouldexit(self):
"""True if server should shut down; checked per pollinterval"""
return False
def newconnection(self):
"""Called when main process notices new connection"""
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 """Create new command server instance; called in the process that
serves for the current connection"""
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 return server(self.ui, repo, fin, fout, prereposetups)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548 class unixforkingservice(object):
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 """
Listens on unix domain socket and forks server per connection
"""
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548
def __init__(self, ui, repo, opts, handler=None):
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 self.ui = ui
self.repo = repo
self.address = opts['address']
Yuya Nishihara
commandserver: drop old unixservice implementation...
r29548 if not util.safehasattr(socket, 'AF_UNIX'):
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('unsupported platform'))
Yuya Nishihara
cmdserver: add service that listens on unix domain socket and forks process...
r22994 if not self.address:
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('no socket path specified with --address'))
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._servicehandler = handler or unixservicehandler(ui)
self._sock = None
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._mainipc = None
self._workeripc = None
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._oldsigchldhandler = None
self._workerpids = set() # updated by signal handler; do not iterate
Jun Wu
commandserver: prevent unlink socket twice...
r30887 self._socketunlinked = None
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 # experimental config: cmdserver.max-repo-cache
maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
if maxlen < 0:
raise error.Abort(_('negative max-repo-cache size not allowed'))
self._repoloader = repocache.repoloader(ui, maxlen)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
def init(self):
self._sock = socket.socket(socket.AF_UNIX)
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 # IPC channel from many workers to one main process; this is actually
# a uni-directional pipe, but is backed by a DGRAM socket so each
# message can be easily separated.
o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
self._mainipc, self._workeripc = o
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._servicehandler.bindsocket(self._sock, self.address)
Yuya Nishihara
procutil: bulk-replace function calls to point to new module
r37138 if util.safehasattr(procutil, 'unblocksignal'):
procutil.unblocksignal(signal.SIGCHLD)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
self._oldsigchldhandler = o
Jun Wu
commandserver: prevent unlink socket twice...
r30887 self._socketunlinked = False
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader.start()
Jun Wu
commandserver: prevent unlink socket twice...
r30887
def _unlinksocket(self):
if not self._socketunlinked:
self._servicehandler.unlinksocket(self.address)
self._socketunlinked = True
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
def _cleanup(self):
signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
self._sock.close()
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._mainipc.close()
self._workeripc.close()
Jun Wu
commandserver: prevent unlink socket twice...
r30887 self._unlinksocket()
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader.stop()
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 # don't kill child processes as they have active clients, just wait
self._reapworkers(0)
def run(self):
try:
self._mainloop()
finally:
self._cleanup()
def _mainloop(self):
Jun Wu
commandserver: handle backlog before exiting...
r30891 exiting = False
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 h = self._servicehandler
Augie Fackler
commandserver: prefer first-party selectors module from Python 3 to backport...
r36958 selector = selectors.DefaultSelector()
Yuya Nishihara
commandserver: loop over selector events...
r40914 selector.register(self._sock, selectors.EVENT_READ,
self._acceptnewconnection)
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 selector.register(self._mainipc, selectors.EVENT_READ,
self._handlemainipc)
Jun Wu
commandserver: handle backlog before exiting...
r30891 while True:
if not exiting and h.shouldexit():
# clients can no longer connect() to the domain socket, so
# we stop queuing new requests.
# for requests that are queued (connect()-ed, but haven't been
# accept()-ed), handle them before exit. otherwise, clients
# waiting for recv() will receive ECONNRESET.
self._unlinksocket()
exiting = True
Yuya Nishihara
commandserver: get around ETIMEDOUT raised by selectors2...
r40833 try:
Yuya Nishihara
commandserver: loop over selector events...
r40914 events = selector.select(timeout=h.pollinterval)
Yuya Nishihara
commandserver: get around ETIMEDOUT raised by selectors2...
r40833 except OSError as inst:
# selectors2 raises ETIMEDOUT if timeout exceeded while
# handling signal interrupt. That's probably wrong, but
# we can easily get around it.
if inst.errno != errno.ETIMEDOUT:
raise
Yuya Nishihara
commandserver: loop over selector events...
r40914 events = []
if not events:
Jun Wu
commandserver: do not handle EINTR for selector.select...
r33543 # only exit if we completed all queued requests
if exiting:
break
continue
Yuya Nishihara
commandserver: loop over selector events...
r40914 for key, _mask in events:
key.data(key.fileobj, selector)
Yuya Nishihara
commandserver: extract handler of new socket connection...
r40912 selector.close()
def _acceptnewconnection(self, sock, selector):
h = self._servicehandler
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 try:
conn, _addr = sock.accept()
except socket.error as inst:
if inst.args[0] == errno.EINTR:
return
raise
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
# to prevent COW memory from being touched by GC.
# https://instagram-engineering.com/
# copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 pid = os.fork()
if pid:
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 try:
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 self.ui.log(b'cmdserver', b'forked worker process (pid=%d)\n',
pid)
self._workerpids.add(pid)
h.newconnection()
finally:
conn.close() # release handle in parent process
else:
try:
selector.close()
sock.close()
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._mainipc.close()
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 self._runworker(conn)
conn.close()
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 self._workeripc.close()
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 os._exit(0)
except: # never return, hence no re-raises
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 try:
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 self.ui.traceback(force=True)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 finally:
Yuya Nishihara
commandserver: remove redundant "if True" block
r40913 os._exit(255)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 def _handlemainipc(self, sock, selector):
"""Process messages sent from a worker"""
try:
path = sock.recv(32768) # large enough to receive path
except socket.error as inst:
if inst.args[0] == errno.EINTR:
return
raise
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035 self._repoloader.load(path)
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 def _sigchldhandler(self, signal, frame):
self._reapworkers(os.WNOHANG)
def _reapworkers(self, options):
while self._workerpids:
try:
pid, _status = os.waitpid(-1, options)
except OSError as inst:
if inst.errno == errno.EINTR:
continue
if inst.errno != errno.ECHILD:
raise
# no child processes at all (reaped by other waitpid()?)
self._workerpids.clear()
return
if pid == 0:
# no waitable child processes
return
Yuya Nishihara
commandserver: turn server debug messages into logs...
r40863 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 self._workerpids.discard(pid)
Yuya Nishihara
commandserver: rename _serveworker() to _runworker()...
r29587 def _runworker(self, conn):
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 _initworkerprocess()
Yuya Nishihara
commandserver: add new forking server implemented without using SocketServer...
r29544 h = self._servicehandler
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 try:
Yuya Nishihara
commandserver: pass around option to hook repo instance creation...
r40911 _serverequest(self.ui, self.repo, conn, h.createcmdserver,
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034 prereposetups=[self._reposetup])
Yuya Nishihara
commandserver: separate initialization and cleanup of forked process...
r29586 finally:
gc.collect() # trigger __del__ since worker process uses os._exit
Yuya Nishihara
commandserver: add IPC channel to teach repository path on command finished...
r41034
def _reposetup(self, ui, repo):
if not repo.local():
return
class unixcmdserverrepo(repo.__class__):
def close(self):
super(unixcmdserverrepo, self).close()
try:
self._cmdserveripc.send(self.root)
except socket.error:
self.ui.log(b'cmdserver',
b'failed to send repo root to master\n')
repo.__class__ = unixcmdserverrepo
repo._cmdserveripc = self._workeripc
Yuya Nishihara
commandserver: preload repository in master server and reuse its file cache...
r41035
cachedrepo = self._repoloader.get(repo.root)
if cachedrepo is None:
return
repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
repocache.copycache(cachedrepo, repo)