tasks.py
411 lines
| 13.4 KiB
| text/x-python
|
PythonLexer
r903 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.lib.celerylib.tasks | ||||
r1002 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |||
r903 | ||||
RhodeCode task modules, containing all task that suppose to be run | ||||
by celery daemon | ||||
r1203 | ||||
r903 | :created_on: Oct 6, 2010 | |||
:author: marcink | ||||
r1203 | :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com> | |||
r903 | :license: GPLv3, see COPYING for more details. | |||
""" | ||||
r1206 | # This program is free software: you can redistribute it and/or modify | |||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
r1203 | # | |||
r903 | # This program is distributed in the hope that it will be useful, | |||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
r1203 | # | |||
r903 | # You should have received a copy of the GNU General Public License | |||
r1206 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
r547 | from celery.decorators import task | |||
r555 | ||||
r692 | import os | |||
import traceback | ||||
r1002 | import logging | |||
r1354 | from os.path import dirname as dn, join as jn | |||
r1002 | ||||
r692 | from time import mktime | |||
r547 | from operator import itemgetter | |||
r1244 | from string import lower | |||
r776 | ||||
r1417 | from pylons import config, url | |||
r547 | from pylons.i18n.translation import _ | |||
r776 | ||||
r1401 | from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str | |||
r1264 | from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \ | |||
__get_lockkey, LockHeld, DaemonLock | ||||
r547 | from rhodecode.lib.helpers import person | |||
from rhodecode.lib.smtp_mailer import SmtpMailer | ||||
r1337 | from rhodecode.lib.utils import add_cache | |||
r1514 | from rhodecode.lib.compat import json, OrderedDict | |||
r776 | from rhodecode.model import init_model | |||
from rhodecode.model import meta | ||||
r1264 | from rhodecode.model.db import RhodeCodeUi, Statistics, Repository | |||
r776 | ||||
r631 | from vcs.backends import get_repo | |||
r776 | ||||
from sqlalchemy import engine_from_config | ||||
r692 | ||||
r777 | add_cache(config) | |||
r1514 | ||||
r564 | ||||
r547 | __all__ = ['whoosh_index', 'get_commits_stats', | |||
'reset_user_password', 'send_email'] | ||||
r776 | CELERY_ON = str2bool(config['app_conf'].get('use_celery')) | |||
r1244 | ||||
r547 | def get_session(): | |||
r776 | if CELERY_ON: | |||
engine = engine_from_config(config, 'sqlalchemy.db1.') | ||||
init_model(engine) | ||||
sa = meta.Session() | ||||
r547 | return sa | |||
r1244 | ||||
r692 | def get_repos_path(): | |||
sa = get_session() | ||||
q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() | ||||
return q.ui_value | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | @locked_task | |||
def whoosh_index(repo_location, full_index): | ||||
r1002 | #log = whoosh_index.get_logger() | |||
r547 | from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon | |||
r776 | index_location = config['index_dir'] | |||
r662 | WhooshIndexingDaemon(index_location=index_location, | |||
r777 | repo_location=repo_location, sa=get_session())\ | |||
.run(full_index=full_index) | ||||
r547 | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def get_commits_stats(repo_name, ts_min_y, ts_max_y): | |||
r1002 | try: | |||
log = get_commits_stats.get_logger() | ||||
except: | ||||
log = logging.getLogger(__name__) | ||||
r1264 | lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y, | |||
ts_max_y) | ||||
r1540 | lockkey_path = config['here'] | |||
r1264 | log.info('running task with lockkey %s', lockkey) | |||
try: | ||||
r1540 | lock = l = DaemonLock(file_=jn(lockkey_path, lockkey)) | |||
r699 | ||||
r1264 | #for js data compatibilty cleans the key for person from ' | |||
akc = lambda k: person(k).replace('"', "") | ||||
r629 | ||||
r1264 | co_day_auth_aggr = {} | |||
commits_by_day_aggregate = {} | ||||
repos_path = get_repos_path() | ||||
r1401 | repo = get_repo(safe_str(os.path.join(repos_path, repo_name))) | |||
r1264 | repo_size = len(repo.revisions) | |||
#return if repo have no revisions | ||||
if repo_size < 1: | ||||
lock.release() | ||||
return True | ||||
r547 | ||||
r1264 | skip_date_limit = True | |||
parse_limit = int(config['app_conf'].get('commit_parse_limit')) | ||||
last_rev = 0 | ||||
last_cs = None | ||||
timegetter = itemgetter('time') | ||||
r629 | ||||
r1264 | sa = get_session() | |||
r629 | ||||
r1264 | dbrepo = sa.query(Repository)\ | |||
.filter(Repository.repo_name == repo_name).scalar() | ||||
cur_stats = sa.query(Statistics)\ | ||||
.filter(Statistics.repository == dbrepo).scalar() | ||||
r1105 | ||||
r1264 | if cur_stats is not None: | |||
last_rev = cur_stats.stat_on_revision | ||||
r1105 | ||||
r1264 | if last_rev == repo.get_changeset().revision and repo_size > 1: | |||
#pass silently without any work if we're not on first revision or | ||||
#current state of parsing revision(from db marker) is the | ||||
#last revision | ||||
lock.release() | ||||
return True | ||||
r629 | ||||
r1264 | if cur_stats: | |||
commits_by_day_aggregate = OrderedDict(json.loads( | ||||
cur_stats.commit_activity_combined)) | ||||
co_day_auth_aggr = json.loads(cur_stats.commit_activity) | ||||
r629 | ||||
r1264 | log.debug('starting parsing %s', parse_limit) | |||
lmktime = mktime | ||||
last_rev = last_rev + 1 if last_rev > 0 else last_rev | ||||
r629 | ||||
r1264 | for cs in repo[last_rev:last_rev + parse_limit]: | |||
last_cs = cs # remember last parsed changeset | ||||
k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], | ||||
cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) | ||||
r1076 | ||||
r1264 | if akc(cs.author) in co_day_auth_aggr: | |||
try: | ||||
l = [timegetter(x) for x in | ||||
co_day_auth_aggr[akc(cs.author)]['data']] | ||||
time_pos = l.index(k) | ||||
except ValueError: | ||||
time_pos = False | ||||
if time_pos >= 0 and time_pos is not False: | ||||
datadict = \ | ||||
co_day_auth_aggr[akc(cs.author)]['data'][time_pos] | ||||
r804 | ||||
r1264 | datadict["commits"] += 1 | |||
datadict["added"] += len(cs.added) | ||||
datadict["changed"] += len(cs.changed) | ||||
datadict["removed"] += len(cs.removed) | ||||
else: | ||||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
r629 | ||||
r1264 | datadict = {"time": k, | |||
"commits": 1, | ||||
"added": len(cs.added), | ||||
"changed": len(cs.changed), | ||||
"removed": len(cs.removed), | ||||
} | ||||
co_day_auth_aggr[akc(cs.author)]['data']\ | ||||
.append(datadict) | ||||
r629 | ||||
r547 | else: | |||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
r1264 | co_day_auth_aggr[akc(cs.author)] = { | |||
"label": akc(cs.author), | ||||
"data": [{"time":k, | ||||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
}], | ||||
"schema": ["commits"], | ||||
} | ||||
r629 | ||||
r1264 | #gather all data by day | |||
if k in commits_by_day_aggregate: | ||||
commits_by_day_aggregate[k] += 1 | ||||
else: | ||||
commits_by_day_aggregate[k] = 1 | ||||
r629 | ||||
r1264 | overview_data = sorted(commits_by_day_aggregate.items(), | |||
key=itemgetter(0)) | ||||
if not co_day_auth_aggr: | ||||
co_day_auth_aggr[akc(repo.contact)] = { | ||||
"label": akc(repo.contact), | ||||
"data": [0, 1], | ||||
"schema": ["commits"], | ||||
} | ||||
r629 | ||||
r1264 | stats = cur_stats if cur_stats else Statistics() | |||
stats.commit_activity = json.dumps(co_day_auth_aggr) | ||||
stats.commit_activity_combined = json.dumps(overview_data) | ||||
r547 | ||||
r1264 | log.debug('last revison %s', last_rev) | |||
leftovers = len(repo.revisions[last_rev:]) | ||||
log.debug('revisions to parse %s', leftovers) | ||||
r547 | ||||
r1264 | if last_rev == 0 or leftovers < parse_limit: | |||
log.debug('getting code trending stats') | ||||
stats.languages = json.dumps(__get_codes_stats(repo_name)) | ||||
r629 | ||||
r1264 | try: | |||
stats.repository = dbrepo | ||||
stats.stat_on_revision = last_cs.revision if last_cs else 0 | ||||
sa.add(stats) | ||||
sa.commit() | ||||
except: | ||||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
lock.release() | ||||
return False | ||||
r629 | ||||
r1264 | #final release | |||
lock.release() | ||||
r629 | ||||
r1264 | #execute another task if celery is enabled | |||
if len(repo.revisions) > 1 and CELERY_ON: | ||||
run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) | ||||
return True | ||||
except LockHeld: | ||||
log.info('LockHeld') | ||||
return 'Task with key %s already running' % lockkey | ||||
r547 | ||||
r1417 | @task(ignore_result=True) | |||
def send_password_link(user_email): | ||||
try: | ||||
log = reset_user_password.get_logger() | ||||
except: | ||||
log = logging.getLogger(__name__) | ||||
from rhodecode.lib import auth | ||||
from rhodecode.model.db import User | ||||
try: | ||||
sa = get_session() | ||||
user = sa.query(User).filter(User.email == user_email).scalar() | ||||
if user: | ||||
link = url('reset_password_confirmation', key=user.api_key, | ||||
qualified=True) | ||||
tmpl = """ | ||||
Hello %s | ||||
We received a request to create a new password for your account. | ||||
You can generate it by clicking following URL: | ||||
%s | ||||
If you didn't request new password please ignore this email. | ||||
""" | ||||
run_task(send_email, user_email, | ||||
"RhodeCode password reset link", | ||||
tmpl % (user.short_contact, link)) | ||||
log.info('send new password mail to %s', user_email) | ||||
except: | ||||
log.error('Failed to update user password') | ||||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def reset_user_password(user_email): | |||
r1002 | try: | |||
log = reset_user_password.get_logger() | ||||
except: | ||||
log = logging.getLogger(__name__) | ||||
r547 | from rhodecode.lib import auth | |||
from rhodecode.model.db import User | ||||
r629 | ||||
r547 | try: | |||
try: | ||||
sa = get_session() | ||||
user = sa.query(User).filter(User.email == user_email).scalar() | ||||
new_passwd = auth.PasswordGenerator().gen_password(8, | ||||
auth.PasswordGenerator.ALPHABETS_BIG_SMALL) | ||||
if user: | ||||
user.password = auth.get_crypt_password(new_passwd) | ||||
r1116 | user.api_key = auth.generate_api_key(user.username) | |||
r547 | sa.add(user) | |||
sa.commit() | ||||
log.info('change password for %s', user_email) | ||||
if new_passwd is None: | ||||
raise Exception('unable to generate new password') | ||||
r629 | ||||
r547 | except: | |||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
r629 | ||||
r547 | run_task(send_email, user_email, | |||
r1417 | "Your new RhodeCode password", | |||
'Your new RhodeCode password:%s' % (new_passwd)) | ||||
r547 | log.info('send new password mail to %s', user_email) | |||
r629 | ||||
r547 | except: | |||
log.error('Failed to update user password') | ||||
log.error(traceback.format_exc()) | ||||
r776 | ||||
r547 | return True | |||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def send_email(recipients, subject, body): | |||
r689 | """ | |||
Sends an email with defined parameters from the .ini files. | ||||
r1203 | ||||
r689 | :param recipients: list of recipients, it this is empty the defined email | |||
address from field 'email_to' is used instead | ||||
:param subject: subject of the mail | ||||
:param body: body of the mail | ||||
""" | ||||
r1002 | try: | |||
log = send_email.get_logger() | ||||
except: | ||||
log = logging.getLogger(__name__) | ||||
r776 | email_config = config | |||
r689 | ||||
if not recipients: | ||||
recipients = [email_config.get('email_to')] | ||||
r547 | mail_from = email_config.get('app_email_from') | |||
user = email_config.get('smtp_username') | ||||
passwd = email_config.get('smtp_password') | ||||
mail_server = email_config.get('smtp_server') | ||||
mail_port = email_config.get('smtp_port') | ||||
r689 | tls = str2bool(email_config.get('smtp_use_tls')) | |||
ssl = str2bool(email_config.get('smtp_use_ssl')) | ||||
r1169 | debug = str2bool(config.get('debug')) | |||
r629 | ||||
r547 | try: | |||
m = SmtpMailer(mail_from, user, passwd, mail_server, | ||||
r1169 | mail_port, ssl, tls, debug=debug) | |||
r629 | m.send(recipients, subject, body) | |||
r547 | except: | |||
log.error('Mail sending failed') | ||||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||
r1244 | ||||
r1002 | @task(ignore_result=True) | |||
r547 | def create_repo_fork(form_data, cur_user): | |||
r1264 | from rhodecode.model.repo import RepoModel | |||
from vcs import get_backend | ||||
r1002 | try: | |||
log = create_repo_fork.get_logger() | ||||
except: | ||||
log = logging.getLogger(__name__) | ||||
r752 | repo_model = RepoModel(get_session()) | |||
r630 | repo_model.create(form_data, cur_user, just_db=True, fork=True) | |||
r659 | repo_name = form_data['repo_name'] | |||
r692 | repos_path = get_repos_path() | |||
r659 | repo_path = os.path.join(repos_path, repo_name) | |||
r547 | repo_fork_path = os.path.join(repos_path, form_data['fork_name']) | |||
r659 | alias = form_data['repo_type'] | |||
r629 | ||||
r659 | log.info('creating repo fork %s as %s', repo_name, repo_path) | |||
backend = get_backend(alias) | ||||
backend(str(repo_fork_path), create=True, src_url=str(repo_path)) | ||||
r629 | ||||
r1244 | ||||
r547 | def __get_codes_stats(repo_name): | |||
r692 | repos_path = get_repos_path() | |||
r1401 | repo = get_repo(safe_str(os.path.join(repos_path, repo_name))) | |||
r603 | tip = repo.get_changeset() | |||
r547 | code_stats = {} | |||
r630 | ||||
def aggregate(cs): | ||||
for f in cs[2]: | ||||
r1244 | ext = lower(f.extension) | |||
r789 | if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary: | |||
r1244 | if ext in code_stats: | |||
code_stats[ext] += 1 | ||||
r547 | else: | |||
r1244 | code_stats[ext] = 1 | |||
r629 | ||||
r630 | map(aggregate, tip.walk('/')) | |||
r547 | return code_stats or {} | |||