tasks.py
504 lines
| 17.6 KiB
| text/x-python
|
PythonLexer
|
r4187 | # -*- coding: utf-8 -*- | ||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
kallithea.lib.celerylib.tasks | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
|
r4212 | Kallithea task modules, containing all task that suppose to be run | ||
|
r4187 | by celery daemon | ||
|
r4211 | This file was forked by the Kallithea project in July 2014. | ||
Original author and date, and relevant copyright and licensing information is below: | ||||
|
r4187 | :created_on: Oct 6, 2010 | ||
:author: marcink | ||||
|
r4211 | :copyright: (c) 2013 RhodeCode GmbH, and others. | ||
|
r4208 | :license: GPLv3, see LICENSE.md for more details. | ||
|
r4187 | """ | ||
import os | ||||
import traceback | ||||
import logging | ||||
|
r5455 | import rfc822 | ||
|
r4187 | |||
from time import mktime | ||||
from operator import itemgetter | ||||
from string import lower | ||||
|
r6508 | from tg import config | ||
|
r4187 | |||
|
r4422 | from kallithea import CELERY_ON | ||
|
r6133 | from kallithea.lib import celerylib | ||
|
r4187 | from kallithea.lib.helpers import person | ||
from kallithea.lib.rcmail.smtp_mailer import SmtpMailer | ||||
|
r6727 | from kallithea.lib.utils import action_logger | ||
|
r6134 | from kallithea.lib.utils2 import str2bool | ||
|
r5455 | from kallithea.lib.vcs.utils import author_email | ||
|
r4187 | from kallithea.lib.compat import json, OrderedDict | ||
from kallithea.lib.hooks import log_create_repository | ||||
|
r6426 | from kallithea.model.db import Statistics, RepoGroup, Repository, User | ||
|
r4187 | |||
|
r4424 | __all__ = ['whoosh_index', 'get_commits_stats', 'send_email'] | ||
|
r4187 | |||
|
r6129 | log = logging.getLogger(__name__) | ||
|
r4187 | |||
|
r6133 | @celerylib.task | ||
|
r6134 | @celerylib.locked_task | ||
@celerylib.dbsession | ||||
|
r4187 | def whoosh_index(repo_location, full_index): | ||
from kallithea.lib.indexers.daemon import WhooshIndexingDaemon | ||||
|
r6483 | celerylib.get_session() # initialize database connection | ||
|
r4187 | |||
index_location = config['index_dir'] | ||||
WhooshIndexingDaemon(index_location=index_location, | ||||
|
r6483 | repo_location=repo_location) \ | ||
|
r4187 | .run(full_index=full_index) | ||
|
r6133 | @celerylib.task | ||
|
r6134 | @celerylib.dbsession | ||
|
r4187 | def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100): | ||
|
r6134 | DBS = celerylib.get_session() | ||
lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y, | ||||
|
r4187 | ts_max_y) | ||
lockkey_path = config['app_conf']['cache_dir'] | ||||
|
r5375 | log.info('running task with lockkey %s', lockkey) | ||
|
r4187 | |||
try: | ||||
|
r6726 | lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey)) | ||
|
r4187 | |||
# for js data compatibility cleans the key for person from ' | ||||
akc = lambda k: person(k).replace('"', "") | ||||
co_day_auth_aggr = {} | ||||
commits_by_day_aggregate = {} | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
if repo is None: | ||||
return True | ||||
repo = repo.scm_instance | ||||
repo_size = repo.count() | ||||
# return if repo have no revisions | ||||
if repo_size < 1: | ||||
lock.release() | ||||
return True | ||||
skip_date_limit = True | ||||
parse_limit = int(config['app_conf'].get('commit_parse_limit')) | ||||
last_rev = None | ||||
last_cs = None | ||||
timegetter = itemgetter('time') | ||||
|
r5585 | dbrepo = DBS.query(Repository) \ | ||
|
r4187 | .filter(Repository.repo_name == repo_name).scalar() | ||
|
r5585 | cur_stats = DBS.query(Statistics) \ | ||
|
r4187 | .filter(Statistics.repository == dbrepo).scalar() | ||
if cur_stats is not None: | ||||
last_rev = cur_stats.stat_on_revision | ||||
if last_rev == repo.get_changeset().revision and repo_size > 1: | ||||
# pass silently without any work if we're not on first revision or | ||||
# current state of parsing revision(from db marker) is the | ||||
# last revision | ||||
lock.release() | ||||
return True | ||||
if cur_stats: | ||||
commits_by_day_aggregate = OrderedDict(json.loads( | ||||
cur_stats.commit_activity_combined)) | ||||
co_day_auth_aggr = json.loads(cur_stats.commit_activity) | ||||
|
r5375 | log.debug('starting parsing %s', parse_limit) | ||
|
r4187 | lmktime = mktime | ||
last_rev = last_rev + 1 if last_rev >= 0 else 0 | ||||
|
r5375 | log.debug('Getting revisions from %s to %s', | ||
last_rev, last_rev + parse_limit | ||||
|
r4187 | ) | ||
for cs in repo[last_rev:last_rev + parse_limit]: | ||||
|
r5375 | log.debug('parsing %s', cs) | ||
|
r4187 | last_cs = cs # remember last parsed changeset | ||
k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], | ||||
cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) | ||||
if akc(cs.author) in co_day_auth_aggr: | ||||
try: | ||||
l = [timegetter(x) for x in | ||||
co_day_auth_aggr[akc(cs.author)]['data']] | ||||
time_pos = l.index(k) | ||||
except ValueError: | ||||
time_pos = None | ||||
if time_pos >= 0 and time_pos is not None: | ||||
datadict = \ | ||||
co_day_auth_aggr[akc(cs.author)]['data'][time_pos] | ||||
datadict["commits"] += 1 | ||||
datadict["added"] += len(cs.added) | ||||
datadict["changed"] += len(cs.changed) | ||||
datadict["removed"] += len(cs.removed) | ||||
else: | ||||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
datadict = {"time": k, | ||||
"commits": 1, | ||||
"added": len(cs.added), | ||||
"changed": len(cs.changed), | ||||
"removed": len(cs.removed), | ||||
} | ||||
|
r5585 | co_day_auth_aggr[akc(cs.author)]['data'] \ | ||
|
r4187 | .append(datadict) | ||
else: | ||||
if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | ||||
co_day_auth_aggr[akc(cs.author)] = { | ||||
"label": akc(cs.author), | ||||
"data": [{"time":k, | ||||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
}], | ||||
"schema": ["commits"], | ||||
} | ||||
|
r6789 | # gather all data by day | ||
|
r4187 | if k in commits_by_day_aggregate: | ||
commits_by_day_aggregate[k] += 1 | ||||
else: | ||||
commits_by_day_aggregate[k] = 1 | ||||
overview_data = sorted(commits_by_day_aggregate.items(), | ||||
key=itemgetter(0)) | ||||
if not co_day_auth_aggr: | ||||
co_day_auth_aggr[akc(repo.contact)] = { | ||||
"label": akc(repo.contact), | ||||
"data": [0, 1], | ||||
"schema": ["commits"], | ||||
} | ||||
stats = cur_stats if cur_stats else Statistics() | ||||
stats.commit_activity = json.dumps(co_day_auth_aggr) | ||||
stats.commit_activity_combined = json.dumps(overview_data) | ||||
|
r5375 | log.debug('last revision %s', last_rev) | ||
|
r4187 | leftovers = len(repo.revisions[last_rev:]) | ||
|
r5375 | log.debug('revisions to parse %s', leftovers) | ||
|
r4187 | |||
if last_rev == 0 or leftovers < parse_limit: | ||||
log.debug('getting code trending stats') | ||||
stats.languages = json.dumps(__get_codes_stats(repo_name)) | ||||
try: | ||||
stats.repository = dbrepo | ||||
stats.stat_on_revision = last_cs.revision if last_cs else 0 | ||||
DBS.add(stats) | ||||
DBS.commit() | ||||
except: | ||||
log.error(traceback.format_exc()) | ||||
DBS.rollback() | ||||
lock.release() | ||||
return False | ||||
# final release | ||||
lock.release() | ||||
# execute another task if celery is enabled | ||||
if len(repo.revisions) > 1 and CELERY_ON and recurse_limit > 0: | ||||
|
r6133 | get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1) | ||
elif recurse_limit <= 0: | ||||
log.debug('Not recursing - limit has been reached') | ||||
else: | ||||
log.debug('Not recursing') | ||||
|
r6134 | except celerylib.LockHeld: | ||
|
r6131 | log.info('Task with key %s already running', lockkey) | ||
|
r4187 | return 'Task with key %s already running' % lockkey | ||
|
r6133 | @celerylib.task | ||
|
r6134 | @celerylib.dbsession | ||
|
r5455 | def send_email(recipients, subject, body='', html_body='', headers=None, author=None): | ||
|
r4187 | """ | ||
Sends an email with defined parameters from the .ini files. | ||||
|
r4332 | :param recipients: list of recipients, if this is None, the defined email | ||
address from field 'email_to' and all admins is used instead | ||||
|
r4187 | :param subject: subject of the mail | ||
:param body: body of the mail | ||||
:param html_body: html version of body | ||||
|
r5455 | :param headers: dictionary of prepopulated e-mail headers | ||
:param author: User object of the author of this mail, if known and relevant | ||||
|
r4187 | """ | ||
|
r4331 | assert isinstance(recipients, list), recipients | ||
|
r5455 | if headers is None: | ||
headers = {} | ||||
else: | ||||
# do not modify the original headers object passed by the caller | ||||
headers = headers.copy() | ||||
|
r4187 | |||
email_config = config | ||||
|
r4332 | email_prefix = email_config.get('email_prefix', '') | ||
if email_prefix: | ||||
subject = "%s %s" % (email_prefix, subject) | ||||
|
r5354 | |||
if not recipients: | ||||
|
r4187 | # if recipients are not defined we send to email_config + all admins | ||
|
r5355 | recipients = [u.email for u in User.query() | ||
.filter(User.admin == True).all()] | ||||
if email_config.get('email_to') is not None: | ||||
recipients += [email_config.get('email_to')] | ||||
# If there are still no recipients, there are no admins and no address | ||||
# configured in email_to, so return. | ||||
if not recipients: | ||||
log.error("No recipients specified and no fallback available.") | ||||
return False | ||||
|
r5354 | |||
log.warning("No recipients specified for '%s' - sending to admins %s", subject, ' '.join(recipients)) | ||||
|
r4187 | |||
|
r5455 | # SMTP sender | ||
envelope_from = email_config.get('app_email_from', 'Kallithea') | ||||
# 'From' header | ||||
if author is not None: | ||||
# set From header based on author but with a generic e-mail address | ||||
# In case app_email_from is in "Some Name <e-mail>" format, we first | ||||
# extract the e-mail address. | ||||
envelope_addr = author_email(envelope_from) | ||||
headers['From'] = '"%s" <%s>' % ( | ||||
rfc822.quote('%s (no-reply)' % author.full_name_or_username), | ||||
envelope_addr) | ||||
|
r4187 | user = email_config.get('smtp_username') | ||
passwd = email_config.get('smtp_password') | ||||
mail_server = email_config.get('smtp_server') | ||||
mail_port = email_config.get('smtp_port') | ||||
tls = str2bool(email_config.get('smtp_use_tls')) | ||||
ssl = str2bool(email_config.get('smtp_use_ssl')) | ||||
debug = str2bool(email_config.get('debug')) | ||||
smtp_auth = email_config.get('smtp_auth') | ||||
|
r5453 | logmsg = ("Mail details:\n" | ||
"recipients: %s\n" | ||||
"headers: %s\n" | ||||
"subject: %s\n" | ||||
"body:\n%s\n" | ||||
"html:\n%s\n" | ||||
% (' '.join(recipients), headers, subject, body, html_body)) | ||||
if mail_server: | ||||
log.debug("Sending e-mail. " + logmsg) | ||||
else: | ||||
log.error("SMTP mail server not configured - cannot send e-mail.") | ||||
log.warning(logmsg) | ||||
|
r4187 | return False | ||
try: | ||||
|
r5455 | m = SmtpMailer(envelope_from, user, passwd, mail_server, smtp_auth, | ||
|
r4187 | mail_port, ssl, tls, debug=debug) | ||
|
r4384 | m.send(recipients, subject, body, html_body, headers=headers) | ||
|
r4187 | except: | ||
log.error('Mail sending failed') | ||||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||
|
r6789 | |||
|
r6133 | @celerylib.task | ||
|
r6134 | @celerylib.dbsession | ||
|
r4187 | def create_repo(form_data, cur_user): | ||
from kallithea.model.repo import RepoModel | ||||
|
r4203 | from kallithea.model.db import Setting | ||
|
r4187 | |||
|
r6134 | DBS = celerylib.get_session() | ||
|
r4187 | |||
|
r6423 | cur_user = User.guess_instance(cur_user) | ||
|
r4187 | |||
owner = cur_user | ||||
repo_name = form_data['repo_name'] | ||||
repo_name_full = form_data['repo_name_full'] | ||||
repo_type = form_data['repo_type'] | ||||
description = form_data['repo_description'] | ||||
private = form_data['repo_private'] | ||||
clone_uri = form_data.get('clone_uri') | ||||
repo_group = form_data['repo_group'] | ||||
landing_rev = form_data['repo_landing_rev'] | ||||
copy_fork_permissions = form_data.get('copy_permissions') | ||||
copy_group_permissions = form_data.get('repo_copy_permissions') | ||||
fork_of = form_data.get('fork_parent_id') | ||||
state = form_data.get('repo_state', Repository.STATE_PENDING) | ||||
# repo creation defaults, private and repo_type are filled in form | ||||
|
r4203 | defs = Setting.get_default_repo_settings(strip_prefix=True) | ||
|
r4187 | enable_statistics = defs.get('repo_enable_statistics') | ||
enable_locking = defs.get('repo_enable_locking') | ||||
enable_downloads = defs.get('repo_enable_downloads') | ||||
try: | ||||
|
r6483 | repo = RepoModel()._create_repo( | ||
|
r4187 | repo_name=repo_name_full, | ||
repo_type=repo_type, | ||||
description=description, | ||||
owner=owner, | ||||
private=private, | ||||
clone_uri=clone_uri, | ||||
repo_group=repo_group, | ||||
landing_rev=landing_rev, | ||||
fork_of=fork_of, | ||||
copy_fork_permissions=copy_fork_permissions, | ||||
copy_group_permissions=copy_group_permissions, | ||||
enable_statistics=enable_statistics, | ||||
enable_locking=enable_locking, | ||||
enable_downloads=enable_downloads, | ||||
state=state | ||||
) | ||||
action_logger(cur_user, 'user_created_repo', | ||||
|
r6480 | form_data['repo_name_full'], '') | ||
|
r4187 | |||
DBS.commit() | ||||
# now create this repo on Filesystem | ||||
|
r6483 | RepoModel()._create_filesystem_repo( | ||
|
r4187 | repo_name=repo_name, | ||
repo_type=repo_type, | ||||
|
r6426 | repo_group=RepoGroup.guess_instance(repo_group), | ||
|
r4187 | clone_uri=clone_uri, | ||
) | ||||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
log_create_repository(repo.get_dict(), created_by=owner.username) | ||||
# update repo changeset caches initially | ||||
repo.update_changeset_cache() | ||||
# set new created state | ||||
repo.set_state(Repository.STATE_CREATED) | ||||
DBS.commit() | ||||
|
r5374 | except Exception as e: | ||
|
r4792 | log.warning('Exception %s occurred when forking repository, ' | ||
|
r4187 | 'doing cleanup...' % e) | ||
# rollback things manually ! | ||||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
if repo: | ||||
Repository.delete(repo.repo_id) | ||||
DBS.commit() | ||||
|
r6483 | RepoModel()._delete_filesystem_repo(repo) | ||
|
r4187 | raise | ||
return True | ||||
|
r6133 | @celerylib.task | ||
|
r6134 | @celerylib.dbsession | ||
|
r4187 | def create_repo_fork(form_data, cur_user): | ||
""" | ||||
Creates a fork of repository using interval VCS methods | ||||
:param form_data: | ||||
:param cur_user: | ||||
""" | ||||
from kallithea.model.repo import RepoModel | ||||
|
r6134 | DBS = celerylib.get_session() | ||
|
r4187 | |||
base_path = Repository.base_path() | ||||
|
r6423 | cur_user = User.guess_instance(cur_user) | ||
|
r4187 | |||
repo_name = form_data['repo_name'] # fork in this case | ||||
repo_name_full = form_data['repo_name_full'] | ||||
repo_type = form_data['repo_type'] | ||||
owner = cur_user | ||||
private = form_data['private'] | ||||
clone_uri = form_data.get('clone_uri') | ||||
repo_group = form_data['repo_group'] | ||||
landing_rev = form_data['landing_rev'] | ||||
copy_fork_permissions = form_data.get('copy_permissions') | ||||
try: | ||||
|
r6424 | fork_of = Repository.guess_instance(form_data.get('fork_parent_id')) | ||
|
r4187 | |||
|
r6483 | RepoModel()._create_repo( | ||
|
r4187 | repo_name=repo_name_full, | ||
repo_type=repo_type, | ||||
description=form_data['description'], | ||||
owner=owner, | ||||
private=private, | ||||
clone_uri=clone_uri, | ||||
repo_group=repo_group, | ||||
landing_rev=landing_rev, | ||||
fork_of=fork_of, | ||||
copy_fork_permissions=copy_fork_permissions | ||||
) | ||||
action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full, | ||||
|
r6480 | fork_of.repo_name, '') | ||
|
r4187 | DBS.commit() | ||
source_repo_path = os.path.join(base_path, fork_of.repo_name) | ||||
# now create this repo on Filesystem | ||||
|
r6483 | RepoModel()._create_filesystem_repo( | ||
|
r4187 | repo_name=repo_name, | ||
repo_type=repo_type, | ||||
|
r6426 | repo_group=RepoGroup.guess_instance(repo_group), | ||
|
r4187 | clone_uri=source_repo_path, | ||
) | ||||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
log_create_repository(repo.get_dict(), created_by=owner.username) | ||||
# update repo changeset caches initially | ||||
repo.update_changeset_cache() | ||||
# set new created state | ||||
repo.set_state(Repository.STATE_CREATED) | ||||
DBS.commit() | ||||
|
r5374 | except Exception as e: | ||
|
r4792 | log.warning('Exception %s occurred when forking repository, ' | ||
|
r4187 | 'doing cleanup...' % e) | ||
|
r6789 | # rollback things manually ! | ||
|
r4187 | repo = Repository.get_by_repo_name(repo_name_full) | ||
if repo: | ||||
Repository.delete(repo.repo_id) | ||||
DBS.commit() | ||||
|
r6483 | RepoModel()._delete_filesystem_repo(repo) | ||
|
r4187 | raise | ||
return True | ||||
def __get_codes_stats(repo_name): | ||||
from kallithea.config.conf import LANGUAGES_EXTENSIONS_MAP | ||||
repo = Repository.get_by_repo_name(repo_name).scm_instance | ||||
tip = repo.get_changeset() | ||||
code_stats = {} | ||||
def aggregate(cs): | ||||
for f in cs[2]: | ||||
ext = lower(f.extension) | ||||
if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary: | ||||
if ext in code_stats: | ||||
code_stats[ext] += 1 | ||||
else: | ||||
code_stats[ext] = 1 | ||||
map(aggregate, tip.walk('/')) | ||||
return code_stats or {} | ||||