##// END OF EJS Templates
python3: fixed some compat problems
super-admin -
r4916:86c3a981 default
parent child Browse files
Show More
@@ -1,364 +1,364 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 import redis
26 27 import gevent
27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from pyramid.settings import asbool
37 37
38 38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from rhodecode.lib.utils import safe_str, safe_unicode
39 from rhodecode.lib.utils import safe_str
40 40
41 41
42 42 _default_max_size = 1024
43 43
44 44 log = logging.getLogger(__name__)
45 45
46 46
47 47 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 48 key_prefix = 'lru_mem_backend'
49 49 pickle_values = False
50 50
51 51 def __init__(self, arguments):
52 52 max_size = arguments.pop('max_size', _default_max_size)
53 53
54 54 LRUDictClass = LRUDict
55 55 if arguments.pop('log_key_count', None):
56 56 LRUDictClass = LRUDictDebug
57 57
58 58 arguments['cache_dict'] = LRUDictClass(max_size)
59 59 super(LRUMemoryBackend, self).__init__(arguments)
60 60
61 61 def delete(self, key):
62 62 try:
63 63 del self._cache[key]
64 64 except KeyError:
65 65 # we don't care if key isn't there at deletion
66 66 pass
67 67
68 68 def delete_multi(self, keys):
69 69 for key in keys:
70 70 self.delete(key)
71 71
72 72
73 73 class PickleSerializer(object):
74 74
75 75 def _dumps(self, value, safe=False):
76 76 try:
77 return compat.pickle.dumps(value)
77 return pickle.dumps(value)
78 78 except Exception:
79 79 if safe:
80 80 return NO_VALUE
81 81 else:
82 82 raise
83 83
84 84 def _loads(self, value, safe=True):
85 85 try:
86 return compat.pickle.loads(value)
86 return pickle.loads(value)
87 87 except Exception:
88 88 if safe:
89 89 return NO_VALUE
90 90 else:
91 91 raise
92 92
93 93
94 94 class MsgPackSerializer(object):
95 95
96 96 def _dumps(self, value, safe=False):
97 97 try:
98 98 return msgpack.packb(value)
99 99 except Exception:
100 100 if safe:
101 101 return NO_VALUE
102 102 else:
103 103 raise
104 104
105 105 def _loads(self, value, safe=True):
106 106 """
107 107 pickle maintained the `CachedValue` wrapper of the tuple
108 108 msgpack does not, so it must be added back in.
109 109 """
110 110 try:
111 111 value = msgpack.unpackb(value, use_list=False)
112 112 return CachedValue(*value)
113 113 except Exception:
114 114 if safe:
115 115 return NO_VALUE
116 116 else:
117 117 raise
118 118
119 119
120 120 import fcntl
121 121 flock_org = fcntl.flock
122 122
123 123
124 124 class CustomLockFactory(FileLock):
125 125
126 126 @memoized_property
127 127 def _module(self):
128 128
129 129 def gevent_flock(fd, operation):
130 130 """
131 131 Gevent compatible flock
132 132 """
133 133 # set non-blocking, this will cause an exception if we cannot acquire a lock
134 134 operation |= fcntl.LOCK_NB
135 135 start_lock_time = time.time()
136 136 timeout = 60 * 15 # 15min
137 137 while True:
138 138 try:
139 139 flock_org(fd, operation)
140 140 # lock has been acquired
141 141 break
142 142 except (OSError, IOError) as e:
143 143 # raise on other errors than Resource temporarily unavailable
144 144 if e.errno != errno.EAGAIN:
145 145 raise
146 146 elif (time.time() - start_lock_time) > timeout:
147 147 # waited to much time on a lock, better fail than loop for ever
148 148 log.error('Failed to acquire lock on `%s` after waiting %ss',
149 149 self.filename, timeout)
150 150 raise
151 151 wait_timeout = 0.03
152 152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
153 153 self.filename, wait_timeout)
154 154 gevent.sleep(wait_timeout)
155 155
156 156 fcntl.flock = gevent_flock
157 157 return fcntl
158 158
159 159
160 160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
161 161 key_prefix = 'file_backend'
162 162
163 163 def __init__(self, arguments):
164 164 arguments['lock_factory'] = CustomLockFactory
165 165 db_file = arguments.get('filename')
166 166
167 167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
168 168 try:
169 169 super(FileNamespaceBackend, self).__init__(arguments)
170 170 except Exception:
171 171 log.exception('Failed to initialize db at: %s', db_file)
172 172 raise
173 173
174 174 def __repr__(self):
175 175 return '{} `{}`'.format(self.__class__, self.filename)
176 176
177 177 def list_keys(self, prefix=''):
178 178 prefix = '{}:{}'.format(self.key_prefix, prefix)
179 179
180 180 def cond(v):
181 181 if not prefix:
182 182 return True
183 183
184 184 if v.startswith(prefix):
185 185 return True
186 186 return False
187 187
188 188 with self._dbm_file(True) as dbm:
189 189 try:
190 return filter(cond, dbm.keys())
190 return list(filter(cond, list(dbm.keys())))
191 191 except Exception:
192 192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
193 193 raise
194 194
195 195 def get_store(self):
196 196 return self.filename
197 197
198 198 def _dbm_get(self, key):
199 199 with self._dbm_file(False) as dbm:
200 200 if hasattr(dbm, 'get'):
201 201 value = dbm.get(key, NO_VALUE)
202 202 else:
203 203 # gdbm objects lack a .get method
204 204 try:
205 205 value = dbm[key]
206 206 except KeyError:
207 207 value = NO_VALUE
208 208 if value is not NO_VALUE:
209 209 value = self._loads(value)
210 210 return value
211 211
212 212 def get(self, key):
213 213 try:
214 214 return self._dbm_get(key)
215 215 except Exception:
216 216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
217 217 raise
218 218
219 219 def set(self, key, value):
220 220 with self._dbm_file(True) as dbm:
221 221 dbm[key] = self._dumps(value)
222 222
223 223 def set_multi(self, mapping):
224 224 with self._dbm_file(True) as dbm:
225 225 for key, value in mapping.items():
226 226 dbm[key] = self._dumps(value)
227 227
228 228
229 229 class BaseRedisBackend(redis_backend.RedisBackend):
230 230 key_prefix = ''
231 231
232 232 def __init__(self, arguments):
233 233 super(BaseRedisBackend, self).__init__(arguments)
234 234 self._lock_timeout = self.lock_timeout
235 235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
236 236
237 237 if self._lock_auto_renewal and not self._lock_timeout:
238 238 # set default timeout for auto_renewal
239 239 self._lock_timeout = 30
240 240
241 241 def _create_client(self):
242 242 args = {}
243 243
244 244 if self.url is not None:
245 245 args.update(url=self.url)
246 246
247 247 else:
248 248 args.update(
249 249 host=self.host, password=self.password,
250 250 port=self.port, db=self.db
251 251 )
252 252
253 253 connection_pool = redis.ConnectionPool(**args)
254 254
255 255 return redis.StrictRedis(connection_pool=connection_pool)
256 256
257 257 def list_keys(self, prefix=''):
258 258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
259 259 return self.client.keys(prefix)
260 260
261 261 def get_store(self):
262 262 return self.client.connection_pool
263 263
264 264 def get(self, key):
265 265 value = self.client.get(key)
266 266 if value is None:
267 267 return NO_VALUE
268 268 return self._loads(value)
269 269
270 270 def get_multi(self, keys):
271 271 if not keys:
272 272 return []
273 273 values = self.client.mget(keys)
274 274 loads = self._loads
275 275 return [
276 276 loads(v) if v is not None else NO_VALUE
277 277 for v in values]
278 278
279 279 def set(self, key, value):
280 280 if self.redis_expiration_time:
281 281 self.client.setex(key, self.redis_expiration_time,
282 282 self._dumps(value))
283 283 else:
284 284 self.client.set(key, self._dumps(value))
285 285
286 286 def set_multi(self, mapping):
287 287 dumps = self._dumps
288 288 mapping = dict(
289 289 (k, dumps(v))
290 290 for k, v in mapping.items()
291 291 )
292 292
293 293 if not self.redis_expiration_time:
294 294 self.client.mset(mapping)
295 295 else:
296 296 pipe = self.client.pipeline()
297 297 for key, value in mapping.items():
298 298 pipe.setex(key, self.redis_expiration_time, value)
299 299 pipe.execute()
300 300
301 301 def get_mutex(self, key):
302 302 if self.distributed_lock:
303 lock_key = u'_lock_{0}'.format(safe_unicode(key))
303 lock_key = '_lock_{0}'.format(safe_str(key))
304 304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 305 auto_renewal=self._lock_auto_renewal)
306 306 else:
307 307 return None
308 308
309 309
310 310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 311 key_prefix = 'redis_pickle_backend'
312 312 pass
313 313
314 314
315 315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 316 key_prefix = 'redis_msgpack_backend'
317 317 pass
318 318
319 319
320 320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 321 import redis_lock
322 322
323 323 class _RedisLockWrapper(object):
324 324 """LockWrapper for redis_lock"""
325 325
326 326 @classmethod
327 327 def get_lock(cls):
328 328 return redis_lock.Lock(
329 329 redis_client=client,
330 330 name=lock_key,
331 331 expire=lock_timeout,
332 332 auto_renewal=auto_renewal,
333 333 strict=True,
334 334 )
335 335
336 336 def __repr__(self):
337 337 return "{}:{}".format(self.__class__.__name__, lock_key)
338 338
339 339 def __str__(self):
340 340 return "{}:{}".format(self.__class__.__name__, lock_key)
341 341
342 342 def __init__(self):
343 343 self.lock = self.get_lock()
344 344 self.lock_key = lock_key
345 345
346 346 def acquire(self, wait=True):
347 347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
348 348 try:
349 349 acquired = self.lock.acquire(wait)
350 350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
351 351 return acquired
352 352 except redis_lock.AlreadyAcquired:
353 353 return False
354 354 except redis_lock.AlreadyStarted:
355 355 # refresh thread exists, but it also means we acquired the lock
356 356 return True
357 357
358 358 def release(self):
359 359 try:
360 360 self.lock.release()
361 361 except redis_lock.NotAcquired:
362 362 pass
363 363
364 364 return _RedisLockWrapper()
@@ -1,423 +1,368 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 import decorator
24 25 import threading
25 26
26 27 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
28 28
29 29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_bytes, sha1
31 31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 33
34 34 from rhodecode.lib.rc_cache import cache_key_meta
35 35 from rhodecode.lib.rc_cache import region_meta
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def isCython(func):
41 41 """
42 42 Private helper that checks if a function is a cython function.
43 43 """
44 44 return func.__class__.__name__ == 'cython_function_or_method'
45 45
46 46
47 47 class RhodeCodeCacheRegion(CacheRegion):
48 48
49 49 def conditional_cache_on_arguments(
50 50 self, namespace=None,
51 51 expiration_time=None,
52 52 should_cache_fn=None,
53 to_str=compat.string_type,
53 to_str=str,
54 54 function_key_generator=None,
55 55 condition=True):
56 56 """
57 57 Custom conditional decorator, that will not touch any dogpile internals if
58 58 condition isn't meet. This works a bit different than should_cache_fn
59 59 And it's faster in cases we don't ever want to compute cached values
60 60 """
61 expiration_time_is_callable = compat.callable(expiration_time)
61 expiration_time_is_callable = callable(expiration_time)
62 62
63 63 if function_key_generator is None:
64 64 function_key_generator = self.function_key_generator
65 65
66 # workaround for py2 and cython problems, this block should be removed
67 # once we've migrated to py3
68 if 'cython' == 'cython':
69 def decorator(fn):
70 if to_str is compat.string_type:
71 # backwards compatible
72 key_generator = function_key_generator(namespace, fn)
73 else:
74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
75
76 @functools.wraps(fn)
77 def decorate(*arg, **kw):
78 key = key_generator(*arg, **kw)
79
80 @functools.wraps(fn)
81 def creator():
82 return fn(*arg, **kw)
83
84 if not condition:
85 return creator()
86
87 timeout = expiration_time() if expiration_time_is_callable \
88 else expiration_time
89
90 return self.get_or_create(key, creator, timeout, should_cache_fn)
91
92 def invalidate(*arg, **kw):
93 key = key_generator(*arg, **kw)
94 self.delete(key)
95
96 def set_(value, *arg, **kw):
97 key = key_generator(*arg, **kw)
98 self.set(key, value)
99
100 def get(*arg, **kw):
101 key = key_generator(*arg, **kw)
102 return self.get(key)
103
104 def refresh(*arg, **kw):
105 key = key_generator(*arg, **kw)
106 value = fn(*arg, **kw)
107 self.set(key, value)
108 return value
109
110 decorate.set = set_
111 decorate.invalidate = invalidate
112 decorate.refresh = refresh
113 decorate.get = get
114 decorate.original = fn
115 decorate.key_generator = key_generator
116 decorate.__wrapped__ = fn
117
118 return decorate
119 return decorator
120
121 66 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
122 67
123 68 if not condition:
124 log.debug('Calling un-cached method:%s', user_func.func_name)
69 log.debug('Calling un-cached method:%s', user_func.__name__)
125 70 start = time.time()
126 71 result = user_func(*arg, **kw)
127 72 total = time.time() - start
128 log.debug('un-cached method:%s took %.4fs', user_func.func_name, total)
73 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
129 74 return result
130 75
131 76 key = key_generator(*arg, **kw)
132 77
133 78 timeout = expiration_time() if expiration_time_is_callable \
134 79 else expiration_time
135 80
136 log.debug('Calling cached method:`%s`', user_func.func_name)
81 log.debug('Calling cached method:`%s`', user_func.__name__)
137 82 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
138 83
139 84 def cache_decorator(user_func):
140 if to_str is compat.string_type:
85 if to_str is str:
141 86 # backwards compatible
142 87 key_generator = function_key_generator(namespace, user_func)
143 88 else:
144 89 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
145 90
146 91 def refresh(*arg, **kw):
147 92 """
148 93 Like invalidate, but regenerates the value instead
149 94 """
150 95 key = key_generator(*arg, **kw)
151 96 value = user_func(*arg, **kw)
152 97 self.set(key, value)
153 98 return value
154 99
155 100 def invalidate(*arg, **kw):
156 101 key = key_generator(*arg, **kw)
157 102 self.delete(key)
158 103
159 104 def set_(value, *arg, **kw):
160 105 key = key_generator(*arg, **kw)
161 106 self.set(key, value)
162 107
163 108 def get(*arg, **kw):
164 109 key = key_generator(*arg, **kw)
165 110 return self.get(key)
166 111
167 112 user_func.set = set_
168 113 user_func.invalidate = invalidate
169 114 user_func.get = get
170 115 user_func.refresh = refresh
171 116 user_func.key_generator = key_generator
172 117 user_func.original = user_func
173 118
174 119 # Use `decorate` to preserve the signature of :param:`user_func`.
175 120 return decorator.decorate(user_func, functools.partial(
176 121 get_or_create_for_user_func, key_generator))
177 122
178 123 return cache_decorator
179 124
180 125
181 126 def make_region(*arg, **kw):
182 127 return RhodeCodeCacheRegion(*arg, **kw)
183 128
184 129
185 130 def get_default_cache_settings(settings, prefixes=None):
186 131 prefixes = prefixes or []
187 132 cache_settings = {}
188 133 for key in settings.keys():
189 134 for prefix in prefixes:
190 135 if key.startswith(prefix):
191 136 name = key.split(prefix)[1].strip()
192 137 val = settings[key]
193 138 if isinstance(val, str):
194 139 val = val.strip()
195 140 cache_settings[name] = val
196 141 return cache_settings
197 142
198 143
199 144 def compute_key_from_params(*args):
200 145 """
201 146 Helper to compute key from given params to be used in cache manager
202 147 """
203 return sha1("_".join(map(safe_str, args)))
148 return sha1(safe_bytes("_".join(map(str, args))))
204 149
205 150
206 151 def backend_key_generator(backend):
207 152 """
208 153 Special wrapper that also sends over the backend to the key generator
209 154 """
210 155 def wrapper(namespace, fn):
211 156 return key_generator(backend, namespace, fn)
212 157 return wrapper
213 158
214 159
215 160 def key_generator(backend, namespace, fn):
216 161 fname = fn.__name__
217 162
218 163 def generate_key(*args):
219 164 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
220 165 namespace_pref = namespace or 'default_namespace'
221 166 arg_key = compute_key_from_params(*args)
222 167 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
223 168
224 169 return final_key
225 170
226 171 return generate_key
227 172
228 173
229 174 def get_or_create_region(region_name, region_namespace=None):
230 175 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
231 176 region_obj = region_meta.dogpile_cache_regions.get(region_name)
232 177 if not region_obj:
233 178 raise EnvironmentError(
234 179 'Region `{}` not in configured: {}.'.format(
235 region_name, region_meta.dogpile_cache_regions.keys()))
180 region_name, list(region_meta.dogpile_cache_regions.keys())))
236 181
237 182 region_uid_name = '{}:{}'.format(region_name, region_namespace)
238 183 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
239 184 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
240 185 if region_exist:
241 186 log.debug('Using already configured region: %s', region_namespace)
242 187 return region_exist
243 188 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
244 189 expiration_time = region_obj.expiration_time
245 190
246 191 if not os.path.isdir(cache_dir):
247 192 os.makedirs(cache_dir)
248 193 new_region = make_region(
249 194 name=region_uid_name,
250 195 function_key_generator=backend_key_generator(region_obj.actual_backend)
251 196 )
252 197 namespace_filename = os.path.join(
253 198 cache_dir, "{}.cache.dbm".format(region_namespace))
254 199 # special type that allows 1db per namespace
255 200 new_region.configure(
256 201 backend='dogpile.cache.rc.file_namespace',
257 202 expiration_time=expiration_time,
258 203 arguments={"filename": namespace_filename}
259 204 )
260 205
261 206 # create and save in region caches
262 207 log.debug('configuring new region: %s', region_uid_name)
263 208 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
264 209
265 210 return region_obj
266 211
267 212
268 213 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
269 214 region = get_or_create_region(cache_region, cache_namespace_uid)
270 215 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
271 216 num_delete_keys = len(cache_keys)
272 217 if invalidate:
273 218 region.invalidate(hard=False)
274 219 else:
275 220 if num_delete_keys:
276 221 region.delete_multi(cache_keys)
277 222 return num_delete_keys
278 223
279 224
280 225 class ActiveRegionCache(object):
281 226 def __init__(self, context, cache_data):
282 227 self.context = context
283 228 self.cache_data = cache_data
284 229
285 230 def should_invalidate(self):
286 231 return False
287 232
288 233
289 234 class FreshRegionCache(object):
290 235 def __init__(self, context, cache_data):
291 236 self.context = context
292 237 self.cache_data = cache_data
293 238
294 239 def should_invalidate(self):
295 240 return True
296 241
297 242
298 243 class InvalidationContext(object):
299 244 """
300 245 usage::
301 246
302 247 from rhodecode.lib import rc_cache
303 248
304 249 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
305 250 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
306 251
307 252 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
308 253 def heavy_compute(cache_name, param1, param2):
309 254 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
310 255
311 256 # invalidation namespace is shared namespace key for all process caches
312 257 # we use it to send a global signal
313 258 invalidation_namespace = 'repo_cache:1'
314 259
315 260 inv_context_manager = rc_cache.InvalidationContext(
316 261 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
317 262 with inv_context_manager as invalidation_context:
318 263 args = ('one', 'two')
319 264 # re-compute and store cache if we get invalidate signal
320 265 if invalidation_context.should_invalidate():
321 266 result = heavy_compute.refresh(*args)
322 267 else:
323 268 result = heavy_compute(*args)
324 269
325 270 compute_time = inv_context_manager.compute_time
326 271 log.debug('result computed in %.4fs', compute_time)
327 272
328 273 # To send global invalidation signal, simply run
329 274 CacheKey.set_invalidate(invalidation_namespace)
330 275
331 276 """
332 277
333 278 def __repr__(self):
334 279 return '<InvalidationContext:{}[{}]>'.format(
335 280 safe_str(self.cache_key), safe_str(self.uid))
336 281
337 282 def __init__(self, uid, invalidation_namespace='',
338 283 raise_exception=False, thread_scoped=None):
339 284 self.uid = uid
340 285 self.invalidation_namespace = invalidation_namespace
341 286 self.raise_exception = raise_exception
342 287 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
343 288 self.thread_id = 'global'
344 289
345 290 if thread_scoped is None:
346 291 # if we set "default" we can override this via .ini settings
347 292 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
348 293
349 294 # Append the thread id to the cache key if this invalidation context
350 295 # should be scoped to the current thread.
351 296 if thread_scoped is True:
352 297 self.thread_id = threading.current_thread().ident
353 298
354 299 self.cache_key = compute_key_from_params(uid)
355 300 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
356 301 self.proc_id, self.thread_id, self.cache_key)
357 302 self.proc_key = 'proc:{}'.format(self.proc_id)
358 303 self.compute_time = 0
359 304
360 305 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
361 306 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
362 307 # fetch all cache keys for this namespace and convert them to a map to find if we
363 308 # have specific cache_key object registered. We do this because we want to have
364 309 # all consistent cache_state_uid for newly registered objects
365 310 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
366 311 cache_obj = cache_obj_map.get(self.cache_key)
367 312 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
368 313 if not cache_obj:
369 314 new_cache_args = invalidation_namespace
370 315 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
371 316 cache_state_uid = None
372 317 if first_cache_obj:
373 318 cache_state_uid = first_cache_obj.cache_state_uid
374 319 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
375 320 cache_state_uid=cache_state_uid)
376 321 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
377 322
378 323 return cache_obj
379 324
380 325 def __enter__(self):
381 326 """
382 327 Test if current object is valid, and return CacheRegion function
383 328 that does invalidation and calculation
384 329 """
385 330 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
386 331 # register or get a new key based on uid
387 332 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
388 333 cache_data = self.cache_obj.get_dict()
389 334 self._start_time = time.time()
390 335 if self.cache_obj.cache_active:
391 336 # means our cache obj is existing and marked as it's
392 337 # cache is not outdated, we return ActiveRegionCache
393 338 self.skip_cache_active_change = True
394 339
395 340 return ActiveRegionCache(context=self, cache_data=cache_data)
396 341
397 342 # the key is either not existing or set to False, we return
398 343 # the real invalidator which re-computes value. We additionally set
399 344 # the flag to actually update the Database objects
400 345 self.skip_cache_active_change = False
401 346 return FreshRegionCache(context=self, cache_data=cache_data)
402 347
403 348 def __exit__(self, exc_type, exc_val, exc_tb):
404 349 # save compute time
405 350 self.compute_time = time.time() - self._start_time
406 351
407 352 if self.skip_cache_active_change:
408 353 return
409 354
410 355 try:
411 356 self.cache_obj.cache_active = True
412 357 Session().add(self.cache_obj)
413 358 Session().commit()
414 359 except IntegrityError:
415 360 # if we catch integrity error, it means we inserted this object
416 361 # assumption is that's really an edge race-condition case and
417 362 # it's safe is to skip it
418 363 Session().rollback()
419 364 except Exception:
420 365 log.exception('Failed to commit on cache key update')
421 366 Session().rollback()
422 367 if self.raise_exception:
423 368 raise
@@ -1,190 +1,185 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Various version Control System version lib (vcs) management abstraction layer
23 23 for Python. Build with server client architecture.
24 24 """
25 25 import atexit
26 26 import logging
27 import urlparse
28 from cStringIO import StringIO
27 from io import StringIO
29 28
30 29 import rhodecode
31 30 from rhodecode.lib.vcs.conf import settings
32 31 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
33 32 from rhodecode.lib.vcs.exceptions import (
34 33 VCSError, RepositoryError, CommitError, VCSCommunicationError)
35 34
36 VERSION = (0, 5, 0, 'dev')
37
38 __version__ = '.'.join((str(each) for each in VERSION[:4]))
39
40 35 __all__ = [
41 36 'get_version', 'get_vcs_instance', 'get_backend',
42 37 'VCSError', 'RepositoryError', 'CommitError', 'VCSCommunicationError'
43 38 ]
44 39
45 40 log = logging.getLogger(__name__)
46 41
47 42 # The pycurl library directly accesses C API functions and is not patched by
48 43 # gevent. This will potentially lead to deadlocks due to incompatibility to
49 44 # gevent. Therefore we check if gevent is active and import a gevent compatible
50 45 # wrapper in that case.
51 46 try:
52 47 from gevent import monkey
53 48 if monkey.is_module_patched('__builtin__'):
54 49 import geventcurl as pycurl
55 50 log.debug('Using gevent comapatible pycurl: %s', pycurl)
56 51 else:
57 52 import pycurl
58 53 except ImportError:
59 54 import pycurl
60 55
61 56
62 57 def get_version():
63 58 """
64 59 Returns shorter version (digit parts only) as string.
65 60 """
66 61 return '.'.join((str(each) for each in VERSION[:3]))
67 62
68 63
69 64 def connect_http(server_and_port):
70 65 from rhodecode.lib.vcs import connection, client_http
71 66 from rhodecode.lib.middleware.utils import scm_app
72 67
73 68 session_factory = client_http.ThreadlocalSessionFactory()
74 69
75 70 connection.Git = client_http.RemoteVCSMaker(
76 71 server_and_port, '/git', 'git', session_factory)
77 72 connection.Hg = client_http.RemoteVCSMaker(
78 73 server_and_port, '/hg', 'hg', session_factory)
79 74 connection.Svn = client_http.RemoteVCSMaker(
80 75 server_and_port, '/svn', 'svn', session_factory)
81 76 connection.Service = client_http.ServiceConnection(
82 77 server_and_port, '/_service', session_factory)
83 78
84 79 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
85 80 server_and_port, '/proxy/hg')
86 81 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
87 82 server_and_port, '/proxy/git')
88 83
89 84 @atexit.register
90 85 def free_connection_resources():
91 86 connection.Git = None
92 87 connection.Hg = None
93 88 connection.Svn = None
94 89 connection.Service = None
95 90
96 91
97 92 def connect_vcs(server_and_port, protocol):
98 93 """
99 94 Initializes the connection to the vcs server.
100 95
101 96 :param server_and_port: str, e.g. "localhost:9900"
102 97 :param protocol: str or "http"
103 98 """
104 99 if protocol == 'http':
105 100 connect_http(server_and_port)
106 101 else:
107 102 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
108 103
109 104
110 105 class CurlSession(object):
111 106 """
112 107 Modeled so that it provides a subset of the requests interface.
113 108
114 109 This has been created so that it does only provide a minimal API for our
115 110 needs. The parts which it provides are based on the API of the library
116 111 `requests` which allows us to easily benchmark against it.
117 112
118 113 Please have a look at the class :class:`requests.Session` when you extend
119 114 it.
120 115 """
121 116
122 117 def __init__(self):
123 118 curl = pycurl.Curl()
124 119 # TODO: johbo: I did test with 7.19 of libcurl. This version has
125 120 # trouble with 100 - continue being set in the expect header. This
126 121 # can lead to massive performance drops, switching it off here.
127 122
128 123 curl.setopt(curl.TCP_NODELAY, True)
129 124 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
130 125 curl.setopt(curl.USERAGENT, 'RhodeCode HTTP {}'.format(rhodecode.__version__))
131 126 curl.setopt(curl.SSL_VERIFYPEER, 0)
132 127 curl.setopt(curl.SSL_VERIFYHOST, 0)
133 128 self._curl = curl
134 129
135 130 def post(self, url, data, allow_redirects=False, headers=None):
136 131 headers = headers or {}
137 132 # format is ['header_name1: header_value1', 'header_name2: header_value2'])
138 133 headers_list = ["Expect:"] + ['{}: {}'.format(k, v) for k, v in headers.items()]
139 134 response_buffer = StringIO()
140 135
141 136 curl = self._curl
142 137 curl.setopt(curl.URL, url)
143 138 curl.setopt(curl.POST, True)
144 139 curl.setopt(curl.POSTFIELDS, data)
145 140 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
146 141 curl.setopt(curl.WRITEDATA, response_buffer)
147 142 curl.setopt(curl.HTTPHEADER, headers_list)
148 143 curl.perform()
149 144
150 145 status_code = curl.getinfo(pycurl.HTTP_CODE)
151 146
152 147 return CurlResponse(response_buffer, status_code)
153 148
154 149
155 150 class CurlResponse(object):
156 151 """
157 152 The response of a request, modeled after the requests API.
158 153
159 154 This class provides a subset of the response interface known from the
160 155 library `requests`. It is intentionally kept similar, so that we can use
161 156 `requests` as a drop in replacement for benchmarking purposes.
162 157 """
163 158
164 159 def __init__(self, response_buffer, status_code):
165 160 self._response_buffer = response_buffer
166 161 self._status_code = status_code
167 162
168 163 @property
169 164 def content(self):
170 165 try:
171 166 return self._response_buffer.getvalue()
172 167 finally:
173 168 self._response_buffer.close()
174 169
175 170 @property
176 171 def status_code(self):
177 172 return self._status_code
178 173
179 174 def iter_content(self, chunk_size):
180 175 self._response_buffer.seek(0)
181 176 while 1:
182 177 chunk = self._response_buffer.read(chunk_size)
183 178 if not chunk:
184 179 break
185 180 yield chunk
186 181
187 182
188 183 def _create_http_rpc_session():
189 184 session = CurlSession()
190 185 return session
General Comments 0
You need to be logged in to leave comments. Login now