##// END OF EJS Templates
fix SyntaxWarning: name 'CELERY_ON' is used prior to global declaration
Mads Kiilerich -
r3460:27525c5f beta
parent child Browse files
Show More
@@ -1,130 +1,130 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.celerylib.__init__
3 rhodecode.lib.celerylib.__init__
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 celery libs for RhodeCode
6 celery libs for RhodeCode
7
7
8 :created_on: Nov 27, 2010
8 :created_on: Nov 27, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import sys
27 import sys
28 import socket
28 import socket
29 import traceback
29 import traceback
30 import logging
30 import logging
31 from os.path import dirname as dn, join as jn
31 from os.path import dirname as dn, join as jn
32 from pylons import config
32 from pylons import config
33
33
34 from hashlib import md5
34 from hashlib import md5
35 from decorator import decorator
35 from decorator import decorator
36
36
37 from rhodecode.lib.vcs.utils.lazy import LazyProperty
37 from rhodecode.lib.vcs.utils.lazy import LazyProperty
38 from rhodecode import CELERY_ON, CELERY_EAGER
38 from rhodecode import CELERY_ON, CELERY_EAGER
39 from rhodecode.lib.utils2 import str2bool, safe_str
39 from rhodecode.lib.utils2 import str2bool, safe_str
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 from rhodecode.model import init_model
41 from rhodecode.model import init_model
42 from rhodecode.model import meta
42 from rhodecode.model import meta
43 from rhodecode.model.db import Statistics, Repository, User
43 from rhodecode.model.db import Statistics, Repository, User
44
44
45 from sqlalchemy import engine_from_config
45 from sqlalchemy import engine_from_config
46
46
47 from celery.messaging import establish_connection
47 from celery.messaging import establish_connection
48
48
49 log = logging.getLogger(__name__)
49 log = logging.getLogger(__name__)
50
50
51
51
52 class ResultWrapper(object):
52 class ResultWrapper(object):
53 def __init__(self, task):
53 def __init__(self, task):
54 self.task = task
54 self.task = task
55
55
56 @LazyProperty
56 @LazyProperty
57 def result(self):
57 def result(self):
58 return self.task
58 return self.task
59
59
60
60
61 def run_task(task, *args, **kwargs):
61 def run_task(task, *args, **kwargs):
62 global CELERY_ON
62 if CELERY_ON:
63 if CELERY_ON:
63 try:
64 try:
64 t = task.apply_async(args=args, kwargs=kwargs)
65 t = task.apply_async(args=args, kwargs=kwargs)
65 log.info('running task %s:%s' % (t.task_id, task))
66 log.info('running task %s:%s' % (t.task_id, task))
66 return t
67 return t
67
68
68 except socket.error, e:
69 except socket.error, e:
69 if isinstance(e, IOError) and e.errno == 111:
70 if isinstance(e, IOError) and e.errno == 111:
70 log.debug('Unable to connect to celeryd. Sync execution')
71 log.debug('Unable to connect to celeryd. Sync execution')
71 global CELERY_ON
72 CELERY_ON = False
72 CELERY_ON = False
73 else:
73 else:
74 log.error(traceback.format_exc())
74 log.error(traceback.format_exc())
75 except KeyError, e:
75 except KeyError, e:
76 log.debug('Unable to connect to celeryd. Sync execution')
76 log.debug('Unable to connect to celeryd. Sync execution')
77 except Exception, e:
77 except Exception, e:
78 log.error(traceback.format_exc())
78 log.error(traceback.format_exc())
79
79
80 log.debug('executing task %s in sync mode' % task)
80 log.debug('executing task %s in sync mode' % task)
81 return ResultWrapper(task(*args, **kwargs))
81 return ResultWrapper(task(*args, **kwargs))
82
82
83
83
84 def __get_lockkey(func, *fargs, **fkwargs):
84 def __get_lockkey(func, *fargs, **fkwargs):
85 params = list(fargs)
85 params = list(fargs)
86 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
86 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
87
87
88 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
88 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
89
89
90 lockkey = 'task_%s.lock' % \
90 lockkey = 'task_%s.lock' % \
91 md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
91 md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
92 return lockkey
92 return lockkey
93
93
94
94
95 def locked_task(func):
95 def locked_task(func):
96 def __wrapper(func, *fargs, **fkwargs):
96 def __wrapper(func, *fargs, **fkwargs):
97 lockkey = __get_lockkey(func, *fargs, **fkwargs)
97 lockkey = __get_lockkey(func, *fargs, **fkwargs)
98 lockkey_path = config['app_conf']['cache_dir']
98 lockkey_path = config['app_conf']['cache_dir']
99
99
100 log.info('running task with lockkey %s' % lockkey)
100 log.info('running task with lockkey %s' % lockkey)
101 try:
101 try:
102 l = DaemonLock(file_=jn(lockkey_path, lockkey))
102 l = DaemonLock(file_=jn(lockkey_path, lockkey))
103 ret = func(*fargs, **fkwargs)
103 ret = func(*fargs, **fkwargs)
104 l.release()
104 l.release()
105 return ret
105 return ret
106 except LockHeld:
106 except LockHeld:
107 log.info('LockHeld')
107 log.info('LockHeld')
108 return 'Task with key %s already running' % lockkey
108 return 'Task with key %s already running' % lockkey
109
109
110 return decorator(__wrapper, func)
110 return decorator(__wrapper, func)
111
111
112
112
113 def get_session():
113 def get_session():
114 if CELERY_ON:
114 if CELERY_ON:
115 engine = engine_from_config(config, 'sqlalchemy.db1.')
115 engine = engine_from_config(config, 'sqlalchemy.db1.')
116 init_model(engine)
116 init_model(engine)
117 sa = meta.Session()
117 sa = meta.Session()
118 return sa
118 return sa
119
119
120
120
121 def dbsession(func):
121 def dbsession(func):
122 def __wrapper(func, *fargs, **fkwargs):
122 def __wrapper(func, *fargs, **fkwargs):
123 try:
123 try:
124 ret = func(*fargs, **fkwargs)
124 ret = func(*fargs, **fkwargs)
125 return ret
125 return ret
126 finally:
126 finally:
127 if CELERY_ON and CELERY_EAGER is False:
127 if CELERY_ON and CELERY_EAGER is False:
128 meta.Session.remove()
128 meta.Session.remove()
129
129
130 return decorator(__wrapper, func)
130 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now