##// END OF EJS Templates
fixes #340 session cleanup for celery tasks
marcink -
r1929:cd8a7e36 beta
parent child Browse files
Show More
@@ -1,105 +1,127 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.celerylib.__init__
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 celery libs for RhodeCode
7 7
8 8 :created_on: Nov 27, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import socket
29 29 import traceback
30 30 import logging
31 31 from os.path import dirname as dn, join as jn
32 from pylons import config
32 33
33 34 from hashlib import md5
34 35 from decorator import decorator
35 36
36 37 from vcs.utils.lazy import LazyProperty
37 38 from rhodecode import CELERY_ON
38 39 from rhodecode.lib import str2bool, safe_str
39 40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 from rhodecode.model import init_model
42 from rhodecode.model import meta
43 from rhodecode.model.db import Statistics, Repository, User
44
45 from sqlalchemy import engine_from_config
40 46
41 47 from celery.messaging import establish_connection
42 48
43
44 49 log = logging.getLogger(__name__)
45 50
46 51
47
48
49 52 class ResultWrapper(object):
50 53 def __init__(self, task):
51 54 self.task = task
52 55
53 56 @LazyProperty
54 57 def result(self):
55 58 return self.task
56 59
57 60
58 61 def run_task(task, *args, **kwargs):
59 62 if CELERY_ON:
60 63 try:
61 64 t = task.apply_async(args=args, kwargs=kwargs)
62 65 log.info('running task %s:%s', t.task_id, task)
63 66 return t
64 67
65 68 except socket.error, e:
66 69 if isinstance(e, IOError) and e.errno == 111:
67 70 log.debug('Unable to connect to celeryd. Sync execution')
68 71 else:
69 72 log.error(traceback.format_exc())
70 73 except KeyError, e:
71 74 log.debug('Unable to connect to celeryd. Sync execution')
72 75 except Exception, e:
73 76 log.error(traceback.format_exc())
74 77
75 78 log.debug('executing task %s in sync mode', task)
76 79 return ResultWrapper(task(*args, **kwargs))
77 80
78 81
79 82 def __get_lockkey(func, *fargs, **fkwargs):
80 83 params = list(fargs)
81 84 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
82 85
83 86 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
84 87
85 88 lockkey = 'task_%s.lock' % \
86 89 md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
87 90 return lockkey
88 91
89 92
90 93 def locked_task(func):
91 94 def __wrapper(func, *fargs, **fkwargs):
92 95 lockkey = __get_lockkey(func, *fargs, **fkwargs)
93 96 lockkey_path = config['here']
94 97
95 98 log.info('running task with lockkey %s', lockkey)
96 99 try:
97 100 l = DaemonLock(file_=jn(lockkey_path, lockkey))
98 101 ret = func(*fargs, **fkwargs)
99 102 l.release()
100 103 return ret
101 104 except LockHeld:
102 105 log.info('LockHeld')
103 106 return 'Task with key %s already running' % lockkey
104 107
105 108 return decorator(__wrapper, func)
109
110
111 def get_session():
112 if CELERY_ON:
113 engine = engine_from_config(config, 'sqlalchemy.db1.')
114 init_model(engine)
115 sa = meta.Session
116 return sa
117
118
119 def dbsession(func):
120 def __wrapper(func, *fargs, **fkwargs):
121 try:
122 ret = func(*fargs, **fkwargs)
123 return ret
124 finally:
125 meta.Session.remove()
126
127 return decorator(__wrapper, func)
@@ -1,418 +1,414 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.celerylib.tasks
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 RhodeCode task modules, containing all task that suppose to be run
7 7 by celery daemon
8 8
9 9 :created_on: Oct 6, 2010
10 10 :author: marcink
11 11 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
12 12 :license: GPLv3, see COPYING for more details.
13 13 """
14 14 # This program is free software: you can redistribute it and/or modify
15 15 # it under the terms of the GNU General Public License as published by
16 16 # the Free Software Foundation, either version 3 of the License, or
17 17 # (at your option) any later version.
18 18 #
19 19 # This program is distributed in the hope that it will be useful,
20 20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 22 # GNU General Public License for more details.
23 23 #
24 24 # You should have received a copy of the GNU General Public License
25 25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 26 from celery.decorators import task
27 27
28 28 import os
29 29 import traceback
30 30 import logging
31 31 from os.path import join as jn
32 32
33 33 from time import mktime
34 34 from operator import itemgetter
35 35 from string import lower
36 36
37 37 from pylons import config, url
38 38 from pylons.i18n.translation import _
39 39
40 40 from vcs import get_backend
41 41
42 42 from rhodecode import CELERY_ON
43 43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
44 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
45 __get_lockkey, LockHeld, DaemonLock
44 from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
45 str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
46 46 from rhodecode.lib.helpers import person
47 47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
48 48 from rhodecode.lib.utils import add_cache, action_logger
49 49 from rhodecode.lib.compat import json, OrderedDict
50 50
51 from rhodecode.model import init_model
52 from rhodecode.model import meta
53 51 from rhodecode.model.db import Statistics, Repository, User
54 52
55 from sqlalchemy import engine_from_config
56 53
57 54 add_cache(config)
58 55
59 56 __all__ = ['whoosh_index', 'get_commits_stats',
60 57 'reset_user_password', 'send_email']
61 58
62 59
63 def get_session():
64 if CELERY_ON:
65 engine = engine_from_config(config, 'sqlalchemy.db1.')
66 init_model(engine)
67 sa = meta.Session
68 return sa
69
70 60 def get_logger(cls):
71 61 if CELERY_ON:
72 62 try:
73 63 log = cls.get_logger()
74 64 except:
75 65 log = logging.getLogger(__name__)
76 66 else:
77 67 log = logging.getLogger(__name__)
78 68
79 69 return log
80 70
81 71
82 72 @task(ignore_result=True)
83 73 @locked_task
74 @dbsession
84 75 def whoosh_index(repo_location, full_index):
85 76 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
86
87 # log = whoosh_index.get_logger(whoosh_index)
77 log = whoosh_index.get_logger(whoosh_index)
78 DBS = get_session()
88 79
89 80 index_location = config['index_dir']
90 81 WhooshIndexingDaemon(index_location=index_location,
91 repo_location=repo_location, sa=get_session())\
82 repo_location=repo_location, sa=DBS)\
92 83 .run(full_index=full_index)
93 84
94 85
95 86 @task(ignore_result=True)
87 @dbsession
96 88 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
97 89 log = get_logger(get_commits_stats)
98
90 DBS = get_session()
99 91 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
100 92 ts_max_y)
101 93 lockkey_path = config['here']
102 94
103 95 log.info('running task with lockkey %s', lockkey)
104 96
105 97 try:
106 sa = get_session()
107 98 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
108 99
109 100 # for js data compatibilty cleans the key for person from '
110 101 akc = lambda k: person(k).replace('"', "")
111 102
112 103 co_day_auth_aggr = {}
113 104 commits_by_day_aggregate = {}
114 105 repo = Repository.get_by_repo_name(repo_name)
115 106 if repo is None:
116 107 return True
117 108
118 109 repo = repo.scm_instance
119 110 repo_size = repo.count()
120 111 # return if repo have no revisions
121 112 if repo_size < 1:
122 113 lock.release()
123 114 return True
124 115
125 116 skip_date_limit = True
126 117 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
127 118 last_rev = None
128 119 last_cs = None
129 120 timegetter = itemgetter('time')
130 121
131 dbrepo = sa.query(Repository)\
122 dbrepo = DBS.query(Repository)\
132 123 .filter(Repository.repo_name == repo_name).scalar()
133 cur_stats = sa.query(Statistics)\
124 cur_stats = DBS.query(Statistics)\
134 125 .filter(Statistics.repository == dbrepo).scalar()
135 126
136 127 if cur_stats is not None:
137 128 last_rev = cur_stats.stat_on_revision
138 129
139 130 if last_rev == repo.get_changeset().revision and repo_size > 1:
140 131 # pass silently without any work if we're not on first revision or
141 132 # current state of parsing revision(from db marker) is the
142 133 # last revision
143 134 lock.release()
144 135 return True
145 136
146 137 if cur_stats:
147 138 commits_by_day_aggregate = OrderedDict(json.loads(
148 139 cur_stats.commit_activity_combined))
149 140 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
150 141
151 142 log.debug('starting parsing %s', parse_limit)
152 143 lmktime = mktime
153 144
154 145 last_rev = last_rev + 1 if last_rev >= 0 else 0
155 146 log.debug('Getting revisions from %s to %s' % (
156 147 last_rev, last_rev + parse_limit)
157 148 )
158 149 for cs in repo[last_rev:last_rev + parse_limit]:
159 150 last_cs = cs # remember last parsed changeset
160 151 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
161 152 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
162 153
163 154 if akc(cs.author) in co_day_auth_aggr:
164 155 try:
165 156 l = [timegetter(x) for x in
166 157 co_day_auth_aggr[akc(cs.author)]['data']]
167 158 time_pos = l.index(k)
168 159 except ValueError:
169 160 time_pos = False
170 161
171 162 if time_pos >= 0 and time_pos is not False:
172 163
173 164 datadict = \
174 165 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
175 166
176 167 datadict["commits"] += 1
177 168 datadict["added"] += len(cs.added)
178 169 datadict["changed"] += len(cs.changed)
179 170 datadict["removed"] += len(cs.removed)
180 171
181 172 else:
182 173 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
183 174
184 175 datadict = {"time": k,
185 176 "commits": 1,
186 177 "added": len(cs.added),
187 178 "changed": len(cs.changed),
188 179 "removed": len(cs.removed),
189 180 }
190 181 co_day_auth_aggr[akc(cs.author)]['data']\
191 182 .append(datadict)
192 183
193 184 else:
194 185 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
195 186 co_day_auth_aggr[akc(cs.author)] = {
196 187 "label": akc(cs.author),
197 188 "data": [{"time":k,
198 189 "commits":1,
199 190 "added":len(cs.added),
200 191 "changed":len(cs.changed),
201 192 "removed":len(cs.removed),
202 193 }],
203 194 "schema": ["commits"],
204 195 }
205 196
206 197 #gather all data by day
207 198 if k in commits_by_day_aggregate:
208 199 commits_by_day_aggregate[k] += 1
209 200 else:
210 201 commits_by_day_aggregate[k] = 1
211 202
212 203 overview_data = sorted(commits_by_day_aggregate.items(),
213 204 key=itemgetter(0))
214 205
215 206 if not co_day_auth_aggr:
216 207 co_day_auth_aggr[akc(repo.contact)] = {
217 208 "label": akc(repo.contact),
218 209 "data": [0, 1],
219 210 "schema": ["commits"],
220 211 }
221 212
222 213 stats = cur_stats if cur_stats else Statistics()
223 214 stats.commit_activity = json.dumps(co_day_auth_aggr)
224 215 stats.commit_activity_combined = json.dumps(overview_data)
225 216
226 217 log.debug('last revison %s', last_rev)
227 218 leftovers = len(repo.revisions[last_rev:])
228 219 log.debug('revisions to parse %s', leftovers)
229 220
230 221 if last_rev == 0 or leftovers < parse_limit:
231 222 log.debug('getting code trending stats')
232 223 stats.languages = json.dumps(__get_codes_stats(repo_name))
233 224
234 225 try:
235 226 stats.repository = dbrepo
236 227 stats.stat_on_revision = last_cs.revision if last_cs else 0
237 sa.add(stats)
238 sa.commit()
228 DBS.add(stats)
229 DBS.commit()
239 230 except:
240 231 log.error(traceback.format_exc())
241 sa.rollback()
232 DBS.rollback()
242 233 lock.release()
243 234 return False
244 235
245 236 #final release
246 237 lock.release()
247 238
248 239 #execute another task if celery is enabled
249 240 if len(repo.revisions) > 1 and CELERY_ON:
250 241 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
251 242 return True
252 243 except LockHeld:
253 244 log.info('LockHeld')
254 245 return 'Task with key %s already running' % lockkey
255 246
256 247 @task(ignore_result=True)
248 @dbsession
257 249 def send_password_link(user_email):
258 250 from rhodecode.model.notification import EmailNotificationModel
259 251
260 252 log = get_logger(send_password_link)
253 DBS = get_session()
261 254
262 255 try:
263 sa = get_session()
264 256 user = User.get_by_email(user_email)
265 257 if user:
266 258 log.debug('password reset user found %s' % user)
267 259 link = url('reset_password_confirmation', key=user.api_key,
268 260 qualified=True)
269 261 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
270 262 body = EmailNotificationModel().get_email_tmpl(reg_type,
271 263 **{'user':user.short_contact,
272 264 'reset_url':link})
273 265 log.debug('sending email')
274 266 run_task(send_email, user_email,
275 267 _("password reset link"), body)
276 268 log.info('send new password mail to %s', user_email)
277 269 else:
278 270 log.debug("password reset email %s not found" % user_email)
279 271 except:
280 272 log.error(traceback.format_exc())
281 273 return False
282 274
283 275 return True
284 276
285 277 @task(ignore_result=True)
278 @dbsession
286 279 def reset_user_password(user_email):
287 280 from rhodecode.lib import auth
288 281
289 282 log = get_logger(reset_user_password)
283 DBS = get_session()
290 284
291 285 try:
292 286 try:
293 sa = get_session()
294 287 user = User.get_by_email(user_email)
295 288 new_passwd = auth.PasswordGenerator().gen_password(8,
296 289 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
297 290 if user:
298 291 user.password = auth.get_crypt_password(new_passwd)
299 292 user.api_key = auth.generate_api_key(user.username)
300 sa.add(user)
301 sa.commit()
293 DBS.add(user)
294 DBS.commit()
302 295 log.info('change password for %s', user_email)
303 296 if new_passwd is None:
304 297 raise Exception('unable to generate new password')
305 298 except:
306 299 log.error(traceback.format_exc())
307 sa.rollback()
300 DBS.rollback()
308 301
309 302 run_task(send_email, user_email,
310 303 'Your new password',
311 304 'Your new RhodeCode password:%s' % (new_passwd))
312 305 log.info('send new password mail to %s', user_email)
313 306
314 307 except:
315 308 log.error('Failed to update user password')
316 309 log.error(traceback.format_exc())
317 310
318 311 return True
319 312
320 313
321 314 @task(ignore_result=True)
315 @dbsession
322 316 def send_email(recipients, subject, body, html_body=''):
323 317 """
324 318 Sends an email with defined parameters from the .ini files.
325 319
326 320 :param recipients: list of recipients, it this is empty the defined email
327 321 address from field 'email_to' is used instead
328 322 :param subject: subject of the mail
329 323 :param body: body of the mail
330 324 :param html_body: html version of body
331 325 """
332 326 log = get_logger(send_email)
333 sa = get_session()
327 DBS = get_session()
328
334 329 email_config = config
335 330 subject = "%s %s" % (email_config.get('email_prefix'), subject)
336 331 if not recipients:
337 332 # if recipients are not defined we send to email_config + all admins
338 333 admins = [u.email for u in User.query()
339 334 .filter(User.admin == True).all()]
340 335 recipients = [email_config.get('email_to')] + admins
341 336
342 337 mail_from = email_config.get('app_email_from', 'RhodeCode')
343 338 user = email_config.get('smtp_username')
344 339 passwd = email_config.get('smtp_password')
345 340 mail_server = email_config.get('smtp_server')
346 341 mail_port = email_config.get('smtp_port')
347 342 tls = str2bool(email_config.get('smtp_use_tls'))
348 343 ssl = str2bool(email_config.get('smtp_use_ssl'))
349 344 debug = str2bool(config.get('debug'))
350 345 smtp_auth = email_config.get('smtp_auth')
351 346
352 347 try:
353 348 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
354 349 mail_port, ssl, tls, debug=debug)
355 350 m.send(recipients, subject, body, html_body)
356 351 except:
357 352 log.error('Mail sending failed')
358 353 log.error(traceback.format_exc())
359 354 return False
360 355 return True
361 356
362 357
363 358 @task(ignore_result=True)
359 @dbsession
364 360 def create_repo_fork(form_data, cur_user):
365 361 """
366 362 Creates a fork of repository using interval VCS methods
367 363
368 364 :param form_data:
369 365 :param cur_user:
370 366 """
371 367 from rhodecode.model.repo import RepoModel
372 368
373 369 log = get_logger(create_repo_fork)
370 DBS = create_repo_fork.DBS
374 371
375 Session = get_session()
376 372 base_path = Repository.base_path()
377 373
378 RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
374 RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
379 375
380 376 alias = form_data['repo_type']
381 377 org_repo_name = form_data['org_path']
382 378 fork_name = form_data['repo_name_full']
383 379 update_after_clone = form_data['update_after_clone']
384 380 source_repo_path = os.path.join(base_path, org_repo_name)
385 381 destination_fork_path = os.path.join(base_path, fork_name)
386 382
387 383 log.info('creating fork of %s as %s', source_repo_path,
388 384 destination_fork_path)
389 385 backend = get_backend(alias)
390 386 backend(safe_str(destination_fork_path), create=True,
391 387 src_url=safe_str(source_repo_path),
392 388 update_after_clone=update_after_clone)
393 389 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
394 org_repo_name, '', Session)
390 org_repo_name, '', DBS)
395 391
396 392 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
397 fork_name, '', Session)
393 fork_name, '', DBS)
398 394 # finally commit at latest possible stage
399 Session.commit()
395 DBS.commit()
400 396
401 397 def __get_codes_stats(repo_name):
402 398 repo = Repository.get_by_repo_name(repo_name).scm_instance
403 399
404 400 tip = repo.get_changeset()
405 401 code_stats = {}
406 402
407 403 def aggregate(cs):
408 404 for f in cs[2]:
409 405 ext = lower(f.extension)
410 406 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
411 407 if ext in code_stats:
412 408 code_stats[ext] += 1
413 409 else:
414 410 code_stats[ext] = 1
415 411
416 412 map(aggregate, tip.walk('/'))
417 413
418 414 return code_stats or {}
General Comments 0
You need to be logged in to leave comments. Login now