##// END OF EJS Templates
user-sessions: fix cleanup with corrupted session data.
marcink -
r4483:8eb46532 default
parent child Browse files
Show More
@@ -1,261 +1,264 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2017-2020 RhodeCode GmbH
3 # Copyright (C) 2017-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import re
22 import re
23 import time
23 import time
24 import datetime
24 import datetime
25 import dateutil
25 import dateutil
26 import pickle
26 import pickle
27
27
28 from rhodecode.model.db import DbSession, Session
28 from rhodecode.model.db import DbSession, Session
29
29
30
30
31 class CleanupCommand(Exception):
31 class CleanupCommand(Exception):
32 pass
32 pass
33
33
34
34
35 class BaseAuthSessions(object):
35 class BaseAuthSessions(object):
36 SESSION_TYPE = None
36 SESSION_TYPE = None
37 NOT_AVAILABLE = 'NOT AVAILABLE'
37 NOT_AVAILABLE = 'NOT AVAILABLE'
38
38
39 def __init__(self, config):
39 def __init__(self, config):
40 session_conf = {}
40 session_conf = {}
41 for k, v in config.items():
41 for k, v in config.items():
42 if k.startswith('beaker.session'):
42 if k.startswith('beaker.session'):
43 session_conf[k] = v
43 session_conf[k] = v
44 self.config = session_conf
44 self.config = session_conf
45
45
46 def get_count(self):
46 def get_count(self):
47 raise NotImplementedError
47 raise NotImplementedError
48
48
49 def get_expired_count(self, older_than_seconds=None):
49 def get_expired_count(self, older_than_seconds=None):
50 raise NotImplementedError
50 raise NotImplementedError
51
51
52 def clean_sessions(self, older_than_seconds=None):
52 def clean_sessions(self, older_than_seconds=None):
53 raise NotImplementedError
53 raise NotImplementedError
54
54
55 def _seconds_to_date(self, seconds):
55 def _seconds_to_date(self, seconds):
56 return datetime.datetime.utcnow() - dateutil.relativedelta.relativedelta(
56 return datetime.datetime.utcnow() - dateutil.relativedelta.relativedelta(
57 seconds=seconds)
57 seconds=seconds)
58
58
59
59
60 class DbAuthSessions(BaseAuthSessions):
60 class DbAuthSessions(BaseAuthSessions):
61 SESSION_TYPE = 'ext:database'
61 SESSION_TYPE = 'ext:database'
62
62
63 def get_count(self):
63 def get_count(self):
64 return DbSession.query().count()
64 return DbSession.query().count()
65
65
66 def get_expired_count(self, older_than_seconds=None):
66 def get_expired_count(self, older_than_seconds=None):
67 expiry_date = self._seconds_to_date(older_than_seconds)
67 expiry_date = self._seconds_to_date(older_than_seconds)
68 return DbSession.query().filter(DbSession.accessed < expiry_date).count()
68 return DbSession.query().filter(DbSession.accessed < expiry_date).count()
69
69
70 def clean_sessions(self, older_than_seconds=None):
70 def clean_sessions(self, older_than_seconds=None):
71 expiry_date = self._seconds_to_date(older_than_seconds)
71 expiry_date = self._seconds_to_date(older_than_seconds)
72 to_remove = DbSession.query().filter(DbSession.accessed < expiry_date).count()
72 to_remove = DbSession.query().filter(DbSession.accessed < expiry_date).count()
73 DbSession.query().filter(DbSession.accessed < expiry_date).delete()
73 DbSession.query().filter(DbSession.accessed < expiry_date).delete()
74 Session().commit()
74 Session().commit()
75 return to_remove
75 return to_remove
76
76
77
77
78 class FileAuthSessions(BaseAuthSessions):
78 class FileAuthSessions(BaseAuthSessions):
79 SESSION_TYPE = 'file sessions'
79 SESSION_TYPE = 'file sessions'
80
80
81 def _get_sessions_dir(self):
81 def _get_sessions_dir(self):
82 data_dir = self.config.get('beaker.session.data_dir')
82 data_dir = self.config.get('beaker.session.data_dir')
83 return data_dir
83 return data_dir
84
84
85 def _count_on_filesystem(self, path, older_than=0, callback=None):
85 def _count_on_filesystem(self, path, older_than=0, callback=None):
86 value = dict(percent=0, used=0, total=0, items=0, callbacks=0,
86 value = dict(percent=0, used=0, total=0, items=0, callbacks=0,
87 path=path, text='')
87 path=path, text='')
88 items_count = 0
88 items_count = 0
89 used = 0
89 used = 0
90 callbacks = 0
90 callbacks = 0
91 cur_time = time.time()
91 cur_time = time.time()
92 for root, dirs, files in os.walk(path):
92 for root, dirs, files in os.walk(path):
93 for f in files:
93 for f in files:
94 final_path = os.path.join(root, f)
94 final_path = os.path.join(root, f)
95 try:
95 try:
96 mtime = os.stat(final_path).st_mtime
96 mtime = os.stat(final_path).st_mtime
97 if (cur_time - mtime) > older_than:
97 if (cur_time - mtime) > older_than:
98 items_count += 1
98 items_count += 1
99 if callback:
99 if callback:
100 callback_res = callback(final_path)
100 callback_res = callback(final_path)
101 callbacks += 1
101 callbacks += 1
102 else:
102 else:
103 used += os.path.getsize(final_path)
103 used += os.path.getsize(final_path)
104 except OSError:
104 except OSError:
105 pass
105 pass
106 value.update({
106 value.update({
107 'percent': 100,
107 'percent': 100,
108 'used': used,
108 'used': used,
109 'total': used,
109 'total': used,
110 'items': items_count,
110 'items': items_count,
111 'callbacks': callbacks
111 'callbacks': callbacks
112 })
112 })
113 return value
113 return value
114
114
115 def get_count(self):
115 def get_count(self):
116 try:
116 try:
117 sessions_dir = self._get_sessions_dir()
117 sessions_dir = self._get_sessions_dir()
118 items_count = self._count_on_filesystem(sessions_dir)['items']
118 items_count = self._count_on_filesystem(sessions_dir)['items']
119 except Exception:
119 except Exception:
120 items_count = self.NOT_AVAILABLE
120 items_count = self.NOT_AVAILABLE
121 return items_count
121 return items_count
122
122
123 def get_expired_count(self, older_than_seconds=0):
123 def get_expired_count(self, older_than_seconds=0):
124 try:
124 try:
125 sessions_dir = self._get_sessions_dir()
125 sessions_dir = self._get_sessions_dir()
126 items_count = self._count_on_filesystem(
126 items_count = self._count_on_filesystem(
127 sessions_dir, older_than=older_than_seconds)['items']
127 sessions_dir, older_than=older_than_seconds)['items']
128 except Exception:
128 except Exception:
129 items_count = self.NOT_AVAILABLE
129 items_count = self.NOT_AVAILABLE
130 return items_count
130 return items_count
131
131
132 def clean_sessions(self, older_than_seconds=0):
132 def clean_sessions(self, older_than_seconds=0):
133 # find . -mtime +60 -exec rm {} \;
133 # find . -mtime +60 -exec rm {} \;
134
134
135 sessions_dir = self._get_sessions_dir()
135 sessions_dir = self._get_sessions_dir()
136
136
137 def remove_item(path):
137 def remove_item(path):
138 os.remove(path)
138 os.remove(path)
139
139
140 stats = self._count_on_filesystem(
140 stats = self._count_on_filesystem(
141 sessions_dir, older_than=older_than_seconds,
141 sessions_dir, older_than=older_than_seconds,
142 callback=remove_item)
142 callback=remove_item)
143 return stats['callbacks']
143 return stats['callbacks']
144
144
145
145
146 class MemcachedAuthSessions(BaseAuthSessions):
146 class MemcachedAuthSessions(BaseAuthSessions):
147 SESSION_TYPE = 'ext:memcached'
147 SESSION_TYPE = 'ext:memcached'
148 _key_regex = re.compile(r'ITEM (.*_session) \[(.*); (.*)\]')
148 _key_regex = re.compile(r'ITEM (.*_session) \[(.*); (.*)\]')
149
149
150 def _get_client(self):
150 def _get_client(self):
151 import memcache
151 import memcache
152 client = memcache.Client([self.config.get('beaker.session.url')])
152 client = memcache.Client([self.config.get('beaker.session.url')])
153 return client
153 return client
154
154
155 def _get_telnet_client(self, host, port):
155 def _get_telnet_client(self, host, port):
156 import telnetlib
156 import telnetlib
157 client = telnetlib.Telnet(host, port, None)
157 client = telnetlib.Telnet(host, port, None)
158 return client
158 return client
159
159
160 def _run_telnet_cmd(self, client, cmd):
160 def _run_telnet_cmd(self, client, cmd):
161 client.write("%s\n" % cmd)
161 client.write("%s\n" % cmd)
162 return client.read_until('END')
162 return client.read_until('END')
163
163
164 def key_details(self, client, slab_ids, limit=100):
164 def key_details(self, client, slab_ids, limit=100):
165 """ Return a list of tuples containing keys and details """
165 """ Return a list of tuples containing keys and details """
166 cmd = 'stats cachedump %s %s'
166 cmd = 'stats cachedump %s %s'
167 for slab_id in slab_ids:
167 for slab_id in slab_ids:
168 for key in self._key_regex.finditer(
168 for key in self._key_regex.finditer(
169 self._run_telnet_cmd(client, cmd % (slab_id, limit))):
169 self._run_telnet_cmd(client, cmd % (slab_id, limit))):
170 yield key
170 yield key
171
171
172 def get_count(self):
172 def get_count(self):
173 client = self._get_client()
173 client = self._get_client()
174 count = self.NOT_AVAILABLE
174 count = self.NOT_AVAILABLE
175 try:
175 try:
176 slabs = []
176 slabs = []
177 for server, slabs_data in client.get_slabs():
177 for server, slabs_data in client.get_slabs():
178 slabs.extend(slabs_data.keys())
178 slabs.extend(slabs_data.keys())
179
179
180 host, port = client.servers[0].address
180 host, port = client.servers[0].address
181 telnet_client = self._get_telnet_client(host, port)
181 telnet_client = self._get_telnet_client(host, port)
182 keys = self.key_details(telnet_client, slabs)
182 keys = self.key_details(telnet_client, slabs)
183 count = 0
183 count = 0
184 for _k in keys:
184 for _k in keys:
185 count += 1
185 count += 1
186 except Exception:
186 except Exception:
187 return count
187 return count
188
188
189 return count
189 return count
190
190
191 def get_expired_count(self, older_than_seconds=None):
191 def get_expired_count(self, older_than_seconds=None):
192 return self.NOT_AVAILABLE
192 return self.NOT_AVAILABLE
193
193
194 def clean_sessions(self, older_than_seconds=None):
194 def clean_sessions(self, older_than_seconds=None):
195 raise CleanupCommand('Cleanup for this session type not yet available')
195 raise CleanupCommand('Cleanup for this session type not yet available')
196
196
197
197
198 class RedisAuthSessions(BaseAuthSessions):
198 class RedisAuthSessions(BaseAuthSessions):
199 SESSION_TYPE = 'ext:redis'
199 SESSION_TYPE = 'ext:redis'
200
200
201 def _get_client(self):
201 def _get_client(self):
202 import redis
202 import redis
203 args = {
203 args = {
204 'socket_timeout': 60,
204 'socket_timeout': 60,
205 'url': self.config.get('beaker.session.url')
205 'url': self.config.get('beaker.session.url')
206 }
206 }
207
207
208 client = redis.StrictRedis.from_url(**args)
208 client = redis.StrictRedis.from_url(**args)
209 return client
209 return client
210
210
211 def get_count(self):
211 def get_count(self):
212 client = self._get_client()
212 client = self._get_client()
213 return len(client.keys('beaker_cache:*'))
213 return len(client.keys('beaker_cache:*'))
214
214
215 def get_expired_count(self, older_than_seconds=None):
215 def get_expired_count(self, older_than_seconds=None):
216 expiry_date = self._seconds_to_date(older_than_seconds)
216 expiry_date = self._seconds_to_date(older_than_seconds)
217 return self.NOT_AVAILABLE
217 return self.NOT_AVAILABLE
218
218
219 def clean_sessions(self, older_than_seconds=None):
219 def clean_sessions(self, older_than_seconds=None):
220 client = self._get_client()
220 client = self._get_client()
221 expiry_time = time.time() - older_than_seconds
221 expiry_time = time.time() - older_than_seconds
222 deleted_keys = 0
222 deleted_keys = 0
223 for key in client.keys('beaker_cache:*'):
223 for key in client.keys('beaker_cache:*'):
224 data = client.get(key)
224 data = client.get(key)
225 if data:
225 if data:
226 json_data = pickle.loads(data)
226 json_data = pickle.loads(data)
227 accessed_time = json_data['_accessed_time']
227 try:
228 accessed_time = json_data['_accessed_time']
229 except KeyError:
230 accessed_time = 0
228 if accessed_time < expiry_time:
231 if accessed_time < expiry_time:
229 client.delete(key)
232 client.delete(key)
230 deleted_keys += 1
233 deleted_keys += 1
231
234
232 return deleted_keys
235 return deleted_keys
233
236
234
237
235 class MemoryAuthSessions(BaseAuthSessions):
238 class MemoryAuthSessions(BaseAuthSessions):
236 SESSION_TYPE = 'memory'
239 SESSION_TYPE = 'memory'
237
240
238 def get_count(self):
241 def get_count(self):
239 return self.NOT_AVAILABLE
242 return self.NOT_AVAILABLE
240
243
241 def get_expired_count(self, older_than_seconds=None):
244 def get_expired_count(self, older_than_seconds=None):
242 return self.NOT_AVAILABLE
245 return self.NOT_AVAILABLE
243
246
244 def clean_sessions(self, older_than_seconds=None):
247 def clean_sessions(self, older_than_seconds=None):
245 raise CleanupCommand('Cleanup for this session type not yet available')
248 raise CleanupCommand('Cleanup for this session type not yet available')
246
249
247
250
248 def get_session_handler(session_type):
251 def get_session_handler(session_type):
249 types = {
252 types = {
250 'file': FileAuthSessions,
253 'file': FileAuthSessions,
251 'ext:memcached': MemcachedAuthSessions,
254 'ext:memcached': MemcachedAuthSessions,
252 'ext:redis': RedisAuthSessions,
255 'ext:redis': RedisAuthSessions,
253 'ext:database': DbAuthSessions,
256 'ext:database': DbAuthSessions,
254 'memory': MemoryAuthSessions
257 'memory': MemoryAuthSessions
255 }
258 }
256
259
257 try:
260 try:
258 return types[session_type]
261 return types[session_type]
259 except KeyError:
262 except KeyError:
260 raise ValueError(
263 raise ValueError(
261 'This type {} is not supported'.format(session_type))
264 'This type {} is not supported'.format(session_type))
General Comments 0
You need to be logged in to leave comments. Login now