##// END OF EJS Templates
sessions: fixed more problems with redis sessions cleanup errors
super-admin -
r5157:2ffcea0c default
parent child Browse files
Show More
@@ -1,271 +1,275 b''
1 # Copyright (C) 2017-2023 RhodeCode GmbH
1 # Copyright (C) 2017-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import os
19 import os
20 import re
20 import re
21 import time
21 import time
22 import datetime
22 import datetime
23
24 import binascii
23 import dateutil
25 import dateutil
24 import dateutil.relativedelta
26 import dateutil.relativedelta
25 import pickle
27 import pickle
26 import base64
28 import base64
27
29
28 from rhodecode.model.db import DbSession, Session
30 from rhodecode.model.db import DbSession, Session
29
31
30
32
31 class CleanupCommand(Exception):
33 class CleanupCommand(Exception):
32 pass
34 pass
33
35
34
36
35 class BaseAuthSessions(object):
37 class BaseAuthSessions(object):
36 SESSION_TYPE = None
38 SESSION_TYPE = None
37 NOT_AVAILABLE = 'NOT AVAILABLE'
39 NOT_AVAILABLE = 'NOT AVAILABLE'
38
40
39 def __init__(self, config):
41 def __init__(self, config):
40 session_conf = {}
42 session_conf = {}
41 for k, v in list(config.items()):
43 for k, v in list(config.items()):
42 if k.startswith('beaker.session'):
44 if k.startswith('beaker.session'):
43 session_conf[k] = v
45 session_conf[k] = v
44 self.config = session_conf
46 self.config = session_conf
45
47
46 def get_count(self):
48 def get_count(self):
47 raise NotImplementedError
49 raise NotImplementedError
48
50
49 def get_expired_count(self, older_than_seconds=None):
51 def get_expired_count(self, older_than_seconds=None):
50 raise NotImplementedError
52 raise NotImplementedError
51
53
52 def clean_sessions(self, older_than_seconds=None):
54 def clean_sessions(self, older_than_seconds=None):
53 raise NotImplementedError
55 raise NotImplementedError
54
56
55 def _seconds_to_date(self, seconds):
57 def _seconds_to_date(self, seconds):
56 return datetime.datetime.utcnow() - dateutil.relativedelta.relativedelta(
58 return datetime.datetime.utcnow() - dateutil.relativedelta.relativedelta(
57 seconds=seconds)
59 seconds=seconds)
58
60
59
61
60 class DbAuthSessions(BaseAuthSessions):
62 class DbAuthSessions(BaseAuthSessions):
61 SESSION_TYPE = 'ext:database'
63 SESSION_TYPE = 'ext:database'
62
64
63 def get_count(self):
65 def get_count(self):
64 return DbSession.query().count()
66 return DbSession.query().count()
65
67
66 def get_expired_count(self, older_than_seconds=None):
68 def get_expired_count(self, older_than_seconds=None):
67 expiry_date = self._seconds_to_date(older_than_seconds)
69 expiry_date = self._seconds_to_date(older_than_seconds)
68 return DbSession.query().filter(DbSession.accessed < expiry_date).count()
70 return DbSession.query().filter(DbSession.accessed < expiry_date).count()
69
71
70 def clean_sessions(self, older_than_seconds=None):
72 def clean_sessions(self, older_than_seconds=None):
71 expiry_date = self._seconds_to_date(older_than_seconds)
73 expiry_date = self._seconds_to_date(older_than_seconds)
72 to_remove = DbSession.query().filter(DbSession.accessed < expiry_date).count()
74 to_remove = DbSession.query().filter(DbSession.accessed < expiry_date).count()
73 DbSession.query().filter(DbSession.accessed < expiry_date).delete()
75 DbSession.query().filter(DbSession.accessed < expiry_date).delete()
74 Session().commit()
76 Session().commit()
75 return to_remove
77 return to_remove
76
78
77
79
78 class FileAuthSessions(BaseAuthSessions):
80 class FileAuthSessions(BaseAuthSessions):
79 SESSION_TYPE = 'file sessions'
81 SESSION_TYPE = 'file sessions'
80
82
81 def _get_sessions_dir(self):
83 def _get_sessions_dir(self):
82 data_dir = self.config.get('beaker.session.data_dir')
84 data_dir = self.config.get('beaker.session.data_dir')
83 return data_dir
85 return data_dir
84
86
85 def _count_on_filesystem(self, path, older_than=0, callback=None):
87 def _count_on_filesystem(self, path, older_than=0, callback=None):
86 value = dict(percent=0, used=0, total=0, items=0, callbacks=0,
88 value = dict(percent=0, used=0, total=0, items=0, callbacks=0,
87 path=path, text='')
89 path=path, text='')
88 items_count = 0
90 items_count = 0
89 used = 0
91 used = 0
90 callbacks = 0
92 callbacks = 0
91 cur_time = time.time()
93 cur_time = time.time()
92 for root, dirs, files in os.walk(path):
94 for root, dirs, files in os.walk(path):
93 for f in files:
95 for f in files:
94 final_path = os.path.join(root, f)
96 final_path = os.path.join(root, f)
95 try:
97 try:
96 mtime = os.stat(final_path).st_mtime
98 mtime = os.stat(final_path).st_mtime
97 if (cur_time - mtime) > older_than:
99 if (cur_time - mtime) > older_than:
98 items_count += 1
100 items_count += 1
99 if callback:
101 if callback:
100 callback_res = callback(final_path)
102 callback_res = callback(final_path)
101 callbacks += 1
103 callbacks += 1
102 else:
104 else:
103 used += os.path.getsize(final_path)
105 used += os.path.getsize(final_path)
104 except OSError:
106 except OSError:
105 pass
107 pass
106 value.update({
108 value.update({
107 'percent': 100,
109 'percent': 100,
108 'used': used,
110 'used': used,
109 'total': used,
111 'total': used,
110 'items': items_count,
112 'items': items_count,
111 'callbacks': callbacks
113 'callbacks': callbacks
112 })
114 })
113 return value
115 return value
114
116
115 def get_count(self):
117 def get_count(self):
116 try:
118 try:
117 sessions_dir = self._get_sessions_dir()
119 sessions_dir = self._get_sessions_dir()
118 items_count = self._count_on_filesystem(sessions_dir)['items']
120 items_count = self._count_on_filesystem(sessions_dir)['items']
119 except Exception:
121 except Exception:
120 items_count = self.NOT_AVAILABLE
122 items_count = self.NOT_AVAILABLE
121 return items_count
123 return items_count
122
124
123 def get_expired_count(self, older_than_seconds=0):
125 def get_expired_count(self, older_than_seconds=0):
124 try:
126 try:
125 sessions_dir = self._get_sessions_dir()
127 sessions_dir = self._get_sessions_dir()
126 items_count = self._count_on_filesystem(
128 items_count = self._count_on_filesystem(
127 sessions_dir, older_than=older_than_seconds)['items']
129 sessions_dir, older_than=older_than_seconds)['items']
128 except Exception:
130 except Exception:
129 items_count = self.NOT_AVAILABLE
131 items_count = self.NOT_AVAILABLE
130 return items_count
132 return items_count
131
133
132 def clean_sessions(self, older_than_seconds=0):
134 def clean_sessions(self, older_than_seconds=0):
133 # find . -mtime +60 -exec rm {} \;
135 # find . -mtime +60 -exec rm {} \;
134
136
135 sessions_dir = self._get_sessions_dir()
137 sessions_dir = self._get_sessions_dir()
136
138
137 def remove_item(path):
139 def remove_item(path):
138 os.remove(path)
140 os.remove(path)
139
141
140 stats = self._count_on_filesystem(
142 stats = self._count_on_filesystem(
141 sessions_dir, older_than=older_than_seconds,
143 sessions_dir, older_than=older_than_seconds,
142 callback=remove_item)
144 callback=remove_item)
143 return stats['callbacks']
145 return stats['callbacks']
144
146
145
147
146 class MemcachedAuthSessions(BaseAuthSessions):
148 class MemcachedAuthSessions(BaseAuthSessions):
147 SESSION_TYPE = 'ext:memcached'
149 SESSION_TYPE = 'ext:memcached'
148 _key_regex = re.compile(r'ITEM (.*_session) \[(.*); (.*)\]')
150 _key_regex = re.compile(r'ITEM (.*_session) \[(.*); (.*)\]')
149
151
150 def _get_client(self):
152 def _get_client(self):
151 import memcache
153 import memcache
152 client = memcache.Client([self.config.get('beaker.session.url')])
154 client = memcache.Client([self.config.get('beaker.session.url')])
153 return client
155 return client
154
156
155 def _get_telnet_client(self, host, port):
157 def _get_telnet_client(self, host, port):
156 import telnetlib
158 import telnetlib
157 client = telnetlib.Telnet(host, port, None)
159 client = telnetlib.Telnet(host, port, None)
158 return client
160 return client
159
161
160 def _run_telnet_cmd(self, client, cmd):
162 def _run_telnet_cmd(self, client, cmd):
161 client.write("%s\n" % cmd)
163 client.write("%s\n" % cmd)
162 return client.read_until('END')
164 return client.read_until('END')
163
165
164 def key_details(self, client, slab_ids, limit=100):
166 def key_details(self, client, slab_ids, limit=100):
165 """ Return a list of tuples containing keys and details """
167 """ Return a list of tuples containing keys and details """
166 cmd = 'stats cachedump %s %s'
168 cmd = 'stats cachedump %s %s'
167 for slab_id in slab_ids:
169 for slab_id in slab_ids:
168 yield from self._key_regex.finditer(
170 yield from self._key_regex.finditer(
169 self._run_telnet_cmd(client, cmd % (slab_id, limit)))
171 self._run_telnet_cmd(client, cmd % (slab_id, limit)))
170
172
171 def get_count(self):
173 def get_count(self):
172 client = self._get_client()
174 client = self._get_client()
173 count = self.NOT_AVAILABLE
175 count = self.NOT_AVAILABLE
174 try:
176 try:
175 slabs = []
177 slabs = []
176 for server, slabs_data in client.get_slabs():
178 for server, slabs_data in client.get_slabs():
177 slabs.extend(list(slabs_data.keys()))
179 slabs.extend(list(slabs_data.keys()))
178
180
179 host, port = client.servers[0].address
181 host, port = client.servers[0].address
180 telnet_client = self._get_telnet_client(host, port)
182 telnet_client = self._get_telnet_client(host, port)
181 keys = self.key_details(telnet_client, slabs)
183 keys = self.key_details(telnet_client, slabs)
182 count = 0
184 count = 0
183 for _k in keys:
185 for _k in keys:
184 count += 1
186 count += 1
185 except Exception:
187 except Exception:
186 return count
188 return count
187
189
188 return count
190 return count
189
191
190 def get_expired_count(self, older_than_seconds=None):
192 def get_expired_count(self, older_than_seconds=None):
191 return self.NOT_AVAILABLE
193 return self.NOT_AVAILABLE
192
194
193 def clean_sessions(self, older_than_seconds=None):
195 def clean_sessions(self, older_than_seconds=None):
194 raise CleanupCommand('Cleanup for this session type not yet available')
196 raise CleanupCommand('Cleanup for this session type not yet available')
195
197
196
198
197 class RedisAuthSessions(BaseAuthSessions):
199 class RedisAuthSessions(BaseAuthSessions):
198 SESSION_TYPE = 'ext:redis'
200 SESSION_TYPE = 'ext:redis'
199
201
200 def _get_client(self):
202 def _get_client(self):
201 import redis
203 import redis
202 args = {
204 args = {
203 'socket_timeout': 60,
205 'socket_timeout': 60,
204 'decode_responses': False,
206 'decode_responses': False,
205 'url': self.config.get('beaker.session.url')
207 'url': self.config.get('beaker.session.url')
206 }
208 }
207
209
208 client = redis.StrictRedis.from_url(**args)
210 client = redis.StrictRedis.from_url(**args)
209 return client
211 return client
210
212
211 def get_count(self):
213 def get_count(self):
212 client = self._get_client()
214 client = self._get_client()
213 return len(client.keys('beaker_cache:*'))
215 return len(client.keys('beaker_cache:*'))
214
216
215 def get_expired_count(self, older_than_seconds=None):
217 def get_expired_count(self, older_than_seconds=None):
216 expiry_date = self._seconds_to_date(older_than_seconds)
218 expiry_date = self._seconds_to_date(older_than_seconds)
217 return self.NOT_AVAILABLE
219 return self.NOT_AVAILABLE
218
220
219 def clean_sessions(self, older_than_seconds=None):
221 def clean_sessions(self, older_than_seconds=None):
220 client = self._get_client()
222 client = self._get_client()
221 expiry_time = time.time() - older_than_seconds
223 expiry_time = time.time() - older_than_seconds
222 deleted_keys = 0
224 deleted_keys = 0
223
225
224 for key in client.keys('beaker_cache:*'):
226 for key in client.keys('beaker_cache:*'):
225 data = client.get(key)
227 data = client.get(key)
226 if data:
228 if data:
227 accessed_time = 0
229 accessed_time = 0
228 data = base64.b64decode(data)
229
230
230 try:
231 try:
232 data = base64.b64decode(data)
231 json_data = pickle.loads(data)
233 json_data = pickle.loads(data)
232 accessed_time = json_data['_accessed_time']
234 accessed_time = json_data['_accessed_time']
235 except binascii.Error:
236 accessed_time = 0
233 except pickle.UnpicklingError:
237 except pickle.UnpicklingError:
234 accessed_time = 0
238 accessed_time = 0
235 except KeyError:
239 except KeyError:
236 accessed_time = 0
240 accessed_time = 0
237
241
238 if accessed_time < expiry_time:
242 if accessed_time < expiry_time:
239 client.delete(key)
243 client.delete(key)
240 deleted_keys += 1
244 deleted_keys += 1
241
245
242 return deleted_keys
246 return deleted_keys
243
247
244
248
245 class MemoryAuthSessions(BaseAuthSessions):
249 class MemoryAuthSessions(BaseAuthSessions):
246 SESSION_TYPE = 'memory'
250 SESSION_TYPE = 'memory'
247
251
248 def get_count(self):
252 def get_count(self):
249 return self.NOT_AVAILABLE
253 return self.NOT_AVAILABLE
250
254
251 def get_expired_count(self, older_than_seconds=None):
255 def get_expired_count(self, older_than_seconds=None):
252 return self.NOT_AVAILABLE
256 return self.NOT_AVAILABLE
253
257
254 def clean_sessions(self, older_than_seconds=None):
258 def clean_sessions(self, older_than_seconds=None):
255 raise CleanupCommand('Cleanup for this session type not yet available')
259 raise CleanupCommand('Cleanup for this session type not yet available')
256
260
257
261
258 def get_session_handler(session_type):
262 def get_session_handler(session_type):
259 types = {
263 types = {
260 'file': FileAuthSessions,
264 'file': FileAuthSessions,
261 'ext:memcached': MemcachedAuthSessions,
265 'ext:memcached': MemcachedAuthSessions,
262 'ext:redis': RedisAuthSessions,
266 'ext:redis': RedisAuthSessions,
263 'ext:database': DbAuthSessions,
267 'ext:database': DbAuthSessions,
264 'memory': MemoryAuthSessions
268 'memory': MemoryAuthSessions
265 }
269 }
266
270
267 try:
271 try:
268 return types[session_type]
272 return types[session_type]
269 except KeyError:
273 except KeyError:
270 raise ValueError(
274 raise ValueError(
271 f'This type {session_type} is not supported')
275 f'This type {session_type} is not supported')
General Comments 0
You need to be logged in to leave comments. Login now