##// END OF EJS Templates
user-sessions: fix cleanup with corrupted session data.
marcink -
r4483:8eb46532 default
parent child Browse files
Show More
@@ -1,261 +1,264 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2017-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import re
23 23 import time
24 24 import datetime
25 25 import dateutil
26 26 import pickle
27 27
28 28 from rhodecode.model.db import DbSession, Session
29 29
30 30
31 31 class CleanupCommand(Exception):
32 32 pass
33 33
34 34
35 35 class BaseAuthSessions(object):
36 36 SESSION_TYPE = None
37 37 NOT_AVAILABLE = 'NOT AVAILABLE'
38 38
39 39 def __init__(self, config):
40 40 session_conf = {}
41 41 for k, v in config.items():
42 42 if k.startswith('beaker.session'):
43 43 session_conf[k] = v
44 44 self.config = session_conf
45 45
46 46 def get_count(self):
47 47 raise NotImplementedError
48 48
49 49 def get_expired_count(self, older_than_seconds=None):
50 50 raise NotImplementedError
51 51
52 52 def clean_sessions(self, older_than_seconds=None):
53 53 raise NotImplementedError
54 54
55 55 def _seconds_to_date(self, seconds):
56 56 return datetime.datetime.utcnow() - dateutil.relativedelta.relativedelta(
57 57 seconds=seconds)
58 58
59 59
60 60 class DbAuthSessions(BaseAuthSessions):
61 61 SESSION_TYPE = 'ext:database'
62 62
63 63 def get_count(self):
64 64 return DbSession.query().count()
65 65
66 66 def get_expired_count(self, older_than_seconds=None):
67 67 expiry_date = self._seconds_to_date(older_than_seconds)
68 68 return DbSession.query().filter(DbSession.accessed < expiry_date).count()
69 69
70 70 def clean_sessions(self, older_than_seconds=None):
71 71 expiry_date = self._seconds_to_date(older_than_seconds)
72 72 to_remove = DbSession.query().filter(DbSession.accessed < expiry_date).count()
73 73 DbSession.query().filter(DbSession.accessed < expiry_date).delete()
74 74 Session().commit()
75 75 return to_remove
76 76
77 77
78 78 class FileAuthSessions(BaseAuthSessions):
79 79 SESSION_TYPE = 'file sessions'
80 80
81 81 def _get_sessions_dir(self):
82 82 data_dir = self.config.get('beaker.session.data_dir')
83 83 return data_dir
84 84
85 85 def _count_on_filesystem(self, path, older_than=0, callback=None):
86 86 value = dict(percent=0, used=0, total=0, items=0, callbacks=0,
87 87 path=path, text='')
88 88 items_count = 0
89 89 used = 0
90 90 callbacks = 0
91 91 cur_time = time.time()
92 92 for root, dirs, files in os.walk(path):
93 93 for f in files:
94 94 final_path = os.path.join(root, f)
95 95 try:
96 96 mtime = os.stat(final_path).st_mtime
97 97 if (cur_time - mtime) > older_than:
98 98 items_count += 1
99 99 if callback:
100 100 callback_res = callback(final_path)
101 101 callbacks += 1
102 102 else:
103 103 used += os.path.getsize(final_path)
104 104 except OSError:
105 105 pass
106 106 value.update({
107 107 'percent': 100,
108 108 'used': used,
109 109 'total': used,
110 110 'items': items_count,
111 111 'callbacks': callbacks
112 112 })
113 113 return value
114 114
115 115 def get_count(self):
116 116 try:
117 117 sessions_dir = self._get_sessions_dir()
118 118 items_count = self._count_on_filesystem(sessions_dir)['items']
119 119 except Exception:
120 120 items_count = self.NOT_AVAILABLE
121 121 return items_count
122 122
123 123 def get_expired_count(self, older_than_seconds=0):
124 124 try:
125 125 sessions_dir = self._get_sessions_dir()
126 126 items_count = self._count_on_filesystem(
127 127 sessions_dir, older_than=older_than_seconds)['items']
128 128 except Exception:
129 129 items_count = self.NOT_AVAILABLE
130 130 return items_count
131 131
132 132 def clean_sessions(self, older_than_seconds=0):
133 133 # find . -mtime +60 -exec rm {} \;
134 134
135 135 sessions_dir = self._get_sessions_dir()
136 136
137 137 def remove_item(path):
138 138 os.remove(path)
139 139
140 140 stats = self._count_on_filesystem(
141 141 sessions_dir, older_than=older_than_seconds,
142 142 callback=remove_item)
143 143 return stats['callbacks']
144 144
145 145
146 146 class MemcachedAuthSessions(BaseAuthSessions):
147 147 SESSION_TYPE = 'ext:memcached'
148 148 _key_regex = re.compile(r'ITEM (.*_session) \[(.*); (.*)\]')
149 149
150 150 def _get_client(self):
151 151 import memcache
152 152 client = memcache.Client([self.config.get('beaker.session.url')])
153 153 return client
154 154
155 155 def _get_telnet_client(self, host, port):
156 156 import telnetlib
157 157 client = telnetlib.Telnet(host, port, None)
158 158 return client
159 159
160 160 def _run_telnet_cmd(self, client, cmd):
161 161 client.write("%s\n" % cmd)
162 162 return client.read_until('END')
163 163
164 164 def key_details(self, client, slab_ids, limit=100):
165 165 """ Return a list of tuples containing keys and details """
166 166 cmd = 'stats cachedump %s %s'
167 167 for slab_id in slab_ids:
168 168 for key in self._key_regex.finditer(
169 169 self._run_telnet_cmd(client, cmd % (slab_id, limit))):
170 170 yield key
171 171
172 172 def get_count(self):
173 173 client = self._get_client()
174 174 count = self.NOT_AVAILABLE
175 175 try:
176 176 slabs = []
177 177 for server, slabs_data in client.get_slabs():
178 178 slabs.extend(slabs_data.keys())
179 179
180 180 host, port = client.servers[0].address
181 181 telnet_client = self._get_telnet_client(host, port)
182 182 keys = self.key_details(telnet_client, slabs)
183 183 count = 0
184 184 for _k in keys:
185 185 count += 1
186 186 except Exception:
187 187 return count
188 188
189 189 return count
190 190
191 191 def get_expired_count(self, older_than_seconds=None):
192 192 return self.NOT_AVAILABLE
193 193
194 194 def clean_sessions(self, older_than_seconds=None):
195 195 raise CleanupCommand('Cleanup for this session type not yet available')
196 196
197 197
198 198 class RedisAuthSessions(BaseAuthSessions):
199 199 SESSION_TYPE = 'ext:redis'
200 200
201 201 def _get_client(self):
202 202 import redis
203 203 args = {
204 204 'socket_timeout': 60,
205 205 'url': self.config.get('beaker.session.url')
206 206 }
207 207
208 208 client = redis.StrictRedis.from_url(**args)
209 209 return client
210 210
211 211 def get_count(self):
212 212 client = self._get_client()
213 213 return len(client.keys('beaker_cache:*'))
214 214
215 215 def get_expired_count(self, older_than_seconds=None):
216 216 expiry_date = self._seconds_to_date(older_than_seconds)
217 217 return self.NOT_AVAILABLE
218 218
219 219 def clean_sessions(self, older_than_seconds=None):
220 220 client = self._get_client()
221 221 expiry_time = time.time() - older_than_seconds
222 222 deleted_keys = 0
223 223 for key in client.keys('beaker_cache:*'):
224 224 data = client.get(key)
225 225 if data:
226 226 json_data = pickle.loads(data)
227 accessed_time = json_data['_accessed_time']
227 try:
228 accessed_time = json_data['_accessed_time']
229 except KeyError:
230 accessed_time = 0
228 231 if accessed_time < expiry_time:
229 232 client.delete(key)
230 233 deleted_keys += 1
231 234
232 235 return deleted_keys
233 236
234 237
235 238 class MemoryAuthSessions(BaseAuthSessions):
236 239 SESSION_TYPE = 'memory'
237 240
238 241 def get_count(self):
239 242 return self.NOT_AVAILABLE
240 243
241 244 def get_expired_count(self, older_than_seconds=None):
242 245 return self.NOT_AVAILABLE
243 246
244 247 def clean_sessions(self, older_than_seconds=None):
245 248 raise CleanupCommand('Cleanup for this session type not yet available')
246 249
247 250
248 251 def get_session_handler(session_type):
249 252 types = {
250 253 'file': FileAuthSessions,
251 254 'ext:memcached': MemcachedAuthSessions,
252 255 'ext:redis': RedisAuthSessions,
253 256 'ext:database': DbAuthSessions,
254 257 'memory': MemoryAuthSessions
255 258 }
256 259
257 260 try:
258 261 return types[session_type]
259 262 except KeyError:
260 263 raise ValueError(
261 264 'This type {} is not supported'.format(session_type))
General Comments 0
You need to be logged in to leave comments. Login now