diff --git a/celeryconfig.py b/celeryconfig.py --- a/celeryconfig.py +++ b/celeryconfig.py @@ -16,6 +16,8 @@ CELERY_IMPORTS = ("pylons_app.lib.celery ## Result store settings. CELERY_RESULT_BACKEND = "database" CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url'] +CELERY_RESULT_SERIALIZER = 'json' + BROKER_CONNECTION_MAX_RETRIES = 30 @@ -36,7 +38,37 @@ CELERYD_CONCURRENCY = 2 CELERYD_LOG_LEVEL = "DEBUG" CELERYD_MAX_TASKS_PER_CHILD = 1 -#CELERY_ALWAYS_EAGER = True -#rabbitmqctl add_user rabbitmq qweqwe -#rabbitmqctl add_vhost rabbitmqhost -#rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*" +#Tasks will never be sent to the queue, but executed locally instead. +CELERY_ALWAYS_EAGER = False + +#=============================================================================== +# EMAIL SETTINGS +#=============================================================================== +pylons_email_config = dict(config.items('DEFAULT')) + +CELERY_SEND_TASK_ERROR_EMAILS = True + +#List of (name, email_address) tuples for the admins that should receive error e-mails. +ADMINS = [('Administrator', pylons_email_config.get('email_to'))] + +#The e-mail address this worker sends e-mails from. Default is "celery@localhost". +SERVER_EMAIL = pylons_email_config.get('error_email_from') + +#The mail server to use. Default is "localhost". +MAIL_HOST = pylons_email_config.get('smtp_server') + +#Username (if required) to log on to the mail server with. +MAIL_HOST_USER = pylons_email_config.get('smtp_username') + +#Password (if required) to log on to the mail server with. +MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password') + +MAIL_PORT = pylons_email_config.get('smtp_port') + + +#=============================================================================== +# INSTRUCTIONS FOR RABBITMQ +#=============================================================================== +# rabbitmqctl add_user rabbitmq qweqwe +# rabbitmqctl add_vhost rabbitmqhost +# rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*" diff --git a/pylons_app/controllers/summary.py b/pylons_app/controllers/summary.py --- a/pylons_app/controllers/summary.py +++ b/pylons_app/controllers/summary.py @@ -27,9 +27,13 @@ from pylons_app.lib.auth import LoginReq from pylons_app.lib.base import BaseController, render from pylons_app.lib.utils import OrderedDict from pylons_app.model.hg_model import HgModel +from pylons_app.model.db import Statistics from webhelpers.paginate import Page from pylons_app.lib.celerylib import run_task from pylons_app.lib.celerylib.tasks import get_commits_stats +from datetime import datetime, timedelta +from time import mktime +import calendar import logging log = logging.getLogger(__name__) @@ -61,11 +65,32 @@ class SummaryController(BaseController): for name, hash in c.repo_info.branches.items()[:10]: c.repo_branches[name] = c.repo_info.get_changeset(hash) - task = run_task(get_commits_stats, c.repo_info.name) - c.ts_min = task.result[0] - c.ts_max = task.result[1] - c.commit_data = task.result[2] - c.overview_data = task.result[3] + td = datetime.today() + timedelta(days=1) + y, m, d = td.year, td.month, td.day + + ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month, + d, 0, 0, 0, 0, 0, 0,)) + ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month, + d, 0, 0, 0, 0, 0, 0,)) + + ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,)) + + run_task(get_commits_stats, c.repo_info.name, ts_min_y, ts_max_y) + c.ts_min = ts_min_m + c.ts_max = ts_max_y + + + stats = self.sa.query(Statistics)\ + .filter(Statistics.repository == c.repo_info.dbrepo)\ + .scalar() + + if stats: + c.commit_data = stats.commit_activity + c.overview_data = stats.commit_activity_combined + else: + import json + c.commit_data = json.dumps({}) + c.overview_data = json.dumps([[ts_min_y, 0], [ts_max_y, 0] ]) return render('summary/summary.html') diff --git a/pylons_app/lib/celerylib/tasks.py b/pylons_app/lib/celerylib/tasks.py --- a/pylons_app/lib/celerylib/tasks.py +++ b/pylons_app/lib/celerylib/tasks.py @@ -1,7 +1,6 @@ from celery.decorators import task from celery.task.sets import subtask from celeryconfig import PYLONS_CONFIG as config -from datetime import datetime, timedelta from pylons.i18n.translation import _ from pylons_app.lib.celerylib import run_task from pylons_app.lib.helpers import person @@ -10,7 +9,6 @@ from pylons_app.lib.utils import Ordered from operator import itemgetter from vcs.backends.hg import MercurialRepository from time import mktime -import calendar import traceback import json @@ -83,94 +81,132 @@ def whoosh_index(repo_location, full_ind return 'LockHeld' @task -def get_commits_stats(repo): +def get_commits_stats(repo_name, ts_min_y, ts_max_y): + author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty + + from pylons_app.model.db import Statistics, Repository log = get_commits_stats.get_logger() - aggregate = OrderedDict() - overview_aggregate = OrderedDict() + commits_by_day_author_aggregate = {} + commits_by_day_aggregate = {} repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '') - repo = MercurialRepository(repos_path + repo) - #graph range - td = datetime.today() + timedelta(days=1) - y, m, d = td.year, td.month, td.day + repo = MercurialRepository(repos_path + repo_name) + + skip_date_limit = True + parse_limit = 500 #limit for single task changeset parsing + last_rev = 0 + last_cs = None + timegetter = itemgetter('time') + + sa = get_session() - ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month, - d, 0, 0, 0, 0, 0, 0,)) - ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month, - d, 0, 0, 0, 0, 0, 0,)) + dbrepo = sa.query(Repository)\ + .filter(Repository.repo_name == repo_name).scalar() + cur_stats = sa.query(Statistics)\ + .filter(Statistics.repository == dbrepo).scalar() + if cur_stats: + last_rev = cur_stats.stat_on_revision - ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,)) - skip_date_limit = True + if last_rev == repo.revisions[-1]: + #pass silently without any work + return True - def author_key_cleaner(k): - k = person(k) - k = k.replace('"', "") #for js data compatibilty - return k - - for cs in repo[:200]:#added limit 200 until fix #29 is made + if cur_stats: + commits_by_day_aggregate = OrderedDict( + json.loads( + cur_stats.commit_activity_combined)) + commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity) + + for cnt, rev in enumerate(repo.revisions[last_rev:]): + last_cs = cs = repo.get_changeset(rev) k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], cs.date.timetuple()[2]) timetupple = [int(x) for x in k.split('-')] timetupple.extend([0 for _ in xrange(6)]) k = mktime(timetupple) - if aggregate.has_key(author_key_cleaner(cs.author)): - if aggregate[author_key_cleaner(cs.author)].has_key(k): - aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1 - aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added) - aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed) - aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed) + if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)): + try: + l = [timegetter(x) for x in commits_by_day_author_aggregate\ + [author_key_cleaner(cs.author)]['data']] + time_pos = l.index(k) + except ValueError: + time_pos = False + + if time_pos >= 0 and time_pos is not False: + + datadict = commits_by_day_author_aggregate\ + [author_key_cleaner(cs.author)]['data'][time_pos] + + datadict["commits"] += 1 + datadict["added"] += len(cs.added) + datadict["changed"] += len(cs.changed) + datadict["removed"] += len(cs.removed) + #print datadict else: - #aggregate[author_key_cleaner(cs.author)].update(dates_range) + #print 'ELSE !!!!' if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - aggregate[author_key_cleaner(cs.author)][k] = {} - aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 - aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) - aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) - aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) + + datadict = {"time":k, + "commits":1, + "added":len(cs.added), + "changed":len(cs.changed), + "removed":len(cs.removed), + } + commits_by_day_author_aggregate\ + [author_key_cleaner(cs.author)]['data'].append(datadict) else: + #print k, 'nokey ADDING' if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - aggregate[author_key_cleaner(cs.author)] = OrderedDict() - #aggregate[author_key_cleaner(cs.author)].update(dates_range) - aggregate[author_key_cleaner(cs.author)][k] = {} - aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1 - aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added) - aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed) - aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) + commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = { + "label":author_key_cleaner(cs.author), + "data":[{"time":k, + "commits":1, + "added":len(cs.added), + "changed":len(cs.changed), + "removed":len(cs.removed), + }], + "schema":["commits"], + } - - if overview_aggregate.has_key(k): - overview_aggregate[k] += 1 +# #gather all data by day + if commits_by_day_aggregate.has_key(k): + commits_by_day_aggregate[k] += 1 else: - overview_aggregate[k] = 1 - + commits_by_day_aggregate[k] = 1 + + if cnt >= parse_limit: + #don't fetch to much data since we can freeze application + break + overview_data = [] - for k, v in overview_aggregate.items(): + for k, v in commits_by_day_aggregate.items(): overview_data.append([k, v]) overview_data = sorted(overview_data, key=itemgetter(0)) - data = {} - for author in aggregate: - commit_data = sorted([{"time":x, - "commits":aggregate[author][x]['commits'], - "added":aggregate[author][x]['added'], - "changed":aggregate[author][x]['changed'], - "removed":aggregate[author][x]['removed'], - } for x in aggregate[author]], - key=itemgetter('time')) - data[author] = {"label":author, - "data":commit_data, - "schema":["commits"] - } - - if not data: - data[author_key_cleaner(repo.contact)] = { + if not commits_by_day_author_aggregate: + commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = { "label":author_key_cleaner(repo.contact), "data":[0, 1], "schema":["commits"], } - - return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data)) + + stats = cur_stats if cur_stats else Statistics() + stats.commit_activity = json.dumps(commits_by_day_author_aggregate) + stats.commit_activity_combined = json.dumps(overview_data) + stats.repository = dbrepo + stats.stat_on_revision = last_cs.revision + stats.languages = json.dumps({'_TOTAL_':0, '':0}) + + try: + sa.add(stats) + sa.commit() + except: + log.error(traceback.format_exc()) + sa.rollback() + return False + + return True @task def reset_user_password(user_email): @@ -184,10 +220,11 @@ def reset_user_password(user_email): user = sa.query(User).filter(User.email == user_email).scalar() new_passwd = auth.PasswordGenerator().gen_password(8, auth.PasswordGenerator.ALPHABETS_BIG_SMALL) - user.password = auth.get_crypt_password(new_passwd) - sa.add(user) - sa.commit() - log.info('change password for %s', user_email) + if user: + user.password = auth.get_crypt_password(new_passwd) + sa.add(user) + sa.commit() + log.info('change password for %s', user_email) if new_passwd is None: raise Exception('unable to generate new password') diff --git a/pylons_app/model/db.py b/pylons_app/model/db.py --- a/pylons_app/model/db.py +++ b/pylons_app/model/db.py @@ -120,6 +120,15 @@ class UserToPerm(Base): user = relation('User') permission = relation('Permission') - +class Statistics(Base): + __tablename__ = 'statistics' + __table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True}) + stat_id = Column("stat_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True) + repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=True, default=None) + stat_on_revision = Column("stat_on_revision", INTEGER(), nullable=False) + commit_activity = Column("commit_activity", BLOB(), nullable=False)#JSON data + commit_activity_combined = Column("commit_activity_combined", BLOB(), nullable=False)#JSON data + languages = Column("languages", BLOB(), nullable=False)#JSON data + + repository = relation('Repository') - diff --git a/pylons_app/templates/summary/summary.html b/pylons_app/templates/summary/summary.html --- a/pylons_app/templates/summary/summary.html +++ b/pylons_app/templates/summary/summary.html @@ -123,7 +123,7 @@ E.onDOMReady(function(e){