##// END OF EJS Templates
fixes #340 session cleanup for celery tasks
marcink -
r1929:cd8a7e36 beta
parent child Browse files
Show More
@@ -1,105 +1,127 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.celerylib.__init__
3 rhodecode.lib.celerylib.__init__
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 celery libs for RhodeCode
6 celery libs for RhodeCode
7
7
8 :created_on: Nov 27, 2010
8 :created_on: Nov 27, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import sys
27 import sys
28 import socket
28 import socket
29 import traceback
29 import traceback
30 import logging
30 import logging
31 from os.path import dirname as dn, join as jn
31 from os.path import dirname as dn, join as jn
32 from pylons import config
32
33
33 from hashlib import md5
34 from hashlib import md5
34 from decorator import decorator
35 from decorator import decorator
35
36
36 from vcs.utils.lazy import LazyProperty
37 from vcs.utils.lazy import LazyProperty
37 from rhodecode import CELERY_ON
38 from rhodecode import CELERY_ON
38 from rhodecode.lib import str2bool, safe_str
39 from rhodecode.lib import str2bool, safe_str
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 from rhodecode.model import init_model
42 from rhodecode.model import meta
43 from rhodecode.model.db import Statistics, Repository, User
44
45 from sqlalchemy import engine_from_config
40
46
41 from celery.messaging import establish_connection
47 from celery.messaging import establish_connection
42
48
43
44 log = logging.getLogger(__name__)
49 log = logging.getLogger(__name__)
45
50
46
51
47
48
49 class ResultWrapper(object):
52 class ResultWrapper(object):
50 def __init__(self, task):
53 def __init__(self, task):
51 self.task = task
54 self.task = task
52
55
53 @LazyProperty
56 @LazyProperty
54 def result(self):
57 def result(self):
55 return self.task
58 return self.task
56
59
57
60
58 def run_task(task, *args, **kwargs):
61 def run_task(task, *args, **kwargs):
59 if CELERY_ON:
62 if CELERY_ON:
60 try:
63 try:
61 t = task.apply_async(args=args, kwargs=kwargs)
64 t = task.apply_async(args=args, kwargs=kwargs)
62 log.info('running task %s:%s', t.task_id, task)
65 log.info('running task %s:%s', t.task_id, task)
63 return t
66 return t
64
67
65 except socket.error, e:
68 except socket.error, e:
66 if isinstance(e, IOError) and e.errno == 111:
69 if isinstance(e, IOError) and e.errno == 111:
67 log.debug('Unable to connect to celeryd. Sync execution')
70 log.debug('Unable to connect to celeryd. Sync execution')
68 else:
71 else:
69 log.error(traceback.format_exc())
72 log.error(traceback.format_exc())
70 except KeyError, e:
73 except KeyError, e:
71 log.debug('Unable to connect to celeryd. Sync execution')
74 log.debug('Unable to connect to celeryd. Sync execution')
72 except Exception, e:
75 except Exception, e:
73 log.error(traceback.format_exc())
76 log.error(traceback.format_exc())
74
77
75 log.debug('executing task %s in sync mode', task)
78 log.debug('executing task %s in sync mode', task)
76 return ResultWrapper(task(*args, **kwargs))
79 return ResultWrapper(task(*args, **kwargs))
77
80
78
81
79 def __get_lockkey(func, *fargs, **fkwargs):
82 def __get_lockkey(func, *fargs, **fkwargs):
80 params = list(fargs)
83 params = list(fargs)
81 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
84 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
82
85
83 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
86 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
84
87
85 lockkey = 'task_%s.lock' % \
88 lockkey = 'task_%s.lock' % \
86 md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
89 md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
87 return lockkey
90 return lockkey
88
91
89
92
90 def locked_task(func):
93 def locked_task(func):
91 def __wrapper(func, *fargs, **fkwargs):
94 def __wrapper(func, *fargs, **fkwargs):
92 lockkey = __get_lockkey(func, *fargs, **fkwargs)
95 lockkey = __get_lockkey(func, *fargs, **fkwargs)
93 lockkey_path = config['here']
96 lockkey_path = config['here']
94
97
95 log.info('running task with lockkey %s', lockkey)
98 log.info('running task with lockkey %s', lockkey)
96 try:
99 try:
97 l = DaemonLock(file_=jn(lockkey_path, lockkey))
100 l = DaemonLock(file_=jn(lockkey_path, lockkey))
98 ret = func(*fargs, **fkwargs)
101 ret = func(*fargs, **fkwargs)
99 l.release()
102 l.release()
100 return ret
103 return ret
101 except LockHeld:
104 except LockHeld:
102 log.info('LockHeld')
105 log.info('LockHeld')
103 return 'Task with key %s already running' % lockkey
106 return 'Task with key %s already running' % lockkey
104
107
105 return decorator(__wrapper, func)
108 return decorator(__wrapper, func)
109
110
111 def get_session():
112 if CELERY_ON:
113 engine = engine_from_config(config, 'sqlalchemy.db1.')
114 init_model(engine)
115 sa = meta.Session
116 return sa
117
118
119 def dbsession(func):
120 def __wrapper(func, *fargs, **fkwargs):
121 try:
122 ret = func(*fargs, **fkwargs)
123 return ret
124 finally:
125 meta.Session.remove()
126
127 return decorator(__wrapper, func)
@@ -1,418 +1,414 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.celerylib.tasks
3 rhodecode.lib.celerylib.tasks
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 RhodeCode task modules, containing all task that suppose to be run
6 RhodeCode task modules, containing all task that suppose to be run
7 by celery daemon
7 by celery daemon
8
8
9 :created_on: Oct 6, 2010
9 :created_on: Oct 6, 2010
10 :author: marcink
10 :author: marcink
11 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
12 :license: GPLv3, see COPYING for more details.
12 :license: GPLv3, see COPYING for more details.
13 """
13 """
14 # This program is free software: you can redistribute it and/or modify
14 # This program is free software: you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation, either version 3 of the License, or
16 # the Free Software Foundation, either version 3 of the License, or
17 # (at your option) any later version.
17 # (at your option) any later version.
18 #
18 #
19 # This program is distributed in the hope that it will be useful,
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
22 # GNU General Public License for more details.
23 #
23 #
24 # You should have received a copy of the GNU General Public License
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from celery.decorators import task
26 from celery.decorators import task
27
27
28 import os
28 import os
29 import traceback
29 import traceback
30 import logging
30 import logging
31 from os.path import join as jn
31 from os.path import join as jn
32
32
33 from time import mktime
33 from time import mktime
34 from operator import itemgetter
34 from operator import itemgetter
35 from string import lower
35 from string import lower
36
36
37 from pylons import config, url
37 from pylons import config, url
38 from pylons.i18n.translation import _
38 from pylons.i18n.translation import _
39
39
40 from vcs import get_backend
40 from vcs import get_backend
41
41
42 from rhodecode import CELERY_ON
42 from rhodecode import CELERY_ON
43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
44 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
44 from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
45 __get_lockkey, LockHeld, DaemonLock
45 str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
46 from rhodecode.lib.helpers import person
46 from rhodecode.lib.helpers import person
47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
48 from rhodecode.lib.utils import add_cache, action_logger
48 from rhodecode.lib.utils import add_cache, action_logger
49 from rhodecode.lib.compat import json, OrderedDict
49 from rhodecode.lib.compat import json, OrderedDict
50
50
51 from rhodecode.model import init_model
52 from rhodecode.model import meta
53 from rhodecode.model.db import Statistics, Repository, User
51 from rhodecode.model.db import Statistics, Repository, User
54
52
55 from sqlalchemy import engine_from_config
56
53
57 add_cache(config)
54 add_cache(config)
58
55
59 __all__ = ['whoosh_index', 'get_commits_stats',
56 __all__ = ['whoosh_index', 'get_commits_stats',
60 'reset_user_password', 'send_email']
57 'reset_user_password', 'send_email']
61
58
62
59
63 def get_session():
64 if CELERY_ON:
65 engine = engine_from_config(config, 'sqlalchemy.db1.')
66 init_model(engine)
67 sa = meta.Session
68 return sa
69
70 def get_logger(cls):
60 def get_logger(cls):
71 if CELERY_ON:
61 if CELERY_ON:
72 try:
62 try:
73 log = cls.get_logger()
63 log = cls.get_logger()
74 except:
64 except:
75 log = logging.getLogger(__name__)
65 log = logging.getLogger(__name__)
76 else:
66 else:
77 log = logging.getLogger(__name__)
67 log = logging.getLogger(__name__)
78
68
79 return log
69 return log
80
70
81
71
82 @task(ignore_result=True)
72 @task(ignore_result=True)
83 @locked_task
73 @locked_task
74 @dbsession
84 def whoosh_index(repo_location, full_index):
75 def whoosh_index(repo_location, full_index):
85 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
76 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
86
77 log = whoosh_index.get_logger(whoosh_index)
87 # log = whoosh_index.get_logger(whoosh_index)
78 DBS = get_session()
88
79
89 index_location = config['index_dir']
80 index_location = config['index_dir']
90 WhooshIndexingDaemon(index_location=index_location,
81 WhooshIndexingDaemon(index_location=index_location,
91 repo_location=repo_location, sa=get_session())\
82 repo_location=repo_location, sa=DBS)\
92 .run(full_index=full_index)
83 .run(full_index=full_index)
93
84
94
85
95 @task(ignore_result=True)
86 @task(ignore_result=True)
87 @dbsession
96 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
88 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
97 log = get_logger(get_commits_stats)
89 log = get_logger(get_commits_stats)
98
90 DBS = get_session()
99 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
91 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
100 ts_max_y)
92 ts_max_y)
101 lockkey_path = config['here']
93 lockkey_path = config['here']
102
94
103 log.info('running task with lockkey %s', lockkey)
95 log.info('running task with lockkey %s', lockkey)
104
96
105 try:
97 try:
106 sa = get_session()
107 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
98 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
108
99
109 # for js data compatibilty cleans the key for person from '
100 # for js data compatibilty cleans the key for person from '
110 akc = lambda k: person(k).replace('"', "")
101 akc = lambda k: person(k).replace('"', "")
111
102
112 co_day_auth_aggr = {}
103 co_day_auth_aggr = {}
113 commits_by_day_aggregate = {}
104 commits_by_day_aggregate = {}
114 repo = Repository.get_by_repo_name(repo_name)
105 repo = Repository.get_by_repo_name(repo_name)
115 if repo is None:
106 if repo is None:
116 return True
107 return True
117
108
118 repo = repo.scm_instance
109 repo = repo.scm_instance
119 repo_size = repo.count()
110 repo_size = repo.count()
120 # return if repo have no revisions
111 # return if repo have no revisions
121 if repo_size < 1:
112 if repo_size < 1:
122 lock.release()
113 lock.release()
123 return True
114 return True
124
115
125 skip_date_limit = True
116 skip_date_limit = True
126 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
117 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
127 last_rev = None
118 last_rev = None
128 last_cs = None
119 last_cs = None
129 timegetter = itemgetter('time')
120 timegetter = itemgetter('time')
130
121
131 dbrepo = sa.query(Repository)\
122 dbrepo = DBS.query(Repository)\
132 .filter(Repository.repo_name == repo_name).scalar()
123 .filter(Repository.repo_name == repo_name).scalar()
133 cur_stats = sa.query(Statistics)\
124 cur_stats = DBS.query(Statistics)\
134 .filter(Statistics.repository == dbrepo).scalar()
125 .filter(Statistics.repository == dbrepo).scalar()
135
126
136 if cur_stats is not None:
127 if cur_stats is not None:
137 last_rev = cur_stats.stat_on_revision
128 last_rev = cur_stats.stat_on_revision
138
129
139 if last_rev == repo.get_changeset().revision and repo_size > 1:
130 if last_rev == repo.get_changeset().revision and repo_size > 1:
140 # pass silently without any work if we're not on first revision or
131 # pass silently without any work if we're not on first revision or
141 # current state of parsing revision(from db marker) is the
132 # current state of parsing revision(from db marker) is the
142 # last revision
133 # last revision
143 lock.release()
134 lock.release()
144 return True
135 return True
145
136
146 if cur_stats:
137 if cur_stats:
147 commits_by_day_aggregate = OrderedDict(json.loads(
138 commits_by_day_aggregate = OrderedDict(json.loads(
148 cur_stats.commit_activity_combined))
139 cur_stats.commit_activity_combined))
149 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
140 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
150
141
151 log.debug('starting parsing %s', parse_limit)
142 log.debug('starting parsing %s', parse_limit)
152 lmktime = mktime
143 lmktime = mktime
153
144
154 last_rev = last_rev + 1 if last_rev >= 0 else 0
145 last_rev = last_rev + 1 if last_rev >= 0 else 0
155 log.debug('Getting revisions from %s to %s' % (
146 log.debug('Getting revisions from %s to %s' % (
156 last_rev, last_rev + parse_limit)
147 last_rev, last_rev + parse_limit)
157 )
148 )
158 for cs in repo[last_rev:last_rev + parse_limit]:
149 for cs in repo[last_rev:last_rev + parse_limit]:
159 last_cs = cs # remember last parsed changeset
150 last_cs = cs # remember last parsed changeset
160 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
151 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
161 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
152 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
162
153
163 if akc(cs.author) in co_day_auth_aggr:
154 if akc(cs.author) in co_day_auth_aggr:
164 try:
155 try:
165 l = [timegetter(x) for x in
156 l = [timegetter(x) for x in
166 co_day_auth_aggr[akc(cs.author)]['data']]
157 co_day_auth_aggr[akc(cs.author)]['data']]
167 time_pos = l.index(k)
158 time_pos = l.index(k)
168 except ValueError:
159 except ValueError:
169 time_pos = False
160 time_pos = False
170
161
171 if time_pos >= 0 and time_pos is not False:
162 if time_pos >= 0 and time_pos is not False:
172
163
173 datadict = \
164 datadict = \
174 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
165 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
175
166
176 datadict["commits"] += 1
167 datadict["commits"] += 1
177 datadict["added"] += len(cs.added)
168 datadict["added"] += len(cs.added)
178 datadict["changed"] += len(cs.changed)
169 datadict["changed"] += len(cs.changed)
179 datadict["removed"] += len(cs.removed)
170 datadict["removed"] += len(cs.removed)
180
171
181 else:
172 else:
182 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
173 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
183
174
184 datadict = {"time": k,
175 datadict = {"time": k,
185 "commits": 1,
176 "commits": 1,
186 "added": len(cs.added),
177 "added": len(cs.added),
187 "changed": len(cs.changed),
178 "changed": len(cs.changed),
188 "removed": len(cs.removed),
179 "removed": len(cs.removed),
189 }
180 }
190 co_day_auth_aggr[akc(cs.author)]['data']\
181 co_day_auth_aggr[akc(cs.author)]['data']\
191 .append(datadict)
182 .append(datadict)
192
183
193 else:
184 else:
194 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
185 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
195 co_day_auth_aggr[akc(cs.author)] = {
186 co_day_auth_aggr[akc(cs.author)] = {
196 "label": akc(cs.author),
187 "label": akc(cs.author),
197 "data": [{"time":k,
188 "data": [{"time":k,
198 "commits":1,
189 "commits":1,
199 "added":len(cs.added),
190 "added":len(cs.added),
200 "changed":len(cs.changed),
191 "changed":len(cs.changed),
201 "removed":len(cs.removed),
192 "removed":len(cs.removed),
202 }],
193 }],
203 "schema": ["commits"],
194 "schema": ["commits"],
204 }
195 }
205
196
206 #gather all data by day
197 #gather all data by day
207 if k in commits_by_day_aggregate:
198 if k in commits_by_day_aggregate:
208 commits_by_day_aggregate[k] += 1
199 commits_by_day_aggregate[k] += 1
209 else:
200 else:
210 commits_by_day_aggregate[k] = 1
201 commits_by_day_aggregate[k] = 1
211
202
212 overview_data = sorted(commits_by_day_aggregate.items(),
203 overview_data = sorted(commits_by_day_aggregate.items(),
213 key=itemgetter(0))
204 key=itemgetter(0))
214
205
215 if not co_day_auth_aggr:
206 if not co_day_auth_aggr:
216 co_day_auth_aggr[akc(repo.contact)] = {
207 co_day_auth_aggr[akc(repo.contact)] = {
217 "label": akc(repo.contact),
208 "label": akc(repo.contact),
218 "data": [0, 1],
209 "data": [0, 1],
219 "schema": ["commits"],
210 "schema": ["commits"],
220 }
211 }
221
212
222 stats = cur_stats if cur_stats else Statistics()
213 stats = cur_stats if cur_stats else Statistics()
223 stats.commit_activity = json.dumps(co_day_auth_aggr)
214 stats.commit_activity = json.dumps(co_day_auth_aggr)
224 stats.commit_activity_combined = json.dumps(overview_data)
215 stats.commit_activity_combined = json.dumps(overview_data)
225
216
226 log.debug('last revison %s', last_rev)
217 log.debug('last revison %s', last_rev)
227 leftovers = len(repo.revisions[last_rev:])
218 leftovers = len(repo.revisions[last_rev:])
228 log.debug('revisions to parse %s', leftovers)
219 log.debug('revisions to parse %s', leftovers)
229
220
230 if last_rev == 0 or leftovers < parse_limit:
221 if last_rev == 0 or leftovers < parse_limit:
231 log.debug('getting code trending stats')
222 log.debug('getting code trending stats')
232 stats.languages = json.dumps(__get_codes_stats(repo_name))
223 stats.languages = json.dumps(__get_codes_stats(repo_name))
233
224
234 try:
225 try:
235 stats.repository = dbrepo
226 stats.repository = dbrepo
236 stats.stat_on_revision = last_cs.revision if last_cs else 0
227 stats.stat_on_revision = last_cs.revision if last_cs else 0
237 sa.add(stats)
228 DBS.add(stats)
238 sa.commit()
229 DBS.commit()
239 except:
230 except:
240 log.error(traceback.format_exc())
231 log.error(traceback.format_exc())
241 sa.rollback()
232 DBS.rollback()
242 lock.release()
233 lock.release()
243 return False
234 return False
244
235
245 #final release
236 #final release
246 lock.release()
237 lock.release()
247
238
248 #execute another task if celery is enabled
239 #execute another task if celery is enabled
249 if len(repo.revisions) > 1 and CELERY_ON:
240 if len(repo.revisions) > 1 and CELERY_ON:
250 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
241 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
251 return True
242 return True
252 except LockHeld:
243 except LockHeld:
253 log.info('LockHeld')
244 log.info('LockHeld')
254 return 'Task with key %s already running' % lockkey
245 return 'Task with key %s already running' % lockkey
255
246
256 @task(ignore_result=True)
247 @task(ignore_result=True)
248 @dbsession
257 def send_password_link(user_email):
249 def send_password_link(user_email):
258 from rhodecode.model.notification import EmailNotificationModel
250 from rhodecode.model.notification import EmailNotificationModel
259
251
260 log = get_logger(send_password_link)
252 log = get_logger(send_password_link)
261
253 DBS = get_session()
254
262 try:
255 try:
263 sa = get_session()
264 user = User.get_by_email(user_email)
256 user = User.get_by_email(user_email)
265 if user:
257 if user:
266 log.debug('password reset user found %s' % user)
258 log.debug('password reset user found %s' % user)
267 link = url('reset_password_confirmation', key=user.api_key,
259 link = url('reset_password_confirmation', key=user.api_key,
268 qualified=True)
260 qualified=True)
269 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
261 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
270 body = EmailNotificationModel().get_email_tmpl(reg_type,
262 body = EmailNotificationModel().get_email_tmpl(reg_type,
271 **{'user':user.short_contact,
263 **{'user':user.short_contact,
272 'reset_url':link})
264 'reset_url':link})
273 log.debug('sending email')
265 log.debug('sending email')
274 run_task(send_email, user_email,
266 run_task(send_email, user_email,
275 _("password reset link"), body)
267 _("password reset link"), body)
276 log.info('send new password mail to %s', user_email)
268 log.info('send new password mail to %s', user_email)
277 else:
269 else:
278 log.debug("password reset email %s not found" % user_email)
270 log.debug("password reset email %s not found" % user_email)
279 except:
271 except:
280 log.error(traceback.format_exc())
272 log.error(traceback.format_exc())
281 return False
273 return False
282
274
283 return True
275 return True
284
276
285 @task(ignore_result=True)
277 @task(ignore_result=True)
278 @dbsession
286 def reset_user_password(user_email):
279 def reset_user_password(user_email):
287 from rhodecode.lib import auth
280 from rhodecode.lib import auth
288
281
289 log = get_logger(reset_user_password)
282 log = get_logger(reset_user_password)
290
283 DBS = get_session()
284
291 try:
285 try:
292 try:
286 try:
293 sa = get_session()
294 user = User.get_by_email(user_email)
287 user = User.get_by_email(user_email)
295 new_passwd = auth.PasswordGenerator().gen_password(8,
288 new_passwd = auth.PasswordGenerator().gen_password(8,
296 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
289 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
297 if user:
290 if user:
298 user.password = auth.get_crypt_password(new_passwd)
291 user.password = auth.get_crypt_password(new_passwd)
299 user.api_key = auth.generate_api_key(user.username)
292 user.api_key = auth.generate_api_key(user.username)
300 sa.add(user)
293 DBS.add(user)
301 sa.commit()
294 DBS.commit()
302 log.info('change password for %s', user_email)
295 log.info('change password for %s', user_email)
303 if new_passwd is None:
296 if new_passwd is None:
304 raise Exception('unable to generate new password')
297 raise Exception('unable to generate new password')
305 except:
298 except:
306 log.error(traceback.format_exc())
299 log.error(traceback.format_exc())
307 sa.rollback()
300 DBS.rollback()
308
301
309 run_task(send_email, user_email,
302 run_task(send_email, user_email,
310 'Your new password',
303 'Your new password',
311 'Your new RhodeCode password:%s' % (new_passwd))
304 'Your new RhodeCode password:%s' % (new_passwd))
312 log.info('send new password mail to %s', user_email)
305 log.info('send new password mail to %s', user_email)
313
306
314 except:
307 except:
315 log.error('Failed to update user password')
308 log.error('Failed to update user password')
316 log.error(traceback.format_exc())
309 log.error(traceback.format_exc())
317
310
318 return True
311 return True
319
312
320
313
321 @task(ignore_result=True)
314 @task(ignore_result=True)
315 @dbsession
322 def send_email(recipients, subject, body, html_body=''):
316 def send_email(recipients, subject, body, html_body=''):
323 """
317 """
324 Sends an email with defined parameters from the .ini files.
318 Sends an email with defined parameters from the .ini files.
325
319
326 :param recipients: list of recipients, it this is empty the defined email
320 :param recipients: list of recipients, it this is empty the defined email
327 address from field 'email_to' is used instead
321 address from field 'email_to' is used instead
328 :param subject: subject of the mail
322 :param subject: subject of the mail
329 :param body: body of the mail
323 :param body: body of the mail
330 :param html_body: html version of body
324 :param html_body: html version of body
331 """
325 """
332 log = get_logger(send_email)
326 log = get_logger(send_email)
333 sa = get_session()
327 DBS = get_session()
328
334 email_config = config
329 email_config = config
335 subject = "%s %s" % (email_config.get('email_prefix'), subject)
330 subject = "%s %s" % (email_config.get('email_prefix'), subject)
336 if not recipients:
331 if not recipients:
337 # if recipients are not defined we send to email_config + all admins
332 # if recipients are not defined we send to email_config + all admins
338 admins = [u.email for u in User.query()
333 admins = [u.email for u in User.query()
339 .filter(User.admin == True).all()]
334 .filter(User.admin == True).all()]
340 recipients = [email_config.get('email_to')] + admins
335 recipients = [email_config.get('email_to')] + admins
341
336
342 mail_from = email_config.get('app_email_from', 'RhodeCode')
337 mail_from = email_config.get('app_email_from', 'RhodeCode')
343 user = email_config.get('smtp_username')
338 user = email_config.get('smtp_username')
344 passwd = email_config.get('smtp_password')
339 passwd = email_config.get('smtp_password')
345 mail_server = email_config.get('smtp_server')
340 mail_server = email_config.get('smtp_server')
346 mail_port = email_config.get('smtp_port')
341 mail_port = email_config.get('smtp_port')
347 tls = str2bool(email_config.get('smtp_use_tls'))
342 tls = str2bool(email_config.get('smtp_use_tls'))
348 ssl = str2bool(email_config.get('smtp_use_ssl'))
343 ssl = str2bool(email_config.get('smtp_use_ssl'))
349 debug = str2bool(config.get('debug'))
344 debug = str2bool(config.get('debug'))
350 smtp_auth = email_config.get('smtp_auth')
345 smtp_auth = email_config.get('smtp_auth')
351
346
352 try:
347 try:
353 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
348 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
354 mail_port, ssl, tls, debug=debug)
349 mail_port, ssl, tls, debug=debug)
355 m.send(recipients, subject, body, html_body)
350 m.send(recipients, subject, body, html_body)
356 except:
351 except:
357 log.error('Mail sending failed')
352 log.error('Mail sending failed')
358 log.error(traceback.format_exc())
353 log.error(traceback.format_exc())
359 return False
354 return False
360 return True
355 return True
361
356
362
357
363 @task(ignore_result=True)
358 @task(ignore_result=True)
359 @dbsession
364 def create_repo_fork(form_data, cur_user):
360 def create_repo_fork(form_data, cur_user):
365 """
361 """
366 Creates a fork of repository using interval VCS methods
362 Creates a fork of repository using interval VCS methods
367
363
368 :param form_data:
364 :param form_data:
369 :param cur_user:
365 :param cur_user:
370 """
366 """
371 from rhodecode.model.repo import RepoModel
367 from rhodecode.model.repo import RepoModel
372
368
373 log = get_logger(create_repo_fork)
369 log = get_logger(create_repo_fork)
374
370 DBS = create_repo_fork.DBS
375 Session = get_session()
371
376 base_path = Repository.base_path()
372 base_path = Repository.base_path()
377
373
378 RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
374 RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
379
375
380 alias = form_data['repo_type']
376 alias = form_data['repo_type']
381 org_repo_name = form_data['org_path']
377 org_repo_name = form_data['org_path']
382 fork_name = form_data['repo_name_full']
378 fork_name = form_data['repo_name_full']
383 update_after_clone = form_data['update_after_clone']
379 update_after_clone = form_data['update_after_clone']
384 source_repo_path = os.path.join(base_path, org_repo_name)
380 source_repo_path = os.path.join(base_path, org_repo_name)
385 destination_fork_path = os.path.join(base_path, fork_name)
381 destination_fork_path = os.path.join(base_path, fork_name)
386
382
387 log.info('creating fork of %s as %s', source_repo_path,
383 log.info('creating fork of %s as %s', source_repo_path,
388 destination_fork_path)
384 destination_fork_path)
389 backend = get_backend(alias)
385 backend = get_backend(alias)
390 backend(safe_str(destination_fork_path), create=True,
386 backend(safe_str(destination_fork_path), create=True,
391 src_url=safe_str(source_repo_path),
387 src_url=safe_str(source_repo_path),
392 update_after_clone=update_after_clone)
388 update_after_clone=update_after_clone)
393 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
389 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
394 org_repo_name, '', Session)
390 org_repo_name, '', DBS)
395
391
396 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
392 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
397 fork_name, '', Session)
393 fork_name, '', DBS)
398 # finally commit at latest possible stage
394 # finally commit at latest possible stage
399 Session.commit()
395 DBS.commit()
400
396
401 def __get_codes_stats(repo_name):
397 def __get_codes_stats(repo_name):
402 repo = Repository.get_by_repo_name(repo_name).scm_instance
398 repo = Repository.get_by_repo_name(repo_name).scm_instance
403
399
404 tip = repo.get_changeset()
400 tip = repo.get_changeset()
405 code_stats = {}
401 code_stats = {}
406
402
407 def aggregate(cs):
403 def aggregate(cs):
408 for f in cs[2]:
404 for f in cs[2]:
409 ext = lower(f.extension)
405 ext = lower(f.extension)
410 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
406 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
411 if ext in code_stats:
407 if ext in code_stats:
412 code_stats[ext] += 1
408 code_stats[ext] += 1
413 else:
409 else:
414 code_stats[ext] = 1
410 code_stats[ext] = 1
415
411
416 map(aggregate, tip.walk('/'))
412 map(aggregate, tip.walk('/'))
417
413
418 return code_stats or {}
414 return code_stats or {}
General Comments 0
You need to be logged in to leave comments. Login now