diff --git a/celeryconfig.py b/celeryconfig.py deleted file mode 100644 --- a/celeryconfig.py +++ /dev/null @@ -1,79 +0,0 @@ -# List of modules to import when celery starts. -import sys -import os -import ConfigParser -root = os.getcwd() - -PYLONS_CONFIG_NAME = 'production.ini' - -sys.path.append(root) -config = ConfigParser.ConfigParser({'here':root}) -config.read('%s/%s' % (root, PYLONS_CONFIG_NAME)) -PYLONS_CONFIG = config - -CELERY_IMPORTS = ("rhodecode.lib.celerylib.tasks",) - -## Result store settings. -CELERY_RESULT_BACKEND = "amqp" -CELERY_AMQP_TASK_RESULT_EXPIRES = 18000 # 5 hours. - -#CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url'] -CELERY_RESULT_SERIALIZER = 'json' - - -BROKER_CONNECTION_MAX_RETRIES = 30 - -## Broker settings. -BROKER_HOST = "localhost" -BROKER_PORT = 5672 -BROKER_VHOST = "rabbitmqhost" -BROKER_USER = "rabbitmq" -BROKER_PASSWORD = "qweqwe" - -## Worker settings -## If you're doing mostly I/O you can have more processes, -## but if mostly spending CPU, try to keep it close to the -## number of CPUs on your machine. If not set, the number of CPUs/cores -## available will be used. -CELERYD_CONCURRENCY = 2 -# CELERYD_LOG_FILE = "celeryd.log" -CELERYD_LOG_LEVEL = "DEBUG" -CELERYD_MAX_TASKS_PER_CHILD = 3 - -#Tasks will never be sent to the queue, but executed locally instead. -CELERY_ALWAYS_EAGER = False -if PYLONS_CONFIG_NAME == 'test.ini': - #auto eager for tests - CELERY_ALWAYS_EAGER = True - -#=============================================================================== -# EMAIL SETTINGS -#=============================================================================== -pylons_email_config = dict(config.items('DEFAULT')) - -CELERY_SEND_TASK_ERROR_EMAILS = True - -#List of (name, email_address) tuples for the admins that should receive error e-mails. -ADMINS = [('Administrator', pylons_email_config.get('email_to'))] - -#The e-mail address this worker sends e-mails from. Default is "celery@localhost". -SERVER_EMAIL = pylons_email_config.get('error_email_from') - -#The mail server to use. Default is "localhost". -MAIL_HOST = pylons_email_config.get('smtp_server') - -#Username (if required) to log on to the mail server with. -MAIL_HOST_USER = pylons_email_config.get('smtp_username') - -#Password (if required) to log on to the mail server with. -MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password') - -MAIL_PORT = pylons_email_config.get('smtp_port') - - -#=============================================================================== -# INSTRUCTIONS FOR RABBITMQ -#=============================================================================== -# rabbitmqctl add_user rabbitmq qweqwe -# rabbitmqctl add_vhost rabbitmqhost -# rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*" diff --git a/development.ini b/development.ini --- a/development.ini +++ b/development.ini @@ -1,6 +1,6 @@ ################################################################################ ################################################################################ -# rhodecode - Pylons environment configuration # +# RhodeCode - Pylons environment configuration # # # # The %(here)s variable will be replaced with the parent directory of this file# ################################################################################ @@ -9,8 +9,8 @@ debug = true ################################################################################ ## Uncomment and replace with the address which should receive ## -## any error reports after application crash ## -## Additionally those settings will be used by rhodecode mailing system ## +## any error reports after application crash ## +## Additionally those settings will be used by RhodeCode mailing system ## ################################################################################ #email_to = admin@localhost #error_email_from = paste_error@localhost @@ -19,15 +19,16 @@ debug = true #smtp_server = mail.server.com #smtp_username = -#smtp_password = +#smtp_password = #smtp_port = -#smtp_use_tls = +#smtp_use_tls = false +#smtp_use_ssl = true [server:main] ##nr of threads to spawn threadpool_workers = 5 -##max request before +##max request before thread respawn threadpool_max_requests = 6 ##option to use threads of process @@ -46,6 +47,33 @@ cache_dir = %(here)s/data index_dir = %(here)s/data/index #################################### +### CELERY CONFIG #### +#################################### +use_celery = false +broker.host = localhost +broker.vhost = rabbitmqhost +broker.port = 5672 +broker.user = rabbitmq +broker.password = qweqwe + +celery.imports = rhodecode.lib.celerylib.tasks + +celery.result.backend = amqp +celery.result.dburi = amqp:// +celery.result.serialier = json + +#celery.send.task.error.emails = true +#celery.amqp.task.result.expires = 18000 + +celeryd.concurrency = 2 +#celeryd.log.file = celeryd.log +celeryd.log.level = debug +celeryd.max.tasks.per.child = 3 + +#tasks will never be sent to the queue, but executed locally instead. +celery.always.eager = false + +#################################### ### BEAKER CACHE #### #################################### beaker.cache.data_dir=/%(here)s/data/cache/data @@ -61,9 +89,8 @@ beaker.cache.short_term.expire=60 beaker.cache.long_term.type=memory beaker.cache.long_term.expire=36000 - beaker.cache.sql_cache_short.type=memory -beaker.cache.sql_cache_short.expire=5 +beaker.cache.sql_cache_short.expire=10 beaker.cache.sql_cache_med.type=memory beaker.cache.sql_cache_med.expire=360 @@ -75,7 +102,7 @@ beaker.cache.sql_cache_long.expire=3600 ### BEAKER SESSION #### #################################### ## Type of storage used for the session, current types are -## "dbm", "file", "memcached", "database", and "memory". +## dbm, file, memcached, database, and memory. ## The storage uses the Container API ##that is also used by the cache system. beaker.session.type = file diff --git a/production.ini b/production.ini --- a/production.ini +++ b/production.ini @@ -47,6 +47,33 @@ cache_dir = %(here)s/data index_dir = %(here)s/data/index #################################### +### CELERY CONFIG #### +#################################### +use_celery = false +broker.host = localhost +broker.vhost = rabbitmqhost +broker.port = 5672 +broker.user = rabbitmq +broker.password = qweqwe + +celery.imports = rhodecode.lib.celerylib.tasks + +celery.result.backend = amqp +celery.result.dburi = amqp:// +celery.result.serialier = json + +#celery.send.task.error.emails = true +#celery.amqp.task.result.expires = 18000 + +celeryd.concurrency = 2 +#celeryd.log.file = celeryd.log +celeryd.log.level = debug +celeryd.max.tasks.per.child = 3 + +#tasks will never be sent to the queue, but executed locally instead. +celery.always.eager = false + +#################################### ### BEAKER CACHE #### #################################### beaker.cache.data_dir=/%(here)s/data/cache/data diff --git a/rhodecode/config/deployment.ini_tmpl b/rhodecode/config/deployment.ini_tmpl --- a/rhodecode/config/deployment.ini_tmpl +++ b/rhodecode/config/deployment.ini_tmpl @@ -1,6 +1,6 @@ ################################################################################ ################################################################################ -# rhodecode - Pylons environment configuration # +# RhodeCode - Pylons environment configuration # # # # The %(here)s variable will be replaced with the parent directory of this file# ################################################################################ @@ -10,7 +10,7 @@ debug = true ################################################################################ ## Uncomment and replace with the address which should receive ## ## any error reports after application crash ## -## Additionally those settings will be used by rhodecode mailing system ## +## Additionally those settings will be used by RhodeCode mailing system ## ################################################################################ #email_to = admin@localhost #error_email_from = paste_error@localhost @@ -48,6 +48,33 @@ index_dir = %(here)s/data/index app_instance_uuid = ${app_instance_uuid} #################################### +### CELERY CONFIG #### +#################################### +use_celery = false +broker.host = localhost +broker.vhost = rabbitmqhost +broker.port = 5672 +broker.user = rabbitmq +broker.password = qweqwe + +celery.imports = rhodecode.lib.celerylib.tasks + +celery.result.backend = amqp +celery.result.dburi = amqp:// +celery.result.serialier = json + +#celery.send.task.error.emails = true +#celery.amqp.task.result.expires = 18000 + +celeryd.concurrency = 2 +#celeryd.log.file = celeryd.log +celeryd.log.level = debug +celeryd.max.tasks.per.child = 3 + +#tasks will never be sent to the queue, but executed locally instead. +celery.always.eager = false + +#################################### ### BEAKER CACHE #### #################################### beaker.cache.data_dir=/%(here)s/data/cache/data @@ -64,7 +91,7 @@ beaker.cache.long_term.type=memory beaker.cache.long_term.expire=36000 beaker.cache.sql_cache_short.type=memory -beaker.cache.sql_cache_short.expire=5 +beaker.cache.sql_cache_short.expire=10 beaker.cache.sql_cache_med.type=memory beaker.cache.sql_cache_med.expire=360 diff --git a/rhodecode/lib/celerylib/__init__.py b/rhodecode/lib/celerylib/__init__.py --- a/rhodecode/lib/celerylib/__init__.py +++ b/rhodecode/lib/celerylib/__init__.py @@ -1,37 +1,47 @@ +import os +import sys +import socket +import traceback +import logging + from rhodecode.lib.pidlock import DaemonLock, LockHeld from vcs.utils.lazy import LazyProperty from decorator import decorator -import logging -import os -import sys -import traceback from hashlib import md5 -import socket +from pylons import config + log = logging.getLogger(__name__) +def str2bool(v): + return v.lower() in ["yes", "true", "t", "1"] if v else None + +CELERY_ON = str2bool(config['app_conf'].get('use_celery')) + class ResultWrapper(object): def __init__(self, task): self.task = task - + @LazyProperty def result(self): return self.task def run_task(task, *args, **kwargs): - try: - t = task.delay(*args, **kwargs) - log.info('running task %s', t.task_id) - return t - except socket.error, e: - if e.errno == 111: - log.debug('Unable to connect to celeryd. Sync execution') - else: - log.error(traceback.format_exc()) - except KeyError, e: - log.debug('Unable to connect to celeryd. Sync execution') - except Exception, e: - log.error(traceback.format_exc()) - + if CELERY_ON: + try: + t = task.delay(*args, **kwargs) + log.info('running task %s:%s', t.task_id, task) + return t + except socket.error, e: + if e.errno == 111: + log.debug('Unable to connect to celeryd. Sync execution') + else: + log.error(traceback.format_exc()) + except KeyError, e: + log.debug('Unable to connect to celeryd. Sync execution') + except Exception, e: + log.error(traceback.format_exc()) + + log.debug('executing task %s in sync mode', task) return ResultWrapper(task(*args, **kwargs)) @@ -39,7 +49,7 @@ def locked_task(func): def __wrapper(func, *fargs, **fkwargs): params = list(fargs) params.extend(['%s-%s' % ar for ar in fkwargs.items()]) - + lockkey = 'task_%s' % \ md5(str(func.__name__) + '-' + \ '-'.join(map(str, params))).hexdigest() @@ -51,14 +61,14 @@ def locked_task(func): return ret except LockHeld: log.info('LockHeld') - return 'Task with key %s already running' % lockkey + return 'Task with key %s already running' % lockkey - return decorator(__wrapper, func) - + return decorator(__wrapper, func) + + - - - - - - + + + + + diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -2,16 +2,24 @@ from celery.decorators import task import os import traceback +import beaker from time import mktime - from operator import itemgetter + +from pylons import config from pylons.i18n.translation import _ -from rhodecode.lib.celerylib import run_task, locked_task + +from rhodecode.lib.celerylib import run_task, locked_task, str2bool from rhodecode.lib.helpers import person from rhodecode.lib.smtp_mailer import SmtpMailer from rhodecode.lib.utils import OrderedDict +from rhodecode.model import init_model +from rhodecode.model import meta +from rhodecode.model.db import RhodeCodeUi + from vcs.backends import get_repo -from rhodecode.model.db import RhodeCodeUi + +from sqlalchemy import engine_from_config try: import json @@ -19,31 +27,16 @@ except ImportError: #python 2.5 compatibility import simplejson as json -try: - from celeryconfig import PYLONS_CONFIG as config - celery_on = True -except ImportError: - #if celeryconfig is not present let's just load our pylons - #config instead - from pylons import config - celery_on = False - - __all__ = ['whoosh_index', 'get_commits_stats', 'reset_user_password', 'send_email'] +CELERY_ON = str2bool(config['app_conf'].get('use_celery')) + def get_session(): - if celery_on: - from sqlalchemy import engine_from_config - from sqlalchemy.orm import sessionmaker, scoped_session - engine = engine_from_config(dict(config.items('app:main')), - 'sqlalchemy.db1.') - sa = scoped_session(sessionmaker(bind=engine)) - else: - #If we don't use celery reuse our current application Session - from rhodecode.model.meta import Session - sa = Session() - + if CELERY_ON: + engine = engine_from_config(config, 'sqlalchemy.db1.') + init_model(engine) + sa = meta.Session() return sa def get_repos_path(): @@ -56,7 +49,7 @@ def get_repos_path(): def whoosh_index(repo_location, full_index): log = whoosh_index.get_logger() from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon - index_location = dict(config.items('app:main'))['index_dir'] + index_location = config['index_dir'] WhooshIndexingDaemon(index_location=index_location, repo_location=repo_location).run(full_index=full_index) @@ -235,6 +228,7 @@ def reset_user_password(user_email): except: log.error('Failed to update user password') log.error(traceback.format_exc()) + return True @task @@ -249,14 +243,11 @@ def send_email(recipients, subject, body :param body: body of the mail """ log = send_email.get_logger() - email_config = dict(config.items('DEFAULT')) + email_config = config if not recipients: recipients = [email_config.get('email_to')] - def str2bool(v): - return v.lower() in ["yes", "true", "t", "1"] if v else None - mail_from = email_config.get('app_email_from') user = email_config.get('smtp_username') passwd = email_config.get('smtp_password') @@ -293,12 +284,58 @@ def create_repo_fork(form_data, cur_user backend(str(repo_fork_path), create=True, src_url=str(repo_path)) def __get_codes_stats(repo_name): - LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', - 'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', - 'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml', - 'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst', - 's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws'] - + LANGUAGES_EXTENSIONS_MAP = {'scm': 'Scheme', 'asmx': 'VbNetAspx', 'Rout': + 'RConsole', 'rest': 'Rst', 'abap': 'ABAP', 'go': 'Go', 'phtml': 'HtmlPhp', + 'ns2': 'Newspeak', 'xml': 'EvoqueXml', 'sh-session': 'BashSession', 'ads': + 'Ada', 'clj': 'Clojure', 'll': 'Llvm', 'ebuild': 'Bash', 'adb': 'Ada', + 'ada': 'Ada', 'c++-objdump': 'CppObjdump', 'aspx': + 'VbNetAspx', 'ksh': 'Bash', 'coffee': 'CoffeeScript', 'vert': 'GLShader', + 'Makefile.*': 'Makefile', 'di': 'D', 'dpatch': 'DarcsPatch', 'rake': + 'Ruby', 'moo': 'MOOCode', 'erl-sh': 'ErlangShell', 'geo': 'GLShader', + 'pov': 'Povray', 'bas': 'VbNet', 'bat': 'Batch', 'd': 'D', 'lisp': + 'CommonLisp', 'h': 'C', 'rbx': 'Ruby', 'tcl': 'Tcl', 'c++': 'Cpp', 'md': + 'MiniD', '.vimrc': 'Vim', 'xsd': 'Xml', 'ml': 'Ocaml', 'el': 'CommonLisp', + 'befunge': 'Befunge', 'xsl': 'Xslt', 'pyx': 'Cython', 'cfm': + 'ColdfusionHtml', 'evoque': 'Evoque', 'cfg': 'Ini', 'htm': 'Html', + 'Makefile': 'Makefile', 'cfc': 'ColdfusionHtml', 'tex': 'Tex', 'cs': + 'CSharp', 'mxml': 'Mxml', 'patch': 'Diff', 'apache.conf': 'ApacheConf', + 'scala': 'Scala', 'applescript': 'AppleScript', 'GNUmakefile': 'Makefile', + 'c-objdump': 'CObjdump', 'lua': 'Lua', 'apache2.conf': 'ApacheConf', 'rb': + 'Ruby', 'gemspec': 'Ruby', 'rl': 'RagelObjectiveC', 'vala': 'Vala', 'tmpl': + 'Cheetah', 'bf': 'Brainfuck', 'plt': 'Gnuplot', 'G': 'AntlrRuby', 'xslt': + 'Xslt', 'flxh': 'Felix', 'asax': 'VbNetAspx', 'Rakefile': 'Ruby', 'S': 'S', + 'wsdl': 'Xml', 'js': 'Javascript', 'autodelegate': 'Myghty', 'properties': + 'Ini', 'bash': 'Bash', 'c': 'C', 'g': 'AntlrRuby', 'r3': 'Rebol', 's': + 'Gas', 'ashx': 'VbNetAspx', 'cxx': 'Cpp', 'boo': 'Boo', 'prolog': 'Prolog', + 'sqlite3-console': 'SqliteConsole', 'cl': 'CommonLisp', 'cc': 'Cpp', 'pot': + 'Gettext', 'vim': 'Vim', 'pxi': 'Cython', 'yaml': 'Yaml', 'SConstruct': + 'Python', 'diff': 'Diff', 'txt': 'Text', 'cw': 'Redcode', 'pxd': 'Cython', + 'plot': 'Gnuplot', 'java': 'Java', 'hrl': 'Erlang', 'py': 'Python', + 'makefile': 'Makefile', 'squid.conf': 'SquidConf', 'asm': 'Nasm', 'toc': + 'Tex', 'kid': 'Genshi', 'rhtml': 'Rhtml', 'po': 'Gettext', 'pl': 'Prolog', + 'pm': 'Perl', 'hx': 'Haxe', 'ascx': 'VbNetAspx', 'ooc': 'Ooc', 'asy': + 'Asymptote', 'hs': 'Haskell', 'SConscript': 'Python', 'pytb': + 'PythonTraceback', 'myt': 'Myghty', 'hh': 'Cpp', 'R': 'S', 'aux': 'Tex', + 'rst': 'Rst', 'cpp-objdump': 'CppObjdump', 'lgt': 'Logtalk', 'rss': 'Xml', + 'flx': 'Felix', 'b': 'Brainfuck', 'f': 'Fortran', 'rbw': 'Ruby', + '.htaccess': 'ApacheConf', 'cxx-objdump': 'CppObjdump', 'j': 'ObjectiveJ', + 'mll': 'Ocaml', 'yml': 'Yaml', 'mu': 'MuPAD', 'r': 'Rebol', 'ASM': 'Nasm', + 'erl': 'Erlang', 'mly': 'Ocaml', 'mo': 'Modelica', 'def': 'Modula2', 'ini': + 'Ini', 'control': 'DebianControl', 'vb': 'VbNet', 'vapi': 'Vala', 'pro': + 'Prolog', 'spt': 'Cheetah', 'mli': 'Ocaml', 'as': 'ActionScript3', 'cmd': + 'Batch', 'cpp': 'Cpp', 'io': 'Io', 'tac': 'Python', 'haml': 'Haml', 'rkt': + 'Racket', 'st':'Smalltalk', 'inc': 'Povray', 'pas': 'Delphi', 'cmake': + 'CMake', 'csh':'Tcsh', 'hpp': 'Cpp', 'feature': 'Gherkin', 'html': 'Html', + 'php':'Php', 'php3':'Php', 'php4':'Php', 'php5':'Php', 'xhtml': 'Html', + 'hxx': 'Cpp', 'eclass': 'Bash', 'css': 'Css', + 'frag': 'GLShader', 'd-objdump': 'DObjdump', 'weechatlog': 'IrcLogs', + 'tcsh': 'Tcsh', 'objdump': 'Objdump', 'pyw': 'Python', 'h++': 'Cpp', + 'py3tb': 'Python3Traceback', 'jsp': 'Jsp', 'sql': 'Sql', 'mak': 'Makefile', + 'php': 'Php', 'mao': 'Mako', 'man': 'Groff', 'dylan': 'Dylan', 'sass': + 'Sass', 'cfml': 'ColdfusionHtml', 'darcspatch': 'DarcsPatch', 'tpl': + 'Smarty', 'm': 'ObjectiveC', 'f90': 'Fortran', 'mod': 'Modula2', 'sh': + 'Bash', 'lhs': 'LiterateHaskell', 'sources.list': 'SourcesList', 'axd': + 'VbNetAspx', 'sc': 'Python'} repos_path = get_repos_path() p = os.path.join(repos_path, repo_name) @@ -308,12 +345,14 @@ def __get_codes_stats(repo_name): def aggregate(cs): for f in cs[2]: - k = f.mimetype - if f.extension in LANGUAGES_EXTENSIONS: - if code_stats.has_key(k): - code_stats[k] += 1 + ext = f.extension + key = LANGUAGES_EXTENSIONS_MAP.get(ext, ext) + key = key or ext + if ext in LANGUAGES_EXTENSIONS_MAP.keys(): + if code_stats.has_key(key): + code_stats[key] += 1 else: - code_stats[k] = 1 + code_stats[key] = 1 map(aggregate, tip.walk('/')) diff --git a/rhodecode/lib/celerypylons/__init__.py b/rhodecode/lib/celerypylons/__init__.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/celerypylons/__init__.py @@ -0,0 +1,16 @@ +""" +Automatically sets the environment variable `CELERY_LOADER` to +`celerypylons.loader:PylonsLoader`. This ensures the loader is +specified when accessing the rest of this package, and allows celery +to be installed in a webapp just by importing celerypylons:: + + import celerypylons + +""" +import os +import warnings + +CELERYPYLONS_LOADER = 'rhodecode.lib.celerypylons.loader.PylonsLoader' +if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER: + warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.") +os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER diff --git a/rhodecode/lib/celerypylons/commands.py b/rhodecode/lib/celerypylons/commands.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/celerypylons/commands.py @@ -0,0 +1,143 @@ +import os +from paste.script.command import Command, BadCommand +import paste.deploy +from pylons import config + + +__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', + 'CAMQPAdminCommand', 'CeleryEventCommand'] + + +class CeleryCommand(Command): + """ + Abstract Base Class for celery commands. + + The celery commands are somewhat aggressive about loading + celery.conf, and since our module sets the `CELERY_LOADER` + environment variable to our loader, we have to bootstrap a bit and + make sure we've had a chance to load the pylons config off of the + command line, otherwise everything fails. + """ + min_args = 1 + min_args_error = "Please provide a paster config file as an argument." + takes_config_file = 1 + requires_config_file = True + + def run(self, args): + """ + Overrides Command.run + + Checks for a config file argument and loads it. + """ + if len(args) < self.min_args: + raise BadCommand( + self.min_args_error % {'min_args': self.min_args, + 'actual_args': len(args)}) + # Decrement because we're going to lob off the first argument. + # @@ This is hacky + self.min_args -= 1 + self.bootstrap_config(args[0]) + self.update_parser() + return super(CeleryCommand, self).run(args[1:]) + + def update_parser(self): + """ + Abstract method. Allows for the class's parser to be updated + before the superclass's `run` method is called. Necessary to + allow options/arguments to be passed through to the underlying + celery command. + """ + raise NotImplementedError("Abstract Method.") + + def bootstrap_config(self, conf): + """ + Loads the pylons configuration. + """ + path_to_ini_file = os.path.realpath(conf) + conf = paste.deploy.appconfig('config:' + path_to_ini_file) + config.init_app(conf.global_conf, conf.local_conf) + + +class CeleryDaemonCommand(CeleryCommand): + """Start the celery worker + + Starts the celery worker that uses a paste.deploy configuration + file. + """ + usage = 'CONFIG_FILE [celeryd options...]' + summary = __doc__.splitlines()[0] + description = "".join(__doc__.splitlines()[2:]) + + parser = Command.standard_parser(quiet=True) + + def update_parser(self): + from celery.bin import celeryd + for x in celeryd.WorkerCommand().get_options(): + self.parser.add_option(x) + + def command(self): + from celery.bin import celeryd + return celeryd.WorkerCommand().run(**vars(self.options)) + + +class CeleryBeatCommand(CeleryCommand): + """Start the celery beat server + + Starts the celery beat server using a paste.deploy configuration + file. + """ + usage = 'CONFIG_FILE [celerybeat options...]' + summary = __doc__.splitlines()[0] + description = "".join(__doc__.splitlines()[2:]) + + parser = Command.standard_parser(quiet=True) + + def update_parser(self): + from celery.bin import celerybeat + for x in celerybeat.BeatCommand().get_options(): + self.parser.add_option(x) + + def command(self): + from celery.bin import celerybeat + return celerybeat.BeatCommand(**vars(self.options)) + +class CAMQPAdminCommand(CeleryCommand): + """CAMQP Admin + + CAMQP celery admin tool. + """ + usage = 'CONFIG_FILE [camqadm options...]' + summary = __doc__.splitlines()[0] + description = "".join(__doc__.splitlines()[2:]) + + parser = Command.standard_parser(quiet=True) + + def update_parser(self): + from celery.bin import camqadm + for x in camqadm.OPTION_LIST: + self.parser.add_option(x) + + def command(self): + from celery.bin import camqadm + return camqadm.camqadm(*self.args, **vars(self.options)) + + +class CeleryEventCommand(CeleryCommand): + """Celery event commandd. + + Capture celery events. + """ + usage = 'CONFIG_FILE [celeryev options...]' + summary = __doc__.splitlines()[0] + description = "".join(__doc__.splitlines()[2:]) + + parser = Command.standard_parser(quiet=True) + + def update_parser(self): + from celery.bin import celeryev + for x in celeryev.OPTION_LIST: + self.parser.add_option(x) + + def command(self): + from celery.bin import celeryev + return celeryev.run_celeryev(**vars(self.options)) diff --git a/rhodecode/lib/celerypylons/loader.py b/rhodecode/lib/celerypylons/loader.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/celerypylons/loader.py @@ -0,0 +1,55 @@ +from celery.loaders.base import BaseLoader +from pylons import config + +to_pylons = lambda x: x.replace('_', '.').lower() +to_celery = lambda x: x.replace('.', '_').upper() + +LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split() + + +class PylonsSettingsProxy(object): + """Pylons Settings Proxy + + Proxies settings from pylons.config + + """ + def __getattr__(self, key): + pylons_key = to_pylons(key) + try: + value = config[pylons_key] + if key in LIST_PARAMS: return value.split() + return self.type_converter(value) + except KeyError: + raise AttributeError(pylons_key) + + def __setattr__(self, key, value): + pylons_key = to_pylons(key) + config[pylons_key] = value + + + def type_converter(self, value): + #cast to int + if value.isdigit(): + return int(value) + + #cast to bool + if value.lower() in ['true', 'false']: + return value.lower() == 'true' + + return value + +class PylonsLoader(BaseLoader): + """Pylons celery loader + + Maps the celery config onto pylons.config + + """ + def read_configuration(self): + self.configured = True + return PylonsSettingsProxy() + + def on_worker_init(self): + """ + Import task modules. + """ + self.import_default_modules() diff --git a/test.ini b/test.ini --- a/test.ini +++ b/test.ini @@ -46,6 +46,33 @@ cache_dir = %(here)s/data index_dir = /tmp/index #################################### +### CELERY CONFIG #### +#################################### +use_celery = false +broker.host = localhost +broker.vhost = rabbitmqhost +broker.port = 5672 +broker.user = rabbitmq +broker.password = qweqwe + +celery.imports = rhodecode.lib.celerylib.tasks + +celery.result.backend = amqp +celery.result.dburi = amqp:// +celery.result.serialier = json + +#celery.send.task.error.emails = true +#celery.amqp.task.result.expires = 18000 + +celeryd.concurrency = 2 +#celeryd.log.file = celeryd.log +celeryd.log.level = debug +celeryd.max.tasks.per.child = 3 + +#tasks will never be sent to the queue, but executed locally instead. +celery.always.eager = false + +#################################### ### BEAKER CACHE #### #################################### beaker.cache.data_dir=/%(here)s/data/cache/data