tasks.py
270 lines
| 9.3 KiB
| text/x-python
|
PythonLexer
r467 | from celery.decorators import task | |||
r474 | from celery.task.sets import subtask | |||
r483 | from celeryconfig import PYLONS_CONFIG as config | |||
r474 | from pylons.i18n.translation import _ | |||
r502 | from pylons_app.lib.celerylib import run_task, locked_task | |||
r467 | from pylons_app.lib.helpers import person | |||
r474 | from pylons_app.lib.smtp_mailer import SmtpMailer | |||
r467 | from pylons_app.lib.utils import OrderedDict | |||
r487 | from operator import itemgetter | |||
from vcs.backends.hg import MercurialRepository | ||||
r467 | from time import mktime | |||
r474 | import traceback | |||
r486 | import json | |||
r474 | ||||
__all__ = ['whoosh_index', 'get_commits_stats', | ||||
'reset_user_password', 'send_email'] | ||||
def get_session(): | ||||
from sqlalchemy import engine_from_config | ||||
from sqlalchemy.orm import sessionmaker, scoped_session | ||||
engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.') | ||||
sa = scoped_session(sessionmaker(bind=engine)) | ||||
return sa | ||||
r467 | ||||
r474 | def get_hg_settings(): | |||
from pylons_app.model.db import HgAppSettings | ||||
try: | ||||
sa = get_session() | ||||
ret = sa.query(HgAppSettings).all() | ||||
finally: | ||||
sa.remove() | ||||
if not ret: | ||||
raise Exception('Could not get application settings !') | ||||
settings = {} | ||||
for each in ret: | ||||
settings['hg_app_' + each.app_settings_name] = each.app_settings_value | ||||
return settings | ||||
r467 | ||||
r474 | def get_hg_ui_settings(): | |||
from pylons_app.model.db import HgAppUi | ||||
try: | ||||
sa = get_session() | ||||
ret = sa.query(HgAppUi).all() | ||||
finally: | ||||
sa.remove() | ||||
if not ret: | ||||
raise Exception('Could not get application ui settings !') | ||||
settings = {} | ||||
for each in ret: | ||||
k = each.ui_key | ||||
v = each.ui_value | ||||
if k == '/': | ||||
k = 'root_path' | ||||
if k.find('.') != -1: | ||||
k = k.replace('.', '_') | ||||
if each.ui_section == 'hooks': | ||||
v = each.ui_active | ||||
settings[each.ui_section + '_' + k] = v | ||||
return settings | ||||
@task | ||||
def whoosh_index(repo_location, full_index): | ||||
log = whoosh_index.get_logger() | ||||
r497 | from pylons_app.lib.pidlock import DaemonLock | |||
r474 | from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld | |||
r467 | try: | |||
l = DaemonLock() | ||||
WhooshIndexingDaemon(repo_location=repo_location)\ | ||||
.run(full_index=full_index) | ||||
l.release() | ||||
return 'Done' | ||||
except LockHeld: | ||||
log.info('LockHeld') | ||||
return 'LockHeld' | ||||
r497 | ||||
r474 | @task | |||
r502 | @locked_task | |||
r493 | def get_commits_stats(repo_name, ts_min_y, ts_max_y): | |||
author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty | ||||
from pylons_app.model.db import Statistics, Repository | ||||
r474 | log = get_commits_stats.get_logger() | |||
r493 | commits_by_day_author_aggregate = {} | |||
commits_by_day_aggregate = {} | ||||
r483 | repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '') | |||
r493 | repo = MercurialRepository(repos_path + repo_name) | |||
skip_date_limit = True | ||||
r497 | parse_limit = 350 #limit for single task changeset parsing | |||
r493 | last_rev = 0 | |||
last_cs = None | ||||
timegetter = itemgetter('time') | ||||
sa = get_session() | ||||
r486 | ||||
r493 | dbrepo = sa.query(Repository)\ | |||
.filter(Repository.repo_name == repo_name).scalar() | ||||
cur_stats = sa.query(Statistics)\ | ||||
.filter(Statistics.repository == dbrepo).scalar() | ||||
if cur_stats: | ||||
last_rev = cur_stats.stat_on_revision | ||||
r467 | ||||
r493 | if last_rev == repo.revisions[-1]: | |||
#pass silently without any work | ||||
return True | ||||
r487 | ||||
r493 | if cur_stats: | |||
commits_by_day_aggregate = OrderedDict( | ||||
json.loads( | ||||
cur_stats.commit_activity_combined)) | ||||
commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity) | ||||
for cnt, rev in enumerate(repo.revisions[last_rev:]): | ||||
last_cs = cs = repo.get_changeset(rev) | ||||
r467 | k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1], | |||
cs.date.timetuple()[2]) | ||||
timetupple = [int(x) for x in k.split('-')] | ||||
timetupple.extend([0 for _ in xrange(6)]) | ||||
k = mktime(timetupple) | ||||
r493 | if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)): | |||
try: | ||||
l = [timegetter(x) for x in commits_by_day_author_aggregate\ | ||||
[author_key_cleaner(cs.author)]['data']] | ||||
time_pos = l.index(k) | ||||
except ValueError: | ||||
time_pos = False | ||||
if time_pos >= 0 and time_pos is not False: | ||||
datadict = commits_by_day_author_aggregate\ | ||||
[author_key_cleaner(cs.author)]['data'][time_pos] | ||||
datadict["commits"] += 1 | ||||
datadict["added"] += len(cs.added) | ||||
datadict["changed"] += len(cs.changed) | ||||
datadict["removed"] += len(cs.removed) | ||||
#print datadict | ||||
r467 | ||||
else: | ||||
r493 | #print 'ELSE !!!!' | |||
r487 | if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | |||
r493 | ||||
datadict = {"time":k, | ||||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
} | ||||
commits_by_day_author_aggregate\ | ||||
[author_key_cleaner(cs.author)]['data'].append(datadict) | ||||
r467 | ||||
else: | ||||
r493 | #print k, 'nokey ADDING' | |||
r487 | if k >= ts_min_y and k <= ts_max_y or skip_date_limit: | |||
r493 | commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = { | |||
"label":author_key_cleaner(cs.author), | ||||
"data":[{"time":k, | ||||
"commits":1, | ||||
"added":len(cs.added), | ||||
"changed":len(cs.changed), | ||||
"removed":len(cs.removed), | ||||
}], | ||||
"schema":["commits"], | ||||
} | ||||
r467 | ||||
r493 | # #gather all data by day | |||
if commits_by_day_aggregate.has_key(k): | ||||
commits_by_day_aggregate[k] += 1 | ||||
r486 | else: | |||
r493 | commits_by_day_aggregate[k] = 1 | |||
if cnt >= parse_limit: | ||||
#don't fetch to much data since we can freeze application | ||||
break | ||||
r486 | overview_data = [] | |||
r493 | for k, v in commits_by_day_aggregate.items(): | |||
r486 | overview_data.append([k, v]) | |||
r487 | overview_data = sorted(overview_data, key=itemgetter(0)) | |||
r493 | if not commits_by_day_author_aggregate: | |||
commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = { | ||||
r486 | "label":author_key_cleaner(repo.contact), | |||
"data":[0, 1], | ||||
"schema":["commits"], | ||||
} | ||||
r493 | ||||
stats = cur_stats if cur_stats else Statistics() | ||||
stats.commit_activity = json.dumps(commits_by_day_author_aggregate) | ||||
stats.commit_activity_combined = json.dumps(overview_data) | ||||
stats.repository = dbrepo | ||||
stats.stat_on_revision = last_cs.revision | ||||
stats.languages = json.dumps({'_TOTAL_':0, '':0}) | ||||
try: | ||||
sa.add(stats) | ||||
sa.commit() | ||||
except: | ||||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
return False | ||||
r497 | ||||
run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) | ||||
r493 | return True | |||
r474 | ||||
@task | ||||
def reset_user_password(user_email): | ||||
log = reset_user_password.get_logger() | ||||
from pylons_app.lib import auth | ||||
from pylons_app.model.db import User | ||||
try: | ||||
try: | ||||
sa = get_session() | ||||
user = sa.query(User).filter(User.email == user_email).scalar() | ||||
new_passwd = auth.PasswordGenerator().gen_password(8, | ||||
auth.PasswordGenerator.ALPHABETS_BIG_SMALL) | ||||
r493 | if user: | |||
user.password = auth.get_crypt_password(new_passwd) | ||||
sa.add(user) | ||||
sa.commit() | ||||
log.info('change password for %s', user_email) | ||||
r474 | if new_passwd is None: | |||
raise Exception('unable to generate new password') | ||||
except: | ||||
log.error(traceback.format_exc()) | ||||
sa.rollback() | ||||
run_task(send_email, user_email, | ||||
"Your new hg-app password", | ||||
'Your new hg-app password:%s' % (new_passwd)) | ||||
log.info('send new password mail to %s', user_email) | ||||
except: | ||||
log.error('Failed to update user password') | ||||
log.error(traceback.format_exc()) | ||||
return True | ||||
@task | ||||
def send_email(recipients, subject, body): | ||||
log = send_email.get_logger() | ||||
email_config = dict(config.items('DEFAULT')) | ||||
mail_from = email_config.get('app_email_from') | ||||
user = email_config.get('smtp_username') | ||||
passwd = email_config.get('smtp_password') | ||||
mail_server = email_config.get('smtp_server') | ||||
mail_port = email_config.get('smtp_port') | ||||
tls = email_config.get('smtp_use_tls') | ||||
ssl = False | ||||
try: | ||||
r483 | m = SmtpMailer(mail_from, user, passwd, mail_server, | |||
r474 | mail_port, ssl, tls) | |||
m.send(recipients, subject, body) | ||||
except: | ||||
log.error('Mail sending failed') | ||||
log.error(traceback.format_exc()) | ||||
return False | ||||
return True | ||||