##// END OF EJS Templates
implements #291 email notification sent to all admin users
marcink -
r1642:c0d8171a default
parent child Browse files
Show More
@@ -1,412 +1,414
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.celerylib.tasks
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 RhodeCode task modules, containing all task that suppose to be run
7 7 by celery daemon
8 8
9 9 :created_on: Oct 6, 2010
10 10 :author: marcink
11 11 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
12 12 :license: GPLv3, see COPYING for more details.
13 13 """
14 14 # This program is free software: you can redistribute it and/or modify
15 15 # it under the terms of the GNU General Public License as published by
16 16 # the Free Software Foundation, either version 3 of the License, or
17 17 # (at your option) any later version.
18 18 #
19 19 # This program is distributed in the hope that it will be useful,
20 20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 22 # GNU General Public License for more details.
23 23 #
24 24 # You should have received a copy of the GNU General Public License
25 25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 26 from celery.decorators import task
27 27
28 28 import os
29 29 import traceback
30 30 import logging
31 31 from os.path import dirname as dn, join as jn
32 32
33 33 from time import mktime
34 34 from operator import itemgetter
35 35 from string import lower
36 36
37 37 from pylons import config, url
38 38 from pylons.i18n.translation import _
39 39
40 40 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
41 41 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
42 42 __get_lockkey, LockHeld, DaemonLock
43 43 from rhodecode.lib.helpers import person
44 44 from rhodecode.lib.smtp_mailer import SmtpMailer
45 45 from rhodecode.lib.utils import add_cache
46 46 from rhodecode.lib.compat import json, OrderedDict
47 47
48 48 from rhodecode.model import init_model
49 49 from rhodecode.model import meta
50 from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
50 from rhodecode.model.db import RhodeCodeUi, Statistics, Repository, User
51 51
52 52 from vcs.backends import get_repo
53 53
54 54 from sqlalchemy import engine_from_config
55 55
56 56 add_cache(config)
57 57
58 58
59 59
60 60 __all__ = ['whoosh_index', 'get_commits_stats',
61 61 'reset_user_password', 'send_email']
62 62
63 63 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
64 64
65 65
66 66 def get_session():
67 67 if CELERY_ON:
68 68 engine = engine_from_config(config, 'sqlalchemy.db1.')
69 69 init_model(engine)
70 70 sa = meta.Session()
71 71 return sa
72 72
73 73
74 74 def get_repos_path():
75 75 sa = get_session()
76 76 q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
77 77 return q.ui_value
78 78
79 79
80 80 @task(ignore_result=True)
81 81 @locked_task
82 82 def whoosh_index(repo_location, full_index):
83 83 #log = whoosh_index.get_logger()
84 84 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
85 85 index_location = config['index_dir']
86 86 WhooshIndexingDaemon(index_location=index_location,
87 87 repo_location=repo_location, sa=get_session())\
88 88 .run(full_index=full_index)
89 89
90 90
91 91 @task(ignore_result=True)
92 92 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
93 93 try:
94 94 log = get_commits_stats.get_logger()
95 95 except:
96 96 log = logging.getLogger(__name__)
97 97
98 98 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
99 99 ts_max_y)
100 100 lockkey_path = config['here']
101 101
102 102 log.info('running task with lockkey %s', lockkey)
103 103 try:
104 104 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
105 105
106 106 #for js data compatibilty cleans the key for person from '
107 107 akc = lambda k: person(k).replace('"', "")
108 108
109 109 co_day_auth_aggr = {}
110 110 commits_by_day_aggregate = {}
111 111 repos_path = get_repos_path()
112 112 repo = get_repo(safe_str(os.path.join(repos_path, repo_name)))
113 113 repo_size = len(repo.revisions)
114 114 #return if repo have no revisions
115 115 if repo_size < 1:
116 116 lock.release()
117 117 return True
118 118
119 119 skip_date_limit = True
120 120 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
121 121 last_rev = 0
122 122 last_cs = None
123 123 timegetter = itemgetter('time')
124 124
125 125 sa = get_session()
126 126
127 127 dbrepo = sa.query(Repository)\
128 128 .filter(Repository.repo_name == repo_name).scalar()
129 129 cur_stats = sa.query(Statistics)\
130 130 .filter(Statistics.repository == dbrepo).scalar()
131 131
132 132 if cur_stats is not None:
133 133 last_rev = cur_stats.stat_on_revision
134 134
135 135 if last_rev == repo.get_changeset().revision and repo_size > 1:
136 136 #pass silently without any work if we're not on first revision or
137 137 #current state of parsing revision(from db marker) is the
138 138 #last revision
139 139 lock.release()
140 140 return True
141 141
142 142 if cur_stats:
143 143 commits_by_day_aggregate = OrderedDict(json.loads(
144 144 cur_stats.commit_activity_combined))
145 145 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
146 146
147 147 log.debug('starting parsing %s', parse_limit)
148 148 lmktime = mktime
149 149
150 150 last_rev = last_rev + 1 if last_rev > 0 else last_rev
151 151
152 152 for cs in repo[last_rev:last_rev + parse_limit]:
153 153 last_cs = cs # remember last parsed changeset
154 154 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
155 155 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
156 156
157 157 if akc(cs.author) in co_day_auth_aggr:
158 158 try:
159 159 l = [timegetter(x) for x in
160 160 co_day_auth_aggr[akc(cs.author)]['data']]
161 161 time_pos = l.index(k)
162 162 except ValueError:
163 163 time_pos = False
164 164
165 165 if time_pos >= 0 and time_pos is not False:
166 166
167 167 datadict = \
168 168 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
169 169
170 170 datadict["commits"] += 1
171 171 datadict["added"] += len(cs.added)
172 172 datadict["changed"] += len(cs.changed)
173 173 datadict["removed"] += len(cs.removed)
174 174
175 175 else:
176 176 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
177 177
178 178 datadict = {"time": k,
179 179 "commits": 1,
180 180 "added": len(cs.added),
181 181 "changed": len(cs.changed),
182 182 "removed": len(cs.removed),
183 183 }
184 184 co_day_auth_aggr[akc(cs.author)]['data']\
185 185 .append(datadict)
186 186
187 187 else:
188 188 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
189 189 co_day_auth_aggr[akc(cs.author)] = {
190 190 "label": akc(cs.author),
191 191 "data": [{"time":k,
192 192 "commits":1,
193 193 "added":len(cs.added),
194 194 "changed":len(cs.changed),
195 195 "removed":len(cs.removed),
196 196 }],
197 197 "schema": ["commits"],
198 198 }
199 199
200 200 #gather all data by day
201 201 if k in commits_by_day_aggregate:
202 202 commits_by_day_aggregate[k] += 1
203 203 else:
204 204 commits_by_day_aggregate[k] = 1
205 205
206 206 overview_data = sorted(commits_by_day_aggregate.items(),
207 207 key=itemgetter(0))
208 208
209 209 if not co_day_auth_aggr:
210 210 co_day_auth_aggr[akc(repo.contact)] = {
211 211 "label": akc(repo.contact),
212 212 "data": [0, 1],
213 213 "schema": ["commits"],
214 214 }
215 215
216 216 stats = cur_stats if cur_stats else Statistics()
217 217 stats.commit_activity = json.dumps(co_day_auth_aggr)
218 218 stats.commit_activity_combined = json.dumps(overview_data)
219 219
220 220 log.debug('last revison %s', last_rev)
221 221 leftovers = len(repo.revisions[last_rev:])
222 222 log.debug('revisions to parse %s', leftovers)
223 223
224 224 if last_rev == 0 or leftovers < parse_limit:
225 225 log.debug('getting code trending stats')
226 226 stats.languages = json.dumps(__get_codes_stats(repo_name))
227 227
228 228 try:
229 229 stats.repository = dbrepo
230 230 stats.stat_on_revision = last_cs.revision if last_cs else 0
231 231 sa.add(stats)
232 232 sa.commit()
233 233 except:
234 234 log.error(traceback.format_exc())
235 235 sa.rollback()
236 236 lock.release()
237 237 return False
238 238
239 239 #final release
240 240 lock.release()
241 241
242 242 #execute another task if celery is enabled
243 243 if len(repo.revisions) > 1 and CELERY_ON:
244 244 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
245 245 return True
246 246 except LockHeld:
247 247 log.info('LockHeld')
248 248 return 'Task with key %s already running' % lockkey
249 249
250 250 @task(ignore_result=True)
251 251 def send_password_link(user_email):
252 252 try:
253 253 log = reset_user_password.get_logger()
254 254 except:
255 255 log = logging.getLogger(__name__)
256 256
257 257 from rhodecode.lib import auth
258 258 from rhodecode.model.db import User
259 259
260 260 try:
261 261 sa = get_session()
262 262 user = sa.query(User).filter(User.email == user_email).scalar()
263 263
264 264 if user:
265 265 link = url('reset_password_confirmation', key=user.api_key,
266 266 qualified=True)
267 267 tmpl = """
268 268 Hello %s
269 269
270 270 We received a request to create a new password for your account.
271 271
272 272 You can generate it by clicking following URL:
273 273
274 274 %s
275 275
276 276 If you didn't request new password please ignore this email.
277 277 """
278 278 run_task(send_email, user_email,
279 279 "RhodeCode password reset link",
280 280 tmpl % (user.short_contact, link))
281 281 log.info('send new password mail to %s', user_email)
282 282
283 283 except:
284 284 log.error('Failed to update user password')
285 285 log.error(traceback.format_exc())
286 286 return False
287 287
288 288 return True
289 289
290 290 @task(ignore_result=True)
291 291 def reset_user_password(user_email):
292 292 try:
293 293 log = reset_user_password.get_logger()
294 294 except:
295 295 log = logging.getLogger(__name__)
296 296
297 297 from rhodecode.lib import auth
298 298 from rhodecode.model.db import User
299 299
300 300 try:
301 301 try:
302 302 sa = get_session()
303 303 user = sa.query(User).filter(User.email == user_email).scalar()
304 304 new_passwd = auth.PasswordGenerator().gen_password(8,
305 305 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
306 306 if user:
307 307 user.password = auth.get_crypt_password(new_passwd)
308 308 user.api_key = auth.generate_api_key(user.username)
309 309 sa.add(user)
310 310 sa.commit()
311 311 log.info('change password for %s', user_email)
312 312 if new_passwd is None:
313 313 raise Exception('unable to generate new password')
314 314
315 315 except:
316 316 log.error(traceback.format_exc())
317 317 sa.rollback()
318 318
319 319 run_task(send_email, user_email,
320 320 "Your new RhodeCode password",
321 321 'Your new RhodeCode password:%s' % (new_passwd))
322 322 log.info('send new password mail to %s', user_email)
323 323
324 324 except:
325 325 log.error('Failed to update user password')
326 326 log.error(traceback.format_exc())
327 327
328 328 return True
329 329
330 330
331 331 @task(ignore_result=True)
332 332 def send_email(recipients, subject, body):
333 333 """
334 334 Sends an email with defined parameters from the .ini files.
335 335
336 336 :param recipients: list of recipients, it this is empty the defined email
337 337 address from field 'email_to' is used instead
338 338 :param subject: subject of the mail
339 339 :param body: body of the mail
340 340 """
341 341 try:
342 342 log = send_email.get_logger()
343 343 except:
344 344 log = logging.getLogger(__name__)
345 345
346 346 email_config = config
347 347
348 348 if not recipients:
349 recipients = [email_config.get('email_to')]
349 # if recipients are not defined we send to email_config + all admins
350 admins = [u.email for u in User.query().filter(User.admin==True).all()]
351 recipients = [email_config.get('email_to')] + admins
350 352
351 353 mail_from = email_config.get('app_email_from')
352 354 user = email_config.get('smtp_username')
353 355 passwd = email_config.get('smtp_password')
354 356 mail_server = email_config.get('smtp_server')
355 357 mail_port = email_config.get('smtp_port')
356 358 tls = str2bool(email_config.get('smtp_use_tls'))
357 359 ssl = str2bool(email_config.get('smtp_use_ssl'))
358 360 debug = str2bool(config.get('debug'))
359 361 smtp_auth = email_config.get('smtp_auth')
360 362
361 363 try:
362 364 m = SmtpMailer(mail_from, user, passwd, mail_server,smtp_auth,
363 365 mail_port, ssl, tls, debug=debug)
364 366 m.send(recipients, subject, body)
365 367 except:
366 368 log.error('Mail sending failed')
367 369 log.error(traceback.format_exc())
368 370 return False
369 371 return True
370 372
371 373
372 374 @task(ignore_result=True)
373 375 def create_repo_fork(form_data, cur_user):
374 376 from rhodecode.model.repo import RepoModel
375 377 from vcs import get_backend
376 378
377 379 try:
378 380 log = create_repo_fork.get_logger()
379 381 except:
380 382 log = logging.getLogger(__name__)
381 383
382 384 repo_model = RepoModel(get_session())
383 385 repo_model.create(form_data, cur_user, just_db=True, fork=True)
384 386 repo_name = form_data['repo_name']
385 387 repos_path = get_repos_path()
386 388 repo_path = os.path.join(repos_path, repo_name)
387 389 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
388 390 alias = form_data['repo_type']
389 391
390 392 log.info('creating repo fork %s as %s', repo_name, repo_path)
391 393 backend = get_backend(alias)
392 394 backend(str(repo_fork_path), create=True, src_url=str(repo_path))
393 395
394 396
395 397 def __get_codes_stats(repo_name):
396 398 repos_path = get_repos_path()
397 399 repo = get_repo(safe_str(os.path.join(repos_path, repo_name)))
398 400 tip = repo.get_changeset()
399 401 code_stats = {}
400 402
401 403 def aggregate(cs):
402 404 for f in cs[2]:
403 405 ext = lower(f.extension)
404 406 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
405 407 if ext in code_stats:
406 408 code_stats[ext] += 1
407 409 else:
408 410 code_stats[ext] = 1
409 411
410 412 map(aggregate, tip.walk('/'))
411 413
412 414 return code_stats or {}
General Comments 0
You need to be logged in to leave comments. Login now