##// END OF EJS Templates
audit-logs: expose download user audit logs as JSON file....
audit-logs: expose download user audit logs as JSON file. Primarly in larger organizations often this needs to be performed for some employees. Before we delegated users to API or DB fetch.

File last commit:

r3933:37529f8b default
r3970:36c4e038 default
Show More
backends.py
288 lines | 8.3 KiB | text/x-python | PythonLexer
caches: rewrite of auth/permission caches to dogpile.
r2845 # -*- coding: utf-8 -*-
docs: updated copyrights to 2019
r3363 # Copyright (C) 2015-2019 RhodeCode GmbH
caches: rewrite of auth/permission caches to dogpile.
r2845 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
caches: updated cache backend to new vcsserver caches implementation.
r3848
caches: use a gevent compatible file-lock mechanism....
r2878 import time
import errno
import logging
caches: synced cache logic with vcsserver.
r3851 import msgpack
caches: use a gevent compatible file-lock mechanism....
r2878 import gevent
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
r3930 import redis
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: synced cache logic with vcsserver.
r3851 from dogpile.cache.api import CachedValue
caches: rewrite of auth/permission caches to dogpile.
r2845 from dogpile.cache.backends import memory as memory_backend
from dogpile.cache.backends import file as file_backend
from dogpile.cache.backends import redis as redis_backend
caches: use a gevent compatible file-lock mechanism....
r2878 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
from dogpile.cache.util import memoized_property
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking...
r2945
from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
caches: rewrite of auth/permission caches to dogpile.
r2845
_default_max_size = 1024
caches: use a gevent compatible file-lock mechanism....
r2878 log = logging.getLogger(__name__)
caches: rewrite of auth/permission caches to dogpile.
r2845
class LRUMemoryBackend(memory_backend.MemoryBackend):
caches: synced cache logic with vcsserver.
r3851 key_prefix = 'lru_mem_backend'
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 pickle_values = False
caches: rewrite of auth/permission caches to dogpile.
r2845
def __init__(self, arguments):
max_size = arguments.pop('max_size', _default_max_size)
caches: allow debug of max_size reached by the MemoryLRU cache.
r2887
caches: use repo.lru based Dict cache. This LRUDict uses Timing Algo to not have to use locking...
r2945 LRUDictClass = LRUDict
if arguments.pop('log_key_count', None):
LRUDictClass = LRUDictDebug
arguments['cache_dict'] = LRUDictClass(max_size)
caches: rewrite of auth/permission caches to dogpile.
r2845 super(LRUMemoryBackend, self).__init__(arguments)
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 def delete(self, key):
caches: use safer method of purging keys from memory dict. During some concurrency tests...
r2931 try:
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882 del self._cache[key]
caches: use safer method of purging keys from memory dict. During some concurrency tests...
r2931 except KeyError:
# we don't care if key isn't there at deletion
pass
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882
def delete_multi(self, keys):
for key in keys:
caches: use safer method of purging keys from memory dict. During some concurrency tests...
r2931 self.delete(key)
caches: use a faster LRUDict implementation for LRUMemoryCache
r2882
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: synced cache logic with vcsserver.
r3851 class PickleSerializer(object):
caches: use safest possible pickle for cache backends....
r2909 def _dumps(self, value, safe=False):
try:
return compat.pickle.dumps(value)
except Exception:
if safe:
return NO_VALUE
else:
raise
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: use safest possible pickle for cache backends....
r2909 def _loads(self, value, safe=True):
try:
return compat.pickle.loads(value)
except Exception:
if safe:
return NO_VALUE
else:
raise
caches: rewrite of auth/permission caches to dogpile.
r2845
caches: synced cache logic with vcsserver.
r3851 class MsgPackSerializer(object):
def _dumps(self, value, safe=False):
try:
return msgpack.packb(value)
except Exception:
if safe:
return NO_VALUE
else:
raise
def _loads(self, value, safe=True):
"""
pickle maintained the `CachedValue` wrapper of the tuple
msgpack does not, so it must be added back in.
"""
try:
value = msgpack.unpackb(value, use_list=False)
return CachedValue(*value)
except Exception:
if safe:
return NO_VALUE
else:
raise
cache: use global flock to prevent recursion when using gevent workers.
r3417 import fcntl
flock_org = fcntl.flock
caches: use a gevent compatible file-lock mechanism....
r2878 class CustomLockFactory(FileLock):
@memoized_property
def _module(self):
def gevent_flock(fd, operation):
"""
Gevent compatible flock
"""
# set non-blocking, this will cause an exception if we cannot acquire a lock
operation |= fcntl.LOCK_NB
start_lock_time = time.time()
summary: don't load size on container expand, only on manual action....
r3334 timeout = 60 * 15 # 15min
caches: use a gevent compatible file-lock mechanism....
r2878 while True:
try:
flock_org(fd, operation)
# lock has been acquired
break
except (OSError, IOError) as e:
# raise on other errors than Resource temporarily unavailable
if e.errno != errno.EAGAIN:
raise
elif (time.time() - start_lock_time) > timeout:
# waited to much time on a lock, better fail than loop for ever
cache: fix overwrite of flock timeout, and improve logging.
r2947 log.error('Failed to acquire lock on `%s` after waiting %ss',
self.filename, timeout)
caches: use a gevent compatible file-lock mechanism....
r2878 raise
cache: fix overwrite of flock timeout, and improve logging.
r2947 wait_timeout = 0.03
log.debug('Failed to acquire lock on `%s`, retry in %ss',
self.filename, wait_timeout)
gevent.sleep(wait_timeout)
caches: use a gevent compatible file-lock mechanism....
r2878
fcntl.flock = gevent_flock
return fcntl
caches: synced cache logic with vcsserver.
r3851 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
key_prefix = 'file_backend'
caches: rewrite of auth/permission caches to dogpile.
r2845
def __init__(self, arguments):
caches: use a gevent compatible file-lock mechanism....
r2878 arguments['lock_factory'] = CustomLockFactory
caches: rewrite of auth/permission caches to dogpile.
r2845 super(FileNamespaceBackend, self).__init__(arguments)
caches: added reprs
r3933 def __repr__(self):
return '{} `{}`'.format(self.__class__, self.filename)
caches: don't use beaker for file caches anymore
r2846 def list_keys(self, prefix=''):
caches: synced cache logic with vcsserver.
r3851 prefix = '{}:{}'.format(self.key_prefix, prefix)
caches: don't use beaker for file caches anymore
r2846 def cond(v):
if not prefix:
return True
if v.startswith(prefix):
return True
return False
caches: rewrite of auth/permission caches to dogpile.
r2845 with self._dbm_file(True) as dbm:
caches: don't use beaker for file caches anymore
r2846
return filter(cond, dbm.keys())
caches: rewrite of auth/permission caches to dogpile.
r2845
def get_store(self):
return self.filename
def get(self, key):
with self._dbm_file(False) as dbm:
if hasattr(dbm, 'get'):
value = dbm.get(key, NO_VALUE)
else:
# gdbm objects lack a .get method
try:
value = dbm[key]
except KeyError:
value = NO_VALUE
if value is not NO_VALUE:
value = self._loads(value)
return value
def set(self, key, value):
with self._dbm_file(True) as dbm:
dbm[key] = self._dumps(value)
def set_multi(self, mapping):
with self._dbm_file(True) as dbm:
for key, value in mapping.items():
dbm[key] = self._dumps(value)
caches: synced cache logic with vcsserver.
r3851 class BaseRedisBackend(redis_backend.RedisBackend):
dogpile: use connection_pool for redis backend which is faster in gevent scenarios
r3930
def _create_client(self):
args = {}
if self.url is not None:
args.update(url=self.url)
else:
args.update(
host=self.host, password=self.password,
port=self.port, db=self.db
)
connection_pool = redis.ConnectionPool(**args)
return redis.StrictRedis(connection_pool=connection_pool)
caches: don't use beaker for file caches anymore
r2846 def list_keys(self, prefix=''):
caches: synced cache logic with vcsserver.
r3851 prefix = '{}:{}*'.format(self.key_prefix, prefix)
caches: don't use beaker for file caches anymore
r2846 return self.client.keys(prefix)
caches: rewrite of auth/permission caches to dogpile.
r2845
def get_store(self):
return self.client.connection_pool
caches: use safest possible pickle for cache backends....
r2909 def get(self, key):
value = self.client.get(key)
if value is None:
return NO_VALUE
return self._loads(value)
caches: synced cache logic with vcsserver.
r3851 def get_multi(self, keys):
if not keys:
return []
values = self.client.mget(keys)
loads = self._loads
return [
loads(v) if v is not None else NO_VALUE
for v in values]
caches: rewrite of auth/permission caches to dogpile.
r2845 def set(self, key, value):
if self.redis_expiration_time:
self.client.setex(key, self.redis_expiration_time,
self._dumps(value))
else:
self.client.set(key, self._dumps(value))
def set_multi(self, mapping):
caches: synced cache logic with vcsserver.
r3851 dumps = self._dumps
caches: rewrite of auth/permission caches to dogpile.
r2845 mapping = dict(
caches: synced cache logic with vcsserver.
r3851 (k, dumps(v))
caches: rewrite of auth/permission caches to dogpile.
r2845 for k, v in mapping.items()
)
if not self.redis_expiration_time:
self.client.mset(mapping)
else:
pipe = self.client.pipeline()
for key, value in mapping.items():
pipe.setex(key, self.redis_expiration_time, value)
pipe.execute()
redis: add logging about lock acquire, this should help in case of locked row debugging
r3466
def get_mutex(self, key):
u = redis_backend.u
if self.distributed_lock:
lock_key = u('_lock_{0}').format(key)
log.debug('Trying to acquire Redis lock for key %s', lock_key)
return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
else:
return None
caches: synced cache logic with vcsserver.
r3851
class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
key_prefix = 'redis_pickle_backend'
pass
class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
key_prefix = 'redis_msgpack_backend'
pass