import os from paste.script.command import Command, BadCommand import paste.deploy from pylons import config __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand'] class CeleryCommand(Command): """ Abstract Base Class for celery commands. The celery commands are somewhat aggressive about loading celery.conf, and since our module sets the `CELERY_LOADER` environment variable to our loader, we have to bootstrap a bit and make sure we've had a chance to load the pylons config off of the command line, otherwise everything fails. """ min_args = 1 min_args_error = "Please provide a paster config file as an argument." takes_config_file = 1 requires_config_file = True def run(self, args): """ Overrides Command.run Checks for a config file argument and loads it. """ if len(args) < self.min_args: raise BadCommand( self.min_args_error % {'min_args': self.min_args, 'actual_args': len(args)}) # Decrement because we're going to lob off the first argument. # @@ This is hacky self.min_args -= 1 self.bootstrap_config(args[0]) self.update_parser() return super(CeleryCommand, self).run(args[1:]) def update_parser(self): """ Abstract method. Allows for the class's parser to be updated before the superclass's `run` method is called. Necessary to allow options/arguments to be passed through to the underlying celery command. """ raise NotImplementedError("Abstract Method.") def bootstrap_config(self, conf): """ Loads the pylons configuration. """ path_to_ini_file = os.path.realpath(conf) conf = paste.deploy.appconfig('config:' + path_to_ini_file) config.init_app(conf.global_conf, conf.local_conf) class CeleryDaemonCommand(CeleryCommand): """Start the celery worker Starts the celery worker that uses a paste.deploy configuration file. """ usage = 'CONFIG_FILE [celeryd options...]' summary = __doc__.splitlines()[0] description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) def update_parser(self): from celery.bin import celeryd for x in celeryd.WorkerCommand().get_options(): self.parser.add_option(x) def command(self): from celery.bin import celeryd return celeryd.WorkerCommand().run(**vars(self.options)) class CeleryBeatCommand(CeleryCommand): """Start the celery beat server Starts the celery beat server using a paste.deploy configuration file. """ usage = 'CONFIG_FILE [celerybeat options...]' summary = __doc__.splitlines()[0] description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) def update_parser(self): from celery.bin import celerybeat for x in celerybeat.BeatCommand().get_options(): self.parser.add_option(x) def command(self): from celery.bin import celerybeat return celerybeat.BeatCommand(**vars(self.options)) class CAMQPAdminCommand(CeleryCommand): """CAMQP Admin CAMQP celery admin tool. """ usage = 'CONFIG_FILE [camqadm options...]' summary = __doc__.splitlines()[0] description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) def update_parser(self): from celery.bin import camqadm for x in camqadm.OPTION_LIST: self.parser.add_option(x) def command(self): from celery.bin import camqadm return camqadm.camqadm(*self.args, **vars(self.options)) class CeleryEventCommand(CeleryCommand): """Celery event commandd. Capture celery events. """ usage = 'CONFIG_FILE [celeryev options...]' summary = __doc__.splitlines()[0] description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) def update_parser(self): from celery.bin import celeryev for x in celeryev.OPTION_LIST: self.parser.add_option(x) def command(self): from celery.bin import celeryev return celeryev.run_celeryev(**vars(self.options))