##// END OF EJS Templates
caches: updated cache backend to new vcsserver caches implementation.
marcink -
r3848:fdb508f1 default
parent child Browse files
Show More
@@ -0,0 +1,37 b''
1 # -*- coding: utf-8 -*-
2
3 import logging
4
5 from alembic.migration import MigrationContext
6 from alembic.operations import Operations
7 from sqlalchemy import Column, String
8
9 from rhodecode.lib.dbmigrate.versions import _reset_base
10 from rhodecode.model import init_model_encryption
11
12
13 log = logging.getLogger(__name__)
14
15
16 def upgrade(migrate_engine):
17 """
18 Upgrade operations go here.
19 Don't create your own engine; bind migrate_engine to your metadata
20 """
21 _reset_base(migrate_engine)
22 from rhodecode.lib.dbmigrate.schema import db_4_16_0_2
23
24 init_model_encryption(db_4_16_0_2)
25
26 context = MigrationContext.configure(migrate_engine.connect())
27 op = Operations(context)
28
29 cache_key = db_4_16_0_2.CacheKey.__table__
30
31 with op.batch_alter_table(cache_key.name) as batch_op:
32 batch_op.add_column(
33 Column("cache_state_uid", String(255), nullable=True, unique=None, default=None))
34
35
36 def downgrade(migrate_engine):
37 pass
@@ -45,7 +45,7 b' PYRAMID_SETTINGS = {}'
45 45 EXTENSIONS = {}
46 46
47 47 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
48 __dbversion__ = 98 # defines current db version for migrations
48 __dbversion__ = 99 # defines current db version for migrations
49 49 __platform__ = platform.system()
50 50 __license__ = 'AGPLv3, and Commercial License'
51 51 __author__ = 'RhodeCode GmbH'
@@ -57,9 +57,6 b' def _update_with_GET(params, request):'
57 57 params[k] += request.GET.getall(k)
58 58
59 59
60
61
62
63 60 class RepoCommitsView(RepoAppView):
64 61 def load_default_context(self):
65 62 c = self._get_local_tmpl_context(include_app_defaults=True)
@@ -93,6 +90,8 b' class RepoCommitsView(RepoAppView):'
93 90 try:
94 91 pre_load = ['affected_files', 'author', 'branch', 'date',
95 92 'message', 'parents']
93 if self.rhodecode_vcs_repo.alias == 'hg':
94 pre_load += ['hidden', 'obsolete', 'phase']
96 95
97 96 if len(commit_range) == 2:
98 97 commits = self.rhodecode_vcs_repo.get_commits(
@@ -3667,7 +3667,7 b' class PullRequest(Base, _PullRequestBase'
3667 3667 vcs_obj = self.target_repo.scm_instance()
3668 3668 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3669 3669 workspace_id)
3670 return vcs_obj._get_shadow_instance(shadow_repository_path)
3670 return vcs_obj.get_shadow_instance(shadow_repository_path)
3671 3671
3672 3672
3673 3673 class PullRequestVersion(Base, _PullRequestBase):
@@ -3750,7 +3750,7 b' class PullRequest(Base, _PullRequestBase'
3750 3750 vcs_obj = self.target_repo.scm_instance()
3751 3751 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3752 3752 workspace_id)
3753 return vcs_obj._get_shadow_instance(shadow_repository_path)
3753 return vcs_obj.get_shadow_instance(shadow_repository_path)
3754 3754
3755 3755
3756 3756 class PullRequestVersion(Base, _PullRequestBase):
@@ -3900,7 +3900,7 b' class PullRequest(Base, _PullRequestBase'
3900 3900 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3901 3901 self.target_repo.repo_id, workspace_id)
3902 3902 if os.path.isdir(shadow_repository_path):
3903 return vcs_obj._get_shadow_instance(shadow_repository_path)
3903 return vcs_obj.get_shadow_instance(shadow_repository_path)
3904 3904
3905 3905
3906 3906 class PullRequestVersion(Base, _PullRequestBase):
@@ -3974,7 +3974,7 b' class PullRequest(Base, _PullRequestBase'
3974 3974 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3975 3975 self.target_repo.repo_id, workspace_id)
3976 3976 if os.path.isdir(shadow_repository_path):
3977 return vcs_obj._get_shadow_instance(shadow_repository_path)
3977 return vcs_obj.get_shadow_instance(shadow_repository_path)
3978 3978
3979 3979
3980 3980 class PullRequestVersion(Base, _PullRequestBase):
@@ -3975,7 +3975,7 b' class PullRequest(Base, _PullRequestBase'
3975 3975 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3976 3976 self.target_repo.repo_id, workspace_id)
3977 3977 if os.path.isdir(shadow_repository_path):
3978 return vcs_obj._get_shadow_instance(shadow_repository_path)
3978 return vcs_obj.get_shadow_instance(shadow_repository_path)
3979 3979
3980 3980
3981 3981 class PullRequestVersion(Base, _PullRequestBase):
@@ -3343,7 +3343,7 b' class PullRequest(Base, _PullRequestBase'
3343 3343 vcs_obj = self.target_repo.scm_instance()
3344 3344 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3345 3345 workspace_id)
3346 return vcs_obj._get_shadow_instance(shadow_repository_path)
3346 return vcs_obj.get_shadow_instance(shadow_repository_path)
3347 3347
3348 3348
3349 3349 class PullRequestVersion(Base, _PullRequestBase):
@@ -3344,7 +3344,7 b' class PullRequest(Base, _PullRequestBase'
3344 3344 vcs_obj = self.target_repo.scm_instance()
3345 3345 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3346 3346 workspace_id)
3347 return vcs_obj._get_shadow_instance(shadow_repository_path)
3347 return vcs_obj.get_shadow_instance(shadow_repository_path)
3348 3348
3349 3349
3350 3350 class PullRequestVersion(Base, _PullRequestBase):
@@ -3602,7 +3602,7 b' class PullRequest(Base, _PullRequestBase'
3602 3602 vcs_obj = self.target_repo.scm_instance()
3603 3603 shadow_repository_path = vcs_obj._get_shadow_repository_path(
3604 3604 workspace_id)
3605 return vcs_obj._get_shadow_instance(shadow_repository_path)
3605 return vcs_obj.get_shadow_instance(shadow_repository_path)
3606 3606
3607 3607
3608 3608 class PullRequestVersion(Base, _PullRequestBase):
@@ -17,6 +17,7 b''
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20 21 import time
21 22 import errno
22 23 import logging
@@ -195,16 +195,18 b' def clear_cache_namespace(cache_region, '
195 195
196 196
197 197 class ActiveRegionCache(object):
198 def __init__(self, context):
198 def __init__(self, context, cache_data):
199 199 self.context = context
200 self.cache_data = cache_data
200 201
201 202 def should_invalidate(self):
202 203 return False
203 204
204 205
205 206 class FreshRegionCache(object):
206 def __init__(self, context):
207 def __init__(self, context, cache_data):
207 208 self.context = context
209 self.cache_data = cache_data
208 210
209 211 def should_invalidate(self):
210 212 return True
@@ -267,7 +269,7 b' class InvalidationContext(object):'
267 269 self.thread_id = threading.current_thread().ident
268 270
269 271 self.cache_key = compute_key_from_params(uid)
270 self.cache_key = 'proc:{}_thread:{}_{}'.format(
272 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
271 273 self.proc_id, self.thread_id, self.cache_key)
272 274 self.compute_time = 0
273 275
@@ -284,21 +286,23 b' class InvalidationContext(object):'
284 286 Test if current object is valid, and return CacheRegion function
285 287 that does invalidation and calculation
286 288 """
289 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
287 290 # register or get a new key based on uid
288 291 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
292 cache_data = self.cache_obj.get_dict()
289 293 self._start_time = time.time()
290 294 if self.cache_obj.cache_active:
291 295 # means our cache obj is existing and marked as it's
292 296 # cache is not outdated, we return ActiveRegionCache
293 297 self.skip_cache_active_change = True
294 298
295 return ActiveRegionCache(context=self)
299 return ActiveRegionCache(context=self, cache_data=cache_data)
296 300
297 301 # the key is either not existing or set to False, we return
298 302 # the real invalidator which re-computes value. We additionally set
299 303 # the flag to actually update the Database objects
300 304 self.skip_cache_active_change = False
301 return FreshRegionCache(context=self)
305 return FreshRegionCache(context=self, cache_data=cache_data)
302 306
303 307 def __exit__(self, exc_type, exc_val, exc_tb):
304 308 # save compute time
@@ -63,7 +63,7 b' class GitRepository(BaseRepository):'
63 63
64 64 self.path = safe_str(os.path.abspath(repo_path))
65 65 self.config = config if config else self.get_default_config()
66 self.with_wire = with_wire
66 self.with_wire = with_wire or {"cache": False} # default should not use cache
67 67
68 68 self._init_repo(create, src_url, do_workspace_checkout, bare)
69 69
@@ -72,7 +72,8 b' class GitRepository(BaseRepository):'
72 72
73 73 @LazyProperty
74 74 def _remote(self):
75 return connection.Git(self.path, self.config, with_wire=self.with_wire)
75 repo_id = self.path
76 return connection.Git(self.path, repo_id, self.config, with_wire=self.with_wire)
76 77
77 78 @LazyProperty
78 79 def bare(self):
@@ -354,7 +355,6 b' class GitRepository(BaseRepository):'
354 355
355 356 :raises TagAlreadyExistError: if tag with same name already exists
356 357 """
357 print self._refs
358 358 if name in self.tags:
359 359 raise TagAlreadyExistError("Tag %s already exists" % name)
360 360 commit = self.get_commit(commit_id=commit_id)
@@ -804,8 +804,8 b' class GitRepository(BaseRepository):'
804 804
805 805 return heads
806 806
807 def _get_shadow_instance(self, shadow_repository_path, enable_hooks=False):
808 return GitRepository(shadow_repository_path)
807 def get_shadow_instance(self, shadow_repository_path, enable_hooks=False, cache=False):
808 return GitRepository(shadow_repository_path, with_wire={"cache": cache})
809 809
810 810 def _local_pull(self, repository_path, branch_name, ff_only=True):
811 811 """
@@ -913,8 +913,8 b' class GitRepository(BaseRepository):'
913 913 if not os.path.exists(shadow_repository_path):
914 914 self._local_clone(
915 915 shadow_repository_path, target_ref.name, source_ref.name)
916 log.debug(
917 'Prepared shadow repository in %s', shadow_repository_path)
916 log.debug('Prepared %s shadow repository in %s',
917 self.alias, shadow_repository_path)
918 918
919 919 return shadow_repository_path
920 920
@@ -934,7 +934,7 b' class GitRepository(BaseRepository):'
934 934
935 935 shadow_repository_path = self._maybe_prepare_merge_workspace(
936 936 repo_id, workspace_id, target_ref, source_ref)
937 shadow_repo = self._get_shadow_instance(shadow_repository_path)
937 shadow_repo = self.get_shadow_instance(shadow_repository_path)
938 938
939 939 # checkout source, if it's different. Otherwise we could not
940 940 # fetch proper commits for merge testing
@@ -952,7 +952,7 b' class GitRepository(BaseRepository):'
952 952
953 953 # Need to reload repo to invalidate the cache, or otherwise we cannot
954 954 # retrieve the last target commit.
955 shadow_repo = self._get_shadow_instance(shadow_repository_path)
955 shadow_repo = self.get_shadow_instance(shadow_repository_path)
956 956 if target_ref.commit_id != shadow_repo.branches[target_ref.name]:
957 957 log.warning('Shadow Target ref %s commit mismatch %s vs %s',
958 958 target_ref, target_ref.commit_id,
@@ -984,9 +984,9 b' class GitRepository(BaseRepository):'
984 984 [source_ref.commit_id])
985 985 merge_possible = True
986 986
987 # Need to reload repo to invalidate the cache, or otherwise we
987 # Need to invalidate the cache, or otherwise we
988 988 # cannot retrieve the merge commit.
989 shadow_repo = GitRepository(shadow_repository_path)
989 shadow_repo = shadow_repo.get_shadow_instance(shadow_repository_path)
990 990 merge_commit_id = shadow_repo.branches[pr_branch]
991 991
992 992 # Set a reference pointing to the merge commit. This reference may
@@ -236,7 +236,7 b' class MercurialCommit(base.BaseCommit):'
236 236 Returns content of the file at given ``path``.
237 237 """
238 238 path = self._get_filectx(path)
239 return self._remote.fctx_data(self.idx, path)
239 return self._remote.fctx_node_data(self.idx, path)
240 240
241 241 def get_file_size(self, path):
242 242 """
@@ -79,7 +79,7 b' class MercurialRepository(BaseRepository'
79 79 # special requirements
80 80 self.config = config if config else self.get_default_config(
81 81 default=[('extensions', 'largefiles', '1')])
82 self.with_wire = with_wire
82 self.with_wire = with_wire or {"cache": False} # default should not use cache
83 83
84 84 self._init_repo(create, src_url, do_workspace_checkout)
85 85
@@ -88,7 +88,8 b' class MercurialRepository(BaseRepository'
88 88
89 89 @LazyProperty
90 90 def _remote(self):
91 return connection.Hg(self.path, self.config, with_wire=self.with_wire)
91 repo_id = self.path
92 return connection.Hg(self.path, repo_id, self.config, with_wire=self.with_wire)
92 93
93 94 @CachedProperty
94 95 def commit_ids(self):
@@ -185,7 +186,7 b' class MercurialRepository(BaseRepository'
185 186 self._remote.invalidate_vcs_cache()
186 187
187 188 # Reinitialize tags
188 self.tags = self._get_tags()
189 self._invalidate_prop_cache('tags')
189 190 tag_id = self.tags[name]
190 191
191 192 return self.get_commit(commit_id=tag_id)
@@ -212,7 +213,7 b' class MercurialRepository(BaseRepository'
212 213
213 214 self._remote.tag(name, nullid, message, local, user, date, tz)
214 215 self._remote.invalidate_vcs_cache()
215 self.tags = self._get_tags()
216 self._invalidate_prop_cache('tags')
216 217
217 218 @LazyProperty
218 219 def bookmarks(self):
@@ -359,7 +360,6 b' class MercurialRepository(BaseRepository'
359 360
360 361 if create:
361 362 os.makedirs(self.path, mode=0o755)
362
363 363 self._remote.localrepository(create)
364 364
365 365 @LazyProperty
@@ -738,7 +738,7 b' class MercurialRepository(BaseRepository'
738 738
739 739 shadow_repository_path = self._maybe_prepare_merge_workspace(
740 740 repo_id, workspace_id, target_ref, source_ref)
741 shadow_repo = self._get_shadow_instance(shadow_repository_path)
741 shadow_repo = self.get_shadow_instance(shadow_repository_path)
742 742
743 743 log.debug('Pulling in target reference %s', target_ref)
744 744 self._validate_pull_reference(target_ref)
@@ -818,7 +818,7 b' class MercurialRepository(BaseRepository'
818 818 shadow_repo.bookmark(
819 819 target_ref.name, revision=merge_commit_id)
820 820 try:
821 shadow_repo_with_hooks = self._get_shadow_instance(
821 shadow_repo_with_hooks = self.get_shadow_instance(
822 822 shadow_repository_path,
823 823 enable_hooks=True)
824 824 # This is the actual merge action, we push from shadow
@@ -854,11 +854,11 b' class MercurialRepository(BaseRepository'
854 854 merge_possible, merge_succeeded, merge_ref, merge_failure_reason,
855 855 metadata=metadata)
856 856
857 def _get_shadow_instance(self, shadow_repository_path, enable_hooks=False):
857 def get_shadow_instance(self, shadow_repository_path, enable_hooks=False, cache=False):
858 858 config = self.config.copy()
859 859 if not enable_hooks:
860 860 config.clear_section('hooks')
861 return MercurialRepository(shadow_repository_path, config)
861 return MercurialRepository(shadow_repository_path, config, with_wire={"cache": cache})
862 862
863 863 def _validate_pull_reference(self, reference):
864 864 if not (reference.name in self.bookmarks or
@@ -69,20 +69,21 b' class SubversionRepository(base.BaseRepo'
69 69 contact = base.BaseRepository.DEFAULT_CONTACT
70 70 description = base.BaseRepository.DEFAULT_DESCRIPTION
71 71
72 def __init__(self, repo_path, config=None, create=False, src_url=None, bare=False,
73 **kwargs):
72 def __init__(self, repo_path, config=None, create=False, src_url=None, with_wire=None,
73 bare=False, **kwargs):
74 74 self.path = safe_str(os.path.abspath(repo_path))
75 75 self.config = config if config else self.get_default_config()
76 self.with_wire = with_wire or {"cache": False} # default should not use cache
76 77
77 78 self._init_repo(create, src_url)
78 79
79 80 # caches
80 81 self._commit_ids = {}
81 82
82
83 83 @LazyProperty
84 84 def _remote(self):
85 return connection.Svn(self.path, self.config)
85 repo_id = self.path
86 return connection.Svn(self.path, repo_id, self.config, with_wire=self.with_wire)
86 87
87 88 def _init_repo(self, create, src_url):
88 89 if create and os.path.exists(self.path):
@@ -25,6 +25,7 b' Client for the VCSServer implemented bas'
25 25 import copy
26 26 import logging
27 27 import threading
28 import time
28 29 import urllib2
29 30 import urlparse
30 31 import uuid
@@ -39,7 +40,6 b' import rhodecode'
39 40 from rhodecode.lib.system_info import get_cert_path
40 41 from rhodecode.lib.vcs import exceptions, CurlSession
41 42
42
43 43 log = logging.getLogger(__name__)
44 44
45 45
@@ -54,15 +54,13 b' EXCEPTIONS_MAP = {'
54 54 class RepoMaker(object):
55 55
56 56 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
57 self.url = urlparse.urljoin(
58 'http://%s' % server_and_port, backend_endpoint)
57 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
59 58 self._session_factory = session_factory
60 59 self.backend_type = backend_type
61 60
62 def __call__(self, path, config, with_wire=None):
63 log.debug('RepoMaker call on %s', path)
64 return RemoteRepo(
65 path, config, self.url, self._session_factory(),
61 def __call__(self, path, repo_id, config, with_wire=None):
62 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
63 return RemoteRepo(path, repo_id, config, self.url, self._session_factory(),
66 64 with_wire=with_wire)
67 65
68 66 def __getattr__(self, name):
@@ -84,8 +82,7 b' class RepoMaker(object):'
84 82
85 83 class ServiceConnection(object):
86 84 def __init__(self, server_and_port, backend_endpoint, session_factory):
87 self.url = urlparse.urljoin(
88 'http://%s' % server_and_port, backend_endpoint)
85 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
89 86 self._session_factory = session_factory
90 87
91 88 def __getattr__(self, name):
@@ -107,21 +104,27 b' class ServiceConnection(object):'
107 104
108 105 class RemoteRepo(object):
109 106
110 def __init__(self, path, config, url, session, with_wire=None):
107 def __init__(self, path, repo_id, config, url, session, with_wire=None):
111 108 self.url = url
112 109 self._session = session
110 with_wire = with_wire or {}
111
112 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
113 113 self._wire = {
114 "path": path,
114 "path": path, # repo path
115 "repo_id": repo_id,
115 116 "config": config,
116 "context": self._create_vcs_cache_context(),
117 "repo_state_uid": repo_state_uid,
118 "context": self._create_vcs_cache_context(path, repo_state_uid)
117 119 }
120
118 121 if with_wire:
119 122 self._wire.update(with_wire)
120 123
121 # johbo: Trading complexity for performance. Avoiding the call to
124 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
122 125 # log.debug brings a few percent gain even if is is not active.
123 126 if log.isEnabledFor(logging.DEBUG):
124 self._call = self._call_with_logging
127 self._call_with_logging = True
125 128
126 129 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
127 130
@@ -136,30 +139,35 b' class RemoteRepo(object):'
136 139 # config object is being changed for hooking scenarios
137 140 wire = copy.deepcopy(self._wire)
138 141 wire["config"] = wire["config"].serialize()
142 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
139 143
140 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
141 144 payload = {
142 145 'id': str(uuid.uuid4()),
143 146 'method': name,
144 147 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
145 148 }
146 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
147 149
148 def _call_with_logging(self, name, *args, **kwargs):
149 context_uid = self._wire.get('context')
150 if self._call_with_logging:
151 start = time.time()
152 context_uid = wire.get('context')
150 153 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
151 154 self.url, name, args, context_uid)
152 return RemoteRepo._call(self, name, *args, **kwargs)
155 result = _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
156 if self._call_with_logging:
157 log.debug('Call %s@%s took: %.3fs. wire_context: %s',
158 self.url, name, time.time()-start, context_uid)
159 return result
153 160
154 161 def __getitem__(self, key):
155 162 return self.revision(key)
156 163
157 def _create_vcs_cache_context(self):
164 def _create_vcs_cache_context(self, *args):
158 165 """
159 166 Creates a unique string which is passed to the VCSServer on every
160 167 remote call. It is used as cache key in the VCSServer.
161 168 """
162 return str(uuid.uuid4())
169 hash_key = '-'.join(map(str, args))
170 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
163 171
164 172 def invalidate_vcs_cache(self):
165 173 """
@@ -167,7 +175,7 b' class RemoteRepo(object):'
167 175 call to a remote method. It forces the VCSServer to create a fresh
168 176 repository instance on the next call to a remote method.
169 177 """
170 self._wire['context'] = self._create_vcs_cache_context()
178 self._wire['context'] = str(uuid.uuid4())
171 179
172 180
173 181 class RemoteObject(object):
@@ -254,8 +262,7 b' class VcsHttpProxy(object):'
254 262 retries = Retry(total=5, connect=None, read=None, redirect=None)
255 263
256 264 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
257 self.base_url = urlparse.urljoin(
258 'http://%s' % server_and_port, backend_endpoint)
265 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
259 266 self.session = requests.Session()
260 267 self.session.mount('http://', adapter)
261 268
@@ -29,6 +29,7 b' import string'
29 29 import hashlib
30 30 import logging
31 31 import datetime
32 import uuid
32 33 import warnings
33 34 import ipaddress
34 35 import functools
@@ -2437,11 +2438,18 b' class Repository(Base, BaseModel):'
2437 2438 # for repo2dbmapper
2438 2439 config = kwargs.pop('config', None)
2439 2440 cache = kwargs.pop('cache', None)
2441 vcs_full_cache = kwargs.pop('vcs_full_cache', None)
2442 if vcs_full_cache is not None:
2443 # allows override global config
2444 full_cache = vcs_full_cache
2445 else:
2440 2446 full_cache = str2bool(rhodecode.CONFIG.get('vcs_full_cache'))
2441 2447 # if cache is NOT defined use default global, else we have a full
2442 2448 # control over cache behaviour
2443 2449 if cache is None and full_cache and not config:
2450 log.debug('Initializing pure cached instance for %s', self.repo_path)
2444 2451 return self._get_instance_cached()
2452
2445 2453 # cache here is sent to the "vcs server"
2446 2454 return self._get_instance(cache=bool(cache), config=config)
2447 2455
@@ -2454,8 +2462,8 b' class Repository(Base, BaseModel):'
2454 2462 region = rc_cache.get_or_create_region('cache_repo_longterm', cache_namespace_uid)
2455 2463
2456 2464 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid)
2457 def get_instance_cached(repo_id, context_id):
2458 return self._get_instance()
2465 def get_instance_cached(repo_id, context_id, _cache_state_uid):
2466 return self._get_instance(repo_state_uid=_cache_state_uid)
2459 2467
2460 2468 # we must use thread scoped cache here,
2461 2469 # because each thread of gevent needs it's own not shared connection and cache
@@ -2464,7 +2472,9 b' class Repository(Base, BaseModel):'
2464 2472 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace,
2465 2473 thread_scoped=True)
2466 2474 with inv_context_manager as invalidation_context:
2467 args = (self.repo_id, inv_context_manager.cache_key)
2475 cache_state_uid = invalidation_context.cache_data['cache_state_uid']
2476 args = (self.repo_id, inv_context_manager.cache_key, cache_state_uid)
2477
2468 2478 # re-compute and store cache if we get invalidate signal
2469 2479 if invalidation_context.should_invalidate():
2470 2480 instance = get_instance_cached.refresh(*args)
@@ -2474,10 +2484,13 b' class Repository(Base, BaseModel):'
2474 2484 log.debug('Repo instance fetched in %.3fs', inv_context_manager.compute_time)
2475 2485 return instance
2476 2486
2477 def _get_instance(self, cache=True, config=None):
2487 def _get_instance(self, cache=True, config=None, repo_state_uid=None):
2488 log.debug('Initializing %s instance `%s` with cache flag set to: %s',
2489 self.repo_type, self.repo_path, cache)
2478 2490 config = config or self._config
2479 2491 custom_wire = {
2480 'cache': cache # controls the vcs.remote cache
2492 'cache': cache, # controls the vcs.remote cache
2493 'repo_state_uid': repo_state_uid
2481 2494 }
2482 2495 repo = get_vcs_instance(
2483 2496 repo_path=safe_str(self.repo_full_path),
@@ -3497,12 +3510,15 b' class CacheKey(Base, BaseModel):'
3497 3510 cache_id = Column("cache_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
3498 3511 cache_key = Column("cache_key", String(255), nullable=True, unique=None, default=None)
3499 3512 cache_args = Column("cache_args", String(255), nullable=True, unique=None, default=None)
3513 cache_state_uid = Column("cache_state_uid", String(255), nullable=True, unique=None, default=None)
3500 3514 cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
3501 3515
3502 def __init__(self, cache_key, cache_args=''):
3516 def __init__(self, cache_key, cache_args='', cache_state_uid=None):
3503 3517 self.cache_key = cache_key
3504 3518 self.cache_args = cache_args
3505 3519 self.cache_active = False
3520 # first key should be same for all entries, since all workers should share it
3521 self.cache_state_uid = cache_state_uid or self.generate_new_state_uid(based_on=cache_args)
3506 3522
3507 3523 def __unicode__(self):
3508 3524 return u"<%s('%s:%s[%s]')>" % (
@@ -3531,6 +3547,13 b' class CacheKey(Base, BaseModel):'
3531 3547 return self._cache_key_partition()[2]
3532 3548
3533 3549 @classmethod
3550 def generate_new_state_uid(cls, based_on=None):
3551 if based_on:
3552 return str(uuid.uuid5(uuid.NAMESPACE_URL, safe_str(based_on)))
3553 else:
3554 return str(uuid.uuid4())
3555
3556 @classmethod
3534 3557 def delete_all_cache(cls):
3535 3558 """
3536 3559 Delete all cache keys from database.
@@ -3553,7 +3576,8 b' class CacheKey(Base, BaseModel):'
3553 3576 log.debug('cache objects deleted for cache args %s',
3554 3577 safe_str(cache_uid))
3555 3578 else:
3556 qry.update({"cache_active": False})
3579 qry.update({"cache_active": False,
3580 "cache_state_uid": cls.generate_new_state_uid()})
3557 3581 log.debug('cache objects marked as invalid for cache args %s',
3558 3582 safe_str(cache_uid))
3559 3583
@@ -4166,7 +4190,7 b' class PullRequest(Base, _PullRequestBase'
4166 4190 shadow_repository_path = vcs_obj._get_shadow_repository_path(
4167 4191 self.target_repo.repo_id, workspace_id)
4168 4192 if os.path.isdir(shadow_repository_path):
4169 return vcs_obj._get_shadow_instance(shadow_repository_path)
4193 return vcs_obj.get_shadow_instance(shadow_repository_path)
4170 4194
4171 4195
4172 4196 class PullRequestVersion(Base, _PullRequestBase):
@@ -37,12 +37,14 b''
37 37 <table class="rctable edit_cache">
38 38 <tr>
39 39 <th>${_('Key')}</th>
40 <th>${_('State UID')}</th>
40 41 <th>${_('Namespace')}</th>
41 42 <th>${_('Active')}</th>
42 43 </tr>
43 44 %for cache in c.rhodecode_db_repo.cache_keys:
44 45 <tr>
45 46 <td class="td-prefix"><code>${cache.cache_key}</code></td>
47 <td class="td-cachekey"><code>${cache.cache_state_uid}</code></td>
46 48 <td class="td-cachekey"><code>${cache.cache_args}</code></td>
47 49 <td class="td-active">${h.bool2icon(cache.cache_active)}</td>
48 50 </tr>
@@ -49,7 +49,7 b' def test_scm_instance_config(backend):'
49 49 mocks['_get_instance_cached'].assert_called()
50 50
51 51
52 def test__get_instance_config(backend):
52 def test_get_instance_config(backend):
53 53 repo = backend.create_repo()
54 54 vcs_class = Mock()
55 55 with patch.multiple('rhodecode.lib.vcs.backends',
@@ -61,13 +61,13 b' def test__get_instance_config(backend):'
61 61 repo._get_instance()
62 62 vcs_class.assert_called_with(
63 63 repo_path=repo.repo_full_path, config=config_mock,
64 create=False, with_wire={'cache': True})
64 create=False, with_wire={'cache': True, 'repo_state_uid': None})
65 65
66 66 new_config = {'override': 'old_config'}
67 67 repo._get_instance(config=new_config)
68 68 vcs_class.assert_called_with(
69 69 repo_path=repo.repo_full_path, config=new_config, create=False,
70 with_wire={'cache': True})
70 with_wire={'cache': True, 'repo_state_uid': None})
71 71
72 72
73 73 def test_mark_for_invalidation_config(backend):
@@ -133,8 +133,7 b' class RcVCSServer(ServerBase):'
133 133
134 134 def __init__(self, config_file, log_file=None):
135 135 super(RcVCSServer, self).__init__(config_file, log_file)
136 self._args = [
137 'gunicorn', '--paste', self.config_file]
136 self._args = ['gunicorn', '--paste', self.config_file]
138 137
139 138 def start(self):
140 139 env = os.environ.copy()
@@ -145,6 +144,7 b' class RcVCSServer(ServerBase):'
145 144 host_url = self.host_url()
146 145 assert_no_running_instance(host_url)
147 146
147 print('rhodecode-vcsserver start command: {}'.format(' '.join(self._args)))
148 148 print('rhodecode-vcsserver starting at: {}'.format(host_url))
149 149 print('rhodecode-vcsserver command: {}'.format(self.command))
150 150 print('rhodecode-vcsserver logfile: {}'.format(self.log_file))
@@ -107,7 +107,7 b' def test_repo_maker_uses_session_for_ins'
107 107 stub_session_factory, config):
108 108 repo_maker = client_http.RepoMaker(
109 109 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
110 repo = repo_maker('stub_path', config)
110 repo = repo_maker('stub_path', 'stub_repo_id', config)
111 111 repo.example_call()
112 112 stub_session_factory().post.assert_called_with(
113 113 'http://server_and_port/endpoint', data=mock.ANY)
@@ -127,7 +127,7 b' def test_repo_maker_uses_session_that_th'
127 127 stub_session_failing_factory, config):
128 128 repo_maker = client_http.RepoMaker(
129 129 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_failing_factory)
130 repo = repo_maker('stub_path', config)
130 repo = repo_maker('stub_path', 'stub_repo_id', config)
131 131
132 132 with pytest.raises(exceptions.HttpVCSCommunicationError):
133 133 repo.example_call()
@@ -751,15 +751,15 b' class TestGetShadowInstance(object):'
751 751 return repo
752 752
753 753 def test_passes_config(self, repo):
754 shadow = repo._get_shadow_instance(repo.path)
754 shadow = repo.get_shadow_instance(repo.path)
755 755 assert shadow.config == repo.config.copy()
756 756
757 757 def test_disables_hooks(self, repo):
758 shadow = repo._get_shadow_instance(repo.path)
758 shadow = repo.get_shadow_instance(repo.path)
759 759 shadow.config.clear_section.assert_called_once_with('hooks')
760 760
761 761 def test_allows_to_keep_hooks(self, repo):
762 shadow = repo._get_shadow_instance(repo.path, enable_hooks=True)
762 shadow = repo.get_shadow_instance(repo.path, enable_hooks=True)
763 763 assert not shadow.config.clear_section.called
764 764
765 765
@@ -108,7 +108,7 b' class TestMercurialRemoteRepoInvalidatio'
108 108 workspace_id = pr._workspace_id(pull_request)
109 109 shadow_repository_path = target_vcs._maybe_prepare_merge_workspace(
110 110 repo_id, workspace_id, target_ref, source_ref)
111 shadow_repo = target_vcs._get_shadow_instance(shadow_repository_path)
111 shadow_repo = target_vcs.get_shadow_instance(shadow_repository_path, cache=True)
112 112
113 113 # This will populate the cache of the mercurial repository object
114 114 # inside of the VCSServer.
@@ -127,32 +127,38 b' class TestMercurialRemoteRepoInvalidatio'
127 127 from rhodecode.lib.vcs.exceptions import CommitDoesNotExistError
128 128
129 129 pull_request = pr_util.create_pull_request()
130
130 131 target_vcs = pull_request.target_repo.scm_instance()
131 132 source_vcs = pull_request.source_repo.scm_instance()
132 shadow_repo, source_ref, target_ref = self._prepare_shadow_repo(
133 pull_request)
133 shadow_repo, source_ref, target_ref = self._prepare_shadow_repo(pull_request)
134
135 initial_cache_uid = shadow_repo._remote._wire['context']
136 initial_commit_ids = shadow_repo._remote.get_all_commit_ids('visible')
134 137
135 138 # Pull from target and source references but without invalidation of
136 # RemoteRepo objects and without VCSServer caching of mercurial
137 # repository objects.
139 # RemoteRepo objects and without VCSServer caching of mercurial repository objects.
138 140 with patch.object(shadow_repo._remote, 'invalidate_vcs_cache'):
139 141 # NOTE: Do not use patch.dict() to disable the cache because it
140 142 # restores the WHOLE dict and not only the patched keys.
141 143 shadow_repo._remote._wire['cache'] = False
142 144 shadow_repo._local_pull(target_vcs.path, target_ref)
143 145 shadow_repo._local_pull(source_vcs.path, source_ref)
144 shadow_repo._remote._wire.pop('cache')
146 shadow_repo._remote._wire['cache'] = True
145 147
146 148 # Try to lookup the target_ref in shadow repo. This should work because
149 # test_repo_maker_uses_session_for_instance_methods
147 150 # the shadow repo is a clone of the target and always contains all off
148 151 # it's commits in the initial cache.
149 152 shadow_repo.get_commit(target_ref.commit_id)
150 153
151 # If we try to lookup the source_ref it should fail because the shadow
154 # we ensure that call context has not changed, this is what
155 # `invalidate_vcs_cache` does
156 assert initial_cache_uid == shadow_repo._remote._wire['context']
157
158 # If we try to lookup all commits.
152 159 # repo commit cache doesn't get invalidated. (Due to patched
153 160 # invalidation and caching above).
154 with pytest.raises(CommitDoesNotExistError):
155 shadow_repo.get_commit(source_ref.commit_id)
161 assert initial_commit_ids == shadow_repo._remote.get_all_commit_ids('visible')
156 162
157 163 @pytest.mark.backends('hg')
158 164 def test_commit_does_not_exist_error_does_not_happen(self, pr_util, app):
@@ -166,8 +172,7 b' class TestMercurialRemoteRepoInvalidatio'
166 172 pull_request = pr_util.create_pull_request()
167 173 target_vcs = pull_request.target_repo.scm_instance()
168 174 source_vcs = pull_request.source_repo.scm_instance()
169 shadow_repo, source_ref, target_ref = self._prepare_shadow_repo(
170 pull_request)
175 shadow_repo, source_ref, target_ref = self._prepare_shadow_repo(pull_request)
171 176
172 177 # Pull from target and source references without without VCSServer
173 178 # caching of mercurial repository objects but with active invalidation
@@ -177,7 +182,7 b' class TestMercurialRemoteRepoInvalidatio'
177 182 shadow_repo._remote._wire['cache'] = False
178 183 shadow_repo._local_pull(target_vcs.path, target_ref)
179 184 shadow_repo._local_pull(source_vcs.path, source_ref)
180 shadow_repo._remote._wire.pop('cache')
185 shadow_repo._remote._wire['cache'] = True
181 186
182 187 # Try to lookup the target and source references in shadow repo. This
183 188 # should work because the RemoteRepo object gets invalidated during the
@@ -65,7 +65,7 b' propagate = 1'
65 65 [handler_console]
66 66 class = StreamHandler
67 67 args = (sys.stderr,)
68 level = INFO
68 level = DEBUG
69 69 formatter = generic
70 70
71 71 ################
General Comments 0
You need to be logged in to leave comments. Login now