repocache.py
139 lines
| 4.5 KiB
| text/x-python
|
PythonLexer
/ mercurial / repocache.py
Yuya Nishihara
|
r41035 | # repocache.py - in-memory repository cache for long-running services | ||
# | ||||
# Copyright 2018 Yuya Nishihara <yuya@tcha.org> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
import collections | ||||
import gc | ||||
import threading | ||||
from . import ( | ||||
error, | ||||
hg, | ||||
obsolete, | ||||
scmutil, | ||||
util, | ||||
) | ||||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r41035 | class repoloader(object): | ||
"""Load repositories in background thread | ||||
This is designed for a forking server. A cached repo cannot be obtained | ||||
until the server fork()s a worker and the loader thread stops. | ||||
""" | ||||
def __init__(self, ui, maxlen): | ||||
self._ui = ui.copy() | ||||
self._cache = util.lrucachedict(max=maxlen) | ||||
# use deque and Event instead of Queue since deque can discard | ||||
# old items to keep at most maxlen items. | ||||
self._inqueue = collections.deque(maxlen=maxlen) | ||||
self._accepting = False | ||||
self._newentry = threading.Event() | ||||
self._thread = None | ||||
def start(self): | ||||
assert not self._thread | ||||
if self._inqueue.maxlen == 0: | ||||
# no need to spawn loader thread as the cache is disabled | ||||
return | ||||
self._accepting = True | ||||
self._thread = threading.Thread(target=self._mainloop) | ||||
self._thread.start() | ||||
def stop(self): | ||||
if not self._thread: | ||||
return | ||||
self._accepting = False | ||||
self._newentry.set() | ||||
self._thread.join() | ||||
self._thread = None | ||||
self._cache.clear() | ||||
self._inqueue.clear() | ||||
def load(self, path): | ||||
"""Request to load the specified repository in background""" | ||||
self._inqueue.append(path) | ||||
self._newentry.set() | ||||
def get(self, path): | ||||
"""Return a cached repo if available | ||||
This function must be called after fork(), where the loader thread | ||||
is stopped. Otherwise, the returned repo might be updated by the | ||||
loader thread. | ||||
""" | ||||
if self._thread and self._thread.is_alive(): | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'cannot obtain cached repo while loader is active' | ||
Augie Fackler
|
r43346 | ) | ||
Yuya Nishihara
|
r41035 | return self._cache.peek(path, None) | ||
def _mainloop(self): | ||||
while self._accepting: | ||||
# Avoid heavy GC after fork(), which would cancel the benefit of | ||||
# COW. We assume that GIL is acquired while GC is underway in the | ||||
# loader thread. If that isn't true, we might have to move | ||||
# gc.collect() to the main thread so that fork() would never stop | ||||
# the thread where GC is in progress. | ||||
gc.collect() | ||||
self._newentry.wait() | ||||
while self._accepting: | ||||
self._newentry.clear() | ||||
try: | ||||
path = self._inqueue.popleft() | ||||
except IndexError: | ||||
break | ||||
scmutil.callcatch(self._ui, lambda: self._load(path)) | ||||
def _load(self, path): | ||||
start = util.timer() | ||||
# TODO: repo should be recreated if storage configuration changed | ||||
try: | ||||
# pop before loading so inconsistent state wouldn't be exposed | ||||
repo = self._cache.pop(path) | ||||
except KeyError: | ||||
repo = hg.repository(self._ui, path).unfiltered() | ||||
_warmupcache(repo) | ||||
Augie Fackler
|
r43346 | repo.ui.log( | ||
b'repocache', | ||||
b'loaded repo into cache: %s (in %.3fs)\n', | ||||
path, | ||||
util.timer() - start, | ||||
) | ||||
Yuya Nishihara
|
r41035 | self._cache.insert(path, repo) | ||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r41035 | # TODO: think about proper API of preloading cache | ||
def _warmupcache(repo): | ||||
repo.invalidateall() | ||||
repo.changelog | ||||
repo.obsstore._all | ||||
repo.obsstore.successors | ||||
repo.obsstore.predecessors | ||||
repo.obsstore.children | ||||
for name in obsolete.cachefuncs: | ||||
obsolete.getrevs(repo, name) | ||||
repo._phasecache.loadphaserevs(repo) | ||||
Augie Fackler
|
r43346 | |||
Yuya Nishihara
|
r41035 | # TODO: think about proper API of attaching preloaded attributes | ||
def copycache(srcrepo, destrepo): | ||||
"""Copy cached attributes from srcrepo to destrepo""" | ||||
destfilecache = destrepo._filecache | ||||
srcfilecache = srcrepo._filecache | ||||
Augie Fackler
|
r43347 | if b'changelog' in srcfilecache: | ||
destfilecache[b'changelog'] = ce = srcfilecache[b'changelog'] | ||||
Yuya Nishihara
|
r41035 | ce.obj.opener = ce.obj._realopener = destrepo.svfs | ||
Augie Fackler
|
r43347 | if b'obsstore' in srcfilecache: | ||
destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore'] | ||||
Yuya Nishihara
|
r41035 | ce.obj.svfs = destrepo.svfs | ||
Augie Fackler
|
r43347 | if b'_phasecache' in srcfilecache: | ||
destfilecache[b'_phasecache'] = ce = srcfilecache[b'_phasecache'] | ||||
Yuya Nishihara
|
r41035 | ce.obj.opener = destrepo.svfs | ||