##// END OF EJS Templates
commandserver: preload repository in master server and reuse its file cache...
Yuya Nishihara -
r41035:dcac24ec default
parent child Browse files
Show More
@@ -0,0 +1,131 b''
1 # repocache.py - in-memory repository cache for long-running services
2 #
3 # Copyright 2018 Yuya Nishihara <yuya@tcha.org>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 import collections
11 import gc
12 import threading
13
14 from . import (
15 error,
16 hg,
17 obsolete,
18 scmutil,
19 util,
20 )
21
22 class repoloader(object):
23 """Load repositories in background thread
24
25 This is designed for a forking server. A cached repo cannot be obtained
26 until the server fork()s a worker and the loader thread stops.
27 """
28
29 def __init__(self, ui, maxlen):
30 self._ui = ui.copy()
31 self._cache = util.lrucachedict(max=maxlen)
32 # use deque and Event instead of Queue since deque can discard
33 # old items to keep at most maxlen items.
34 self._inqueue = collections.deque(maxlen=maxlen)
35 self._accepting = False
36 self._newentry = threading.Event()
37 self._thread = None
38
39 def start(self):
40 assert not self._thread
41 if self._inqueue.maxlen == 0:
42 # no need to spawn loader thread as the cache is disabled
43 return
44 self._accepting = True
45 self._thread = threading.Thread(target=self._mainloop)
46 self._thread.start()
47
48 def stop(self):
49 if not self._thread:
50 return
51 self._accepting = False
52 self._newentry.set()
53 self._thread.join()
54 self._thread = None
55 self._cache.clear()
56 self._inqueue.clear()
57
58 def load(self, path):
59 """Request to load the specified repository in background"""
60 self._inqueue.append(path)
61 self._newentry.set()
62
63 def get(self, path):
64 """Return a cached repo if available
65
66 This function must be called after fork(), where the loader thread
67 is stopped. Otherwise, the returned repo might be updated by the
68 loader thread.
69 """
70 if self._thread and self._thread.is_alive():
71 raise error.ProgrammingError(b'cannot obtain cached repo while '
72 b'loader is active')
73 return self._cache.peek(path, None)
74
75 def _mainloop(self):
76 while self._accepting:
77 # Avoid heavy GC after fork(), which would cancel the benefit of
78 # COW. We assume that GIL is acquired while GC is underway in the
79 # loader thread. If that isn't true, we might have to move
80 # gc.collect() to the main thread so that fork() would never stop
81 # the thread where GC is in progress.
82 gc.collect()
83
84 self._newentry.wait()
85 while self._accepting:
86 self._newentry.clear()
87 try:
88 path = self._inqueue.popleft()
89 except IndexError:
90 break
91 scmutil.callcatch(self._ui, lambda: self._load(path))
92
93 def _load(self, path):
94 start = util.timer()
95 # TODO: repo should be recreated if storage configuration changed
96 try:
97 # pop before loading so inconsistent state wouldn't be exposed
98 repo = self._cache.pop(path)
99 except KeyError:
100 repo = hg.repository(self._ui, path).unfiltered()
101 _warmupcache(repo)
102 repo.ui.log(b'repocache', b'loaded repo into cache: %s (in %.3fs)\n',
103 path, util.timer() - start)
104 self._cache.insert(path, repo)
105
106 # TODO: think about proper API of preloading cache
107 def _warmupcache(repo):
108 repo.invalidateall()
109 repo.changelog
110 repo.obsstore._all
111 repo.obsstore.successors
112 repo.obsstore.predecessors
113 repo.obsstore.children
114 for name in obsolete.cachefuncs:
115 obsolete.getrevs(repo, name)
116 repo._phasecache.loadphaserevs(repo)
117
118 # TODO: think about proper API of attaching preloaded attributes
119 def copycache(srcrepo, destrepo):
120 """Copy cached attributes from srcrepo to destrepo"""
121 destfilecache = destrepo._filecache
122 srcfilecache = srcrepo._filecache
123 if 'changelog' in srcfilecache:
124 destfilecache['changelog'] = ce = srcfilecache['changelog']
125 ce.obj.opener = ce.obj._realopener = destrepo.svfs
126 if 'obsstore' in srcfilecache:
127 destfilecache['obsstore'] = ce = srcfilecache['obsstore']
128 ce.obj.svfs = destrepo.svfs
129 if '_phasecache' in srcfilecache:
130 destfilecache['_phasecache'] = ce = srcfilecache['_phasecache']
131 ce.obj.opener = destrepo.svfs
@@ -28,6 +28,7 b' from . import ('
28 error,
28 error,
29 loggingutil,
29 loggingutil,
30 pycompat,
30 pycompat,
31 repocache,
31 util,
32 util,
32 vfs as vfsmod,
33 vfs as vfsmod,
33 )
34 )
@@ -511,6 +512,11 b' class unixforkingservice(object):'
511 self._oldsigchldhandler = None
512 self._oldsigchldhandler = None
512 self._workerpids = set() # updated by signal handler; do not iterate
513 self._workerpids = set() # updated by signal handler; do not iterate
513 self._socketunlinked = None
514 self._socketunlinked = None
515 # experimental config: cmdserver.max-repo-cache
516 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
517 if maxlen < 0:
518 raise error.Abort(_('negative max-repo-cache size not allowed'))
519 self._repoloader = repocache.repoloader(ui, maxlen)
514
520
515 def init(self):
521 def init(self):
516 self._sock = socket.socket(socket.AF_UNIX)
522 self._sock = socket.socket(socket.AF_UNIX)
@@ -525,6 +531,7 b' class unixforkingservice(object):'
525 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
531 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
526 self._oldsigchldhandler = o
532 self._oldsigchldhandler = o
527 self._socketunlinked = False
533 self._socketunlinked = False
534 self._repoloader.start()
528
535
529 def _unlinksocket(self):
536 def _unlinksocket(self):
530 if not self._socketunlinked:
537 if not self._socketunlinked:
@@ -537,6 +544,7 b' class unixforkingservice(object):'
537 self._mainipc.close()
544 self._mainipc.close()
538 self._workeripc.close()
545 self._workeripc.close()
539 self._unlinksocket()
546 self._unlinksocket()
547 self._repoloader.stop()
540 # don't kill child processes as they have active clients, just wait
548 # don't kill child processes as they have active clients, just wait
541 self._reapworkers(0)
549 self._reapworkers(0)
542
550
@@ -590,6 +598,10 b' class unixforkingservice(object):'
590 return
598 return
591 raise
599 raise
592
600
601 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
602 # to prevent COW memory from being touched by GC.
603 # https://instagram-engineering.com/
604 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
593 pid = os.fork()
605 pid = os.fork()
594 if pid:
606 if pid:
595 try:
607 try:
@@ -622,8 +634,7 b' class unixforkingservice(object):'
622 if inst.args[0] == errno.EINTR:
634 if inst.args[0] == errno.EINTR:
623 return
635 return
624 raise
636 raise
625
637 self._repoloader.load(path)
626 self.ui.log(b'cmdserver', b'repository: %s\n', path)
627
638
628 def _sigchldhandler(self, signal, frame):
639 def _sigchldhandler(self, signal, frame):
629 self._reapworkers(os.WNOHANG)
640 self._reapworkers(os.WNOHANG)
@@ -671,3 +682,9 b' class unixforkingservice(object):'
671
682
672 repo.__class__ = unixcmdserverrepo
683 repo.__class__ = unixcmdserverrepo
673 repo._cmdserveripc = self._workeripc
684 repo._cmdserveripc = self._workeripc
685
686 cachedrepo = self._repoloader.get(repo.root)
687 if cachedrepo is None:
688 return
689 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
690 repocache.copycache(cachedrepo, repo)
@@ -179,11 +179,14 b" coreconfigitem('cmdserver', 'max-log-fil"
179 coreconfigitem('cmdserver', 'max-log-size',
179 coreconfigitem('cmdserver', 'max-log-size',
180 default='1 MB',
180 default='1 MB',
181 )
181 )
182 coreconfigitem('cmdserver', 'max-repo-cache',
183 default=0,
184 )
182 coreconfigitem('cmdserver', 'message-encodings',
185 coreconfigitem('cmdserver', 'message-encodings',
183 default=list,
186 default=list,
184 )
187 )
185 coreconfigitem('cmdserver', 'track-log',
188 coreconfigitem('cmdserver', 'track-log',
186 default=lambda: ['chgserver', 'cmdserver'],
189 default=lambda: ['chgserver', 'cmdserver', 'repocache'],
187 )
190 )
188 coreconfigitem('color', '.*',
191 coreconfigitem('color', '.*',
189 default=None,
192 default=None,
@@ -1,6 +1,7 b''
1 #require chg
1 #require chg
2
2
3 $ mkdir log
3 $ mkdir log
4 $ cp $HGRCPATH $HGRCPATH.unconfigured
4 $ cat <<'EOF' >> $HGRCPATH
5 $ cat <<'EOF' >> $HGRCPATH
5 > [cmdserver]
6 > [cmdserver]
6 > log = $TESTTMP/log/server.log
7 > log = $TESTTMP/log/server.log
@@ -13,6 +14,7 b''
13 > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \
14 > sed -e 's!^[0-9/]* [0-9:]* ([0-9]*)>!YYYY/MM/DD HH:MM:SS (PID)>!' \
14 > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \
15 > -e 's!\(setprocname\|received fds\|setenv\): .*!\1: ...!' \
15 > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \
16 > -e 's!\(confighash\|mtimehash\) = [0-9a-f]*!\1 = ...!g' \
17 > -e 's!\(in \)[0-9.]*s\b!\1 ...s!g' \
16 > -e 's!\(pid\)=[0-9]*!\1=...!g' \
18 > -e 's!\(pid\)=[0-9]*!\1=...!g' \
17 > -e 's!\(/server-\)[0-9a-f]*!\1...!g'
19 > -e 's!\(/server-\)[0-9a-f]*!\1...!g'
18 > }
20 > }
@@ -230,6 +232,7 b' print only the last 10 lines, since we a'
230 preserved:
232 preserved:
231
233
232 $ cat log/server.log.1 log/server.log | tail -10 | filterlog
234 $ cat log/server.log.1 log/server.log | tail -10 | filterlog
235 YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
233 YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
236 YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
234 YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
237 YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
235 YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
238 YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -237,6 +240,92 b' preserved:'
237 YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
240 YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
238 YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
241 YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
239 YYYY/MM/DD HH:MM:SS (PID)> validate: []
242 YYYY/MM/DD HH:MM:SS (PID)> validate: []
240 YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
241 YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
243 YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
242 YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.
244 YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.
245
246 repository cache
247 ----------------
248
249 $ rm log/server.log*
250 $ cp $HGRCPATH.unconfigured $HGRCPATH
251 $ cat <<'EOF' >> $HGRCPATH
252 > [cmdserver]
253 > log = $TESTTMP/log/server.log
254 > max-repo-cache = 1
255 > track-log = command, repocache
256 > EOF
257
258 isolate socket directory for stable result:
259
260 $ OLDCHGSOCKNAME=$CHGSOCKNAME
261 $ mkdir chgsock
262 $ CHGSOCKNAME=`pwd`/chgsock/server
263
264 create empty repo and cache it:
265
266 $ hg init cached
267 $ hg id -R cached
268 000000000000 tip
269 $ sleep 1
270
271 modify repo (and cache will be invalidated):
272
273 $ touch cached/a
274 $ hg ci -R cached -Am 'add a'
275 adding a
276 $ sleep 1
277
278 read cached repo:
279
280 $ hg log -R cached
281 changeset: 0:ac82d8b1f7c4
282 tag: tip
283 user: test
284 date: Thu Jan 01 00:00:00 1970 +0000
285 summary: add a
286
287 $ sleep 1
288
289 discard cached from LRU cache:
290
291 $ hg clone cached cached2
292 updating to branch default
293 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
294 $ hg id -R cached2
295 ac82d8b1f7c4 tip
296 $ sleep 1
297
298 read uncached repo:
299
300 $ hg log -R cached
301 changeset: 0:ac82d8b1f7c4
302 tag: tip
303 user: test
304 date: Thu Jan 01 00:00:00 1970 +0000
305 summary: add a
306
307 $ sleep 1
308
309 shut down servers and restore environment:
310
311 $ rm -R chgsock
312 $ sleep 2
313 $ CHGSOCKNAME=$OLDCHGSOCKNAME
314
315 check server log:
316
317 $ cat log/server.log | filterlog
318 YYYY/MM/DD HH:MM:SS (PID)> init cached
319 YYYY/MM/DD HH:MM:SS (PID)> id -R cached
320 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
321 YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
322 YYYY/MM/DD HH:MM:SS (PID)> ci -R cached -Am 'add a'
323 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
324 YYYY/MM/DD HH:MM:SS (PID)> repo from cache: $TESTTMP/cached
325 YYYY/MM/DD HH:MM:SS (PID)> log -R cached
326 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
327 YYYY/MM/DD HH:MM:SS (PID)> clone cached cached2
328 YYYY/MM/DD HH:MM:SS (PID)> id -R cached2
329 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached2 (in ...s)
330 YYYY/MM/DD HH:MM:SS (PID)> log -R cached
331 YYYY/MM/DD HH:MM:SS (PID)> loaded repo into cache: $TESTTMP/cached (in ...s)
General Comments 0
You need to be logged in to leave comments. Login now