##// END OF EJS Templates
fixed email prefix sent as None
marcink -
r2154:747c1c70 beta
parent child Browse files
Show More
@@ -1,416 +1,416 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.celerylib.tasks
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 RhodeCode task modules, containing all task that suppose to be run
7 7 by celery daemon
8 8
9 9 :created_on: Oct 6, 2010
10 10 :author: marcink
11 11 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
12 12 :license: GPLv3, see COPYING for more details.
13 13 """
14 14 # This program is free software: you can redistribute it and/or modify
15 15 # it under the terms of the GNU General Public License as published by
16 16 # the Free Software Foundation, either version 3 of the License, or
17 17 # (at your option) any later version.
18 18 #
19 19 # This program is distributed in the hope that it will be useful,
20 20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 22 # GNU General Public License for more details.
23 23 #
24 24 # You should have received a copy of the GNU General Public License
25 25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 26 from celery.decorators import task
27 27
28 28 import os
29 29 import traceback
30 30 import logging
31 31 from os.path import join as jn
32 32
33 33 from time import mktime
34 34 from operator import itemgetter
35 35 from string import lower
36 36
37 37 from pylons import config, url
38 38 from pylons.i18n.translation import _
39 39
40 40 from rhodecode.lib.vcs import get_backend
41 41
42 42 from rhodecode import CELERY_ON
43 43 from rhodecode.lib.utils2 import safe_str
44 44 from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
45 45 str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
46 46 from rhodecode.lib.helpers import person
47 47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
48 48 from rhodecode.lib.utils import add_cache, action_logger
49 49 from rhodecode.lib.compat import json, OrderedDict
50 50
51 51 from rhodecode.model.db import Statistics, Repository, User
52 52
53 53
54 54 add_cache(config)
55 55
56 56 __all__ = ['whoosh_index', 'get_commits_stats',
57 57 'reset_user_password', 'send_email']
58 58
59 59
60 60 def get_logger(cls):
61 61 if CELERY_ON:
62 62 try:
63 63 log = cls.get_logger()
64 64 except:
65 65 log = logging.getLogger(__name__)
66 66 else:
67 67 log = logging.getLogger(__name__)
68 68
69 69 return log
70 70
71 71
72 72 @task(ignore_result=True)
73 73 @locked_task
74 74 @dbsession
75 75 def whoosh_index(repo_location, full_index):
76 76 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
77 77 log = whoosh_index.get_logger(whoosh_index)
78 78 DBS = get_session()
79 79
80 80 index_location = config['index_dir']
81 81 WhooshIndexingDaemon(index_location=index_location,
82 82 repo_location=repo_location, sa=DBS)\
83 83 .run(full_index=full_index)
84 84
85 85
86 86 @task(ignore_result=True)
87 87 @dbsession
88 88 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
89 89 log = get_logger(get_commits_stats)
90 90 DBS = get_session()
91 91 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
92 92 ts_max_y)
93 93 lockkey_path = config['here']
94 94
95 95 log.info('running task with lockkey %s' % lockkey)
96 96
97 97 try:
98 98 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
99 99
100 100 # for js data compatibility cleans the key for person from '
101 101 akc = lambda k: person(k).replace('"', "")
102 102
103 103 co_day_auth_aggr = {}
104 104 commits_by_day_aggregate = {}
105 105 repo = Repository.get_by_repo_name(repo_name)
106 106 if repo is None:
107 107 return True
108 108
109 109 repo = repo.scm_instance
110 110 repo_size = repo.count()
111 111 # return if repo have no revisions
112 112 if repo_size < 1:
113 113 lock.release()
114 114 return True
115 115
116 116 skip_date_limit = True
117 117 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
118 118 last_rev = None
119 119 last_cs = None
120 120 timegetter = itemgetter('time')
121 121
122 122 dbrepo = DBS.query(Repository)\
123 123 .filter(Repository.repo_name == repo_name).scalar()
124 124 cur_stats = DBS.query(Statistics)\
125 125 .filter(Statistics.repository == dbrepo).scalar()
126 126
127 127 if cur_stats is not None:
128 128 last_rev = cur_stats.stat_on_revision
129 129
130 130 if last_rev == repo.get_changeset().revision and repo_size > 1:
131 131 # pass silently without any work if we're not on first revision or
132 132 # current state of parsing revision(from db marker) is the
133 133 # last revision
134 134 lock.release()
135 135 return True
136 136
137 137 if cur_stats:
138 138 commits_by_day_aggregate = OrderedDict(json.loads(
139 139 cur_stats.commit_activity_combined))
140 140 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
141 141
142 142 log.debug('starting parsing %s' % parse_limit)
143 143 lmktime = mktime
144 144
145 145 last_rev = last_rev + 1 if last_rev >= 0 else 0
146 146 log.debug('Getting revisions from %s to %s' % (
147 147 last_rev, last_rev + parse_limit)
148 148 )
149 149 for cs in repo[last_rev:last_rev + parse_limit]:
150 150 log.debug('parsing %s' % cs)
151 151 last_cs = cs # remember last parsed changeset
152 152 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
153 153 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
154 154
155 155 if akc(cs.author) in co_day_auth_aggr:
156 156 try:
157 157 l = [timegetter(x) for x in
158 158 co_day_auth_aggr[akc(cs.author)]['data']]
159 159 time_pos = l.index(k)
160 160 except ValueError:
161 161 time_pos = False
162 162
163 163 if time_pos >= 0 and time_pos is not False:
164 164
165 165 datadict = \
166 166 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
167 167
168 168 datadict["commits"] += 1
169 169 datadict["added"] += len(cs.added)
170 170 datadict["changed"] += len(cs.changed)
171 171 datadict["removed"] += len(cs.removed)
172 172
173 173 else:
174 174 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
175 175
176 176 datadict = {"time": k,
177 177 "commits": 1,
178 178 "added": len(cs.added),
179 179 "changed": len(cs.changed),
180 180 "removed": len(cs.removed),
181 181 }
182 182 co_day_auth_aggr[akc(cs.author)]['data']\
183 183 .append(datadict)
184 184
185 185 else:
186 186 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
187 187 co_day_auth_aggr[akc(cs.author)] = {
188 188 "label": akc(cs.author),
189 189 "data": [{"time":k,
190 190 "commits":1,
191 191 "added":len(cs.added),
192 192 "changed":len(cs.changed),
193 193 "removed":len(cs.removed),
194 194 }],
195 195 "schema": ["commits"],
196 196 }
197 197
198 198 #gather all data by day
199 199 if k in commits_by_day_aggregate:
200 200 commits_by_day_aggregate[k] += 1
201 201 else:
202 202 commits_by_day_aggregate[k] = 1
203 203
204 204 overview_data = sorted(commits_by_day_aggregate.items(),
205 205 key=itemgetter(0))
206 206
207 207 if not co_day_auth_aggr:
208 208 co_day_auth_aggr[akc(repo.contact)] = {
209 209 "label": akc(repo.contact),
210 210 "data": [0, 1],
211 211 "schema": ["commits"],
212 212 }
213 213
214 214 stats = cur_stats if cur_stats else Statistics()
215 215 stats.commit_activity = json.dumps(co_day_auth_aggr)
216 216 stats.commit_activity_combined = json.dumps(overview_data)
217 217
218 218 log.debug('last revison %s' % last_rev)
219 219 leftovers = len(repo.revisions[last_rev:])
220 220 log.debug('revisions to parse %s' % leftovers)
221 221
222 222 if last_rev == 0 or leftovers < parse_limit:
223 223 log.debug('getting code trending stats')
224 224 stats.languages = json.dumps(__get_codes_stats(repo_name))
225 225
226 226 try:
227 227 stats.repository = dbrepo
228 228 stats.stat_on_revision = last_cs.revision if last_cs else 0
229 229 DBS.add(stats)
230 230 DBS.commit()
231 231 except:
232 232 log.error(traceback.format_exc())
233 233 DBS.rollback()
234 234 lock.release()
235 235 return False
236 236
237 237 # final release
238 238 lock.release()
239 239
240 240 # execute another task if celery is enabled
241 241 if len(repo.revisions) > 1 and CELERY_ON:
242 242 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
243 243 return True
244 244 except LockHeld:
245 245 log.info('LockHeld')
246 246 return 'Task with key %s already running' % lockkey
247 247
248 248 @task(ignore_result=True)
249 249 @dbsession
250 250 def send_password_link(user_email):
251 251 from rhodecode.model.notification import EmailNotificationModel
252 252
253 253 log = get_logger(send_password_link)
254 254 DBS = get_session()
255 255
256 256 try:
257 257 user = User.get_by_email(user_email)
258 258 if user:
259 259 log.debug('password reset user found %s' % user)
260 260 link = url('reset_password_confirmation', key=user.api_key,
261 261 qualified=True)
262 262 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
263 263 body = EmailNotificationModel().get_email_tmpl(reg_type,
264 264 **{'user':user.short_contact,
265 265 'reset_url':link})
266 266 log.debug('sending email')
267 267 run_task(send_email, user_email,
268 268 _("password reset link"), body)
269 269 log.info('send new password mail to %s' % user_email)
270 270 else:
271 271 log.debug("password reset email %s not found" % user_email)
272 272 except:
273 273 log.error(traceback.format_exc())
274 274 return False
275 275
276 276 return True
277 277
278 278 @task(ignore_result=True)
279 279 @dbsession
280 280 def reset_user_password(user_email):
281 281 from rhodecode.lib import auth
282 282
283 283 log = get_logger(reset_user_password)
284 284 DBS = get_session()
285 285
286 286 try:
287 287 try:
288 288 user = User.get_by_email(user_email)
289 289 new_passwd = auth.PasswordGenerator().gen_password(8,
290 290 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
291 291 if user:
292 292 user.password = auth.get_crypt_password(new_passwd)
293 293 user.api_key = auth.generate_api_key(user.username)
294 294 DBS.add(user)
295 295 DBS.commit()
296 296 log.info('change password for %s' % user_email)
297 297 if new_passwd is None:
298 298 raise Exception('unable to generate new password')
299 299 except:
300 300 log.error(traceback.format_exc())
301 301 DBS.rollback()
302 302
303 303 run_task(send_email, user_email,
304 304 'Your new password',
305 305 'Your new RhodeCode password:%s' % (new_passwd))
306 306 log.info('send new password mail to %s' % user_email)
307 307
308 308 except:
309 309 log.error('Failed to update user password')
310 310 log.error(traceback.format_exc())
311 311
312 312 return True
313 313
314 314
315 315 @task(ignore_result=True)
316 316 @dbsession
317 317 def send_email(recipients, subject, body, html_body=''):
318 318 """
319 319 Sends an email with defined parameters from the .ini files.
320 320
321 321 :param recipients: list of recipients, it this is empty the defined email
322 322 address from field 'email_to' is used instead
323 323 :param subject: subject of the mail
324 324 :param body: body of the mail
325 325 :param html_body: html version of body
326 326 """
327 327 log = get_logger(send_email)
328 328 DBS = get_session()
329 329
330 330 email_config = config
331 subject = "%s %s" % (email_config.get('email_prefix'), subject)
331 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
332 332 if not recipients:
333 333 # if recipients are not defined we send to email_config + all admins
334 334 admins = [u.email for u in User.query()
335 335 .filter(User.admin == True).all()]
336 336 recipients = [email_config.get('email_to')] + admins
337 337
338 338 mail_from = email_config.get('app_email_from', 'RhodeCode')
339 339 user = email_config.get('smtp_username')
340 340 passwd = email_config.get('smtp_password')
341 341 mail_server = email_config.get('smtp_server')
342 342 mail_port = email_config.get('smtp_port')
343 343 tls = str2bool(email_config.get('smtp_use_tls'))
344 344 ssl = str2bool(email_config.get('smtp_use_ssl'))
345 345 debug = str2bool(config.get('debug'))
346 346 smtp_auth = email_config.get('smtp_auth')
347 347
348 348 try:
349 349 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
350 350 mail_port, ssl, tls, debug=debug)
351 351 m.send(recipients, subject, body, html_body)
352 352 except:
353 353 log.error('Mail sending failed')
354 354 log.error(traceback.format_exc())
355 355 return False
356 356 return True
357 357
358 358
359 359 @task(ignore_result=True)
360 360 @dbsession
361 361 def create_repo_fork(form_data, cur_user):
362 362 """
363 363 Creates a fork of repository using interval VCS methods
364 364
365 365 :param form_data:
366 366 :param cur_user:
367 367 """
368 368 from rhodecode.model.repo import RepoModel
369 369
370 370 log = get_logger(create_repo_fork)
371 371 DBS = get_session()
372 372
373 373 base_path = Repository.base_path()
374 374
375 375 RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
376 376
377 377 alias = form_data['repo_type']
378 378 org_repo_name = form_data['org_path']
379 379 fork_name = form_data['repo_name_full']
380 380 update_after_clone = form_data['update_after_clone']
381 381 source_repo_path = os.path.join(base_path, org_repo_name)
382 382 destination_fork_path = os.path.join(base_path, fork_name)
383 383
384 384 log.info('creating fork of %s as %s', source_repo_path,
385 385 destination_fork_path)
386 386 backend = get_backend(alias)
387 387 backend(safe_str(destination_fork_path), create=True,
388 388 src_url=safe_str(source_repo_path),
389 389 update_after_clone=update_after_clone)
390 390 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
391 391 org_repo_name, '', DBS)
392 392
393 393 action_logger(cur_user, 'user_created_fork:%s' % fork_name,
394 394 fork_name, '', DBS)
395 395 # finally commit at latest possible stage
396 396 DBS.commit()
397 397
398 398 def __get_codes_stats(repo_name):
399 399 from rhodecode.config.conf import LANGUAGES_EXTENSIONS_MAP
400 400 repo = Repository.get_by_repo_name(repo_name).scm_instance
401 401
402 402 tip = repo.get_changeset()
403 403 code_stats = {}
404 404
405 405 def aggregate(cs):
406 406 for f in cs[2]:
407 407 ext = lower(f.extension)
408 408 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
409 409 if ext in code_stats:
410 410 code_stats[ext] += 1
411 411 else:
412 412 code_stats[ext] = 1
413 413
414 414 map(aggregate, tip.walk('/'))
415 415
416 416 return code_stats or {}
General Comments 0
You need to be logged in to leave comments. Login now