Show More
commands.py
143 lines
| 4.3 KiB
| text/x-python
|
PythonLexer
r776 | import os | |||
from paste.script.command import Command, BadCommand | ||||
import paste.deploy | ||||
from pylons import config | ||||
__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', | ||||
'CAMQPAdminCommand', 'CeleryEventCommand'] | ||||
class CeleryCommand(Command): | ||||
""" | ||||
Abstract Base Class for celery commands. | ||||
The celery commands are somewhat aggressive about loading | ||||
celery.conf, and since our module sets the `CELERY_LOADER` | ||||
environment variable to our loader, we have to bootstrap a bit and | ||||
make sure we've had a chance to load the pylons config off of the | ||||
command line, otherwise everything fails. | ||||
""" | ||||
min_args = 1 | ||||
min_args_error = "Please provide a paster config file as an argument." | ||||
takes_config_file = 1 | ||||
requires_config_file = True | ||||
def run(self, args): | ||||
""" | ||||
Overrides Command.run | ||||
Checks for a config file argument and loads it. | ||||
""" | ||||
if len(args) < self.min_args: | ||||
raise BadCommand( | ||||
self.min_args_error % {'min_args': self.min_args, | ||||
'actual_args': len(args)}) | ||||
# Decrement because we're going to lob off the first argument. | ||||
# @@ This is hacky | ||||
self.min_args -= 1 | ||||
self.bootstrap_config(args[0]) | ||||
self.update_parser() | ||||
return super(CeleryCommand, self).run(args[1:]) | ||||
def update_parser(self): | ||||
""" | ||||
Abstract method. Allows for the class's parser to be updated | ||||
before the superclass's `run` method is called. Necessary to | ||||
allow options/arguments to be passed through to the underlying | ||||
celery command. | ||||
""" | ||||
raise NotImplementedError("Abstract Method.") | ||||
def bootstrap_config(self, conf): | ||||
""" | ||||
Loads the pylons configuration. | ||||
""" | ||||
path_to_ini_file = os.path.realpath(conf) | ||||
conf = paste.deploy.appconfig('config:' + path_to_ini_file) | ||||
config.init_app(conf.global_conf, conf.local_conf) | ||||
class CeleryDaemonCommand(CeleryCommand): | ||||
"""Start the celery worker | ||||
Starts the celery worker that uses a paste.deploy configuration | ||||
file. | ||||
""" | ||||
usage = 'CONFIG_FILE [celeryd options...]' | ||||
summary = __doc__.splitlines()[0] | ||||
description = "".join(__doc__.splitlines()[2:]) | ||||
parser = Command.standard_parser(quiet=True) | ||||
def update_parser(self): | ||||
from celery.bin import celeryd | ||||
for x in celeryd.WorkerCommand().get_options(): | ||||
self.parser.add_option(x) | ||||
def command(self): | ||||
from celery.bin import celeryd | ||||
return celeryd.WorkerCommand().run(**vars(self.options)) | ||||
class CeleryBeatCommand(CeleryCommand): | ||||
"""Start the celery beat server | ||||
Starts the celery beat server using a paste.deploy configuration | ||||
file. | ||||
""" | ||||
usage = 'CONFIG_FILE [celerybeat options...]' | ||||
summary = __doc__.splitlines()[0] | ||||
description = "".join(__doc__.splitlines()[2:]) | ||||
parser = Command.standard_parser(quiet=True) | ||||
def update_parser(self): | ||||
from celery.bin import celerybeat | ||||
for x in celerybeat.BeatCommand().get_options(): | ||||
self.parser.add_option(x) | ||||
def command(self): | ||||
from celery.bin import celerybeat | ||||
return celerybeat.BeatCommand(**vars(self.options)) | ||||
class CAMQPAdminCommand(CeleryCommand): | ||||
"""CAMQP Admin | ||||
CAMQP celery admin tool. | ||||
""" | ||||
usage = 'CONFIG_FILE [camqadm options...]' | ||||
summary = __doc__.splitlines()[0] | ||||
description = "".join(__doc__.splitlines()[2:]) | ||||
parser = Command.standard_parser(quiet=True) | ||||
def update_parser(self): | ||||
from celery.bin import camqadm | ||||
for x in camqadm.OPTION_LIST: | ||||
self.parser.add_option(x) | ||||
def command(self): | ||||
from celery.bin import camqadm | ||||
return camqadm.camqadm(*self.args, **vars(self.options)) | ||||
class CeleryEventCommand(CeleryCommand): | ||||
"""Celery event commandd. | ||||
Capture celery events. | ||||
""" | ||||
usage = 'CONFIG_FILE [celeryev options...]' | ||||
summary = __doc__.splitlines()[0] | ||||
description = "".join(__doc__.splitlines()[2:]) | ||||
parser = Command.standard_parser(quiet=True) | ||||
def update_parser(self): | ||||
from celery.bin import celeryev | ||||
for x in celeryev.OPTION_LIST: | ||||
self.parser.add_option(x) | ||||
def command(self): | ||||
from celery.bin import celeryev | ||||
return celeryev.run_celeryev(**vars(self.options)) | ||||