##// END OF EJS Templates
fixes #340 session cleanup for celery tasks
marcink -
r1929:cd8a7e36 beta
parent child Browse files
Show More
@@ -29,6 +29,7 b' import socket'
29 29 import traceback
30 30 import logging
31 31 from os.path import dirname as dn, join as jn
32 from pylons import config
32 33
33 34 from hashlib import md5
34 35 from decorator import decorator
@@ -37,15 +38,17 b' from vcs.utils.lazy import LazyProperty'
37 38 from rhodecode import CELERY_ON
38 39 from rhodecode.lib import str2bool, safe_str
39 40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 from rhodecode.model import init_model
42 from rhodecode.model import meta
43 from rhodecode.model.db import Statistics, Repository, User
44
45 from sqlalchemy import engine_from_config
40 46
41 47 from celery.messaging import establish_connection
42 48
43
44 49 log = logging.getLogger(__name__)
45 50
46 51
47
48
49 52 class ResultWrapper(object):
50 53 def __init__(self, task):
51 54 self.task = task
@@ -103,3 +106,22 b' def locked_task(func):'
103 106 return 'Task with key %s already running' % lockkey
104 107
105 108 return decorator(__wrapper, func)
109
110
111 def get_session():
112 if CELERY_ON:
113 engine = engine_from_config(config, 'sqlalchemy.db1.')
114 init_model(engine)
115 sa = meta.Session
116 return sa
117
118
119 def dbsession(func):
120 def __wrapper(func, *fargs, **fkwargs):
121 try:
122 ret = func(*fargs, **fkwargs)
123 return ret
124 finally:
125 meta.Session.remove()
126
127 return decorator(__wrapper, func)
@@ -41,18 +41,15 b' from vcs import get_backend'
41 41
42 42 from rhodecode import CELERY_ON
43 43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
44 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
45 __get_lockkey, LockHeld, DaemonLock
44 from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
45 str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
46 46 from rhodecode.lib.helpers import person
47 47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
48 48 from rhodecode.lib.utils import add_cache, action_logger
49 49 from rhodecode.lib.compat import json, OrderedDict
50 50
51 from rhodecode.model import init_model
52 from rhodecode.model import meta
53 51 from rhodecode.model.db import Statistics, Repository, User
54 52
55 from sqlalchemy import engine_from_config
56 53
57 54 add_cache(config)
58 55
@@ -60,13 +57,6 b' add_cache(config)'
60 57 'reset_user_password', 'send_email']
61 58
62 59
63 def get_session():
64 if CELERY_ON:
65 engine = engine_from_config(config, 'sqlalchemy.db1.')
66 init_model(engine)
67 sa = meta.Session
68 return sa
69
70 60 def get_logger(cls):
71 61 if CELERY_ON:
72 62 try:
@@ -81,21 +71,23 b' def get_logger(cls):'
81 71
82 72 @task(ignore_result=True)
83 73 @locked_task
74 @dbsession
84 75 def whoosh_index(repo_location, full_index):
85 76 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
86
87 # log = whoosh_index.get_logger(whoosh_index)
77 log = whoosh_index.get_logger(whoosh_index)
78 DBS = get_session()
88 79
89 80 index_location = config['index_dir']
90 81 WhooshIndexingDaemon(index_location=index_location,
91 repo_location=repo_location, sa=get_session())\
82 repo_location=repo_location, sa=DBS)\
92 83 .run(full_index=full_index)
93 84
94 85
95 86 @task(ignore_result=True)
87 @dbsession
96 88 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
97 89 log = get_logger(get_commits_stats)
98
90 DBS = get_session()
99 91 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
100 92 ts_max_y)
101 93 lockkey_path = config['here']
@@ -103,7 +95,6 b' def get_commits_stats(repo_name, ts_min_'
103 95 log.info('running task with lockkey %s', lockkey)
104 96
105 97 try:
106 sa = get_session()
107 98 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
108 99
109 100 # for js data compatibilty cleans the key for person from '
@@ -128,9 +119,9 b' def get_commits_stats(repo_name, ts_min_'
128 119 last_cs = None
129 120 timegetter = itemgetter('time')
130 121
131 dbrepo = sa.query(Repository)\
122 dbrepo = DBS.query(Repository)\
132 123 .filter(Repository.repo_name == repo_name).scalar()
133 cur_stats = sa.query(Statistics)\
124 cur_stats = DBS.query(Statistics)\
134 125 .filter(Statistics.repository == dbrepo).scalar()
135 126
136 127 if cur_stats is not None:
@@ -234,11 +225,11 b' def get_commits_stats(repo_name, ts_min_'
234 225 try:
235 226 stats.repository = dbrepo
236 227 stats.stat_on_revision = last_cs.revision if last_cs else 0
237 sa.add(stats)
238 sa.commit()
228 DBS.add(stats)
229 DBS.commit()
239 230 except:
240 231 log.error(traceback.format_exc())
241 sa.rollback()
232 DBS.rollback()
242 233 lock.release()
243 234 return False
244 235
@@ -254,13 +245,14 b' def get_commits_stats(repo_name, ts_min_'
254 245 return 'Task with key %s already running' % lockkey
255 246
256 247 @task(ignore_result=True)
248 @dbsession
257 249 def send_password_link(user_email):
258 250 from rhodecode.model.notification import EmailNotificationModel
259 251
260 252 log = get_logger(send_password_link)
261
253 DBS = get_session()
254
262 255 try:
263 sa = get_session()
264 256 user = User.get_by_email(user_email)
265 257 if user:
266 258 log.debug('password reset user found %s' % user)
@@ -283,28 +275,29 b' def send_password_link(user_email):'
283 275 return True
284 276
285 277 @task(ignore_result=True)
278 @dbsession
286 279 def reset_user_password(user_email):
287 280 from rhodecode.lib import auth
288 281
289 282 log = get_logger(reset_user_password)
290
283 DBS = get_session()
284
291 285 try:
292 286 try:
293 sa = get_session()
294 287 user = User.get_by_email(user_email)
295 288 new_passwd = auth.PasswordGenerator().gen_password(8,
296 289 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
297 290 if user:
298 291 user.password = auth.get_crypt_password(new_passwd)
299 292 user.api_key = auth.generate_api_key(user.username)
300 sa.add(user)
301 sa.commit()
293 DBS.add(user)
294 DBS.commit()
302 295 log.info('change password for %s', user_email)
303 296 if new_passwd is None:
304 297 raise Exception('unable to generate new password')
305 298 except:
306 299 log.error(traceback.format_exc())
307 sa.rollback()
300 DBS.rollback()
308 301
309 302 run_task(send_email, user_email,
310 303 'Your new password',
@@ -319,6 +312,7 b' def reset_user_password(user_email):'
319 312
320 313
321 314 @task(ignore_result=True)
315 @dbsession
322 316 def send_email(recipients, subject, body, html_body=''):
323 317 """
324 318 Sends an email with defined parameters from the .ini files.
@@ -330,7 +324,8 b' def send_email(recipients, subject, body'
330 324 :param html_body: html version of body
331 325 """
332 326 log = get_logger(send_email)
333 sa = get_session()
327 DBS = get_session()
328
334 329 email_config = config
335 330 subject = "%s %s" % (email_config.get('email_prefix'), subject)
336 331 if not recipients:
@@ -361,6 +356,7 b' def send_email(recipients, subject, body'
361 356
362 357
363 358 @task(ignore_result=True)
359 @dbsession
364 360 def create_repo_fork(form_data, cur_user):
365 361 """
366 362 Creates a fork of repository using interval VCS methods
@@ -371,11 +367,11 b' def create_repo_fork(form_data, cur_user'
371 367 from rhodecode.model.repo import RepoModel
372 368
373 369 log = get_logger(create_repo_fork)
374
375 Session = get_session()
370 DBS = create_repo_fork.DBS
371
376 372 base_path = Repository.base_path()
377 373
378 RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
374 RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
379 375
380 376 alias = form_data['repo_type']
381 377 org_repo_name = form_data['org_path']
@@ -391,12 +387,12 b' def create_repo_fork(form_data, cur_user'
391 387 src_url=safe_str(source_repo_path),
392 388 update_after_clone=update_after_clone)
393 389 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
394 org_repo_name, '', Session)
390 org_repo_name, '', DBS)
395 391
396 392 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
397 fork_name, '', Session)
393 fork_name, '', DBS)
398 394 # finally commit at latest possible stage
399 Session.commit()
395 DBS.commit()
400 396
401 397 def __get_codes_stats(repo_name):
402 398 repo = Repository.get_by_repo_name(repo_name).scm_instance
General Comments 0
You need to be logged in to leave comments. Login now