Show More
@@ -0,0 +1,131 b'' | |||
|
1 | # repocache.py - in-memory repository cache for long-running services | |
|
2 | # | |
|
3 | # Copyright 2018 Yuya Nishihara <yuya@tcha.org> | |
|
4 | # | |
|
5 | # This software may be used and distributed according to the terms of the | |
|
6 | # GNU General Public License version 2 or any later version. | |
|
7 | ||
|
8 | from __future__ import absolute_import | |
|
9 | ||
|
10 | import collections | |
|
11 | import gc | |
|
12 | import threading | |
|
13 | ||
|
14 | from . import ( | |
|
15 | error, | |
|
16 | hg, | |
|
17 | obsolete, | |
|
18 | scmutil, | |
|
19 | util, | |
|
20 | ) | |
|
21 | ||
|
22 | class repoloader(object): | |
|
23 | """Load repositories in background thread | |
|
24 | ||
|
25 | This is designed for a forking server. A cached repo cannot be obtained | |
|
26 | until the server fork()s a worker and the loader thread stops. | |
|
27 | """ | |
|
28 | ||
|
29 | def __init__(self, ui, maxlen): | |
|
30 | self._ui = ui.copy() | |
|
31 | self._cache = util.lrucachedict(max=maxlen) | |
|
32 | # use deque and Event instead of Queue since deque can discard | |
|
33 | # old items to keep at most maxlen items. | |
|
34 | self._inqueue = collections.deque(maxlen=maxlen) | |
|
35 | self._accepting = False | |
|
36 | self._newentry = threading.Event() | |
|
37 | self._thread = None | |
|
38 | ||
|
39 | def start(self): | |
|
40 | assert not self._thread | |
|
41 | if self._inqueue.maxlen == 0: | |
|
42 | # no need to spawn loader thread as the cache is disabled | |
|
43 | return | |
|
44 | self._accepting = True | |
|
45 | self._thread = threading.Thread(target=self._mainloop) | |
|
46 | self._thread.start() | |
|
47 | ||
|
48 | def stop(self): | |
|
49 | if not self._thread: | |
|
50 | return | |
|
51 | self._accepting = False | |
|
52 | self._newentry.set() | |
|
53 | self._thread.join() | |
|
54 | self._thread = None | |
|
55 | self._cache.clear() | |
|
56 | self._inqueue.clear() | |
|
57 | ||
|
58 | def load(self, path): | |
|
59 | """Request to load the specified repository in background""" | |
|
60 | self._inqueue.append(path) | |
|
61 | self._newentry.set() | |
|
62 | ||
|
63 | def get(self, path): | |
|
64 | """Return a cached repo if available | |
|
65 | ||
|
66 | This function must be called after fork(), where the loader thread | |
|
67 | is stopped. Otherwise, the returned repo might be updated by the | |
|
68 | loader thread. | |
|
69 | """ | |
|
70 | if self._thread and self._thread.is_alive(): | |
|
71 | raise error.ProgrammingError(b'cannot obtain cached repo while ' | |
|
72 | b'loader is active') | |
|
73 | return self._cache.peek(path, None) | |
|
74 | ||
|
75 | def _mainloop(self): | |
|
76 | while self._accepting: | |
|
77 | # Avoid heavy GC after fork(), which would cancel the benefit of | |
|
78 | # COW. We assume that GIL is acquired while GC is underway in the | |
|
79 | # loader thread. If that isn't true, we might have to move | |
|
80 | # gc.collect() to the main thread so that fork() would never stop | |
|
81 | # the thread where GC is in progress. | |
|
82 | gc.collect() | |
|
83 | ||
|
84 | self._newentry.wait() | |
|
85 | while self._accepting: | |
|
86 | self._newentry.clear() | |
|
87 | try: | |
|
88 | path = self._inqueue.popleft() | |
|
89 | except IndexError: | |
|
90 | break | |
|
91 | scmutil.callcatch(self._ui, lambda: self._load(path)) | |
|
92 | ||
|
93 | def _load(self, path): | |
|
94 | start = util.timer() | |
|
95 | # TODO: repo should be recreated if storage configuration changed | |
|
96 | try: | |
|
97 | # pop before loading so inconsistent state wouldn't be exposed | |
|
98 | repo = self._cache.pop(path) | |
|
99 | except KeyError: | |
|
100 | repo = hg.repository(self._ui, path).unfiltered() | |
|
101 | _warmupcache(repo) | |
|
102 | repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n', | |
|
103 | path, util.timer() - start) | |
|
104 | self._cache.insert(path, repo) | |
|
105 | ||
|
106 | # TODO: think about proper API of preloading cache | |
|
107 | def _warmupcache(repo): | |
|
108 | repo.invalidateall() | |
|
109 | repo.changelog | |
|
110 | repo.obsstore._all | |
|
111 | repo.obsstore.successors | |
|
112 | repo.obsstore.predecessors | |
|
113 | repo.obsstore.children | |
|
114 | for name in obsolete.cachefuncs: | |
|
115 | obsolete.getrevs(repo, name) | |
|
116 | repo._phasecache.loadphaserevs(repo) | |
|
117 | ||
|
118 | # TODO: think about proper API of attaching preloaded attributes | |
|
119 | def copycache(srcrepo, destrepo): | |
|
120 | """Copy cached attributes from srcrepo to destrepo""" | |
|
121 | destfilecache = destrepo._filecache | |
|
122 | srcfilecache = srcrepo._filecache | |
|
123 | if 'changelog' in srcfilecache: | |
|
124 | destfilecache['changelog'] = ce = srcfilecache['changelog'] | |
|
125 | ce.obj.opener = ce.obj._realopener = destrepo.svfs | |
|
126 | if 'obsstore' in srcfilecache: | |
|
127 | destfilecache['obsstore'] = ce = srcfilecache['obsstore'] | |
|
128 | ce.obj.svfs = destrepo.svfs | |
|
129 | if '_phasecache' in srcfilecache: | |
|
130 | destfilecache['_phasecache'] = ce = srcfilecache['_phasecache'] | |
|
131 | ce.obj.opener = destrepo.svfs |
@@ -28,6 +28,7 b' from . import (' | |||
|
28 | 28 | error, |
|
29 | 29 | loggingutil, |
|
30 | 30 | pycompat, |
|
31 | repocache, | |
|
31 | 32 | util, |
|
32 | 33 | vfs as vfsmod, |
|
33 | 34 | ) |
@@ -511,6 +512,11 b' class unixforkingservice(object):' | |||
|
511 | 512 | self._oldsigchldhandler = None |
|
512 | 513 | self._workerpids = set() # updated by signal handler; do not iterate |
|
513 | 514 | self._socketunlinked = None |
|
515 | # experimental config: cmdserver.max-repo-cache | |
|
516 | maxlen = ui.configint(b'cmdserver', b'max-repo-cache') | |
|
517 | if maxlen < 0: | |
|
518 | raise error.Abort(_('negative max-repo-cache size not allowed')) | |
|
519 | self._repoloader = repocache.repoloader(ui, maxlen) | |
|
514 | 520 | |
|
515 | 521 | def init(self): |
|
516 | 522 | self._sock = socket.socket(socket.AF_UNIX) |
@@ -525,6 +531,7 b' class unixforkingservice(object):' | |||
|
525 | 531 | o = signal.signal(signal.SIGCHLD, self._sigchldhandler) |
|
526 | 532 | self._oldsigchldhandler = o |
|
527 | 533 | self._socketunlinked = False |
|
534 | self._repoloader.start() | |
|
528 | 535 | |
|
529 | 536 | def _unlinksocket(self): |
|
530 | 537 | if not self._socketunlinked: |
@@ -537,6 +544,7 b' class unixforkingservice(object):' | |||
|
537 | 544 | self._mainipc.close() |
|
538 | 545 | self._workeripc.close() |
|
539 | 546 | self._unlinksocket() |
|
547 | self._repoloader.stop() | |
|
540 | 548 | # don't kill child processes as they have active clients, just wait |
|
541 | 549 | self._reapworkers(0) |
|
542 | 550 | |
@@ -590,6 +598,10 b' class unixforkingservice(object):' | |||
|
590 | 598 | return |
|
591 | 599 | raise |
|
592 | 600 | |
|
601 | # Future improvement: On Python 3.7, maybe gc.freeze() can be used | |
|
602 | # to prevent COW memory from being touched by GC. | |
|
603 | # https://instagram-engineering.com/ | |
|
604 | # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf | |
|
593 | 605 | pid = os.fork() |
|
594 | 606 | if pid: |
|
595 | 607 | try: |
@@ -622,8 +634,7 b' class unixforkingservice(object):' | |||
|
622 | 634 | if inst.args[0] == errno.EINTR: |
|
623 | 635 | return |
|
624 | 636 | raise |
|
625 | ||
|
626 | self.ui.log(b'cmdserver', b'repository: %s\n', path) | |
|
637 | self._repoloader.load(path) | |
|
627 | 638 | |
|
628 | 639 | def _sigchldhandler(self, signal, frame): |
|
629 | 640 | self._reapworkers(os.WNOHANG) |
@@ -671,3 +682,9 b' class unixforkingservice(object):' | |||
|
671 | 682 | |
|
672 | 683 | repo.__class__ = unixcmdserverrepo |
|
673 | 684 | repo._cmdserveripc = self._workeripc |
|
685 | ||
|
686 | cachedrepo = self._repoloader.get(repo.root) | |
|
687 | if cachedrepo is None: | |
|
688 | return | |
|
689 | repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root) | |
|
690 | repocache.copycache(cachedrepo, repo) |
@@ -179,11 +179,14 b" coreconfigitem('cmdserver', 'max-log-fil" | |||
|
179 | 179 | coreconfigitem('cmdserver', 'max-log-size', |
|
180 | 180 | default='1 MB', |
|
181 | 181 | ) |
|
182 | coreconfigitem('cmdserver', 'max-repo-cache', | |
|
183 | default=0, | |
|
184 | ) | |
|
182 | 185 | coreconfigitem('cmdserver', 'message-encodings', |
|
183 | 186 | default=list, |
|
184 | 187 | ) |
|
185 | 188 | coreconfigitem('cmdserver', 'track-log', |
|
186 | default=lambda: ['chgserver', 'cmdserver'], | |
|
189 | default=lambda: ['chgserver', 'cmdserver', 'repocache'], | |
|
187 | 190 | ) |
|
188 | 191 | coreconfigitem('color', '.*', |
|
189 | 192 | default=None, |
@@ -1,6 +1,7 b'' | |||
|
1 | 1 | #require chg |
|
2 | 2 | |
|
3 | 3 | $ mkdir log |
|
4 | $ cp $HGRCPATH $HGRCPATH.unconfigured | |
|
4 | 5 | $ cat <<'EOF' >> $HGRCPATH |
|
5 | 6 | > [cmdserver] |
|
6 | 7 | > log = $TESTTMP/log/server.log |
@@ -13,6 +14,7 b'' | |||
|
13 | 14 | > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \ |
|
14 | 15 | > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \ |
|
15 | 16 | > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \ |
|
17 | > -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \ | |
|
16 | 18 | > -e 's!\(pid\)=[0-9]*!\1=...!g' \ |
|
17 | 19 | > -e 's!\(/server-\)[0-9a-f]*!\1...!g' |
|
18 | 20 | > } |
@@ -230,6 +232,7 b' print only the last 10 lines, since we a' | |||
|
230 | 232 | preserved: |
|
231 | 233 | |
|
232 | 234 | $ cat log/server.log.1 log/server.log | tail -10 | filterlog |
|
235 | YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...) | |
|
233 | 236 | YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... |
|
234 | 237 | YYYY/MM/DD HH:MM:SS (PID)> received fds: ... |
|
235 | 238 | YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' |
@@ -237,6 +240,92 b' preserved:' | |||
|
237 | 240 | YYYY/MM/DD HH:MM:SS (PID)> setenv: ... |
|
238 | 241 | YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... |
|
239 | 242 | YYYY/MM/DD HH:MM:SS (PID)> validate: [] |
|
240 | YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload | |
|
241 | 243 | YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) |
|
242 | 244 | YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting. |
|
245 | ||
|
246 | repository cache | |
|
247 | ---------------- | |
|
248 | ||
|
249 | $ rm log/server.log* | |
|
250 | $ cp $HGRCPATH.unconfigured $HGRCPATH | |
|
251 | $ cat <<'EOF' >> $HGRCPATH | |
|
252 | > [cmdserver] | |
|
253 | > log = $TESTTMP/log/server.log | |
|
254 | > max-repo-cache = 1 | |
|
255 | > track-log = command, repocache | |
|
256 | > EOF | |
|
257 | ||
|
258 | isolate socket directory for stable result: | |
|
259 | ||
|
260 | $ OLDCHGSOCKNAME=$CHGSOCKNAME | |
|
261 | $ mkdir chgsock | |
|
262 | $ CHGSOCKNAME=`pwd`/chgsock/server | |
|
263 | ||
|
264 | create empty repo and cache it: | |
|
265 | ||
|
266 | $ hg init cached | |
|
267 | $ hg id -R cached | |
|
268 | 000000000000 tip | |
|
269 | $ sleep 1 | |
|
270 | ||
|
271 | modify repo (and cache will be invalidated): | |
|
272 | ||
|
273 | $ touch cached/a | |
|
274 | $ hg ci -R cached -Am 'add a' | |
|
275 | adding a | |
|
276 | $ sleep 1 | |
|
277 | ||
|
278 | read cached repo: | |
|
279 | ||
|
280 | $ hg log -R cached | |
|
281 | changeset: 0:ac82d8b1f7c4 | |
|
282 | tag: tip | |
|
283 | user: test | |
|
284 | date: Thu Jan 01 00:00:00 1970 +0000 | |
|
285 | summary: add a | |
|
286 | ||
|
287 | $ sleep 1 | |
|
288 | ||
|
289 | discard cached from LRU cache: | |
|
290 | ||
|
291 | $ hg clone cached cached2 | |
|
292 | updating to branch default | |
|
293 | 1 files updated, 0 files merged, 0 files removed, 0 files unresolved | |
|
294 | $ hg id -R cached2 | |
|
295 | ac82d8b1f7c4 tip | |
|
296 | $ sleep 1 | |
|
297 | ||
|
298 | read uncached repo: | |
|
299 | ||
|
300 | $ hg log -R cached | |
|
301 | changeset: 0:ac82d8b1f7c4 | |
|
302 | tag: tip | |
|
303 | user: test | |
|
304 | date: Thu Jan 01 00:00:00 1970 +0000 | |
|
305 | summary: add a | |
|
306 | ||
|
307 | $ sleep 1 | |
|
308 | ||
|
309 | shut down servers and restore environment: | |
|
310 | ||
|
311 | $ rm -R chgsock | |
|
312 | $ sleep 2 | |
|
313 | $ CHGSOCKNAME=$OLDCHGSOCKNAME | |
|
314 | ||
|
315 | check server log: | |
|
316 | ||
|
317 | $ cat log/server.log | filterlog | |
|
318 | YYYY/MM/DD HH:MM:SS (PID)> init cached | |
|
319 | YYYY/MM/DD HH:MM:SS (PID)> id -R cached | |
|
320 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) | |
|
321 | YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached | |
|
322 | YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a' | |
|
323 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) | |
|
324 | YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached | |
|
325 | YYYY/MM/DD HH:MM:SS (PID)> log -R cached | |
|
326 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) | |
|
327 | YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2 | |
|
328 | YYYY/MM/DD HH:MM:SS (PID)> id -R cached2 | |
|
329 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in ...s) | |
|
330 | YYYY/MM/DD HH:MM:SS (PID)> log -R cached | |
|
331 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) |
General Comments 0
You need to be logged in to leave comments.
Login now