Show More
@@ -0,0 +1,131 b'' | |||||
|
1 | # repocache.py - in-memory repository cache for long-running services | |||
|
2 | # | |||
|
3 | # Copyright 2018 Yuya Nishihara <yuya@tcha.org> | |||
|
4 | # | |||
|
5 | # This software may be used and distributed according to the terms of the | |||
|
6 | # GNU General Public License version 2 or any later version. | |||
|
7 | ||||
|
8 | from __future__ import absolute_import | |||
|
9 | ||||
|
10 | import collections | |||
|
11 | import gc | |||
|
12 | import threading | |||
|
13 | ||||
|
14 | from . import ( | |||
|
15 | error, | |||
|
16 | hg, | |||
|
17 | obsolete, | |||
|
18 | scmutil, | |||
|
19 | util, | |||
|
20 | ) | |||
|
21 | ||||
|
22 | class repoloader(object): | |||
|
23 | """Load repositories in background thread | |||
|
24 | ||||
|
25 | This is designed for a forking server. A cached repo cannot be obtained | |||
|
26 | until the server fork()s a worker and the loader thread stops. | |||
|
27 | """ | |||
|
28 | ||||
|
29 | def __init__(self, ui, maxlen): | |||
|
30 | self._ui = ui.copy() | |||
|
31 | self._cache = util.lrucachedict(max=maxlen) | |||
|
32 | # use deque and Event instead of Queue since deque can discard | |||
|
33 | # old items to keep at most maxlen items. | |||
|
34 | self._inqueue = collections.deque(maxlen=maxlen) | |||
|
35 | self._accepting = False | |||
|
36 | self._newentry = threading.Event() | |||
|
37 | self._thread = None | |||
|
38 | ||||
|
39 | def start(self): | |||
|
40 | assert not self._thread | |||
|
41 | if self._inqueue.maxlen == 0: | |||
|
42 | # no need to spawn loader thread as the cache is disabled | |||
|
43 | return | |||
|
44 | self._accepting = True | |||
|
45 | self._thread = threading.Thread(target=self._mainloop) | |||
|
46 | self._thread.start() | |||
|
47 | ||||
|
48 | def stop(self): | |||
|
49 | if not self._thread: | |||
|
50 | return | |||
|
51 | self._accepting = False | |||
|
52 | self._newentry.set() | |||
|
53 | self._thread.join() | |||
|
54 | self._thread = None | |||
|
55 | self._cache.clear() | |||
|
56 | self._inqueue.clear() | |||
|
57 | ||||
|
58 | def load(self, path): | |||
|
59 | """Request to load the specified repository in background""" | |||
|
60 | self._inqueue.append(path) | |||
|
61 | self._newentry.set() | |||
|
62 | ||||
|
63 | def get(self, path): | |||
|
64 | """Return a cached repo if available | |||
|
65 | ||||
|
66 | This function must be called after fork(), where the loader thread | |||
|
67 | is stopped. Otherwise, the returned repo might be updated by the | |||
|
68 | loader thread. | |||
|
69 | """ | |||
|
70 | if self._thread and self._thread.is_alive(): | |||
|
71 | raise error.ProgrammingError(b'cannot obtain cached repo while ' | |||
|
72 | b'loader is active') | |||
|
73 | return self._cache.peek(path, None) | |||
|
74 | ||||
|
75 | def _mainloop(self): | |||
|
76 | while self._accepting: | |||
|
77 | # Avoid heavy GC after fork(), which would cancel the benefit of | |||
|
78 | # COW. We assume that GIL is acquired while GC is underway in the | |||
|
79 | # loader thread. If that isn't true, we might have to move | |||
|
80 | # gc.collect() to the main thread so that fork() would never stop | |||
|
81 | # the thread where GC is in progress. | |||
|
82 | gc.collect() | |||
|
83 | ||||
|
84 | self._newentry.wait() | |||
|
85 | while self._accepting: | |||
|
86 | self._newentry.clear() | |||
|
87 | try: | |||
|
88 | path = self._inqueue.popleft() | |||
|
89 | except IndexError: | |||
|
90 | break | |||
|
91 | scmutil.callcatch(self._ui, lambda: self._load(path)) | |||
|
92 | ||||
|
93 | def _load(self, path): | |||
|
94 | start = util.timer() | |||
|
95 | # TODO: repo should be recreated if storage configuration changed | |||
|
96 | try: | |||
|
97 | # pop before loading so inconsistent state wouldn't be exposed | |||
|
98 | repo = self._cache.pop(path) | |||
|
99 | except KeyError: | |||
|
100 | repo = hg.repository(self._ui, path).unfiltered() | |||
|
101 | _warmupcache(repo) | |||
|
102 | repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n', | |||
|
103 | path, util.timer() - start) | |||
|
104 | self._cache.insert(path, repo) | |||
|
105 | ||||
|
106 | # TODO: think about proper API of preloading cache | |||
|
107 | def _warmupcache(repo): | |||
|
108 | repo.invalidateall() | |||
|
109 | repo.changelog | |||
|
110 | repo.obsstore._all | |||
|
111 | repo.obsstore.successors | |||
|
112 | repo.obsstore.predecessors | |||
|
113 | repo.obsstore.children | |||
|
114 | for name in obsolete.cachefuncs: | |||
|
115 | obsolete.getrevs(repo, name) | |||
|
116 | repo._phasecache.loadphaserevs(repo) | |||
|
117 | ||||
|
118 | # TODO: think about proper API of attaching preloaded attributes | |||
|
119 | def copycache(srcrepo, destrepo): | |||
|
120 | """Copy cached attributes from srcrepo to destrepo""" | |||
|
121 | destfilecache = destrepo._filecache | |||
|
122 | srcfilecache = srcrepo._filecache | |||
|
123 | if 'changelog' in srcfilecache: | |||
|
124 | destfilecache['changelog'] = ce = srcfilecache['changelog'] | |||
|
125 | ce.obj.opener = ce.obj._realopener = destrepo.svfs | |||
|
126 | if 'obsstore' in srcfilecache: | |||
|
127 | destfilecache['obsstore'] = ce = srcfilecache['obsstore'] | |||
|
128 | ce.obj.svfs = destrepo.svfs | |||
|
129 | if '_phasecache' in srcfilecache: | |||
|
130 | destfilecache['_phasecache'] = ce = srcfilecache['_phasecache'] | |||
|
131 | ce.obj.opener = destrepo.svfs |
@@ -28,6 +28,7 b' from . import (' | |||||
28 | error, |
|
28 | error, | |
29 | loggingutil, |
|
29 | loggingutil, | |
30 | pycompat, |
|
30 | pycompat, | |
|
31 | repocache, | |||
31 | util, |
|
32 | util, | |
32 | vfs as vfsmod, |
|
33 | vfs as vfsmod, | |
33 | ) |
|
34 | ) | |
@@ -511,6 +512,11 b' class unixforkingservice(object):' | |||||
511 | self._oldsigchldhandler = None |
|
512 | self._oldsigchldhandler = None | |
512 | self._workerpids = set() # updated by signal handler; do not iterate |
|
513 | self._workerpids = set() # updated by signal handler; do not iterate | |
513 | self._socketunlinked = None |
|
514 | self._socketunlinked = None | |
|
515 | # experimental config: cmdserver.max-repo-cache | |||
|
516 | maxlen = ui.configint(b'cmdserver', b'max-repo-cache') | |||
|
517 | if maxlen < 0: | |||
|
518 | raise error.Abort(_('negative max-repo-cache size not allowed')) | |||
|
519 | self._repoloader = repocache.repoloader(ui, maxlen) | |||
514 |
|
520 | |||
515 | def init(self): |
|
521 | def init(self): | |
516 | self._sock = socket.socket(socket.AF_UNIX) |
|
522 | self._sock = socket.socket(socket.AF_UNIX) | |
@@ -525,6 +531,7 b' class unixforkingservice(object):' | |||||
525 | o = signal.signal(signal.SIGCHLD, self._sigchldhandler) |
|
531 | o = signal.signal(signal.SIGCHLD, self._sigchldhandler) | |
526 | self._oldsigchldhandler = o |
|
532 | self._oldsigchldhandler = o | |
527 | self._socketunlinked = False |
|
533 | self._socketunlinked = False | |
|
534 | self._repoloader.start() | |||
528 |
|
535 | |||
529 | def _unlinksocket(self): |
|
536 | def _unlinksocket(self): | |
530 | if not self._socketunlinked: |
|
537 | if not self._socketunlinked: | |
@@ -537,6 +544,7 b' class unixforkingservice(object):' | |||||
537 | self._mainipc.close() |
|
544 | self._mainipc.close() | |
538 | self._workeripc.close() |
|
545 | self._workeripc.close() | |
539 | self._unlinksocket() |
|
546 | self._unlinksocket() | |
|
547 | self._repoloader.stop() | |||
540 | # don't kill child processes as they have active clients, just wait |
|
548 | # don't kill child processes as they have active clients, just wait | |
541 | self._reapworkers(0) |
|
549 | self._reapworkers(0) | |
542 |
|
550 | |||
@@ -590,6 +598,10 b' class unixforkingservice(object):' | |||||
590 | return |
|
598 | return | |
591 | raise |
|
599 | raise | |
592 |
|
600 | |||
|
601 | # Future improvement: On Python 3.7, maybe gc.freeze() can be used | |||
|
602 | # to prevent COW memory from being touched by GC. | |||
|
603 | # https://instagram-engineering.com/ | |||
|
604 | # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf | |||
593 | pid = os.fork() |
|
605 | pid = os.fork() | |
594 | if pid: |
|
606 | if pid: | |
595 | try: |
|
607 | try: | |
@@ -622,8 +634,7 b' class unixforkingservice(object):' | |||||
622 | if inst.args[0] == errno.EINTR: |
|
634 | if inst.args[0] == errno.EINTR: | |
623 | return |
|
635 | return | |
624 | raise |
|
636 | raise | |
625 |
|
637 | self._repoloader.load(path) | ||
626 | self.ui.log(b'cmdserver', b'repository: %s\n', path) |
|
|||
627 |
|
638 | |||
628 | def _sigchldhandler(self, signal, frame): |
|
639 | def _sigchldhandler(self, signal, frame): | |
629 | self._reapworkers(os.WNOHANG) |
|
640 | self._reapworkers(os.WNOHANG) | |
@@ -671,3 +682,9 b' class unixforkingservice(object):' | |||||
671 |
|
682 | |||
672 | repo.__class__ = unixcmdserverrepo |
|
683 | repo.__class__ = unixcmdserverrepo | |
673 | repo._cmdserveripc = self._workeripc |
|
684 | repo._cmdserveripc = self._workeripc | |
|
685 | ||||
|
686 | cachedrepo = self._repoloader.get(repo.root) | |||
|
687 | if cachedrepo is None: | |||
|
688 | return | |||
|
689 | repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root) | |||
|
690 | repocache.copycache(cachedrepo, repo) |
@@ -179,11 +179,14 b" coreconfigitem('cmdserver', 'max-log-fil" | |||||
179 | coreconfigitem('cmdserver', 'max-log-size', |
|
179 | coreconfigitem('cmdserver', 'max-log-size', | |
180 | default='1 MB', |
|
180 | default='1 MB', | |
181 | ) |
|
181 | ) | |
|
182 | coreconfigitem('cmdserver', 'max-repo-cache', | |||
|
183 | default=0, | |||
|
184 | ) | |||
182 | coreconfigitem('cmdserver', 'message-encodings', |
|
185 | coreconfigitem('cmdserver', 'message-encodings', | |
183 | default=list, |
|
186 | default=list, | |
184 | ) |
|
187 | ) | |
185 | coreconfigitem('cmdserver', 'track-log', |
|
188 | coreconfigitem('cmdserver', 'track-log', | |
186 | default=lambda: ['chgserver', 'cmdserver'], |
|
189 | default=lambda: ['chgserver', 'cmdserver', 'repocache'], | |
187 | ) |
|
190 | ) | |
188 | coreconfigitem('color', '.*', |
|
191 | coreconfigitem('color', '.*', | |
189 | default=None, |
|
192 | default=None, |
@@ -1,6 +1,7 b'' | |||||
1 | #require chg |
|
1 | #require chg | |
2 |
|
2 | |||
3 | $ mkdir log |
|
3 | $ mkdir log | |
|
4 | $ cp $HGRCPATH $HGRCPATH.unconfigured | |||
4 | $ cat <<'EOF' >> $HGRCPATH |
|
5 | $ cat <<'EOF' >> $HGRCPATH | |
5 | > [cmdserver] |
|
6 | > [cmdserver] | |
6 | > log = $TESTTMP/log/server.log |
|
7 | > log = $TESTTMP/log/server.log | |
@@ -13,6 +14,7 b'' | |||||
13 | > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \ |
|
14 | > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \ | |
14 | > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \ |
|
15 | > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \ | |
15 | > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \ |
|
16 | > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \ | |
|
17 | > -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \ | |||
16 | > -e 's!\(pid\)=[0-9]*!\1=...!g' \ |
|
18 | > -e 's!\(pid\)=[0-9]*!\1=...!g' \ | |
17 | > -e 's!\(/server-\)[0-9a-f]*!\1...!g' |
|
19 | > -e 's!\(/server-\)[0-9a-f]*!\1...!g' | |
18 | > } |
|
20 | > } | |
@@ -230,6 +232,7 b' print only the last 10 lines, since we a' | |||||
230 | preserved: |
|
232 | preserved: | |
231 |
|
233 | |||
232 | $ cat log/server.log.1 log/server.log | tail -10 | filterlog |
|
234 | $ cat log/server.log.1 log/server.log | tail -10 | filterlog | |
|
235 | YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...) | |||
233 | YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... |
|
236 | YYYY/MM/DD HH:MM:SS (PID)> setprocname: ... | |
234 | YYYY/MM/DD HH:MM:SS (PID)> received fds: ... |
|
237 | YYYY/MM/DD HH:MM:SS (PID)> received fds: ... | |
235 | YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' |
|
238 | YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload' | |
@@ -237,6 +240,92 b' preserved:' | |||||
237 | YYYY/MM/DD HH:MM:SS (PID)> setenv: ... |
|
240 | YYYY/MM/DD HH:MM:SS (PID)> setenv: ... | |
238 | YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... |
|
241 | YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ... | |
239 | YYYY/MM/DD HH:MM:SS (PID)> validate: [] |
|
242 | YYYY/MM/DD HH:MM:SS (PID)> validate: [] | |
240 | YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload |
|
|||
241 | YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) |
|
243 | YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...) | |
242 | YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting. |
|
244 | YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting. | |
|
245 | ||||
|
246 | repository cache | |||
|
247 | ---------------- | |||
|
248 | ||||
|
249 | $ rm log/server.log* | |||
|
250 | $ cp $HGRCPATH.unconfigured $HGRCPATH | |||
|
251 | $ cat <<'EOF' >> $HGRCPATH | |||
|
252 | > [cmdserver] | |||
|
253 | > log = $TESTTMP/log/server.log | |||
|
254 | > max-repo-cache = 1 | |||
|
255 | > track-log = command, repocache | |||
|
256 | > EOF | |||
|
257 | ||||
|
258 | isolate socket directory for stable result: | |||
|
259 | ||||
|
260 | $ OLDCHGSOCKNAME=$CHGSOCKNAME | |||
|
261 | $ mkdir chgsock | |||
|
262 | $ CHGSOCKNAME=`pwd`/chgsock/server | |||
|
263 | ||||
|
264 | create empty repo and cache it: | |||
|
265 | ||||
|
266 | $ hg init cached | |||
|
267 | $ hg id -R cached | |||
|
268 | 000000000000 tip | |||
|
269 | $ sleep 1 | |||
|
270 | ||||
|
271 | modify repo (and cache will be invalidated): | |||
|
272 | ||||
|
273 | $ touch cached/a | |||
|
274 | $ hg ci -R cached -Am 'add a' | |||
|
275 | adding a | |||
|
276 | $ sleep 1 | |||
|
277 | ||||
|
278 | read cached repo: | |||
|
279 | ||||
|
280 | $ hg log -R cached | |||
|
281 | changeset: 0:ac82d8b1f7c4 | |||
|
282 | tag: tip | |||
|
283 | user: test | |||
|
284 | date: Thu Jan 01 00:00:00 1970 +0000 | |||
|
285 | summary: add a | |||
|
286 | ||||
|
287 | $ sleep 1 | |||
|
288 | ||||
|
289 | discard cached from LRU cache: | |||
|
290 | ||||
|
291 | $ hg clone cached cached2 | |||
|
292 | updating to branch default | |||
|
293 | 1 files updated, 0 files merged, 0 files removed, 0 files unresolved | |||
|
294 | $ hg id -R cached2 | |||
|
295 | ac82d8b1f7c4 tip | |||
|
296 | $ sleep 1 | |||
|
297 | ||||
|
298 | read uncached repo: | |||
|
299 | ||||
|
300 | $ hg log -R cached | |||
|
301 | changeset: 0:ac82d8b1f7c4 | |||
|
302 | tag: tip | |||
|
303 | user: test | |||
|
304 | date: Thu Jan 01 00:00:00 1970 +0000 | |||
|
305 | summary: add a | |||
|
306 | ||||
|
307 | $ sleep 1 | |||
|
308 | ||||
|
309 | shut down servers and restore environment: | |||
|
310 | ||||
|
311 | $ rm -R chgsock | |||
|
312 | $ sleep 2 | |||
|
313 | $ CHGSOCKNAME=$OLDCHGSOCKNAME | |||
|
314 | ||||
|
315 | check server log: | |||
|
316 | ||||
|
317 | $ cat log/server.log | filterlog | |||
|
318 | YYYY/MM/DD HH:MM:SS (PID)> init cached | |||
|
319 | YYYY/MM/DD HH:MM:SS (PID)> id -R cached | |||
|
320 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) | |||
|
321 | YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached | |||
|
322 | YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a' | |||
|
323 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) | |||
|
324 | YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached | |||
|
325 | YYYY/MM/DD HH:MM:SS (PID)> log -R cached | |||
|
326 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) | |||
|
327 | YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2 | |||
|
328 | YYYY/MM/DD HH:MM:SS (PID)> id -R cached2 | |||
|
329 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in ...s) | |||
|
330 | YYYY/MM/DD HH:MM:SS (PID)> log -R cached | |||
|
331 | YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s) |
General Comments 0
You need to be logged in to leave comments.
Login now