##// END OF EJS Templates
Add Celery upstart file and fix post-stop for rhodecode upstart
Add Celery upstart file and fix post-stop for rhodecode upstart

File last commit:

r1807:1635a214 beta
r1811:58df0b3e beta
Show More
tasks.py
418 lines | 14.3 KiB | text/x-python | PythonLexer
code docs, updates
r903 # -*- coding: utf-8 -*-
"""
rhodecode.lib.celerylib.tasks
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
code docs, updates
r903
RhodeCode task modules, containing all task that suppose to be run
by celery daemon
source code cleanup: remove trailing white space, normalize file endings
r1203
code docs, updates
r903 :created_on: Oct 6, 2010
:author: marcink
source code cleanup: remove trailing white space, normalize file endings
r1203 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
code docs, updates
r903 :license: GPLv3, see COPYING for more details.
"""
fixed license issue #149
r1206 # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
source code cleanup: remove trailing white space, normalize file endings
r1203 #
code docs, updates
r903 # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
source code cleanup: remove trailing white space, normalize file endings
r1203 #
code docs, updates
r903 # You should have received a copy of the GNU General Public License
fixed license issue #149
r1206 # along with this program. If not, see <http://www.gnu.org/licenses/>.
renamed project to rhodecode
r547 from celery.decorators import task
added fault tolerant case when celeryconfig is not present in the directory....
r555
#50 on point cache invalidation changes....
r692 import os
import traceback
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 import logging
Notification fixes...
r1717 from os.path import join as jn
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002
#50 on point cache invalidation changes....
r692 from time import mktime
renamed project to rhodecode
r547 from operator import itemgetter
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244 from string import lower
Celery is configured by the .ini files and run from paster now...
r776
fixes #223 improve password reset form
r1417 from pylons import config, url
renamed project to rhodecode
r547 from pylons.i18n.translation import _
Celery is configured by the .ini files and run from paster now...
r776
#235 forking page repo group selection...
r1722 from vcs import get_backend
Notification fixes...
r1717
- refactoring to overcome poor usage of global pylons config...
r1723 from rhodecode import CELERY_ON
Unicode fixes, added safe_str method for global str() operations +better test sandboxing
r1401 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
__get_lockkey, LockHeld, DaemonLock
renamed project to rhodecode
r547 from rhodecode.lib.helpers import person
Notification fixes...
r1717 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
#235 forking page repo group selection...
r1722 from rhodecode.lib.utils import add_cache, action_logger
fixed issues with python2.5...
r1514 from rhodecode.lib.compat import json, OrderedDict
Celery is configured by the .ini files and run from paster now...
r776 from rhodecode.model import init_model
from rhodecode.model import meta
#235 forking page repo group selection...
r1722 from rhodecode.model.db import Statistics, Repository, User
Celery is configured by the .ini files and run from paster now...
r776
from sqlalchemy import engine_from_config
#50 on point cache invalidation changes....
r692
fixed cache problem,...
r777 add_cache(config)
renamed project to rhodecode
r547 __all__ = ['whoosh_index', 'get_commits_stats',
'reset_user_password', 'send_email']
#235 forking page repo group selection...
r1722
renamed project to rhodecode
r547 def get_session():
Celery is configured by the .ini files and run from paster now...
r776 if CELERY_ON:
engine = engine_from_config(config, 'sqlalchemy.db1.')
init_model(engine)
commit less models...
r1749 sa = meta.Session
renamed project to rhodecode
r547 return sa
Notification fixes...
r1717 def get_logger(cls):
if CELERY_ON:
try:
log = cls.get_logger()
except:
log = logging.getLogger(__name__)
else:
log = logging.getLogger(__name__)
return log
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244
- fixed issue with missing commits on some repos commands...
r1807
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 @task(ignore_result=True)
renamed project to rhodecode
r547 @locked_task
def whoosh_index(repo_location, full_index):
#235 forking page repo group selection...
r1722 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
- fixed issue with missing commits on some repos commands...
r1807 # log = whoosh_index.get_logger(whoosh_index)
#235 forking page repo group selection...
r1722
Celery is configured by the .ini files and run from paster now...
r776 index_location = config['index_dir']
fixed annotation bug, added history to annotation....
r662 WhooshIndexingDaemon(index_location=index_location,
fixed cache problem,...
r777 repo_location=repo_location, sa=get_session())\
.run(full_index=full_index)
renamed project to rhodecode
r547
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 @task(ignore_result=True)
renamed project to rhodecode
r547 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
Notification fixes...
r1717 log = get_logger(get_commits_stats)
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
ts_max_y)
fixes #258 RhodeCode 1.2 assumes egg folder is writable
r1540 lockkey_path = config['here']
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 log.info('running task with lockkey %s', lockkey)
- fixed issue with missing commits on some repos commands...
r1807
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 try:
Tests updates, Session refactoring
r1713 sa = get_session()
fixes #258 RhodeCode 1.2 assumes egg folder is writable
r1540 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
Code refactor for auth func, preparing for ldap support...
r699
#235 forking page repo group selection...
r1722 # for js data compatibilty cleans the key for person from '
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 akc = lambda k: person(k).replace('"', "")
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 co_day_auth_aggr = {}
commits_by_day_aggregate = {}
commit less models...
r1749 repo = Repository.get_by_repo_name(repo_name)
if repo is None:
return True
repo = repo.scm_instance
- fixed issue with missing commits on some repos commands...
r1807 repo_size = repo.count()
# return if repo have no revisions
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 if repo_size < 1:
lock.release()
return True
renamed project to rhodecode
r547
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 skip_date_limit = True
parse_limit = int(config['app_conf'].get('commit_parse_limit'))
- fixed issue with missing commits on some repos commands...
r1807 last_rev = None
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 last_cs = None
timegetter = itemgetter('time')
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 dbrepo = sa.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
cur_stats = sa.query(Statistics)\
.filter(Statistics.repository == dbrepo).scalar()
Added branch filter to repo pager...
r1105
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 if cur_stats is not None:
last_rev = cur_stats.stat_on_revision
Added branch filter to repo pager...
r1105
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 if last_rev == repo.get_changeset().revision and repo_size > 1:
#235 forking page repo group selection...
r1722 # pass silently without any work if we're not on first revision or
# current state of parsing revision(from db marker) is the
# last revision
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 lock.release()
return True
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 if cur_stats:
commits_by_day_aggregate = OrderedDict(json.loads(
cur_stats.commit_activity_combined))
co_day_auth_aggr = json.loads(cur_stats.commit_activity)
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 log.debug('starting parsing %s', parse_limit)
lmktime = mktime
- fixed issue with missing commits on some repos commands...
r1807 last_rev = last_rev + 1 if last_rev >= 0 else 0
log.debug('Getting revisions from %s to %s' % (
last_rev, last_rev + parse_limit)
)
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 for cs in repo[last_rev:last_rev + parse_limit]:
last_cs = cs # remember last parsed changeset
k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
moved statistics parse_limit into .ini files...
r1076
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 if akc(cs.author) in co_day_auth_aggr:
try:
l = [timegetter(x) for x in
co_day_auth_aggr[akc(cs.author)]['data']]
time_pos = l.index(k)
except ValueError:
time_pos = False
if time_pos >= 0 and time_pos is not False:
datadict = \
co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
code stats speed improvments
r804
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 datadict["commits"] += 1
datadict["added"] += len(cs.added)
datadict["changed"] += len(cs.changed)
datadict["removed"] += len(cs.removed)
else:
if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 datadict = {"time": k,
"commits": 1,
"added": len(cs.added),
"changed": len(cs.changed),
"removed": len(cs.removed),
}
co_day_auth_aggr[akc(cs.author)]['data']\
.append(datadict)
Code refactoring,models renames...
r629
renamed project to rhodecode
r547 else:
if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 co_day_auth_aggr[akc(cs.author)] = {
"label": akc(cs.author),
"data": [{"time":k,
"commits":1,
"added":len(cs.added),
"changed":len(cs.changed),
"removed":len(cs.removed),
}],
"schema": ["commits"],
}
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 #gather all data by day
if k in commits_by_day_aggregate:
commits_by_day_aggregate[k] += 1
else:
commits_by_day_aggregate[k] = 1
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 overview_data = sorted(commits_by_day_aggregate.items(),
key=itemgetter(0))
if not co_day_auth_aggr:
co_day_auth_aggr[akc(repo.contact)] = {
"label": akc(repo.contact),
"data": [0, 1],
"schema": ["commits"],
}
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 stats = cur_stats if cur_stats else Statistics()
stats.commit_activity = json.dumps(co_day_auth_aggr)
stats.commit_activity_combined = json.dumps(overview_data)
renamed project to rhodecode
r547
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 log.debug('last revison %s', last_rev)
leftovers = len(repo.revisions[last_rev:])
log.debug('revisions to parse %s', leftovers)
renamed project to rhodecode
r547
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 if last_rev == 0 or leftovers < parse_limit:
log.debug('getting code trending stats')
stats.languages = json.dumps(__get_codes_stats(repo_name))
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 try:
stats.repository = dbrepo
stats.stat_on_revision = last_cs.revision if last_cs else 0
sa.add(stats)
sa.commit()
except:
log.error(traceback.format_exc())
sa.rollback()
lock.release()
return False
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 #final release
lock.release()
Code refactoring,models renames...
r629
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264 #execute another task if celery is enabled
if len(repo.revisions) > 1 and CELERY_ON:
run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
return True
except LockHeld:
log.info('LockHeld')
return 'Task with key %s already running' % lockkey
renamed project to rhodecode
r547
fixes #223 improve password reset form
r1417 @task(ignore_result=True)
def send_password_link(user_email):
#235 forking page repo group selection...
r1722 from rhodecode.model.notification import EmailNotificationModel
Notification fixes...
r1717 log = get_logger(send_password_link)
fixes #223 improve password reset form
r1417
try:
sa = get_session()
Notification fixes...
r1717 user = User.get_by_email(user_email)
fixes #223 improve password reset form
r1417 if user:
Notification fixes...
r1717 log.debug('password reset user found %s' % user)
fixes #223 improve password reset form
r1417 link = url('reset_password_confirmation', key=user.api_key,
qualified=True)
Notification fixes...
r1717 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
body = EmailNotificationModel().get_email_tmpl(reg_type,
**{'user':user.short_contact,
'reset_url':link})
log.debug('sending email')
fixes #223 improve password reset form
r1417 run_task(send_email, user_email,
Notification fixes...
r1717 _("password reset link"), body)
fixes #223 improve password reset form
r1417 log.info('send new password mail to %s', user_email)
Notification fixes...
r1717 else:
log.debug("password reset email %s not found" % user_email)
fixes #223 improve password reset form
r1417 except:
log.error(traceback.format_exc())
return False
return True
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 @task(ignore_result=True)
renamed project to rhodecode
r547 def reset_user_password(user_email):
#235 forking page repo group selection...
r1722 from rhodecode.lib import auth
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002
#235 forking page repo group selection...
r1722 log = get_logger(reset_user_password)
Code refactoring,models renames...
r629
renamed project to rhodecode
r547 try:
try:
sa = get_session()
Notification fixes...
r1717 user = User.get_by_email(user_email)
renamed project to rhodecode
r547 new_passwd = auth.PasswordGenerator().gen_password(8,
auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
if user:
user.password = auth.get_crypt_password(new_passwd)
Added api_key into user, api key get's generated again after password change...
r1116 user.api_key = auth.generate_api_key(user.username)
renamed project to rhodecode
r547 sa.add(user)
sa.commit()
log.info('change password for %s', user_email)
if new_passwd is None:
raise Exception('unable to generate new password')
except:
log.error(traceback.format_exc())
sa.rollback()
Code refactoring,models renames...
r629
renamed project to rhodecode
r547 run_task(send_email, user_email,
Notification fixes...
r1717 'Your new password',
fixes #223 improve password reset form
r1417 'Your new RhodeCode password:%s' % (new_passwd))
renamed project to rhodecode
r547 log.info('send new password mail to %s', user_email)
Code refactoring,models renames...
r629
renamed project to rhodecode
r547 except:
log.error('Failed to update user password')
log.error(traceback.format_exc())
Celery is configured by the .ini files and run from paster now...
r776
renamed project to rhodecode
r547 return True
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 @task(ignore_result=True)
Notification fixes...
r1717 def send_email(recipients, subject, body, html_body=''):
fixes #59, notifications for user registrations + some changes to mailer
r689 """
Sends an email with defined parameters from the .ini files.
source code cleanup: remove trailing white space, normalize file endings
r1203
fixes #59, notifications for user registrations + some changes to mailer
r689 :param recipients: list of recipients, it this is empty the defined email
address from field 'email_to' is used instead
:param subject: subject of the mail
:param body: body of the mail
Notification fixes...
r1717 :param html_body: html version of body
fixes #59, notifications for user registrations + some changes to mailer
r689 """
Notification fixes...
r1717 log = get_logger(send_email)
- fixes celery sqlalchemy session issues for async forking...
r1728 sa = get_session()
Celery is configured by the .ini files and run from paster now...
r776 email_config = config
Notification fixes...
r1717 subject = "%s %s" % (email_config.get('email_prefix'), subject)
fixes #59, notifications for user registrations + some changes to mailer
r689 if not recipients:
implements #291 email notification sent to all admin users
r1612 # if recipients are not defined we send to email_config + all admins
Notification fixes...
r1717 admins = [u.email for u in User.query()
.filter(User.admin == True).all()]
implements #291 email notification sent to all admin users
r1612 recipients = [email_config.get('email_to')] + admins
fixes #59, notifications for user registrations + some changes to mailer
r689
Notification fixes...
r1717 mail_from = email_config.get('app_email_from', 'RhodeCode')
renamed project to rhodecode
r547 user = email_config.get('smtp_username')
passwd = email_config.get('smtp_password')
mail_server = email_config.get('smtp_server')
mail_port = email_config.get('smtp_port')
fixes #59, notifications for user registrations + some changes to mailer
r689 tls = str2bool(email_config.get('smtp_use_tls'))
ssl = str2bool(email_config.get('smtp_use_ssl'))
control mailer debug with the .ini file
r1169 debug = str2bool(config.get('debug'))
Les Peabody
applied smth_auth options update patch
r1581 smtp_auth = email_config.get('smtp_auth')
Code refactoring,models renames...
r629
renamed project to rhodecode
r547 try:
Added email sending test site
r1673 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
control mailer debug with the .ini file
r1169 mail_port, ssl, tls, debug=debug)
Notification fixes...
r1717 m.send(recipients, subject, body, html_body)
renamed project to rhodecode
r547 except:
log.error('Mail sending failed')
log.error(traceback.format_exc())
return False
return True
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)...
r1002 @task(ignore_result=True)
renamed project to rhodecode
r547 def create_repo_fork(form_data, cur_user):
#235 forking page repo group selection...
r1722 """
Creates a fork of repository using interval VCS methods
:param form_data:
:param cur_user:
"""
from rhodecode.model.repo import RepoModel
Notification fixes...
r1717 log = get_logger(create_repo_fork)
#235 forking page repo group selection...
r1722 Session = get_session()
base_path = Repository.base_path()
RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
alias = form_data['repo_type']
org_repo_name = form_data['org_path']
fixed fork journal entry
r1730 fork_name = form_data['repo_name_full']
added option to do a checkout after cloning a repository
r1742 update_after_clone = form_data['update_after_clone']
#235 forking page repo group selection...
r1722 source_repo_path = os.path.join(base_path, org_repo_name)
fixed fork journal entry
r1730 destination_fork_path = os.path.join(base_path, fork_name)
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
r1264
#235 forking page repo group selection...
r1722 log.info('creating fork of %s as %s', source_repo_path,
destination_fork_path)
extended repo creation by repo type. fixed fork creation to maintain repo type.
r659 backend = get_backend(alias)
#235 forking page repo group selection...
r1722 backend(safe_str(destination_fork_path), create=True,
added option to do a checkout after cloning a repository
r1742 src_url=safe_str(source_repo_path),
update_after_clone=update_after_clone)
fixed fork journal entry
r1730 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
#235 forking page repo group selection...
r1722 org_repo_name, '', Session)
implements #193 journal stores information about deleting of repos...
r1747
action_logger(cur_user, 'user_created_fork:%s' % fork_name,
fork_name, '', Session)
#235 forking page repo group selection...
r1722 # finally commit at latest possible stage
Session.commit()
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244
renamed project to rhodecode
r547 def __get_codes_stats(repo_name):
#235 forking page repo group selection...
r1722 repo = Repository.get_by_repo_name(repo_name).scm_instance
removed soon deprecated walk method on repository instance
r603 tip = repo.get_changeset()
renamed project to rhodecode
r547 code_stats = {}
some small fixes
r630
def aggregate(cs):
for f in cs[2]:
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244 ext = lower(f.extension)
removed binary files from trending sources
r789 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244 if ext in code_stats:
code_stats[ext] += 1
renamed project to rhodecode
r547 else:
fixed issue #165 trending source files are now stored in cache as ext only, and translated to description only when displaying, so future changes of mappings will take affect right away....
r1244 code_stats[ext] = 1
Code refactoring,models renames...
r629
some small fixes
r630 map(aggregate, tip.walk('/'))
renamed project to rhodecode
r547 return code_stats or {}