##// END OF EJS Templates
commandserver: preload repository in master server and reuse its file cache...
Yuya Nishihara -
r41035:dcac24ec default
parent child Browse files
Show More
@@ -0,0 +1,131 b''
1 # repocache.py - in-memory repository cache for long-running services
2 #
3 # Copyright 2018 Yuya Nishihara <yuya@tcha.org>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 import collections
11 import gc
12 import threading
13
14 from . import (
15 error,
16 hg,
17 obsolete,
18 scmutil,
19 util,
20 )
21
22 class repoloader(object):
23 """Load repositories in background thread
24
25 This is designed for a forking server. A cached repo cannot be obtained
26 until the server fork()s a worker and the loader thread stops.
27 """
28
29 def __init__(self, ui, maxlen):
30 self._ui = ui.copy()
31 self._cache = util.lrucachedict(max=maxlen)
32 # use deque and Event instead of Queue since deque can discard
33 # old items to keep at most maxlen items.
34 self._inqueue = collections.deque(maxlen=maxlen)
35 self._accepting = False
36 self._newentry = threading.Event()
37 self._thread = None
38
39 def start(self):
40 assert not self._thread
41 if self._inqueue.maxlen == 0:
42 # no need to spawn loader thread as the cache is disabled
43 return
44 self._accepting = True
45 self._thread = threading.Thread(target=self._mainloop)
46 self._thread.start()
47
48 def stop(self):
49 if not self._thread:
50 return
51 self._accepting = False
52 self._newentry.set()
53 self._thread.join()
54 self._thread = None
55 self._cache.clear()
56 self._inqueue.clear()
57
58 def load(self, path):
59 """Request to load the specified repository in background"""
60 self._inqueue.append(path)
61 self._newentry.set()
62
63 def get(self, path):
64 """Return a cached repo if available
65
66 This function must be called after fork(), where the loader thread
67 is stopped. Otherwise, the returned repo might be updated by the
68 loader thread.
69 """
70 if self._thread and self._thread.is_alive():
71 raise error.ProgrammingError(b'cannot obtain cached repo while '
72 b'loader is active')
73 return self._cache.peek(path, None)
74
75 def _mainloop(self):
76 while self._accepting:
77 # Avoid heavy GC after fork(), which would cancel the benefit of
78 # COW. We assume that GIL is acquired while GC is underway in the
79 # loader thread. If that isn't true, we might have to move
80 # gc.collect() to the main thread so that fork() would never stop
81 # the thread where GC is in progress.
82 gc.collect()
83
84 self._newentry.wait()
85 while self._accepting:
86 self._newentry.clear()
87 try:
88 path = self._inqueue.popleft()
89 except IndexError:
90 break
91 scmutil.callcatch(self._ui, lambda: self._load(path))
92
93 def _load(self, path):
94 start = util.timer()
95 # TODO: repo should be recreated if storage configuration changed
96 try:
97 # pop before loading so inconsistent state wouldn't be exposed
98 repo = self._cache.pop(path)
99 except KeyError:
100 repo = hg.repository(self._ui, path).unfiltered()
101 _warmupcache(repo)
102 repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n',
103 path, util.timer() - start)
104 self._cache.insert(path, repo)
105
106 # TODO: think about proper API of preloading cache
107 def _warmupcache(repo):
108 repo.invalidateall()
109 repo.changelog
110 repo.obsstore._all
111 repo.obsstore.successors
112 repo.obsstore.predecessors
113 repo.obsstore.children
114 for name in obsolete.cachefuncs:
115 obsolete.getrevs(repo, name)
116 repo._phasecache.loadphaserevs(repo)
117
118 # TODO: think about proper API of attaching preloaded attributes
119 def copycache(srcrepo, destrepo):
120 """Copy cached attributes from srcrepo to destrepo"""
121 destfilecache = destrepo._filecache
122 srcfilecache = srcrepo._filecache
123 if 'changelog' in srcfilecache:
124 destfilecache['changelog'] = ce = srcfilecache['changelog']
125 ce.obj.opener = ce.obj._realopener = destrepo.svfs
126 if 'obsstore' in srcfilecache:
127 destfilecache['obsstore'] = ce = srcfilecache['obsstore']
128 ce.obj.svfs = destrepo.svfs
129 if '_phasecache' in srcfilecache:
130 destfilecache['_phasecache'] = ce = srcfilecache['_phasecache']
131 ce.obj.opener = destrepo.svfs
@@ -28,6 +28,7 b' from . import ('
28 28 error,
29 29 loggingutil,
30 30 pycompat,
31 repocache,
31 32 util,
32 33 vfs as vfsmod,
33 34 )
@@ -511,6 +512,11 b' class unixforkingservice(object):'
511 512 self._oldsigchldhandler = None
512 513 self._workerpids = set() # updated by signal handler; do not iterate
513 514 self._socketunlinked = None
515 # experimental config: cmdserver.max-repo-cache
516 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
517 if maxlen < 0:
518 raise error.Abort(_('negative max-repo-cache size not allowed'))
519 self._repoloader = repocache.repoloader(ui, maxlen)
514 520
515 521 def init(self):
516 522 self._sock = socket.socket(socket.AF_UNIX)
@@ -525,6 +531,7 b' class unixforkingservice(object):'
525 531 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
526 532 self._oldsigchldhandler = o
527 533 self._socketunlinked = False
534 self._repoloader.start()
528 535
529 536 def _unlinksocket(self):
530 537 if not self._socketunlinked:
@@ -537,6 +544,7 b' class unixforkingservice(object):'
537 544 self._mainipc.close()
538 545 self._workeripc.close()
539 546 self._unlinksocket()
547 self._repoloader.stop()
540 548 # don't kill child processes as they have active clients, just wait
541 549 self._reapworkers(0)
542 550
@@ -590,6 +598,10 b' class unixforkingservice(object):'
590 598 return
591 599 raise
592 600
601 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
602 # to prevent COW memory from being touched by GC.
603 # https://instagram-engineering.com/
604 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
593 605 pid = os.fork()
594 606 if pid:
595 607 try:
@@ -622,8 +634,7 b' class unixforkingservice(object):'
622 634 if inst.args[0] == errno.EINTR:
623 635 return
624 636 raise
625
626 self.ui.log(b'cmdserver', b'repository: %s\n', path)
637 self._repoloader.load(path)
627 638
628 639 def _sigchldhandler(self, signal, frame):
629 640 self._reapworkers(os.WNOHANG)
@@ -671,3 +682,9 b' class unixforkingservice(object):'
671 682
672 683 repo.__class__ = unixcmdserverrepo
673 684 repo._cmdserveripc = self._workeripc
685
686 cachedrepo = self._repoloader.get(repo.root)
687 if cachedrepo is None:
688 return
689 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
690 repocache.copycache(cachedrepo, repo)
@@ -179,11 +179,14 b" coreconfigitem('cmdserver', 'max-log-fil"
179 179 coreconfigitem('cmdserver', 'max-log-size',
180 180 default='1 MB',
181 181 )
182 coreconfigitem('cmdserver', 'max-repo-cache',
183 default=0,
184 )
182 185 coreconfigitem('cmdserver', 'message-encodings',
183 186 default=list,
184 187 )
185 188 coreconfigitem('cmdserver', 'track-log',
186 default=lambda: ['chgserver', 'cmdserver'],
189 default=lambda: ['chgserver', 'cmdserver', 'repocache'],
187 190 )
188 191 coreconfigitem('color', '.*',
189 192 default=None,
@@ -1,6 +1,7 b''
1 1 #require chg
2 2
3 3 $ mkdir log
4 $ cp $HGRCPATH $HGRCPATH.unconfigured
4 5 $ cat <<'EOF' >> $HGRCPATH
5 6 > [cmdserver]
6 7 > log = $TESTTMP/log/server.log
@@ -13,6 +14,7 b''
13 14 > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \
14 15 > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \
15 16 > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \
17 > -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \
16 18 > -e 's!\(pid\)=[0-9]*!\1=...!g' \
17 19 > -e 's!\(/server-\)[0-9a-f]*!\1...!g'
18 20 > }
@@ -230,6 +232,7 b' print only the last 10 lines, since we a'
230 232 preserved:
231 233
232 234 $ cat log/server.log.1 log/server.log | tail -10 | filterlog
235 YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
233 236 YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
234 237 YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
235 238 YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -237,6 +240,92 b' preserved:'
237 240 YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
238 241 YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
239 242 YYYY/MM/DD HH:MM:SS (PID)> validate: []
240 YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
241 243 YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
242 244 YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.
245
246 repository cache
247 ----------------
248
249 $ rm log/server.log*
250 $ cp $HGRCPATH.unconfigured $HGRCPATH
251 $ cat <<'EOF' >> $HGRCPATH
252 > [cmdserver]
253 > log = $TESTTMP/log/server.log
254 > max-repo-cache = 1
255 > track-log = command, repocache
256 > EOF
257
258 isolate socket directory for stable result:
259
260 $ OLDCHGSOCKNAME=$CHGSOCKNAME
261 $ mkdir chgsock
262 $ CHGSOCKNAME=`pwd`/chgsock/server
263
264 create empty repo and cache it:
265
266 $ hg init cached
267 $ hg id -R cached
268 000000000000 tip
269 $ sleep 1
270
271 modify repo (and cache will be invalidated):
272
273 $ touch cached/a
274 $ hg ci -R cached -Am 'add a'
275 adding a
276 $ sleep 1
277
278 read cached repo:
279
280 $ hg log -R cached
281 changeset: 0:ac82d8b1f7c4
282 tag: tip
283 user: test
284 date: Thu Jan 01 00:00:00 1970 +0000
285 summary: add a
286
287 $ sleep 1
288
289 discard cached from LRU cache:
290
291 $ hg clone cached cached2
292 updating to branch default
293 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
294 $ hg id -R cached2
295 ac82d8b1f7c4 tip
296 $ sleep 1
297
298 read uncached repo:
299
300 $ hg log -R cached
301 changeset: 0:ac82d8b1f7c4
302 tag: tip
303 user: test
304 date: Thu Jan 01 00:00:00 1970 +0000
305 summary: add a
306
307 $ sleep 1
308
309 shut down servers and restore environment:
310
311 $ rm -R chgsock
312 $ sleep 2
313 $ CHGSOCKNAME=$OLDCHGSOCKNAME
314
315 check server log:
316
317 $ cat log/server.log | filterlog
318 YYYY/MM/DD HH:MM:SS (PID)> init cached
319 YYYY/MM/DD HH:MM:SS (PID)> id -R cached
320 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
321 YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
322 YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a'
323 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
324 YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
325 YYYY/MM/DD HH:MM:SS (PID)> log -R cached
326 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
327 YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2
328 YYYY/MM/DD HH:MM:SS (PID)> id -R cached2
329 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in ...s)
330 YYYY/MM/DD HH:MM:SS (PID)> log -R cached
331 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
General Comments 0
You need to be logged in to leave comments. Login now