# HG changeset patch # User Yuya Nishihara # Date 2018-10-31 13:43:08 # Node ID dcac24ec935bbd1c6232988edab4ff5d873ef6d7 # Parent 042ed354b9eb6209b09dbdd19018462431cb464c commandserver: preload repository in master server and reuse its file cache This greatly speeds up repository operation with lots of obsolete markers: $ ls -lh .hg/store/obsstore -rw-r--r-- 1 yuya yuya 21M Dec 2 17:55 .hg/store/obsstore $ time hg log -G -l10 --pager no (hg) 1.79s user 0.13s system 99% cpu 1.919 total (chg uncached) 0.00s user 0.01s system 0% cpu 1.328 total (chg cached) 0.00s user 0.00s system 3% cpu 0.180 total As you can see, the implementation of the preloader function is highly experimental. It works, but I'm yet to be sure how things can be organized. So I don't want to formalize the API at this point. diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py --- a/mercurial/commandserver.py +++ b/mercurial/commandserver.py @@ -28,6 +28,7 @@ from . import ( error, loggingutil, pycompat, + repocache, util, vfs as vfsmod, ) @@ -511,6 +512,11 @@ class unixforkingservice(object): self._oldsigchldhandler = None self._workerpids = set() # updated by signal handler; do not iterate self._socketunlinked = None + # experimental config: cmdserver.max-repo-cache + maxlen = ui.configint(b'cmdserver', b'max-repo-cache') + if maxlen < 0: + raise error.Abort(_('negative max-repo-cache size not allowed')) + self._repoloader = repocache.repoloader(ui, maxlen) def init(self): self._sock = socket.socket(socket.AF_UNIX) @@ -525,6 +531,7 @@ class unixforkingservice(object): o = signal.signal(signal.SIGCHLD, self._sigchldhandler) self._oldsigchldhandler = o self._socketunlinked = False + self._repoloader.start() def _unlinksocket(self): if not self._socketunlinked: @@ -537,6 +544,7 @@ class unixforkingservice(object): self._mainipc.close() self._workeripc.close() self._unlinksocket() + self._repoloader.stop() # don't kill child processes as they have active clients, just wait self._reapworkers(0) @@ -590,6 +598,10 @@ class unixforkingservice(object): return raise + # Future improvement: On Python 3.7, maybe gc.freeze() can be used + # to prevent COW memory from being touched by GC. + # https://instagram-engineering.com/ + # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf pid = os.fork() if pid: try: @@ -622,8 +634,7 @@ class unixforkingservice(object): if inst.args[0] == errno.EINTR: return raise - - self.ui.log(b'cmdserver', b'repository: %s\n', path) + self._repoloader.load(path) def _sigchldhandler(self, signal, frame): self._reapworkers(os.WNOHANG) @@ -671,3 +682,9 @@ class unixforkingservice(object): repo.__class__ = unixcmdserverrepo repo._cmdserveripc = self._workeripc + + cachedrepo = self._repoloader.get(repo.root) + if cachedrepo is None: + return + repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root) + repocache.copycache(cachedrepo, repo) diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -179,11 +179,14 @@ coreconfigitem('cmdserver', 'max-log-fil coreconfigitem('cmdserver', 'max-log-size', default='1 MB', ) +coreconfigitem('cmdserver', 'max-repo-cache', + default=0, +) coreconfigitem('cmdserver', 'message-encodings', default=list, ) coreconfigitem('cmdserver', 'track-log', - default=lambda: ['chgserver', 'cmdserver'], + default=lambda: ['chgserver', 'cmdserver', 'repocache'], ) coreconfigitem('color', '.*', default=None, diff --git a/mercurial/repocache.py b/mercurial/repocache.py new file mode 100644 --- /dev/null +++ b/mercurial/repocache.py @@ -0,0 +1,131 @@ +# repocache.py - in-memory repository cache for long-running services +# +# Copyright 2018 Yuya Nishihara +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from __future__ import absolute_import + +import collections +import gc +import threading + +from . import ( + error, + hg, + obsolete, + scmutil, + util, +) + +class repoloader(object): + """Load repositories in background thread + + This is designed for a forking server. A cached repo cannot be obtained + until the server fork()s a worker and the loader thread stops. + """ + + def __init__(self, ui, maxlen): + self._ui = ui.copy() + self._cache = util.lrucachedict(max=maxlen) + # use deque and Event instead of Queue since deque can discard + # old items to keep at most maxlen items. + self._inqueue = collections.deque(maxlen=maxlen) + self._accepting = False + self._newentry = threading.Event() + self._thread = None + + def start(self): + assert not self._thread + if self._inqueue.maxlen == 0: + # no need to spawn loader thread as the cache is disabled + return + self._accepting = True + self._thread = threading.Thread(target=self._mainloop) + self._thread.start() + + def stop(self): + if not self._thread: + return + self._accepting = False + self._newentry.set() + self._thread.join() + self._thread = None + self._cache.clear() + self._inqueue.clear() + + def load(self, path): + """Request to load the specified repository in background""" + self._inqueue.append(path) + self._newentry.set() + + def get(self, path): + """Return a cached repo if available + + This function must be called after fork(), where the loader thread + is stopped. Otherwise, the returned repo might be updated by the + loader thread. + """ + if self._thread and self._thread.is_alive(): + raise error.ProgrammingError(b'cannot obtain cached repo while ' + b'loader is active') + return self._cache.peek(path, None) + + def _mainloop(self): + while self._accepting: + # Avoid heavy GC after fork(), which would cancel the benefit of + # COW. We assume that GIL is acquired while GC is underway in the + # loader thread. If that isn't true, we might have to move + # gc.collect() to the main thread so that fork() would never stop + # the thread where GC is in progress. + gc.collect() + + self._newentry.wait() + while self._accepting: + self._newentry.clear() + try: + path = self._inqueue.popleft() + except IndexError: + break + scmutil.callcatch(self._ui, lambda: self._load(path)) + + def _load(self, path): + start = util.timer() + # TODO: repo should be recreated if storage configuration changed + try: + # pop before loading so inconsistent state wouldn't be exposed + repo = self._cache.pop(path) + except KeyError: + repo = hg.repository(self._ui, path).unfiltered() + _warmupcache(repo) + repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n', + path, util.timer() - start) + self._cache.insert(path, repo) + +# TODO: think about proper API of preloading cache +def _warmupcache(repo): + repo.invalidateall() + repo.changelog + repo.obsstore._all + repo.obsstore.successors + repo.obsstore.predecessors + repo.obsstore.children + for name in obsolete.cachefuncs: + obsolete.getrevs(repo, name) + repo._phasecache.loadphaserevs(repo) + +# TODO: think about proper API of attaching preloaded attributes +def copycache(srcrepo, destrepo): + """Copy cached attributes from srcrepo to destrepo""" + destfilecache = destrepo._filecache + srcfilecache = srcrepo._filecache + if 'changelog' in srcfilecache: + destfilecache['changelog'] = ce = srcfilecache['changelog'] + ce.obj.opener = ce.obj._realopener = destrepo.svfs + if 'obsstore' in srcfilecache: + destfilecache['obsstore'] = ce = srcfilecache['obsstore'] + ce.obj.svfs = destrepo.svfs + if '_phasecache' in srcfilecache: + destfilecache['_phasecache'] = ce = srcfilecache['_phasecache'] + ce.obj.opener = destrepo.svfs diff --git a/tests/test-chg.t b/tests/test-chg.t --- a/tests/test-chg.t +++ b/tests/test-chg.t @@ -1,6 +1,7 @@ #require chg $ mkdir log + $ cp $HGRCPATH $HGRCPATH.unconfigured $ cat <<'EOF' >> $HGRCPATH > [cmdserver] > log = $TESTTMP/log/server.log @@ -13,6 +14,7 @@ > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \ > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \ > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \ + > -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \ > -e 's!\(pid\)=[0-9]*!\1=...!g' \ > -e 's!\(/server-\)[0-9a-f]*!\1...!g' > } @@ -230,6 +232,7 @@ print only the last 10 lines, since we a preserved: $ cat log/server.log.1 log/server.log | tail -10 | filterlog + YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...) YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... YYYY/MM/DD HH:MM:SS (PID)> received fds: ... YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' @@ -237,6 +240,92 @@ preserved: YYYY/MM/DD HH:MM:SS (PID)> setenv: ... YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... YYYY/MM/DD HH:MM:SS (PID)> validate: [] - YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting. + +repository cache +---------------- + + $ rm log/server.log* + $ cp $HGRCPATH.unconfigured $HGRCPATH + $ cat <<'EOF' >> $HGRCPATH + > [cmdserver] + > log = $TESTTMP/log/server.log + > max-repo-cache = 1 + > track-log = command, repocache + > EOF + +isolate socket directory for stable result: + + $ OLDCHGSOCKNAME=$CHGSOCKNAME + $ mkdir chgsock + $ CHGSOCKNAME=`pwd`/chgsock/server + +create empty repo and cache it: + + $ hg init cached + $ hg id -R cached + 000000000000 tip + $ sleep 1 + +modify repo (and cache will be invalidated): + + $ touch cached/a + $ hg ci -R cached -Am 'add a' + adding a + $ sleep 1 + +read cached repo: + + $ hg log -R cached + changeset: 0:ac82d8b1f7c4 + tag: tip + user: test + date: Thu Jan 01 00:00:00 1970 +0000 + summary: add a + + $ sleep 1 + +discard cached from LRU cache: + + $ hg clone cached cached2 + updating to branch default + 1 files updated, 0 files merged, 0 files removed, 0 files unresolved + $ hg id -R cached2 + ac82d8b1f7c4 tip + $ sleep 1 + +read uncached repo: + + $ hg log -R cached + changeset: 0:ac82d8b1f7c4 + tag: tip + user: test + date: Thu Jan 01 00:00:00 1970 +0000 + summary: add a + + $ sleep 1 + +shut down servers and restore environment: + + $ rm -R chgsock + $ sleep 2 + $ CHGSOCKNAME=$OLDCHGSOCKNAME + +check server log: + + $ cat log/server.log | filterlog + YYYY/MM/DD HH:MM:SS (PID)> init cached + YYYY/MM/DD HH:MM:SS (PID)> id -R cached + YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) + YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached + YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a' + YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) + YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached + YYYY/MM/DD HH:MM:SS (PID)> log -R cached + YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) + YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2 + YYYY/MM/DD HH:MM:SS (PID)> id -R cached2 + YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in ...s) + YYYY/MM/DD HH:MM:SS (PID)> log -R cached + YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)