##// END OF EJS Templates
python3: fixed some compat problems
super-admin -
r4916:86c3a981 default
parent child Browse files
Show More
@@ -1,364 +1,364 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import redis
26 import gevent
27 import gevent
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from pyramid.settings import asbool
36 from pyramid.settings import asbool
37
37
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from rhodecode.lib.utils import safe_str, safe_unicode
39 from rhodecode.lib.utils import safe_str
40
40
41
41
42 _default_max_size = 1024
42 _default_max_size = 1024
43
43
44 log = logging.getLogger(__name__)
44 log = logging.getLogger(__name__)
45
45
46
46
47 class LRUMemoryBackend(memory_backend.MemoryBackend):
47 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 key_prefix = 'lru_mem_backend'
48 key_prefix = 'lru_mem_backend'
49 pickle_values = False
49 pickle_values = False
50
50
51 def __init__(self, arguments):
51 def __init__(self, arguments):
52 max_size = arguments.pop('max_size', _default_max_size)
52 max_size = arguments.pop('max_size', _default_max_size)
53
53
54 LRUDictClass = LRUDict
54 LRUDictClass = LRUDict
55 if arguments.pop('log_key_count', None):
55 if arguments.pop('log_key_count', None):
56 LRUDictClass = LRUDictDebug
56 LRUDictClass = LRUDictDebug
57
57
58 arguments['cache_dict'] = LRUDictClass(max_size)
58 arguments['cache_dict'] = LRUDictClass(max_size)
59 super(LRUMemoryBackend, self).__init__(arguments)
59 super(LRUMemoryBackend, self).__init__(arguments)
60
60
61 def delete(self, key):
61 def delete(self, key):
62 try:
62 try:
63 del self._cache[key]
63 del self._cache[key]
64 except KeyError:
64 except KeyError:
65 # we don't care if key isn't there at deletion
65 # we don't care if key isn't there at deletion
66 pass
66 pass
67
67
68 def delete_multi(self, keys):
68 def delete_multi(self, keys):
69 for key in keys:
69 for key in keys:
70 self.delete(key)
70 self.delete(key)
71
71
72
72
73 class PickleSerializer(object):
73 class PickleSerializer(object):
74
74
75 def _dumps(self, value, safe=False):
75 def _dumps(self, value, safe=False):
76 try:
76 try:
77 return compat.pickle.dumps(value)
77 return pickle.dumps(value)
78 except Exception:
78 except Exception:
79 if safe:
79 if safe:
80 return NO_VALUE
80 return NO_VALUE
81 else:
81 else:
82 raise
82 raise
83
83
84 def _loads(self, value, safe=True):
84 def _loads(self, value, safe=True):
85 try:
85 try:
86 return compat.pickle.loads(value)
86 return pickle.loads(value)
87 except Exception:
87 except Exception:
88 if safe:
88 if safe:
89 return NO_VALUE
89 return NO_VALUE
90 else:
90 else:
91 raise
91 raise
92
92
93
93
94 class MsgPackSerializer(object):
94 class MsgPackSerializer(object):
95
95
96 def _dumps(self, value, safe=False):
96 def _dumps(self, value, safe=False):
97 try:
97 try:
98 return msgpack.packb(value)
98 return msgpack.packb(value)
99 except Exception:
99 except Exception:
100 if safe:
100 if safe:
101 return NO_VALUE
101 return NO_VALUE
102 else:
102 else:
103 raise
103 raise
104
104
105 def _loads(self, value, safe=True):
105 def _loads(self, value, safe=True):
106 """
106 """
107 pickle maintained the `CachedValue` wrapper of the tuple
107 pickle maintained the `CachedValue` wrapper of the tuple
108 msgpack does not, so it must be added back in.
108 msgpack does not, so it must be added back in.
109 """
109 """
110 try:
110 try:
111 value = msgpack.unpackb(value, use_list=False)
111 value = msgpack.unpackb(value, use_list=False)
112 return CachedValue(*value)
112 return CachedValue(*value)
113 except Exception:
113 except Exception:
114 if safe:
114 if safe:
115 return NO_VALUE
115 return NO_VALUE
116 else:
116 else:
117 raise
117 raise
118
118
119
119
120 import fcntl
120 import fcntl
121 flock_org = fcntl.flock
121 flock_org = fcntl.flock
122
122
123
123
124 class CustomLockFactory(FileLock):
124 class CustomLockFactory(FileLock):
125
125
126 @memoized_property
126 @memoized_property
127 def _module(self):
127 def _module(self):
128
128
129 def gevent_flock(fd, operation):
129 def gevent_flock(fd, operation):
130 """
130 """
131 Gevent compatible flock
131 Gevent compatible flock
132 """
132 """
133 # set non-blocking, this will cause an exception if we cannot acquire a lock
133 # set non-blocking, this will cause an exception if we cannot acquire a lock
134 operation |= fcntl.LOCK_NB
134 operation |= fcntl.LOCK_NB
135 start_lock_time = time.time()
135 start_lock_time = time.time()
136 timeout = 60 * 15 # 15min
136 timeout = 60 * 15 # 15min
137 while True:
137 while True:
138 try:
138 try:
139 flock_org(fd, operation)
139 flock_org(fd, operation)
140 # lock has been acquired
140 # lock has been acquired
141 break
141 break
142 except (OSError, IOError) as e:
142 except (OSError, IOError) as e:
143 # raise on other errors than Resource temporarily unavailable
143 # raise on other errors than Resource temporarily unavailable
144 if e.errno != errno.EAGAIN:
144 if e.errno != errno.EAGAIN:
145 raise
145 raise
146 elif (time.time() - start_lock_time) > timeout:
146 elif (time.time() - start_lock_time) > timeout:
147 # waited to much time on a lock, better fail than loop for ever
147 # waited to much time on a lock, better fail than loop for ever
148 log.error('Failed to acquire lock on `%s` after waiting %ss',
148 log.error('Failed to acquire lock on `%s` after waiting %ss',
149 self.filename, timeout)
149 self.filename, timeout)
150 raise
150 raise
151 wait_timeout = 0.03
151 wait_timeout = 0.03
152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
152 log.debug('Failed to acquire lock on `%s`, retry in %ss',
153 self.filename, wait_timeout)
153 self.filename, wait_timeout)
154 gevent.sleep(wait_timeout)
154 gevent.sleep(wait_timeout)
155
155
156 fcntl.flock = gevent_flock
156 fcntl.flock = gevent_flock
157 return fcntl
157 return fcntl
158
158
159
159
160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
160 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
161 key_prefix = 'file_backend'
161 key_prefix = 'file_backend'
162
162
163 def __init__(self, arguments):
163 def __init__(self, arguments):
164 arguments['lock_factory'] = CustomLockFactory
164 arguments['lock_factory'] = CustomLockFactory
165 db_file = arguments.get('filename')
165 db_file = arguments.get('filename')
166
166
167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
167 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
168 try:
168 try:
169 super(FileNamespaceBackend, self).__init__(arguments)
169 super(FileNamespaceBackend, self).__init__(arguments)
170 except Exception:
170 except Exception:
171 log.exception('Failed to initialize db at: %s', db_file)
171 log.exception('Failed to initialize db at: %s', db_file)
172 raise
172 raise
173
173
174 def __repr__(self):
174 def __repr__(self):
175 return '{} `{}`'.format(self.__class__, self.filename)
175 return '{} `{}`'.format(self.__class__, self.filename)
176
176
177 def list_keys(self, prefix=''):
177 def list_keys(self, prefix=''):
178 prefix = '{}:{}'.format(self.key_prefix, prefix)
178 prefix = '{}:{}'.format(self.key_prefix, prefix)
179
179
180 def cond(v):
180 def cond(v):
181 if not prefix:
181 if not prefix:
182 return True
182 return True
183
183
184 if v.startswith(prefix):
184 if v.startswith(prefix):
185 return True
185 return True
186 return False
186 return False
187
187
188 with self._dbm_file(True) as dbm:
188 with self._dbm_file(True) as dbm:
189 try:
189 try:
190 return filter(cond, dbm.keys())
190 return list(filter(cond, list(dbm.keys())))
191 except Exception:
191 except Exception:
192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
192 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
193 raise
193 raise
194
194
195 def get_store(self):
195 def get_store(self):
196 return self.filename
196 return self.filename
197
197
198 def _dbm_get(self, key):
198 def _dbm_get(self, key):
199 with self._dbm_file(False) as dbm:
199 with self._dbm_file(False) as dbm:
200 if hasattr(dbm, 'get'):
200 if hasattr(dbm, 'get'):
201 value = dbm.get(key, NO_VALUE)
201 value = dbm.get(key, NO_VALUE)
202 else:
202 else:
203 # gdbm objects lack a .get method
203 # gdbm objects lack a .get method
204 try:
204 try:
205 value = dbm[key]
205 value = dbm[key]
206 except KeyError:
206 except KeyError:
207 value = NO_VALUE
207 value = NO_VALUE
208 if value is not NO_VALUE:
208 if value is not NO_VALUE:
209 value = self._loads(value)
209 value = self._loads(value)
210 return value
210 return value
211
211
212 def get(self, key):
212 def get(self, key):
213 try:
213 try:
214 return self._dbm_get(key)
214 return self._dbm_get(key)
215 except Exception:
215 except Exception:
216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
217 raise
217 raise
218
218
219 def set(self, key, value):
219 def set(self, key, value):
220 with self._dbm_file(True) as dbm:
220 with self._dbm_file(True) as dbm:
221 dbm[key] = self._dumps(value)
221 dbm[key] = self._dumps(value)
222
222
223 def set_multi(self, mapping):
223 def set_multi(self, mapping):
224 with self._dbm_file(True) as dbm:
224 with self._dbm_file(True) as dbm:
225 for key, value in mapping.items():
225 for key, value in mapping.items():
226 dbm[key] = self._dumps(value)
226 dbm[key] = self._dumps(value)
227
227
228
228
229 class BaseRedisBackend(redis_backend.RedisBackend):
229 class BaseRedisBackend(redis_backend.RedisBackend):
230 key_prefix = ''
230 key_prefix = ''
231
231
232 def __init__(self, arguments):
232 def __init__(self, arguments):
233 super(BaseRedisBackend, self).__init__(arguments)
233 super(BaseRedisBackend, self).__init__(arguments)
234 self._lock_timeout = self.lock_timeout
234 self._lock_timeout = self.lock_timeout
235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
235 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
236
236
237 if self._lock_auto_renewal and not self._lock_timeout:
237 if self._lock_auto_renewal and not self._lock_timeout:
238 # set default timeout for auto_renewal
238 # set default timeout for auto_renewal
239 self._lock_timeout = 30
239 self._lock_timeout = 30
240
240
241 def _create_client(self):
241 def _create_client(self):
242 args = {}
242 args = {}
243
243
244 if self.url is not None:
244 if self.url is not None:
245 args.update(url=self.url)
245 args.update(url=self.url)
246
246
247 else:
247 else:
248 args.update(
248 args.update(
249 host=self.host, password=self.password,
249 host=self.host, password=self.password,
250 port=self.port, db=self.db
250 port=self.port, db=self.db
251 )
251 )
252
252
253 connection_pool = redis.ConnectionPool(**args)
253 connection_pool = redis.ConnectionPool(**args)
254
254
255 return redis.StrictRedis(connection_pool=connection_pool)
255 return redis.StrictRedis(connection_pool=connection_pool)
256
256
257 def list_keys(self, prefix=''):
257 def list_keys(self, prefix=''):
258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
258 prefix = '{}:{}*'.format(self.key_prefix, prefix)
259 return self.client.keys(prefix)
259 return self.client.keys(prefix)
260
260
261 def get_store(self):
261 def get_store(self):
262 return self.client.connection_pool
262 return self.client.connection_pool
263
263
264 def get(self, key):
264 def get(self, key):
265 value = self.client.get(key)
265 value = self.client.get(key)
266 if value is None:
266 if value is None:
267 return NO_VALUE
267 return NO_VALUE
268 return self._loads(value)
268 return self._loads(value)
269
269
270 def get_multi(self, keys):
270 def get_multi(self, keys):
271 if not keys:
271 if not keys:
272 return []
272 return []
273 values = self.client.mget(keys)
273 values = self.client.mget(keys)
274 loads = self._loads
274 loads = self._loads
275 return [
275 return [
276 loads(v) if v is not None else NO_VALUE
276 loads(v) if v is not None else NO_VALUE
277 for v in values]
277 for v in values]
278
278
279 def set(self, key, value):
279 def set(self, key, value):
280 if self.redis_expiration_time:
280 if self.redis_expiration_time:
281 self.client.setex(key, self.redis_expiration_time,
281 self.client.setex(key, self.redis_expiration_time,
282 self._dumps(value))
282 self._dumps(value))
283 else:
283 else:
284 self.client.set(key, self._dumps(value))
284 self.client.set(key, self._dumps(value))
285
285
286 def set_multi(self, mapping):
286 def set_multi(self, mapping):
287 dumps = self._dumps
287 dumps = self._dumps
288 mapping = dict(
288 mapping = dict(
289 (k, dumps(v))
289 (k, dumps(v))
290 for k, v in mapping.items()
290 for k, v in mapping.items()
291 )
291 )
292
292
293 if not self.redis_expiration_time:
293 if not self.redis_expiration_time:
294 self.client.mset(mapping)
294 self.client.mset(mapping)
295 else:
295 else:
296 pipe = self.client.pipeline()
296 pipe = self.client.pipeline()
297 for key, value in mapping.items():
297 for key, value in mapping.items():
298 pipe.setex(key, self.redis_expiration_time, value)
298 pipe.setex(key, self.redis_expiration_time, value)
299 pipe.execute()
299 pipe.execute()
300
300
301 def get_mutex(self, key):
301 def get_mutex(self, key):
302 if self.distributed_lock:
302 if self.distributed_lock:
303 lock_key = u'_lock_{0}'.format(safe_unicode(key))
303 lock_key = '_lock_{0}'.format(safe_str(key))
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 auto_renewal=self._lock_auto_renewal)
305 auto_renewal=self._lock_auto_renewal)
306 else:
306 else:
307 return None
307 return None
308
308
309
309
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 key_prefix = 'redis_pickle_backend'
311 key_prefix = 'redis_pickle_backend'
312 pass
312 pass
313
313
314
314
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 key_prefix = 'redis_msgpack_backend'
316 key_prefix = 'redis_msgpack_backend'
317 pass
317 pass
318
318
319
319
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 import redis_lock
321 import redis_lock
322
322
323 class _RedisLockWrapper(object):
323 class _RedisLockWrapper(object):
324 """LockWrapper for redis_lock"""
324 """LockWrapper for redis_lock"""
325
325
326 @classmethod
326 @classmethod
327 def get_lock(cls):
327 def get_lock(cls):
328 return redis_lock.Lock(
328 return redis_lock.Lock(
329 redis_client=client,
329 redis_client=client,
330 name=lock_key,
330 name=lock_key,
331 expire=lock_timeout,
331 expire=lock_timeout,
332 auto_renewal=auto_renewal,
332 auto_renewal=auto_renewal,
333 strict=True,
333 strict=True,
334 )
334 )
335
335
336 def __repr__(self):
336 def __repr__(self):
337 return "{}:{}".format(self.__class__.__name__, lock_key)
337 return "{}:{}".format(self.__class__.__name__, lock_key)
338
338
339 def __str__(self):
339 def __str__(self):
340 return "{}:{}".format(self.__class__.__name__, lock_key)
340 return "{}:{}".format(self.__class__.__name__, lock_key)
341
341
342 def __init__(self):
342 def __init__(self):
343 self.lock = self.get_lock()
343 self.lock = self.get_lock()
344 self.lock_key = lock_key
344 self.lock_key = lock_key
345
345
346 def acquire(self, wait=True):
346 def acquire(self, wait=True):
347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
347 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
348 try:
348 try:
349 acquired = self.lock.acquire(wait)
349 acquired = self.lock.acquire(wait)
350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
350 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
351 return acquired
351 return acquired
352 except redis_lock.AlreadyAcquired:
352 except redis_lock.AlreadyAcquired:
353 return False
353 return False
354 except redis_lock.AlreadyStarted:
354 except redis_lock.AlreadyStarted:
355 # refresh thread exists, but it also means we acquired the lock
355 # refresh thread exists, but it also means we acquired the lock
356 return True
356 return True
357
357
358 def release(self):
358 def release(self):
359 try:
359 try:
360 self.lock.release()
360 self.lock.release()
361 except redis_lock.NotAcquired:
361 except redis_lock.NotAcquired:
362 pass
362 pass
363
363
364 return _RedisLockWrapper()
364 return _RedisLockWrapper()
@@ -1,423 +1,368 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 import decorator
24 import threading
25 import threading
25
26
26 from dogpile.cache import CacheRegion
27 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_bytes, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
33
34 from rhodecode.lib.rc_cache import cache_key_meta
34 from rhodecode.lib.rc_cache import cache_key_meta
35 from rhodecode.lib.rc_cache import region_meta
35 from rhodecode.lib.rc_cache import region_meta
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 def isCython(func):
40 def isCython(func):
41 """
41 """
42 Private helper that checks if a function is a cython function.
42 Private helper that checks if a function is a cython function.
43 """
43 """
44 return func.__class__.__name__ == 'cython_function_or_method'
44 return func.__class__.__name__ == 'cython_function_or_method'
45
45
46
46
47 class RhodeCodeCacheRegion(CacheRegion):
47 class RhodeCodeCacheRegion(CacheRegion):
48
48
49 def conditional_cache_on_arguments(
49 def conditional_cache_on_arguments(
50 self, namespace=None,
50 self, namespace=None,
51 expiration_time=None,
51 expiration_time=None,
52 should_cache_fn=None,
52 should_cache_fn=None,
53 to_str=compat.string_type,
53 to_str=str,
54 function_key_generator=None,
54 function_key_generator=None,
55 condition=True):
55 condition=True):
56 """
56 """
57 Custom conditional decorator, that will not touch any dogpile internals if
57 Custom conditional decorator, that will not touch any dogpile internals if
58 condition isn't meet. This works a bit different than should_cache_fn
58 condition isn't meet. This works a bit different than should_cache_fn
59 And it's faster in cases we don't ever want to compute cached values
59 And it's faster in cases we don't ever want to compute cached values
60 """
60 """
61 expiration_time_is_callable = compat.callable(expiration_time)
61 expiration_time_is_callable = callable(expiration_time)
62
62
63 if function_key_generator is None:
63 if function_key_generator is None:
64 function_key_generator = self.function_key_generator
64 function_key_generator = self.function_key_generator
65
65
66 # workaround for py2 and cython problems, this block should be removed
67 # once we've migrated to py3
68 if 'cython' == 'cython':
69 def decorator(fn):
70 if to_str is compat.string_type:
71 # backwards compatible
72 key_generator = function_key_generator(namespace, fn)
73 else:
74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
75
76 @functools.wraps(fn)
77 def decorate(*arg, **kw):
78 key = key_generator(*arg, **kw)
79
80 @functools.wraps(fn)
81 def creator():
82 return fn(*arg, **kw)
83
84 if not condition:
85 return creator()
86
87 timeout = expiration_time() if expiration_time_is_callable \
88 else expiration_time
89
90 return self.get_or_create(key, creator, timeout, should_cache_fn)
91
92 def invalidate(*arg, **kw):
93 key = key_generator(*arg, **kw)
94 self.delete(key)
95
96 def set_(value, *arg, **kw):
97 key = key_generator(*arg, **kw)
98 self.set(key, value)
99
100 def get(*arg, **kw):
101 key = key_generator(*arg, **kw)
102 return self.get(key)
103
104 def refresh(*arg, **kw):
105 key = key_generator(*arg, **kw)
106 value = fn(*arg, **kw)
107 self.set(key, value)
108 return value
109
110 decorate.set = set_
111 decorate.invalidate = invalidate
112 decorate.refresh = refresh
113 decorate.get = get
114 decorate.original = fn
115 decorate.key_generator = key_generator
116 decorate.__wrapped__ = fn
117
118 return decorate
119 return decorator
120
121 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
66 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
122
67
123 if not condition:
68 if not condition:
124 log.debug('Calling un-cached method:%s', user_func.func_name)
69 log.debug('Calling un-cached method:%s', user_func.__name__)
125 start = time.time()
70 start = time.time()
126 result = user_func(*arg, **kw)
71 result = user_func(*arg, **kw)
127 total = time.time() - start
72 total = time.time() - start
128 log.debug('un-cached method:%s took %.4fs', user_func.func_name, total)
73 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
129 return result
74 return result
130
75
131 key = key_generator(*arg, **kw)
76 key = key_generator(*arg, **kw)
132
77
133 timeout = expiration_time() if expiration_time_is_callable \
78 timeout = expiration_time() if expiration_time_is_callable \
134 else expiration_time
79 else expiration_time
135
80
136 log.debug('Calling cached method:`%s`', user_func.func_name)
81 log.debug('Calling cached method:`%s`', user_func.__name__)
137 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
82 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
138
83
139 def cache_decorator(user_func):
84 def cache_decorator(user_func):
140 if to_str is compat.string_type:
85 if to_str is str:
141 # backwards compatible
86 # backwards compatible
142 key_generator = function_key_generator(namespace, user_func)
87 key_generator = function_key_generator(namespace, user_func)
143 else:
88 else:
144 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
89 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
145
90
146 def refresh(*arg, **kw):
91 def refresh(*arg, **kw):
147 """
92 """
148 Like invalidate, but regenerates the value instead
93 Like invalidate, but regenerates the value instead
149 """
94 """
150 key = key_generator(*arg, **kw)
95 key = key_generator(*arg, **kw)
151 value = user_func(*arg, **kw)
96 value = user_func(*arg, **kw)
152 self.set(key, value)
97 self.set(key, value)
153 return value
98 return value
154
99
155 def invalidate(*arg, **kw):
100 def invalidate(*arg, **kw):
156 key = key_generator(*arg, **kw)
101 key = key_generator(*arg, **kw)
157 self.delete(key)
102 self.delete(key)
158
103
159 def set_(value, *arg, **kw):
104 def set_(value, *arg, **kw):
160 key = key_generator(*arg, **kw)
105 key = key_generator(*arg, **kw)
161 self.set(key, value)
106 self.set(key, value)
162
107
163 def get(*arg, **kw):
108 def get(*arg, **kw):
164 key = key_generator(*arg, **kw)
109 key = key_generator(*arg, **kw)
165 return self.get(key)
110 return self.get(key)
166
111
167 user_func.set = set_
112 user_func.set = set_
168 user_func.invalidate = invalidate
113 user_func.invalidate = invalidate
169 user_func.get = get
114 user_func.get = get
170 user_func.refresh = refresh
115 user_func.refresh = refresh
171 user_func.key_generator = key_generator
116 user_func.key_generator = key_generator
172 user_func.original = user_func
117 user_func.original = user_func
173
118
174 # Use `decorate` to preserve the signature of :param:`user_func`.
119 # Use `decorate` to preserve the signature of :param:`user_func`.
175 return decorator.decorate(user_func, functools.partial(
120 return decorator.decorate(user_func, functools.partial(
176 get_or_create_for_user_func, key_generator))
121 get_or_create_for_user_func, key_generator))
177
122
178 return cache_decorator
123 return cache_decorator
179
124
180
125
181 def make_region(*arg, **kw):
126 def make_region(*arg, **kw):
182 return RhodeCodeCacheRegion(*arg, **kw)
127 return RhodeCodeCacheRegion(*arg, **kw)
183
128
184
129
185 def get_default_cache_settings(settings, prefixes=None):
130 def get_default_cache_settings(settings, prefixes=None):
186 prefixes = prefixes or []
131 prefixes = prefixes or []
187 cache_settings = {}
132 cache_settings = {}
188 for key in settings.keys():
133 for key in settings.keys():
189 for prefix in prefixes:
134 for prefix in prefixes:
190 if key.startswith(prefix):
135 if key.startswith(prefix):
191 name = key.split(prefix)[1].strip()
136 name = key.split(prefix)[1].strip()
192 val = settings[key]
137 val = settings[key]
193 if isinstance(val, str):
138 if isinstance(val, str):
194 val = val.strip()
139 val = val.strip()
195 cache_settings[name] = val
140 cache_settings[name] = val
196 return cache_settings
141 return cache_settings
197
142
198
143
199 def compute_key_from_params(*args):
144 def compute_key_from_params(*args):
200 """
145 """
201 Helper to compute key from given params to be used in cache manager
146 Helper to compute key from given params to be used in cache manager
202 """
147 """
203 return sha1("_".join(map(safe_str, args)))
148 return sha1(safe_bytes("_".join(map(str, args))))
204
149
205
150
206 def backend_key_generator(backend):
151 def backend_key_generator(backend):
207 """
152 """
208 Special wrapper that also sends over the backend to the key generator
153 Special wrapper that also sends over the backend to the key generator
209 """
154 """
210 def wrapper(namespace, fn):
155 def wrapper(namespace, fn):
211 return key_generator(backend, namespace, fn)
156 return key_generator(backend, namespace, fn)
212 return wrapper
157 return wrapper
213
158
214
159
215 def key_generator(backend, namespace, fn):
160 def key_generator(backend, namespace, fn):
216 fname = fn.__name__
161 fname = fn.__name__
217
162
218 def generate_key(*args):
163 def generate_key(*args):
219 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
164 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
220 namespace_pref = namespace or 'default_namespace'
165 namespace_pref = namespace or 'default_namespace'
221 arg_key = compute_key_from_params(*args)
166 arg_key = compute_key_from_params(*args)
222 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
167 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
223
168
224 return final_key
169 return final_key
225
170
226 return generate_key
171 return generate_key
227
172
228
173
229 def get_or_create_region(region_name, region_namespace=None):
174 def get_or_create_region(region_name, region_namespace=None):
230 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
175 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
231 region_obj = region_meta.dogpile_cache_regions.get(region_name)
176 region_obj = region_meta.dogpile_cache_regions.get(region_name)
232 if not region_obj:
177 if not region_obj:
233 raise EnvironmentError(
178 raise EnvironmentError(
234 'Region `{}` not in configured: {}.'.format(
179 'Region `{}` not in configured: {}.'.format(
235 region_name, region_meta.dogpile_cache_regions.keys()))
180 region_name, list(region_meta.dogpile_cache_regions.keys())))
236
181
237 region_uid_name = '{}:{}'.format(region_name, region_namespace)
182 region_uid_name = '{}:{}'.format(region_name, region_namespace)
238 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
183 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
239 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
184 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
240 if region_exist:
185 if region_exist:
241 log.debug('Using already configured region: %s', region_namespace)
186 log.debug('Using already configured region: %s', region_namespace)
242 return region_exist
187 return region_exist
243 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
188 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
244 expiration_time = region_obj.expiration_time
189 expiration_time = region_obj.expiration_time
245
190
246 if not os.path.isdir(cache_dir):
191 if not os.path.isdir(cache_dir):
247 os.makedirs(cache_dir)
192 os.makedirs(cache_dir)
248 new_region = make_region(
193 new_region = make_region(
249 name=region_uid_name,
194 name=region_uid_name,
250 function_key_generator=backend_key_generator(region_obj.actual_backend)
195 function_key_generator=backend_key_generator(region_obj.actual_backend)
251 )
196 )
252 namespace_filename = os.path.join(
197 namespace_filename = os.path.join(
253 cache_dir, "{}.cache.dbm".format(region_namespace))
198 cache_dir, "{}.cache.dbm".format(region_namespace))
254 # special type that allows 1db per namespace
199 # special type that allows 1db per namespace
255 new_region.configure(
200 new_region.configure(
256 backend='dogpile.cache.rc.file_namespace',
201 backend='dogpile.cache.rc.file_namespace',
257 expiration_time=expiration_time,
202 expiration_time=expiration_time,
258 arguments={"filename": namespace_filename}
203 arguments={"filename": namespace_filename}
259 )
204 )
260
205
261 # create and save in region caches
206 # create and save in region caches
262 log.debug('configuring new region: %s', region_uid_name)
207 log.debug('configuring new region: %s', region_uid_name)
263 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
208 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
264
209
265 return region_obj
210 return region_obj
266
211
267
212
268 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
213 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
269 region = get_or_create_region(cache_region, cache_namespace_uid)
214 region = get_or_create_region(cache_region, cache_namespace_uid)
270 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
215 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
271 num_delete_keys = len(cache_keys)
216 num_delete_keys = len(cache_keys)
272 if invalidate:
217 if invalidate:
273 region.invalidate(hard=False)
218 region.invalidate(hard=False)
274 else:
219 else:
275 if num_delete_keys:
220 if num_delete_keys:
276 region.delete_multi(cache_keys)
221 region.delete_multi(cache_keys)
277 return num_delete_keys
222 return num_delete_keys
278
223
279
224
280 class ActiveRegionCache(object):
225 class ActiveRegionCache(object):
281 def __init__(self, context, cache_data):
226 def __init__(self, context, cache_data):
282 self.context = context
227 self.context = context
283 self.cache_data = cache_data
228 self.cache_data = cache_data
284
229
285 def should_invalidate(self):
230 def should_invalidate(self):
286 return False
231 return False
287
232
288
233
289 class FreshRegionCache(object):
234 class FreshRegionCache(object):
290 def __init__(self, context, cache_data):
235 def __init__(self, context, cache_data):
291 self.context = context
236 self.context = context
292 self.cache_data = cache_data
237 self.cache_data = cache_data
293
238
294 def should_invalidate(self):
239 def should_invalidate(self):
295 return True
240 return True
296
241
297
242
298 class InvalidationContext(object):
243 class InvalidationContext(object):
299 """
244 """
300 usage::
245 usage::
301
246
302 from rhodecode.lib import rc_cache
247 from rhodecode.lib import rc_cache
303
248
304 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
249 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
305 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
250 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
306
251
307 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
252 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
308 def heavy_compute(cache_name, param1, param2):
253 def heavy_compute(cache_name, param1, param2):
309 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
254 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
310
255
311 # invalidation namespace is shared namespace key for all process caches
256 # invalidation namespace is shared namespace key for all process caches
312 # we use it to send a global signal
257 # we use it to send a global signal
313 invalidation_namespace = 'repo_cache:1'
258 invalidation_namespace = 'repo_cache:1'
314
259
315 inv_context_manager = rc_cache.InvalidationContext(
260 inv_context_manager = rc_cache.InvalidationContext(
316 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
261 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
317 with inv_context_manager as invalidation_context:
262 with inv_context_manager as invalidation_context:
318 args = ('one', 'two')
263 args = ('one', 'two')
319 # re-compute and store cache if we get invalidate signal
264 # re-compute and store cache if we get invalidate signal
320 if invalidation_context.should_invalidate():
265 if invalidation_context.should_invalidate():
321 result = heavy_compute.refresh(*args)
266 result = heavy_compute.refresh(*args)
322 else:
267 else:
323 result = heavy_compute(*args)
268 result = heavy_compute(*args)
324
269
325 compute_time = inv_context_manager.compute_time
270 compute_time = inv_context_manager.compute_time
326 log.debug('result computed in %.4fs', compute_time)
271 log.debug('result computed in %.4fs', compute_time)
327
272
328 # To send global invalidation signal, simply run
273 # To send global invalidation signal, simply run
329 CacheKey.set_invalidate(invalidation_namespace)
274 CacheKey.set_invalidate(invalidation_namespace)
330
275
331 """
276 """
332
277
333 def __repr__(self):
278 def __repr__(self):
334 return '<InvalidationContext:{}[{}]>'.format(
279 return '<InvalidationContext:{}[{}]>'.format(
335 safe_str(self.cache_key), safe_str(self.uid))
280 safe_str(self.cache_key), safe_str(self.uid))
336
281
337 def __init__(self, uid, invalidation_namespace='',
282 def __init__(self, uid, invalidation_namespace='',
338 raise_exception=False, thread_scoped=None):
283 raise_exception=False, thread_scoped=None):
339 self.uid = uid
284 self.uid = uid
340 self.invalidation_namespace = invalidation_namespace
285 self.invalidation_namespace = invalidation_namespace
341 self.raise_exception = raise_exception
286 self.raise_exception = raise_exception
342 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
287 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
343 self.thread_id = 'global'
288 self.thread_id = 'global'
344
289
345 if thread_scoped is None:
290 if thread_scoped is None:
346 # if we set "default" we can override this via .ini settings
291 # if we set "default" we can override this via .ini settings
347 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
292 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
348
293
349 # Append the thread id to the cache key if this invalidation context
294 # Append the thread id to the cache key if this invalidation context
350 # should be scoped to the current thread.
295 # should be scoped to the current thread.
351 if thread_scoped is True:
296 if thread_scoped is True:
352 self.thread_id = threading.current_thread().ident
297 self.thread_id = threading.current_thread().ident
353
298
354 self.cache_key = compute_key_from_params(uid)
299 self.cache_key = compute_key_from_params(uid)
355 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
300 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
356 self.proc_id, self.thread_id, self.cache_key)
301 self.proc_id, self.thread_id, self.cache_key)
357 self.proc_key = 'proc:{}'.format(self.proc_id)
302 self.proc_key = 'proc:{}'.format(self.proc_id)
358 self.compute_time = 0
303 self.compute_time = 0
359
304
360 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
305 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
361 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
306 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
362 # fetch all cache keys for this namespace and convert them to a map to find if we
307 # fetch all cache keys for this namespace and convert them to a map to find if we
363 # have specific cache_key object registered. We do this because we want to have
308 # have specific cache_key object registered. We do this because we want to have
364 # all consistent cache_state_uid for newly registered objects
309 # all consistent cache_state_uid for newly registered objects
365 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
310 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
366 cache_obj = cache_obj_map.get(self.cache_key)
311 cache_obj = cache_obj_map.get(self.cache_key)
367 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
312 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
368 if not cache_obj:
313 if not cache_obj:
369 new_cache_args = invalidation_namespace
314 new_cache_args = invalidation_namespace
370 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
315 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
371 cache_state_uid = None
316 cache_state_uid = None
372 if first_cache_obj:
317 if first_cache_obj:
373 cache_state_uid = first_cache_obj.cache_state_uid
318 cache_state_uid = first_cache_obj.cache_state_uid
374 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
319 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
375 cache_state_uid=cache_state_uid)
320 cache_state_uid=cache_state_uid)
376 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
321 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
377
322
378 return cache_obj
323 return cache_obj
379
324
380 def __enter__(self):
325 def __enter__(self):
381 """
326 """
382 Test if current object is valid, and return CacheRegion function
327 Test if current object is valid, and return CacheRegion function
383 that does invalidation and calculation
328 that does invalidation and calculation
384 """
329 """
385 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
330 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
386 # register or get a new key based on uid
331 # register or get a new key based on uid
387 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
332 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
388 cache_data = self.cache_obj.get_dict()
333 cache_data = self.cache_obj.get_dict()
389 self._start_time = time.time()
334 self._start_time = time.time()
390 if self.cache_obj.cache_active:
335 if self.cache_obj.cache_active:
391 # means our cache obj is existing and marked as it's
336 # means our cache obj is existing and marked as it's
392 # cache is not outdated, we return ActiveRegionCache
337 # cache is not outdated, we return ActiveRegionCache
393 self.skip_cache_active_change = True
338 self.skip_cache_active_change = True
394
339
395 return ActiveRegionCache(context=self, cache_data=cache_data)
340 return ActiveRegionCache(context=self, cache_data=cache_data)
396
341
397 # the key is either not existing or set to False, we return
342 # the key is either not existing or set to False, we return
398 # the real invalidator which re-computes value. We additionally set
343 # the real invalidator which re-computes value. We additionally set
399 # the flag to actually update the Database objects
344 # the flag to actually update the Database objects
400 self.skip_cache_active_change = False
345 self.skip_cache_active_change = False
401 return FreshRegionCache(context=self, cache_data=cache_data)
346 return FreshRegionCache(context=self, cache_data=cache_data)
402
347
403 def __exit__(self, exc_type, exc_val, exc_tb):
348 def __exit__(self, exc_type, exc_val, exc_tb):
404 # save compute time
349 # save compute time
405 self.compute_time = time.time() - self._start_time
350 self.compute_time = time.time() - self._start_time
406
351
407 if self.skip_cache_active_change:
352 if self.skip_cache_active_change:
408 return
353 return
409
354
410 try:
355 try:
411 self.cache_obj.cache_active = True
356 self.cache_obj.cache_active = True
412 Session().add(self.cache_obj)
357 Session().add(self.cache_obj)
413 Session().commit()
358 Session().commit()
414 except IntegrityError:
359 except IntegrityError:
415 # if we catch integrity error, it means we inserted this object
360 # if we catch integrity error, it means we inserted this object
416 # assumption is that's really an edge race-condition case and
361 # assumption is that's really an edge race-condition case and
417 # it's safe is to skip it
362 # it's safe is to skip it
418 Session().rollback()
363 Session().rollback()
419 except Exception:
364 except Exception:
420 log.exception('Failed to commit on cache key update')
365 log.exception('Failed to commit on cache key update')
421 Session().rollback()
366 Session().rollback()
422 if self.raise_exception:
367 if self.raise_exception:
423 raise
368 raise
@@ -1,190 +1,185 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2020 RhodeCode GmbH
3 # Copyright (C) 2014-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Various version Control System version lib (vcs) management abstraction layer
22 Various version Control System version lib (vcs) management abstraction layer
23 for Python. Build with server client architecture.
23 for Python. Build with server client architecture.
24 """
24 """
25 import atexit
25 import atexit
26 import logging
26 import logging
27 import urlparse
27 from io import StringIO
28 from cStringIO import StringIO
29
28
30 import rhodecode
29 import rhodecode
31 from rhodecode.lib.vcs.conf import settings
30 from rhodecode.lib.vcs.conf import settings
32 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
31 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
33 from rhodecode.lib.vcs.exceptions import (
32 from rhodecode.lib.vcs.exceptions import (
34 VCSError, RepositoryError, CommitError, VCSCommunicationError)
33 VCSError, RepositoryError, CommitError, VCSCommunicationError)
35
34
36 VERSION = (0, 5, 0, 'dev')
37
38 __version__ = '.'.join((str(each) for each in VERSION[:4]))
39
40 __all__ = [
35 __all__ = [
41 'get_version', 'get_vcs_instance', 'get_backend',
36 'get_version', 'get_vcs_instance', 'get_backend',
42 'VCSError', 'RepositoryError', 'CommitError', 'VCSCommunicationError'
37 'VCSError', 'RepositoryError', 'CommitError', 'VCSCommunicationError'
43 ]
38 ]
44
39
45 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
46
41
47 # The pycurl library directly accesses C API functions and is not patched by
42 # The pycurl library directly accesses C API functions and is not patched by
48 # gevent. This will potentially lead to deadlocks due to incompatibility to
43 # gevent. This will potentially lead to deadlocks due to incompatibility to
49 # gevent. Therefore we check if gevent is active and import a gevent compatible
44 # gevent. Therefore we check if gevent is active and import a gevent compatible
50 # wrapper in that case.
45 # wrapper in that case.
51 try:
46 try:
52 from gevent import monkey
47 from gevent import monkey
53 if monkey.is_module_patched('__builtin__'):
48 if monkey.is_module_patched('__builtin__'):
54 import geventcurl as pycurl
49 import geventcurl as pycurl
55 log.debug('Using gevent comapatible pycurl: %s', pycurl)
50 log.debug('Using gevent comapatible pycurl: %s', pycurl)
56 else:
51 else:
57 import pycurl
52 import pycurl
58 except ImportError:
53 except ImportError:
59 import pycurl
54 import pycurl
60
55
61
56
62 def get_version():
57 def get_version():
63 """
58 """
64 Returns shorter version (digit parts only) as string.
59 Returns shorter version (digit parts only) as string.
65 """
60 """
66 return '.'.join((str(each) for each in VERSION[:3]))
61 return '.'.join((str(each) for each in VERSION[:3]))
67
62
68
63
69 def connect_http(server_and_port):
64 def connect_http(server_and_port):
70 from rhodecode.lib.vcs import connection, client_http
65 from rhodecode.lib.vcs import connection, client_http
71 from rhodecode.lib.middleware.utils import scm_app
66 from rhodecode.lib.middleware.utils import scm_app
72
67
73 session_factory = client_http.ThreadlocalSessionFactory()
68 session_factory = client_http.ThreadlocalSessionFactory()
74
69
75 connection.Git = client_http.RemoteVCSMaker(
70 connection.Git = client_http.RemoteVCSMaker(
76 server_and_port, '/git', 'git', session_factory)
71 server_and_port, '/git', 'git', session_factory)
77 connection.Hg = client_http.RemoteVCSMaker(
72 connection.Hg = client_http.RemoteVCSMaker(
78 server_and_port, '/hg', 'hg', session_factory)
73 server_and_port, '/hg', 'hg', session_factory)
79 connection.Svn = client_http.RemoteVCSMaker(
74 connection.Svn = client_http.RemoteVCSMaker(
80 server_and_port, '/svn', 'svn', session_factory)
75 server_and_port, '/svn', 'svn', session_factory)
81 connection.Service = client_http.ServiceConnection(
76 connection.Service = client_http.ServiceConnection(
82 server_and_port, '/_service', session_factory)
77 server_and_port, '/_service', session_factory)
83
78
84 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
79 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
85 server_and_port, '/proxy/hg')
80 server_and_port, '/proxy/hg')
86 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
81 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
87 server_and_port, '/proxy/git')
82 server_and_port, '/proxy/git')
88
83
89 @atexit.register
84 @atexit.register
90 def free_connection_resources():
85 def free_connection_resources():
91 connection.Git = None
86 connection.Git = None
92 connection.Hg = None
87 connection.Hg = None
93 connection.Svn = None
88 connection.Svn = None
94 connection.Service = None
89 connection.Service = None
95
90
96
91
97 def connect_vcs(server_and_port, protocol):
92 def connect_vcs(server_and_port, protocol):
98 """
93 """
99 Initializes the connection to the vcs server.
94 Initializes the connection to the vcs server.
100
95
101 :param server_and_port: str, e.g. "localhost:9900"
96 :param server_and_port: str, e.g. "localhost:9900"
102 :param protocol: str or "http"
97 :param protocol: str or "http"
103 """
98 """
104 if protocol == 'http':
99 if protocol == 'http':
105 connect_http(server_and_port)
100 connect_http(server_and_port)
106 else:
101 else:
107 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
102 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
108
103
109
104
110 class CurlSession(object):
105 class CurlSession(object):
111 """
106 """
112 Modeled so that it provides a subset of the requests interface.
107 Modeled so that it provides a subset of the requests interface.
113
108
114 This has been created so that it does only provide a minimal API for our
109 This has been created so that it does only provide a minimal API for our
115 needs. The parts which it provides are based on the API of the library
110 needs. The parts which it provides are based on the API of the library
116 `requests` which allows us to easily benchmark against it.
111 `requests` which allows us to easily benchmark against it.
117
112
118 Please have a look at the class :class:`requests.Session` when you extend
113 Please have a look at the class :class:`requests.Session` when you extend
119 it.
114 it.
120 """
115 """
121
116
122 def __init__(self):
117 def __init__(self):
123 curl = pycurl.Curl()
118 curl = pycurl.Curl()
124 # TODO: johbo: I did test with 7.19 of libcurl. This version has
119 # TODO: johbo: I did test with 7.19 of libcurl. This version has
125 # trouble with 100 - continue being set in the expect header. This
120 # trouble with 100 - continue being set in the expect header. This
126 # can lead to massive performance drops, switching it off here.
121 # can lead to massive performance drops, switching it off here.
127
122
128 curl.setopt(curl.TCP_NODELAY, True)
123 curl.setopt(curl.TCP_NODELAY, True)
129 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
124 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
130 curl.setopt(curl.USERAGENT, 'RhodeCode HTTP {}'.format(rhodecode.__version__))
125 curl.setopt(curl.USERAGENT, 'RhodeCode HTTP {}'.format(rhodecode.__version__))
131 curl.setopt(curl.SSL_VERIFYPEER, 0)
126 curl.setopt(curl.SSL_VERIFYPEER, 0)
132 curl.setopt(curl.SSL_VERIFYHOST, 0)
127 curl.setopt(curl.SSL_VERIFYHOST, 0)
133 self._curl = curl
128 self._curl = curl
134
129
135 def post(self, url, data, allow_redirects=False, headers=None):
130 def post(self, url, data, allow_redirects=False, headers=None):
136 headers = headers or {}
131 headers = headers or {}
137 # format is ['header_name1: header_value1', 'header_name2: header_value2'])
132 # format is ['header_name1: header_value1', 'header_name2: header_value2'])
138 headers_list = ["Expect:"] + ['{}: {}'.format(k, v) for k, v in headers.items()]
133 headers_list = ["Expect:"] + ['{}: {}'.format(k, v) for k, v in headers.items()]
139 response_buffer = StringIO()
134 response_buffer = StringIO()
140
135
141 curl = self._curl
136 curl = self._curl
142 curl.setopt(curl.URL, url)
137 curl.setopt(curl.URL, url)
143 curl.setopt(curl.POST, True)
138 curl.setopt(curl.POST, True)
144 curl.setopt(curl.POSTFIELDS, data)
139 curl.setopt(curl.POSTFIELDS, data)
145 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
140 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
146 curl.setopt(curl.WRITEDATA, response_buffer)
141 curl.setopt(curl.WRITEDATA, response_buffer)
147 curl.setopt(curl.HTTPHEADER, headers_list)
142 curl.setopt(curl.HTTPHEADER, headers_list)
148 curl.perform()
143 curl.perform()
149
144
150 status_code = curl.getinfo(pycurl.HTTP_CODE)
145 status_code = curl.getinfo(pycurl.HTTP_CODE)
151
146
152 return CurlResponse(response_buffer, status_code)
147 return CurlResponse(response_buffer, status_code)
153
148
154
149
155 class CurlResponse(object):
150 class CurlResponse(object):
156 """
151 """
157 The response of a request, modeled after the requests API.
152 The response of a request, modeled after the requests API.
158
153
159 This class provides a subset of the response interface known from the
154 This class provides a subset of the response interface known from the
160 library `requests`. It is intentionally kept similar, so that we can use
155 library `requests`. It is intentionally kept similar, so that we can use
161 `requests` as a drop in replacement for benchmarking purposes.
156 `requests` as a drop in replacement for benchmarking purposes.
162 """
157 """
163
158
164 def __init__(self, response_buffer, status_code):
159 def __init__(self, response_buffer, status_code):
165 self._response_buffer = response_buffer
160 self._response_buffer = response_buffer
166 self._status_code = status_code
161 self._status_code = status_code
167
162
168 @property
163 @property
169 def content(self):
164 def content(self):
170 try:
165 try:
171 return self._response_buffer.getvalue()
166 return self._response_buffer.getvalue()
172 finally:
167 finally:
173 self._response_buffer.close()
168 self._response_buffer.close()
174
169
175 @property
170 @property
176 def status_code(self):
171 def status_code(self):
177 return self._status_code
172 return self._status_code
178
173
179 def iter_content(self, chunk_size):
174 def iter_content(self, chunk_size):
180 self._response_buffer.seek(0)
175 self._response_buffer.seek(0)
181 while 1:
176 while 1:
182 chunk = self._response_buffer.read(chunk_size)
177 chunk = self._response_buffer.read(chunk_size)
183 if not chunk:
178 if not chunk:
184 break
179 break
185 yield chunk
180 yield chunk
186
181
187
182
188 def _create_http_rpc_session():
183 def _create_http_rpc_session():
189 session = CurlSession()
184 session = CurlSession()
190 return session
185 return session
General Comments 0
You need to be logged in to leave comments. Login now