tasks.py
324 lines
| 10.9 KiB
| text/x-python
|
PythonLexer
r547 | from celery.decorators import task | |||
r555 | ||||
r692 | import os | |||
import traceback | ||||
from time import mktime | ||||
r547 | from operator import itemgetter | |||
from pylons.i18n.translation import _ | ||||
from rhodecode.lib.celerylib import run_task, locked_task | ||||
from rhodecode.lib.helpers import person | ||||
from rhodecode.lib.smtp_mailer import SmtpMailer | ||||
from rhodecode.lib.utils import OrderedDict | ||||
r631 | from vcs.backends import get_repo | |||
r692 | from rhodecode.model.db import RhodeCodeUi | |||
r555 | try: | |||
r564 | import json | |||
except ImportError: | ||||
#python 2.5 compatibility | ||||
import simplejson as json | ||||
try: | ||||
r555 | from celeryconfig import PYLONS_CONFIG as config | |||
r559 | celery_on = True | |||
r555 | except ImportError: | |||
#if celeryconfig is not present let's just load our pylons | ||||
#config instead | ||||
from pylons import config | ||||
r559 | celery_on = False | |||
r555 | ||||
r547 | __all__ = ['whoosh_index', 'get_commits_stats', | |||
'reset_user_password', 'send_email'] | ||||
def get_session(): | ||||
r559 | if celery_on: | |||
from sqlalchemy import engine_from_config | ||||
from sqlalchemy.orm import sessionmaker, scoped_session | ||||
r685 | engine = engine_from_config(dict(config.items('app:main')), | |||
'sqlalchemy.db1.') | ||||
r559 | sa = scoped_session(sessionmaker(bind=engine)) | |||
else: | ||||
#If we don't use celery reuse our current application Session | ||||
from rhodecode.model.meta import Session | ||||
r629 | sa = Session() | |||
r547 | return sa | |||
r692 | def get_repos_path(): | |||
sa = get_session() | ||||
q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() | ||||
return q.ui_value | ||||
r547 | @task | |||
@locked_task | ||||
def whoosh_index(repo_location, full_index): | ||||
log = whoosh_index.get_logger() | ||||
from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon | ||||
r685 | index_location = dict(config.items('app:main'))['index_dir'] | |||
r662 | WhooshIndexingDaemon(index_location=index_location, | |||
repo_location=repo_location).run(full_index=full_index) | ||||
r547 | ||||
@task | ||||
@locked_task | ||||
def get_commits_stats(repo_name, ts_min_y, ts_max_y): | ||||
from rhodecode.model.db import Statistics, Repository | ||||
log = get_commits_stats.get_logger() | ||||
r699 | ||||
#for js data compatibilty | ||||
author_key_cleaner = lambda k: person(k).replace('"', "") | ||||
r629 | ||||
r547 | commits_by_day_author_aggregate = {} | |||
commits_by_day_aggregate = {} | ||||
r692 | repos_path = get_repos_path() | |||
r631 | p = os.path.join(repos_path, repo_name) | |||
r635 | repo = get_repo(p) | |||
r547 | ||||
skip_date_limit = True | ||||
r630 | parse_limit = 250 #limit for single task changeset parsing optimal for | |||
r547 | last_rev = 0 | |||
last_cs = None | ||||
timegetter = itemgetter('time') | ||||
r629 | ||||
r547 | sa = get_session() | |||
r629 | ||||
r547 | dbrepo = sa.query(Repository)\ | |||
.filter(Repository.repo_name == repo_name).scalar() | ||||
cur_stats = sa.query(Statistics)\ | ||||
.filter(Statistics.repository == dbrepo).scalar() | ||||
if cur_stats: | ||||
last_rev = cur_stats.stat_on_revision | ||||
if not repo.revisions: | ||||
return True | ||||
r629 | ||||
r547 | if last_rev == repo.revisions[-1] and len(repo.revisions) > 1: | |||
r699 | #pass silently without any work if we're not on first revision or | |||
#current state of parsing revision(from db marker) is the last revision | ||||
r547 | return True | |||
r629 | ||||
r547 | if cur_stats: | |||
commits_by_day_aggregate = OrderedDict( | ||||
json.loads( | ||||
cur_stats.commit_activity_combined)) | ||||
commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity) | ||||
r629 | ||||
r547 | log.debug('starting parsing %s', parse_limit) | |||
r630 | lmktime = mktime | |||
r547 | for cnt, rev in enumerate(repo.revisions[last_rev:]): | |||
last_cs = cs = repo.get_changeset(rev) | ||||
k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], | ||||
cs.date.timetuple()[2]) | ||||
timetupple = [int(x) for x in k.split('-')] | ||||
timetupple.extend([0 for _ in xrange(6)]) | ||||
r630 | k = lmktime(timetupple) | |||
r547 | if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)): | |||
try: | ||||
l = [timegetter(x) for x in commits_by_day_author_aggregate\ | ||||
[author_key_cleaner(cs.author)]['data']] | ||||
time_pos = l.index(k) | ||||
except ValueError: | ||||
time_pos = False | ||||
r629 | ||||
r547 | if time_pos >= 0 and time_pos is not False: | |||
r629 | ||||
r547 | datadict = commits_by_day_author_aggregate\ | |||
[author_key_cleaner(cs.author)]['data'][time_pos] | ||||
r629 | ||||
r547 | datadict["commits"] += 1 | |||
datadict["added"] += len(cs.added) | ||||
datadict["changed"] += len(cs.changed) | ||||
datadict["removed"] += len(cs.removed) | ||||
r629 | ||||
r547 | else: | |||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
r629 | ||||
r547 | datadict = {"time":k, | |||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
} | ||||
commits_by_day_author_aggregate\ | ||||
[author_key_cleaner(cs.author)]['data'].append(datadict) | ||||
r629 | ||||
r547 | else: | |||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = { | ||||
"label":author_key_cleaner(cs.author), | ||||
"data":[{"time":k, | ||||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
}], | ||||
"schema":["commits"], | ||||
r629 | } | |||
r562 | #gather all data by day | |||
r547 | if commits_by_day_aggregate.has_key(k): | |||
commits_by_day_aggregate[k] += 1 | ||||
else: | ||||
commits_by_day_aggregate[k] = 1 | ||||
r629 | ||||
r547 | if cnt >= parse_limit: | |||
#don't fetch to much data since we can freeze application | ||||
break | ||||
overview_data = [] | ||||
for k, v in commits_by_day_aggregate.items(): | ||||
overview_data.append([k, v]) | ||||
overview_data = sorted(overview_data, key=itemgetter(0)) | ||||
if not commits_by_day_author_aggregate: | ||||
commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = { | ||||
"label":author_key_cleaner(repo.contact), | ||||
"data":[0, 1], | ||||
"schema":["commits"], | ||||
} | ||||
stats = cur_stats if cur_stats else Statistics() | ||||
stats.commit_activity = json.dumps(commits_by_day_author_aggregate) | ||||
stats.commit_activity_combined = json.dumps(overview_data) | ||||
log.debug('last revison %s', last_rev) | ||||
leftovers = len(repo.revisions[last_rev:]) | ||||
log.debug('revisions to parse %s', leftovers) | ||||
r629 | ||||
if last_rev == 0 or leftovers < parse_limit: | ||||
r547 | stats.languages = json.dumps(__get_codes_stats(repo_name)) | |||
r629 | ||||
r547 | stats.repository = dbrepo | |||
stats.stat_on_revision = last_cs.revision | ||||
r629 | ||||
r547 | try: | |||
sa.add(stats) | ||||
r629 | sa.commit() | |||
r547 | except: | |||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
return False | ||||
if len(repo.revisions) > 1: | ||||
run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) | ||||
r629 | ||||
r547 | return True | |||
@task | ||||
def reset_user_password(user_email): | ||||
log = reset_user_password.get_logger() | ||||
from rhodecode.lib import auth | ||||
from rhodecode.model.db import User | ||||
r629 | ||||
r547 | try: | |||
try: | ||||
sa = get_session() | ||||
user = sa.query(User).filter(User.email == user_email).scalar() | ||||
new_passwd = auth.PasswordGenerator().gen_password(8, | ||||
auth.PasswordGenerator.ALPHABETS_BIG_SMALL) | ||||
if user: | ||||
user.password = auth.get_crypt_password(new_passwd) | ||||
sa.add(user) | ||||
sa.commit() | ||||
log.info('change password for %s', user_email) | ||||
if new_passwd is None: | ||||
raise Exception('unable to generate new password') | ||||
r629 | ||||
r547 | except: | |||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
r629 | ||||
r547 | run_task(send_email, user_email, | |||
r549 | "Your new rhodecode password", | |||
'Your new rhodecode password:%s' % (new_passwd)) | ||||
r547 | log.info('send new password mail to %s', user_email) | |||
r629 | ||||
r547 | except: | |||
log.error('Failed to update user password') | ||||
log.error(traceback.format_exc()) | ||||
return True | ||||
r629 | @task | |||
r547 | def send_email(recipients, subject, body): | |||
r689 | """ | |||
Sends an email with defined parameters from the .ini files. | ||||
:param recipients: list of recipients, it this is empty the defined email | ||||
address from field 'email_to' is used instead | ||||
:param subject: subject of the mail | ||||
:param body: body of the mail | ||||
""" | ||||
r547 | log = send_email.get_logger() | |||
r629 | email_config = dict(config.items('DEFAULT')) | |||
r689 | ||||
if not recipients: | ||||
recipients = [email_config.get('email_to')] | ||||
def str2bool(v): | ||||
r722 | return v.lower() in ["yes", "true", "t", "1"] if v else None | |||
r689 | ||||
r547 | mail_from = email_config.get('app_email_from') | |||
user = email_config.get('smtp_username') | ||||
passwd = email_config.get('smtp_password') | ||||
mail_server = email_config.get('smtp_server') | ||||
mail_port = email_config.get('smtp_port') | ||||
r689 | tls = str2bool(email_config.get('smtp_use_tls')) | |||
ssl = str2bool(email_config.get('smtp_use_ssl')) | ||||
r629 | ||||
r547 | try: | |||
m = SmtpMailer(mail_from, user, passwd, mail_server, | ||||
mail_port, ssl, tls) | ||||
r629 | m.send(recipients, subject, body) | |||
r547 | except: | |||
log.error('Mail sending failed') | ||||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||
@task | ||||
def create_repo_fork(form_data, cur_user): | ||||
r629 | from rhodecode.model.repo import RepoModel | |||
r659 | from vcs import get_backend | |||
log = create_repo_fork.get_logger() | ||||
r692 | repo_model = RepoModel() | |||
r630 | repo_model.create(form_data, cur_user, just_db=True, fork=True) | |||
r659 | repo_name = form_data['repo_name'] | |||
r692 | repos_path = get_repos_path() | |||
r659 | repo_path = os.path.join(repos_path, repo_name) | |||
r547 | repo_fork_path = os.path.join(repos_path, form_data['fork_name']) | |||
r659 | alias = form_data['repo_type'] | |||
r629 | ||||
r659 | log.info('creating repo fork %s as %s', repo_name, repo_path) | |||
backend = get_backend(alias) | ||||
backend(str(repo_fork_path), create=True, src_url=str(repo_path)) | ||||
r629 | ||||
r547 | def __get_codes_stats(repo_name): | |||
r630 | LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', | |||
'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', | ||||
'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml', | ||||
'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst', | ||||
's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws'] | ||||
r692 | repos_path = get_repos_path() | |||
r631 | p = os.path.join(repos_path, repo_name) | |||
r635 | repo = get_repo(p) | |||
r603 | tip = repo.get_changeset() | |||
r547 | code_stats = {} | |||
r630 | ||||
def aggregate(cs): | ||||
for f in cs[2]: | ||||
r547 | k = f.mimetype | |||
if f.extension in LANGUAGES_EXTENSIONS: | ||||
if code_stats.has_key(k): | ||||
code_stats[k] += 1 | ||||
else: | ||||
code_stats[k] = 1 | ||||
r629 | ||||
r630 | map(aggregate, tip.walk('/')) | |||
r547 | return code_stats or {} | |||