##// END OF EJS Templates
fixes #340 session cleanup for celery tasks
marcink -
r1929:cd8a7e36 beta
parent child Browse files
Show More
@@ -29,6 +29,7 b' import socket'
29 import traceback
29 import traceback
30 import logging
30 import logging
31 from os.path import dirname as dn, join as jn
31 from os.path import dirname as dn, join as jn
32 from pylons import config
32
33
33 from hashlib import md5
34 from hashlib import md5
34 from decorator import decorator
35 from decorator import decorator
@@ -37,15 +38,17 b' from vcs.utils.lazy import LazyProperty'
37 from rhodecode import CELERY_ON
38 from rhodecode import CELERY_ON
38 from rhodecode.lib import str2bool, safe_str
39 from rhodecode.lib import str2bool, safe_str
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 from rhodecode.model import init_model
42 from rhodecode.model import meta
43 from rhodecode.model.db import Statistics, Repository, User
44
45 from sqlalchemy import engine_from_config
40
46
41 from celery.messaging import establish_connection
47 from celery.messaging import establish_connection
42
48
43
44 log = logging.getLogger(__name__)
49 log = logging.getLogger(__name__)
45
50
46
51
47
48
49 class ResultWrapper(object):
52 class ResultWrapper(object):
50 def __init__(self, task):
53 def __init__(self, task):
51 self.task = task
54 self.task = task
@@ -103,3 +106,22 b' def locked_task(func):'
103 return 'Task with key %s already running' % lockkey
106 return 'Task with key %s already running' % lockkey
104
107
105 return decorator(__wrapper, func)
108 return decorator(__wrapper, func)
109
110
111 def get_session():
112 if CELERY_ON:
113 engine = engine_from_config(config, 'sqlalchemy.db1.')
114 init_model(engine)
115 sa = meta.Session
116 return sa
117
118
119 def dbsession(func):
120 def __wrapper(func, *fargs, **fkwargs):
121 try:
122 ret = func(*fargs, **fkwargs)
123 return ret
124 finally:
125 meta.Session.remove()
126
127 return decorator(__wrapper, func)
@@ -41,18 +41,15 b' from vcs import get_backend'
41
41
42 from rhodecode import CELERY_ON
42 from rhodecode import CELERY_ON
43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
44 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
44 from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
45 __get_lockkey, LockHeld, DaemonLock
45 str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
46 from rhodecode.lib.helpers import person
46 from rhodecode.lib.helpers import person
47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
48 from rhodecode.lib.utils import add_cache, action_logger
48 from rhodecode.lib.utils import add_cache, action_logger
49 from rhodecode.lib.compat import json, OrderedDict
49 from rhodecode.lib.compat import json, OrderedDict
50
50
51 from rhodecode.model import init_model
52 from rhodecode.model import meta
53 from rhodecode.model.db import Statistics, Repository, User
51 from rhodecode.model.db import Statistics, Repository, User
54
52
55 from sqlalchemy import engine_from_config
56
53
57 add_cache(config)
54 add_cache(config)
58
55
@@ -60,13 +57,6 b' add_cache(config)'
60 'reset_user_password', 'send_email']
57 'reset_user_password', 'send_email']
61
58
62
59
63 def get_session():
64 if CELERY_ON:
65 engine = engine_from_config(config, 'sqlalchemy.db1.')
66 init_model(engine)
67 sa = meta.Session
68 return sa
69
70 def get_logger(cls):
60 def get_logger(cls):
71 if CELERY_ON:
61 if CELERY_ON:
72 try:
62 try:
@@ -81,21 +71,23 b' def get_logger(cls):'
81
71
82 @task(ignore_result=True)
72 @task(ignore_result=True)
83 @locked_task
73 @locked_task
74 @dbsession
84 def whoosh_index(repo_location, full_index):
75 def whoosh_index(repo_location, full_index):
85 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
76 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
86
77 log = whoosh_index.get_logger(whoosh_index)
87 # log = whoosh_index.get_logger(whoosh_index)
78 DBS = get_session()
88
79
89 index_location = config['index_dir']
80 index_location = config['index_dir']
90 WhooshIndexingDaemon(index_location=index_location,
81 WhooshIndexingDaemon(index_location=index_location,
91 repo_location=repo_location, sa=get_session())\
82 repo_location=repo_location, sa=DBS)\
92 .run(full_index=full_index)
83 .run(full_index=full_index)
93
84
94
85
95 @task(ignore_result=True)
86 @task(ignore_result=True)
87 @dbsession
96 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
88 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
97 log = get_logger(get_commits_stats)
89 log = get_logger(get_commits_stats)
98
90 DBS = get_session()
99 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
91 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
100 ts_max_y)
92 ts_max_y)
101 lockkey_path = config['here']
93 lockkey_path = config['here']
@@ -103,7 +95,6 b' def get_commits_stats(repo_name, ts_min_'
103 log.info('running task with lockkey %s', lockkey)
95 log.info('running task with lockkey %s', lockkey)
104
96
105 try:
97 try:
106 sa = get_session()
107 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
98 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
108
99
109 # for js data compatibilty cleans the key for person from '
100 # for js data compatibilty cleans the key for person from '
@@ -128,9 +119,9 b' def get_commits_stats(repo_name, ts_min_'
128 last_cs = None
119 last_cs = None
129 timegetter = itemgetter('time')
120 timegetter = itemgetter('time')
130
121
131 dbrepo = sa.query(Repository)\
122 dbrepo = DBS.query(Repository)\
132 .filter(Repository.repo_name == repo_name).scalar()
123 .filter(Repository.repo_name == repo_name).scalar()
133 cur_stats = sa.query(Statistics)\
124 cur_stats = DBS.query(Statistics)\
134 .filter(Statistics.repository == dbrepo).scalar()
125 .filter(Statistics.repository == dbrepo).scalar()
135
126
136 if cur_stats is not None:
127 if cur_stats is not None:
@@ -234,11 +225,11 b' def get_commits_stats(repo_name, ts_min_'
234 try:
225 try:
235 stats.repository = dbrepo
226 stats.repository = dbrepo
236 stats.stat_on_revision = last_cs.revision if last_cs else 0
227 stats.stat_on_revision = last_cs.revision if last_cs else 0
237 sa.add(stats)
228 DBS.add(stats)
238 sa.commit()
229 DBS.commit()
239 except:
230 except:
240 log.error(traceback.format_exc())
231 log.error(traceback.format_exc())
241 sa.rollback()
232 DBS.rollback()
242 lock.release()
233 lock.release()
243 return False
234 return False
244
235
@@ -254,13 +245,14 b' def get_commits_stats(repo_name, ts_min_'
254 return 'Task with key %s already running' % lockkey
245 return 'Task with key %s already running' % lockkey
255
246
256 @task(ignore_result=True)
247 @task(ignore_result=True)
248 @dbsession
257 def send_password_link(user_email):
249 def send_password_link(user_email):
258 from rhodecode.model.notification import EmailNotificationModel
250 from rhodecode.model.notification import EmailNotificationModel
259
251
260 log = get_logger(send_password_link)
252 log = get_logger(send_password_link)
253 DBS = get_session()
261
254
262 try:
255 try:
263 sa = get_session()
264 user = User.get_by_email(user_email)
256 user = User.get_by_email(user_email)
265 if user:
257 if user:
266 log.debug('password reset user found %s' % user)
258 log.debug('password reset user found %s' % user)
@@ -283,28 +275,29 b' def send_password_link(user_email):'
283 return True
275 return True
284
276
285 @task(ignore_result=True)
277 @task(ignore_result=True)
278 @dbsession
286 def reset_user_password(user_email):
279 def reset_user_password(user_email):
287 from rhodecode.lib import auth
280 from rhodecode.lib import auth
288
281
289 log = get_logger(reset_user_password)
282 log = get_logger(reset_user_password)
283 DBS = get_session()
290
284
291 try:
285 try:
292 try:
286 try:
293 sa = get_session()
294 user = User.get_by_email(user_email)
287 user = User.get_by_email(user_email)
295 new_passwd = auth.PasswordGenerator().gen_password(8,
288 new_passwd = auth.PasswordGenerator().gen_password(8,
296 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
289 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
297 if user:
290 if user:
298 user.password = auth.get_crypt_password(new_passwd)
291 user.password = auth.get_crypt_password(new_passwd)
299 user.api_key = auth.generate_api_key(user.username)
292 user.api_key = auth.generate_api_key(user.username)
300 sa.add(user)
293 DBS.add(user)
301 sa.commit()
294 DBS.commit()
302 log.info('change password for %s', user_email)
295 log.info('change password for %s', user_email)
303 if new_passwd is None:
296 if new_passwd is None:
304 raise Exception('unable to generate new password')
297 raise Exception('unable to generate new password')
305 except:
298 except:
306 log.error(traceback.format_exc())
299 log.error(traceback.format_exc())
307 sa.rollback()
300 DBS.rollback()
308
301
309 run_task(send_email, user_email,
302 run_task(send_email, user_email,
310 'Your new password',
303 'Your new password',
@@ -319,6 +312,7 b' def reset_user_password(user_email):'
319
312
320
313
321 @task(ignore_result=True)
314 @task(ignore_result=True)
315 @dbsession
322 def send_email(recipients, subject, body, html_body=''):
316 def send_email(recipients, subject, body, html_body=''):
323 """
317 """
324 Sends an email with defined parameters from the .ini files.
318 Sends an email with defined parameters from the .ini files.
@@ -330,7 +324,8 b' def send_email(recipients, subject, body'
330 :param html_body: html version of body
324 :param html_body: html version of body
331 """
325 """
332 log = get_logger(send_email)
326 log = get_logger(send_email)
333 sa = get_session()
327 DBS = get_session()
328
334 email_config = config
329 email_config = config
335 subject = "%s %s" % (email_config.get('email_prefix'), subject)
330 subject = "%s %s" % (email_config.get('email_prefix'), subject)
336 if not recipients:
331 if not recipients:
@@ -361,6 +356,7 b' def send_email(recipients, subject, body'
361
356
362
357
363 @task(ignore_result=True)
358 @task(ignore_result=True)
359 @dbsession
364 def create_repo_fork(form_data, cur_user):
360 def create_repo_fork(form_data, cur_user):
365 """
361 """
366 Creates a fork of repository using interval VCS methods
362 Creates a fork of repository using interval VCS methods
@@ -371,11 +367,11 b' def create_repo_fork(form_data, cur_user'
371 from rhodecode.model.repo import RepoModel
367 from rhodecode.model.repo import RepoModel
372
368
373 log = get_logger(create_repo_fork)
369 log = get_logger(create_repo_fork)
370 DBS = create_repo_fork.DBS
374
371
375 Session = get_session()
376 base_path = Repository.base_path()
372 base_path = Repository.base_path()
377
373
378 RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
374 RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
379
375
380 alias = form_data['repo_type']
376 alias = form_data['repo_type']
381 org_repo_name = form_data['org_path']
377 org_repo_name = form_data['org_path']
@@ -391,12 +387,12 b' def create_repo_fork(form_data, cur_user'
391 src_url=safe_str(source_repo_path),
387 src_url=safe_str(source_repo_path),
392 update_after_clone=update_after_clone)
388 update_after_clone=update_after_clone)
393 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
389 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
394 org_repo_name, '', Session)
390 org_repo_name, '', DBS)
395
391
396 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
392 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
397 fork_name, '', Session)
393 fork_name, '', DBS)
398 # finally commit at latest possible stage
394 # finally commit at latest possible stage
399 Session.commit()
395 DBS.commit()
400
396
401 def __get_codes_stats(repo_name):
397 def __get_codes_stats(repo_name):
402 repo = Repository.get_by_repo_name(repo_name).scm_instance
398 repo = Repository.get_by_repo_name(repo_name).scm_instance
General Comments 0
You need to be logged in to leave comments. Login now