tasks.py
414 lines
| 14.1 KiB
| text/x-python
|
PythonLexer
r903 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.lib.celerylib.tasks | ||||
r1002 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |||
r903 | ||||
RhodeCode task modules, containing all task that suppose to be run | ||||
by celery daemon | ||||
r1203 | ||||
r903 | :created_on: Oct 6, 2010 | |||
:author: marcink | ||||
r1203 | :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com> | |||
r903 | :license: GPLv3, see COPYING for more details. | |||
""" | ||||
r1206 | # This program is free software: you can redistribute it and/or modify | |||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
r1203 | # | |||
r903 | # This program is distributed in the hope that it will be useful, | |||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
r1203 | # | |||
r903 | # You should have received a copy of the GNU General Public License | |||
r1206 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
r547 | from celery.decorators import task | |||
r555 | ||||
r692 | import os | |||
import traceback | ||||
r1002 | import logging | |||
r1717 | from os.path import join as jn | |||
r1002 | ||||
r692 | from time import mktime | |||
r547 | from operator import itemgetter | |||
r1244 | from string import lower | |||
r776 | ||||
r1417 | from pylons import config, url | |||
r547 | from pylons.i18n.translation import _ | |||
r776 | ||||
r1722 | from vcs import get_backend | |||
r1717 | ||||
r1723 | from rhodecode import CELERY_ON | |||
r1401 | from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str | |||
r1264 | from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \ | |||
__get_lockkey, LockHeld, DaemonLock | ||||
r547 | from rhodecode.lib.helpers import person | |||
r1717 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer | |||
r1722 | from rhodecode.lib.utils import add_cache, action_logger | |||
r1514 | from rhodecode.lib.compat import json, OrderedDict | |||
r776 | from rhodecode.model import init_model | |||
from rhodecode.model import meta | ||||
r1722 | from rhodecode.model.db import Statistics, Repository, User | |||
r776 | ||||
from sqlalchemy import engine_from_config | ||||
r692 | ||||
r777 | add_cache(config) | |||
r547 | __all__ = ['whoosh_index', 'get_commits_stats', | |||
'reset_user_password', 'send_email'] | ||||
r1722 | ||||
r547 | def get_session(): | |||
r776 | if CELERY_ON: | |||
engine = engine_from_config(config, 'sqlalchemy.db1.') | ||||
init_model(engine) | ||||
r1749 | sa = meta.Session | |||
r547 | return sa | |||
r1717 | def get_logger(cls): | |||
if CELERY_ON: | ||||
try: | ||||
log = cls.get_logger() | ||||
except: | ||||
log = logging.getLogger(__name__) | ||||
else: | ||||
log = logging.getLogger(__name__) | ||||
return log | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | @locked_task | |||
def whoosh_index(repo_location, full_index): | ||||
r1722 | from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon | |||
r1002 | #log = whoosh_index.get_logger() | |||
r1722 | ||||
r776 | index_location = config['index_dir'] | |||
r662 | WhooshIndexingDaemon(index_location=index_location, | |||
r777 | repo_location=repo_location, sa=get_session())\ | |||
.run(full_index=full_index) | ||||
r547 | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def get_commits_stats(repo_name, ts_min_y, ts_max_y): | |||
r1717 | log = get_logger(get_commits_stats) | |||
r1002 | ||||
r1264 | lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y, | |||
ts_max_y) | ||||
r1540 | lockkey_path = config['here'] | |||
r1264 | log.info('running task with lockkey %s', lockkey) | |||
try: | ||||
r1713 | sa = get_session() | |||
r1540 | lock = l = DaemonLock(file_=jn(lockkey_path, lockkey)) | |||
r699 | ||||
r1722 | # for js data compatibilty cleans the key for person from ' | |||
r1264 | akc = lambda k: person(k).replace('"', "") | |||
r629 | ||||
r1264 | co_day_auth_aggr = {} | |||
commits_by_day_aggregate = {} | ||||
r1749 | repo = Repository.get_by_repo_name(repo_name) | |||
if repo is None: | ||||
return True | ||||
repo = repo.scm_instance | ||||
r1264 | repo_size = len(repo.revisions) | |||
#return if repo have no revisions | ||||
if repo_size < 1: | ||||
lock.release() | ||||
return True | ||||
r547 | ||||
r1264 | skip_date_limit = True | |||
parse_limit = int(config['app_conf'].get('commit_parse_limit')) | ||||
last_rev = 0 | ||||
last_cs = None | ||||
timegetter = itemgetter('time') | ||||
r629 | ||||
r1264 | dbrepo = sa.query(Repository)\ | |||
.filter(Repository.repo_name == repo_name).scalar() | ||||
cur_stats = sa.query(Statistics)\ | ||||
.filter(Statistics.repository == dbrepo).scalar() | ||||
r1105 | ||||
r1264 | if cur_stats is not None: | |||
last_rev = cur_stats.stat_on_revision | ||||
r1105 | ||||
r1264 | if last_rev == repo.get_changeset().revision and repo_size > 1: | |||
r1722 | # pass silently without any work if we're not on first revision or | |||
# current state of parsing revision(from db marker) is the | ||||
# last revision | ||||
r1264 | lock.release() | |||
return True | ||||
r629 | ||||
r1264 | if cur_stats: | |||
commits_by_day_aggregate = OrderedDict(json.loads( | ||||
cur_stats.commit_activity_combined)) | ||||
co_day_auth_aggr = json.loads(cur_stats.commit_activity) | ||||
r629 | ||||
r1264 | log.debug('starting parsing %s', parse_limit) | |||
lmktime = mktime | ||||
last_rev = last_rev + 1 if last_rev > 0 else last_rev | ||||
r629 | ||||
r1264 | for cs in repo[last_rev:last_rev + parse_limit]: | |||
last_cs = cs # remember last parsed changeset | ||||
k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], | ||||
cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) | ||||
r1076 | ||||
r1264 | if akc(cs.author) in co_day_auth_aggr: | |||
try: | ||||
l = [timegetter(x) for x in | ||||
co_day_auth_aggr[akc(cs.author)]['data']] | ||||
time_pos = l.index(k) | ||||
except ValueError: | ||||
time_pos = False | ||||
if time_pos >= 0 and time_pos is not False: | ||||
datadict = \ | ||||
co_day_auth_aggr[akc(cs.author)]['data'][time_pos] | ||||
r804 | ||||
r1264 | datadict["commits"] += 1 | |||
datadict["added"] += len(cs.added) | ||||
datadict["changed"] += len(cs.changed) | ||||
datadict["removed"] += len(cs.removed) | ||||
else: | ||||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
r629 | ||||
r1264 | datadict = {"time": k, | |||
"commits": 1, | ||||
"added": len(cs.added), | ||||
"changed": len(cs.changed), | ||||
"removed": len(cs.removed), | ||||
} | ||||
co_day_auth_aggr[akc(cs.author)]['data']\ | ||||
.append(datadict) | ||||
r629 | ||||
r547 | else: | |||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
r1264 | co_day_auth_aggr[akc(cs.author)] = { | |||
"label": akc(cs.author), | ||||
"data": [{"time":k, | ||||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
}], | ||||
"schema": ["commits"], | ||||
} | ||||
r629 | ||||
r1264 | #gather all data by day | |||
if k in commits_by_day_aggregate: | ||||
commits_by_day_aggregate[k] += 1 | ||||
else: | ||||
commits_by_day_aggregate[k] = 1 | ||||
r629 | ||||
r1264 | overview_data = sorted(commits_by_day_aggregate.items(), | |||
key=itemgetter(0)) | ||||
if not co_day_auth_aggr: | ||||
co_day_auth_aggr[akc(repo.contact)] = { | ||||
"label": akc(repo.contact), | ||||
"data": [0, 1], | ||||
"schema": ["commits"], | ||||
} | ||||
r629 | ||||
r1264 | stats = cur_stats if cur_stats else Statistics() | |||
stats.commit_activity = json.dumps(co_day_auth_aggr) | ||||
stats.commit_activity_combined = json.dumps(overview_data) | ||||
r547 | ||||
r1264 | log.debug('last revison %s', last_rev) | |||
leftovers = len(repo.revisions[last_rev:]) | ||||
log.debug('revisions to parse %s', leftovers) | ||||
r547 | ||||
r1264 | if last_rev == 0 or leftovers < parse_limit: | |||
log.debug('getting code trending stats') | ||||
stats.languages = json.dumps(__get_codes_stats(repo_name)) | ||||
r629 | ||||
r1264 | try: | |||
stats.repository = dbrepo | ||||
stats.stat_on_revision = last_cs.revision if last_cs else 0 | ||||
sa.add(stats) | ||||
sa.commit() | ||||
except: | ||||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
lock.release() | ||||
return False | ||||
r629 | ||||
r1264 | #final release | |||
lock.release() | ||||
r629 | ||||
r1264 | #execute another task if celery is enabled | |||
if len(repo.revisions) > 1 and CELERY_ON: | ||||
run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) | ||||
return True | ||||
except LockHeld: | ||||
log.info('LockHeld') | ||||
return 'Task with key %s already running' % lockkey | ||||
r547 | ||||
r1417 | @task(ignore_result=True) | |||
def send_password_link(user_email): | ||||
r1722 | from rhodecode.model.notification import EmailNotificationModel | |||
r1717 | log = get_logger(send_password_link) | |||
r1417 | ||||
try: | ||||
sa = get_session() | ||||
r1717 | user = User.get_by_email(user_email) | |||
r1417 | if user: | |||
r1717 | log.debug('password reset user found %s' % user) | |||
r1417 | link = url('reset_password_confirmation', key=user.api_key, | |||
qualified=True) | ||||
r1717 | reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET | |||
body = EmailNotificationModel().get_email_tmpl(reg_type, | ||||
**{'user':user.short_contact, | ||||
'reset_url':link}) | ||||
log.debug('sending email') | ||||
r1417 | run_task(send_email, user_email, | |||
r1717 | _("password reset link"), body) | |||
r1417 | log.info('send new password mail to %s', user_email) | |||
r1717 | else: | |||
log.debug("password reset email %s not found" % user_email) | ||||
r1417 | except: | |||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def reset_user_password(user_email): | |||
r1722 | from rhodecode.lib import auth | |||
r1002 | ||||
r1722 | log = get_logger(reset_user_password) | |||
r629 | ||||
r547 | try: | |||
try: | ||||
sa = get_session() | ||||
r1717 | user = User.get_by_email(user_email) | |||
r547 | new_passwd = auth.PasswordGenerator().gen_password(8, | |||
auth.PasswordGenerator.ALPHABETS_BIG_SMALL) | ||||
if user: | ||||
user.password = auth.get_crypt_password(new_passwd) | ||||
r1116 | user.api_key = auth.generate_api_key(user.username) | |||
r547 | sa.add(user) | |||
sa.commit() | ||||
log.info('change password for %s', user_email) | ||||
if new_passwd is None: | ||||
raise Exception('unable to generate new password') | ||||
except: | ||||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
r629 | ||||
r547 | run_task(send_email, user_email, | |||
r1717 | 'Your new password', | |||
r1417 | 'Your new RhodeCode password:%s' % (new_passwd)) | |||
r547 | log.info('send new password mail to %s', user_email) | |||
r629 | ||||
r547 | except: | |||
log.error('Failed to update user password') | ||||
log.error(traceback.format_exc()) | ||||
r776 | ||||
r547 | return True | |||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r1717 | def send_email(recipients, subject, body, html_body=''): | |||
r689 | """ | |||
Sends an email with defined parameters from the .ini files. | ||||
r1203 | ||||
r689 | :param recipients: list of recipients, it this is empty the defined email | |||
address from field 'email_to' is used instead | ||||
:param subject: subject of the mail | ||||
:param body: body of the mail | ||||
r1717 | :param html_body: html version of body | |||
r689 | """ | |||
r1717 | log = get_logger(send_email) | |||
r1728 | sa = get_session() | |||
r776 | email_config = config | |||
r1717 | subject = "%s %s" % (email_config.get('email_prefix'), subject) | |||
r689 | if not recipients: | |||
r1612 | # if recipients are not defined we send to email_config + all admins | |||
r1717 | admins = [u.email for u in User.query() | |||
.filter(User.admin == True).all()] | ||||
r1612 | recipients = [email_config.get('email_to')] + admins | |||
r689 | ||||
r1717 | mail_from = email_config.get('app_email_from', 'RhodeCode') | |||
r547 | user = email_config.get('smtp_username') | |||
passwd = email_config.get('smtp_password') | ||||
mail_server = email_config.get('smtp_server') | ||||
mail_port = email_config.get('smtp_port') | ||||
r689 | tls = str2bool(email_config.get('smtp_use_tls')) | |||
ssl = str2bool(email_config.get('smtp_use_ssl')) | ||||
r1169 | debug = str2bool(config.get('debug')) | |||
Les Peabody
|
r1581 | smtp_auth = email_config.get('smtp_auth') | ||
r629 | ||||
r547 | try: | |||
r1673 | m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth, | |||
r1169 | mail_port, ssl, tls, debug=debug) | |||
r1717 | m.send(recipients, subject, body, html_body) | |||
r547 | except: | |||
log.error('Mail sending failed') | ||||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def create_repo_fork(form_data, cur_user): | |||
r1722 | """ | |||
Creates a fork of repository using interval VCS methods | ||||
:param form_data: | ||||
:param cur_user: | ||||
""" | ||||
from rhodecode.model.repo import RepoModel | ||||
r1717 | log = get_logger(create_repo_fork) | |||
r1722 | Session = get_session() | |||
base_path = Repository.base_path() | ||||
RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True) | ||||
alias = form_data['repo_type'] | ||||
org_repo_name = form_data['org_path'] | ||||
r1730 | fork_name = form_data['repo_name_full'] | |||
r1742 | update_after_clone = form_data['update_after_clone'] | |||
r1722 | source_repo_path = os.path.join(base_path, org_repo_name) | |||
r1730 | destination_fork_path = os.path.join(base_path, fork_name) | |||
r1264 | ||||
r1722 | log.info('creating fork of %s as %s', source_repo_path, | |||
destination_fork_path) | ||||
r659 | backend = get_backend(alias) | |||
r1722 | backend(safe_str(destination_fork_path), create=True, | |||
r1742 | src_url=safe_str(source_repo_path), | |||
update_after_clone=update_after_clone) | ||||
r1730 | action_logger(cur_user, 'user_forked_repo:%s' % fork_name, | |||
r1722 | org_repo_name, '', Session) | |||
r1747 | ||||
action_logger(cur_user, 'user_created_fork:%s' % fork_name, | ||||
fork_name, '', Session) | ||||
r1722 | # finally commit at latest possible stage | |||
Session.commit() | ||||
r1244 | ||||
r547 | def __get_codes_stats(repo_name): | |||
r1722 | repo = Repository.get_by_repo_name(repo_name).scm_instance | |||
r603 | tip = repo.get_changeset() | |||
r547 | code_stats = {} | |||
r630 | ||||
def aggregate(cs): | ||||
for f in cs[2]: | ||||
r1244 | ext = lower(f.extension) | |||
r789 | if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary: | |||
r1244 | if ext in code_stats: | |||
code_stats[ext] += 1 | ||||
r547 | else: | |||
r1244 | code_stats[ext] = 1 | |||
r629 | ||||
r630 | map(aggregate, tip.walk('/')) | |||
r547 | return code_stats or {} | |||