diff --git a/configs/development.ini b/configs/development.ini --- a/configs/development.ini +++ b/configs/development.ini @@ -296,28 +296,15 @@ labs_settings_active = true ### CELERY CONFIG #### #################################### use_celery = false -broker.host = localhost -broker.vhost = rabbitmqhost -broker.port = 5672 -broker.user = rabbitmq -broker.password = qweqwe - -celery.imports = rhodecode.lib.celerylib.tasks -celery.result.backend = amqp -celery.result.dburi = amqp:// -celery.result.serialier = json +# connection url to the message broker (default rabbitmq) +celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost -#celery.send.task.error.emails = true -#celery.amqp.task.result.expires = 18000 - -celeryd.concurrency = 2 -#celeryd.log.file = celeryd.log -celeryd.log.level = debug -celeryd.max.tasks.per.child = 1 +# maximum tasks to execute before worker restart +celery.max_tasks_per_child = 100 ## tasks will never be sent to the queue, but executed locally instead. -celery.always.eager = false +celery.task_always_eager = false #################################### ### BEAKER CACHE #### @@ -649,7 +636,7 @@ custom.conf = 1 ### LOGGING CONFIGURATION #### ################################ [loggers] -keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper +keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery [handlers] keys = console, console_sql @@ -688,6 +675,11 @@ handlers = qualname = ssh_wrapper propagate = 1 +[logger_celery] +level = DEBUG +handlers = +qualname = celery + ############## ## HANDLERS ## diff --git a/configs/production.ini b/configs/production.ini --- a/configs/production.ini +++ b/configs/production.ini @@ -271,28 +271,15 @@ labs_settings_active = true ### CELERY CONFIG #### #################################### use_celery = false -broker.host = localhost -broker.vhost = rabbitmqhost -broker.port = 5672 -broker.user = rabbitmq -broker.password = qweqwe - -celery.imports = rhodecode.lib.celerylib.tasks -celery.result.backend = amqp -celery.result.dburi = amqp:// -celery.result.serialier = json +# connection url to the message broker (default rabbitmq) +celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost -#celery.send.task.error.emails = true -#celery.amqp.task.result.expires = 18000 - -celeryd.concurrency = 2 -#celeryd.log.file = celeryd.log -celeryd.log.level = debug -celeryd.max.tasks.per.child = 1 +# maximum tasks to execute before worker restart +celery.max_tasks_per_child = 100 ## tasks will never be sent to the queue, but executed locally instead. -celery.always.eager = false +celery.task_always_eager = false #################################### ### BEAKER CACHE #### @@ -619,7 +606,7 @@ custom.conf = 1 ### LOGGING CONFIGURATION #### ################################ [loggers] -keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper +keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery [handlers] keys = console, console_sql @@ -658,6 +645,11 @@ handlers = qualname = ssh_wrapper propagate = 1 +[logger_celery] +level = DEBUG +handlers = +qualname = celery + ############## ## HANDLERS ## diff --git a/pkgs/patch-kombu-msgpack.diff b/pkgs/patch-kombu-msgpack.diff deleted file mode 100644 --- a/pkgs/patch-kombu-msgpack.diff +++ /dev/null @@ -1,12 +0,0 @@ -diff -rup kombu-1.5.1-orig/kombu/serialization.py kombu-1.5.1/kombu/serialization.py ---- kombu-1.5.1-orig/kombu/serialization.py 2016-03-09 15:11:34.000000000 +0100 -+++ kombu-1.5.1/kombu/serialization.py 2016-03-09 15:19:20.000000000 +0100 -@@ -318,7 +318,7 @@ def register_msgpack(): - """See http://msgpack.sourceforge.net/""" - try: - import msgpack -- registry.register('msgpack', msgpack.packs, msgpack.unpacks, -+ registry.register('msgpack', msgpack.packb, msgpack.unpackb, - content_type='application/x-msgpack', - content_encoding='binary') - except ImportError: diff --git a/rhodecode/api/views/repo_api.py b/rhodecode/api/views/repo_api.py --- a/rhodecode/api/views/repo_api.py +++ b/rhodecode/api/views/repo_api.py @@ -32,6 +32,7 @@ from rhodecode.api.utils import ( from rhodecode.lib import audit_logger from rhodecode.lib import repo_maintenance from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi +from rhodecode.lib.celerylib.utils import get_task_id from rhodecode.lib.utils2 import str2bool, time_to_datetime from rhodecode.lib.ext_json import json from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError @@ -712,10 +713,7 @@ def create_repo( } task = RepoModel().create(form_data=data, cur_user=owner) - from celery.result import BaseAsyncResult - task_id = None - if isinstance(task, BaseAsyncResult): - task_id = task.task_id + task_id = get_task_id(task) # no commit, it's done in RepoModel, or async via celery return { 'msg': "Created new repository `%s`" % (schema_data['repo_name'],), @@ -1105,10 +1103,8 @@ def fork_repo(request, apiuser, repoid, task = RepoModel().create_fork(data, cur_user=owner) # no commit, it's done in RepoModel, or async via celery - from celery.result import BaseAsyncResult - task_id = None - if isinstance(task, BaseAsyncResult): - task_id = task.task_id + task_id = get_task_id(task) + return { 'msg': 'Created fork of `%s` as `%s`' % ( repo.repo_name, schema_data['repo_name']), diff --git a/rhodecode/apps/admin/views/repositories.py b/rhodecode/apps/admin/views/repositories.py --- a/rhodecode/apps/admin/views/repositories.py +++ b/rhodecode/apps/admin/views/repositories.py @@ -28,6 +28,7 @@ from pyramid.renderers import render from pyramid.response import Response from rhodecode.apps._base import BaseAppView, DataGridAppView +from rhodecode.lib.celerylib.utils import get_task_id from rhodecode.lib.ext_json import json from rhodecode.lib.auth import ( @@ -143,22 +144,19 @@ class AdminReposView(BaseAppView, DataGr c = self.load_default_context() form_result = {} + self._load_form_data(c) task_id = None - self._load_form_data(c) - try: # CanWriteToGroup validators checks permissions of this POST form = RepoForm( self.request.translate, repo_groups=c.repo_groups_choices, landing_revs=c.landing_revs_choices)() - form_results = form.to_python(dict(self.request.POST)) + form_result = form.to_python(dict(self.request.POST)) # create is done sometimes async on celery, db transaction # management is handled there. task = RepoModel().create(form_result, self._rhodecode_user.user_id) - from celery.result import BaseAsyncResult - if isinstance(task, BaseAsyncResult): - task_id = task.task_id + task_id = get_task_id(task) except formencode.Invalid as errors: data = render('rhodecode:templates/admin/repos/repo_add.mako', self._get_template_context(c), self.request) diff --git a/rhodecode/apps/repository/views/repo_checks.py b/rhodecode/apps/repository/views/repo_checks.py --- a/rhodecode/apps/repository/views/repo_checks.py +++ b/rhodecode/apps/repository/views/repo_checks.py @@ -46,11 +46,9 @@ class RepoChecksView(BaseAppView): repo_name = self.request.matchdict['repo_name'] db_repo = Repository.get_by_repo_name(repo_name) - if not db_repo: - raise HTTPNotFound() # check if maybe repo is already created - if db_repo.repo_state in [Repository.STATE_CREATED]: + if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]: # re-check permissions before redirecting to prevent resource # discovery by checking the 302 code perm_set = ['repository.read', 'repository.write', 'repository.admin'] @@ -80,9 +78,10 @@ class RepoChecksView(BaseAppView): if task_id and task_id not in ['None']: import rhodecode - from celery.result import AsyncResult + from rhodecode.lib.celerylib.loader import celery_app if rhodecode.CELERY_ENABLED: - task = AsyncResult(task_id) + task = celery_app.AsyncResult(task_id) + task.get() if task.failed(): msg = self._log_creation_exception(task.result, repo_name) h.flash(msg, category='error') diff --git a/rhodecode/apps/repository/views/repo_forks.py b/rhodecode/apps/repository/views/repo_forks.py --- a/rhodecode/apps/repository/views/repo_forks.py +++ b/rhodecode/apps/repository/views/repo_forks.py @@ -33,6 +33,7 @@ from rhodecode.lib.auth import ( LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired) import rhodecode.lib.helpers as h +from rhodecode.lib.celerylib.utils import get_task_id from rhodecode.model.db import coalesce, or_, Repository, RepoGroup from rhodecode.model.repo import RepoModel from rhodecode.model.forms import RepoForkForm @@ -226,9 +227,8 @@ class RepoForksView(RepoAppView, DataGri # management is handled there. task = RepoModel().create_fork( form_result, c.rhodecode_user.user_id) - from celery.result import BaseAsyncResult - if isinstance(task, BaseAsyncResult): - task_id = task.task_id + + task_id = get_task_id(task) except formencode.Invalid as errors: c.rhodecode_db_repo = self.db_repo diff --git a/rhodecode/config/environment.py b/rhodecode/config/environment.py --- a/rhodecode/config/environment.py +++ b/rhodecode/config/environment.py @@ -23,14 +23,6 @@ import os import logging import rhodecode -# ------------------------------------------------------------------------------ -# CELERY magic until refactor - issue #4163 - import order matters here: -#from rhodecode.lib import celerypylons # this must be first, celerypylons - # sets config settings upon import - -import rhodecode.integrations # any modules using celery task - # decorators should be added afterwards: -# ------------------------------------------------------------------------------ from rhodecode.config import utils @@ -54,14 +46,6 @@ def load_pyramid_environment(global_conf 'secret': settings_merged.get('channelstream.secret') } - - # TODO(marcink): celery - # # store some globals into rhodecode - # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery')) - # rhodecode.CELERY_EAGER = str2bool( - # config['app_conf'].get('celery.always.eager')) - - # If this is a test run we prepare the test environment like # creating a test database, test search index and test repositories. # This has to be done before the database connection is initialized. diff --git a/rhodecode/config/licenses.json b/rhodecode/config/licenses.json --- a/rhodecode/config/licenses.json +++ b/rhodecode/config/licenses.json @@ -189,7 +189,7 @@ "python2.7-jupyter-core-4.3.0": { "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" }, - "python2.7-kombu-1.5.1": { + "python2.7-kombu-4.1.0": { "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" }, "python2.7-mistune-0.7.4": { diff --git a/rhodecode/config/middleware.py b/rhodecode/config/middleware.py --- a/rhodecode/config/middleware.py +++ b/rhodecode/config/middleware.py @@ -42,6 +42,7 @@ from rhodecode.lib.vcs import VCSCommuni from rhodecode.lib.exceptions import VCSServerUnavailable from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled from rhodecode.lib.middleware.https_fixup import HttpsFixup +from rhodecode.lib.celerylib.loader import configure_celery from rhodecode.lib.plugins.utils import register_rhodecode_plugin from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict from rhodecode.subscribers import ( @@ -87,9 +88,11 @@ def make_pyramid_app(global_config, **se pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config) pyramid_app.config = config + config.configure_celery(global_config['__file__']) # creating the app uses a connection - return it after we are done meta.Session.remove() + log.info('Pyramid app %s created and configured.', pyramid_app) return pyramid_app @@ -196,6 +199,8 @@ def includeme(config): config.add_directive( 'register_rhodecode_plugin', register_rhodecode_plugin) + config.add_directive('configure_celery', configure_celery) + if asbool(settings.get('appenlight', 'false')): config.include('appenlight_client.ext.pyramid_tween') diff --git a/rhodecode/integrations/types/hipchat.py b/rhodecode/integrations/types/hipchat.py --- a/rhodecode/integrations/types/hipchat.py +++ b/rhodecode/integrations/types/hipchat.py @@ -20,18 +20,16 @@ from __future__ import unicode_literals import deform -import re import logging import requests import colander import textwrap -from celery.task import task from mako.template import Template from rhodecode import events from rhodecode.translation import _ from rhodecode.lib import helpers as h -from rhodecode.lib.celerylib import run_task +from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask from rhodecode.lib.colander_utils import strip_whitespace from rhodecode.integrations.types.base import IntegrationTypeBase @@ -243,7 +241,7 @@ class HipchatIntegrationType(Integration ) -@task(ignore_result=True) +@async_task(ignore_result=True, base=RequestContextTask) def post_text_to_hipchat(settings, text): log.debug('sending %s to hipchat %s' % (text, settings['server_url'])) resp = requests.post(settings['server_url'], json={ diff --git a/rhodecode/integrations/types/slack.py b/rhodecode/integrations/types/slack.py --- a/rhodecode/integrations/types/slack.py +++ b/rhodecode/integrations/types/slack.py @@ -27,13 +27,12 @@ import logging import deform import requests import colander -from celery.task import task from mako.template import Template from rhodecode import events from rhodecode.translation import _ from rhodecode.lib import helpers as h -from rhodecode.lib.celerylib import run_task +from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask from rhodecode.lib.colander_utils import strip_whitespace from rhodecode.integrations.types.base import IntegrationTypeBase @@ -296,7 +295,7 @@ def html_to_slack_links(message): r'<\1|\2>', message) -@task(ignore_result=True) +@async_task(ignore_result=True, base=RequestContextTask) def post_text_to_slack(settings, title, text, fields=None, overrides=None): log.debug('sending %s (%s) to slack %s' % ( title, text, settings['service'])) diff --git a/rhodecode/integrations/types/webhook.py b/rhodecode/integrations/types/webhook.py --- a/rhodecode/integrations/types/webhook.py +++ b/rhodecode/integrations/types/webhook.py @@ -28,16 +28,17 @@ import logging import requests import requests.adapters import colander -from celery.task import task from requests.packages.urllib3.util.retry import Retry import rhodecode from rhodecode import events from rhodecode.translation import _ from rhodecode.integrations.types.base import IntegrationTypeBase +from rhodecode.lib.celerylib import async_task, RequestContextTask log = logging.getLogger(__name__) + # updating this required to update the `common_vars` passed in url calling func WEBHOOK_URL_VARS = [ 'repo_name', @@ -315,7 +316,7 @@ class WebhookIntegrationType(Integration post_to_webhook(url_calls, self.settings) -@task(ignore_result=True) +@async_task(ignore_result=True, base=RequestContextTask) def post_to_webhook(url_calls, settings): max_retries = 3 retries = Retry( diff --git a/rhodecode/lib/celerylib/__init__.py b/rhodecode/lib/celerylib/__init__.py --- a/rhodecode/lib/celerylib/__init__.py +++ b/rhodecode/lib/celerylib/__init__.py @@ -17,36 +17,17 @@ # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ -""" -celery libs for RhodeCode -""" - -import pylons import socket import logging import rhodecode - -from os.path import join as jn -from pylons import config -from celery.task import Task -from pyramid.request import Request -from pyramid.scripting import prepare -from pyramid.threadlocal import get_current_request - -from decorator import decorator +from zope.cachedescriptors.property import Lazy as LazyProperty +from rhodecode.lib.celerylib.loader import ( + celery_app, RequestContextTask, get_logger) -from zope.cachedescriptors.property import Lazy as LazyProperty +async_task = celery_app.task -from rhodecode.config import utils -from rhodecode.lib.utils2 import ( - safe_str, md5_safe, aslist, get_routes_generator_for_server_url, - get_server_url) -from rhodecode.lib.pidlock import DaemonLock, LockHeld -from rhodecode.lib.vcs import connect_vcs -from rhodecode.model import meta -from rhodecode.lib.auth import AuthUser log = logging.getLogger(__name__) @@ -60,95 +41,13 @@ class ResultWrapper(object): return self.task -class RhodecodeCeleryTask(Task): - """ - This is a celery task which will create a rhodecode app instance context - for the task, patch pyramid + pylons threadlocals with the original request - that created the task and also add the user to the context. - - This class as a whole should be removed once the pylons port is complete - and a pyramid only solution for celery is implemented as per issue #4139 - """ - - def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, - link=None, link_error=None, **options): - """ queue the job to run (we are in web request context here) """ - - request = get_current_request() - - if hasattr(request, 'user'): - ip_addr = request.user.ip_addr - user_id = request.user.user_id - elif hasattr(request, 'rpc_params'): - # TODO(marcink) remove when migration is finished - # api specific call on Pyramid. - ip_addr = request.rpc_params['apiuser'].ip_addr - user_id = request.rpc_params['apiuser'].user_id - else: - raise Exception('Unable to fetch data from request: {}'.format( - request)) - - if request: - # we hook into kwargs since it is the only way to pass our data to - # the celery worker in celery 2.2 - kwargs.update({ - '_rhodecode_proxy_data': { - 'environ': { - 'PATH_INFO': request.environ['PATH_INFO'], - 'SCRIPT_NAME': request.environ['SCRIPT_NAME'], - 'HTTP_HOST': request.environ.get('HTTP_HOST', - request.environ['SERVER_NAME']), - 'SERVER_NAME': request.environ['SERVER_NAME'], - 'SERVER_PORT': request.environ['SERVER_PORT'], - 'wsgi.url_scheme': request.environ['wsgi.url_scheme'], - }, - 'auth_user': { - 'ip_addr': ip_addr, - 'user_id': user_id - }, - } - }) - return super(RhodecodeCeleryTask, self).apply_async( - args, kwargs, task_id, producer, link, link_error, **options) - - def __call__(self, *args, **kwargs): - """ rebuild the context and then run task on celery worker """ - proxy_data = kwargs.pop('_rhodecode_proxy_data', {}) - - if not proxy_data: - return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) - - log.debug('using celery proxy data to run task: %r', proxy_data) - - from rhodecode.config.routing import make_map - - request = Request.blank('/', environ=proxy_data['environ']) - request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], - ip_addr=proxy_data['auth_user']['ip_addr']) - - pyramid_request = prepare(request) # set pyramid threadlocal request - - # pylons routing - if not rhodecode.CONFIG.get('routes.map'): - rhodecode.CONFIG['routes.map'] = make_map(config) - pylons.url._push_object(get_routes_generator_for_server_url( - get_server_url(request.environ) - )) - - try: - return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) - finally: - pyramid_request['closer']() - pylons.url._pop_object() - - def run_task(task, *args, **kwargs): if rhodecode.CELERY_ENABLED: celery_is_up = False try: t = task.apply_async(args=args, kwargs=kwargs) - log.info('running task %s:%s', t.task_id, task) celery_is_up = True + log.debug('executing task %s:%s in async mode', t.task_id, task) return t except socket.error as e: @@ -164,73 +63,10 @@ def run_task(task, *args, **kwargs): "Fallback to sync execution.") # keep in mind there maybe a subtle race condition where something - # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator + # depending on rhodecode.CELERY_ENABLED # will see CELERY_ENABLED as True before this has a chance to set False rhodecode.CELERY_ENABLED = celery_is_up else: - log.debug('executing task %s in sync mode', task) - return ResultWrapper(task(*args, **kwargs)) - - -def __get_lockkey(func, *fargs, **fkwargs): - params = list(fargs) - params.extend(['%s-%s' % ar for ar in fkwargs.items()]) - - func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) - _lock_key = func_name + '-' + '-'.join(map(safe_str, params)) - return 'task_%s.lock' % (md5_safe(_lock_key),) - - -def locked_task(func): - def __wrapper(func, *fargs, **fkwargs): - lockkey = __get_lockkey(func, *fargs, **fkwargs) - lockkey_path = config['app_conf']['cache_dir'] - - log.info('running task with lockkey %s' % lockkey) - try: - l = DaemonLock(file_=jn(lockkey_path, lockkey)) - ret = func(*fargs, **fkwargs) - l.release() - return ret - except LockHeld: - log.info('LockHeld') - return 'Task with key %s already running' % lockkey - - return decorator(__wrapper, func) - + log.debug('executing task %s:%s in sync mode', 'TASK', task) -def get_session(): - if rhodecode.CELERY_ENABLED: - utils.initialize_database(config) - sa = meta.Session() - return sa - - -def dbsession(func): - def __wrapper(func, *fargs, **fkwargs): - try: - ret = func(*fargs, **fkwargs) - return ret - finally: - if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER: - meta.Session.remove() - - return decorator(__wrapper, func) - - -def vcsconnection(func): - def __wrapper(func, *fargs, **fkwargs): - if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER: - settings = rhodecode.PYRAMID_SETTINGS - backends = settings['vcs.backends'] - for alias in rhodecode.BACKENDS.keys(): - if alias not in backends: - del rhodecode.BACKENDS[alias] - utils.configure_vcs(settings) - connect_vcs( - settings['vcs.server'], - utils.get_vcs_server_protocol(settings)) - ret = func(*fargs, **fkwargs) - return ret - - return decorator(__wrapper, func) + return ResultWrapper(task(*args, **kwargs)) diff --git a/rhodecode/lib/celerylib/loader.py b/rhodecode/lib/celerylib/loader.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/celerylib/loader.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2017 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ +""" +Celery loader, run with:: + + celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini +""" +import os +import logging + +from celery import Celery +from celery import signals +from celery import Task +from kombu.serialization import register +from pyramid.threadlocal import get_current_request + +import rhodecode + +from rhodecode.lib.auth import AuthUser +from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars +from rhodecode.lib.ext_json import json +from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request +from rhodecode.lib.utils2 import str2bool +from rhodecode.model import meta + + +register('json_ext', json.dumps, json.loads, + content_type='application/x-json-ext', + content_encoding='utf-8') + +log = logging.getLogger('celery.rhodecode.loader') + + +def add_preload_arguments(parser): + parser.add_argument( + '--ini', default=None, + help='Path to ini configuration file.' + ) + parser.add_argument( + '--ini-var', default=None, + help='Comma separated list of key=value to pass to ini.' + ) + + +def get_logger(obj): + custom_log = logging.getLogger( + 'rhodecode.task.{}'.format(obj.__class__.__name__)) + + if rhodecode.CELERY_ENABLED: + try: + custom_log = obj.get_logger() + except Exception: + pass + + return custom_log + + +base_celery_config = { + 'result_backend': 'rpc://', + 'result_expires': 60 * 60 * 24, + 'result_persistent': True, + 'imports': [], + 'worker_max_tasks_per_child': 100, + 'accept_content': ['json_ext'], + 'task_serializer': 'json_ext', + 'result_serializer': 'json_ext', + 'worker_hijack_root_logger': False, +} +# init main celery app +celery_app = Celery() +celery_app.user_options['preload'].add(add_preload_arguments) +ini_file_glob = None + + +@signals.setup_logging.connect +def setup_logging_callback(**kwargs): + setup_logging(ini_file_glob) + + +@signals.user_preload_options.connect +def on_preload_parsed(options, **kwargs): + ini_location = options['ini'] + ini_vars = options['ini_var'] + celery_app.conf['INI_PYRAMID'] = options['ini'] + + if ini_location is None: + print('You must provide the paste --ini argument') + exit(-1) + + options = None + if ini_vars is not None: + options = parse_ini_vars(ini_vars) + + global ini_file_glob + ini_file_glob = ini_location + + log.debug('Bootstrapping RhodeCode application...') + env = bootstrap(ini_location, options=options) + + setup_celery_app( + app=env['app'], root=env['root'], request=env['request'], + registry=env['registry'], closer=env['closer'], + ini_location=ini_location) + + # fix the global flag even if it's disabled via .ini file because this + # is a worker code that doesn't need this to be disabled. + rhodecode.CELERY_ENABLED = True + + +@signals.task_success.connect +def task_success_signal(result, **kwargs): + meta.Session.commit() + celery_app.conf['PYRAMID_CLOSER']() + + +@signals.task_retry.connect +def task_retry_signal( + request, reason, einfo, **kwargs): + meta.Session.remove() + celery_app.conf['PYRAMID_CLOSER']() + + +@signals.task_failure.connect +def task_failure_signal( + task_id, exception, args, kwargs, traceback, einfo, **kargs): + meta.Session.remove() + celery_app.conf['PYRAMID_CLOSER']() + + +@signals.task_revoked.connect +def task_revoked_signal( + request, terminated, signum, expired, **kwargs): + celery_app.conf['PYRAMID_CLOSER']() + + +def setup_celery_app(app, root, request, registry, closer, ini_location): + ini_dir = os.path.dirname(os.path.abspath(ini_location)) + celery_config = base_celery_config + celery_config.update({ + # store celerybeat scheduler db where the .ini file is + 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'), + }) + ini_settings = get_ini_config(ini_location) + log.debug('Got custom celery conf: %s', ini_settings) + + celery_config.update(ini_settings) + celery_app.config_from_object(celery_config) + + celery_app.conf.update({'PYRAMID_APP': app}) + celery_app.conf.update({'PYRAMID_ROOT': root}) + celery_app.conf.update({'PYRAMID_REQUEST': request}) + celery_app.conf.update({'PYRAMID_REGISTRY': registry}) + celery_app.conf.update({'PYRAMID_CLOSER': closer}) + + +def configure_celery(config, ini_location): + """ + Helper that is called from our application creation logic. It gives + connection info into running webapp and allows execution of tasks from + RhodeCode itself + """ + # store some globals into rhodecode + rhodecode.CELERY_ENABLED = str2bool( + config.registry.settings.get('use_celery')) + if rhodecode.CELERY_ENABLED: + log.info('Configuring celery based on `%s` file', ini_location) + setup_celery_app( + app=None, root=None, request=None, registry=config.registry, + closer=None, ini_location=ini_location) + + +class RequestContextTask(Task): + """ + This is a celery task which will create a rhodecode app instance context + for the task, patch pyramid with the original request + that created the task and also add the user to the context. + """ + + def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, + link=None, link_error=None, shadow=None, **options): + """ queue the job to run (we are in web request context here) """ + + req = get_current_request() + + # web case + if hasattr(req, 'user'): + ip_addr = req.user.ip_addr + user_id = req.user.user_id + + # api case + elif hasattr(req, 'rpc_user'): + ip_addr = req.rpc_user.ip_addr + user_id = req.rpc_user.user_id + else: + raise Exception( + 'Unable to fetch required data from request: {}. \n' + 'This task is required to be executed from context of ' + 'request in a webapp'.format(repr(req))) + + if req: + # we hook into kwargs since it is the only way to pass our data to + # the celery worker + options['headers'] = options.get('headers', {}) + options['headers'].update({ + 'rhodecode_proxy_data': { + 'environ': { + 'PATH_INFO': req.environ['PATH_INFO'], + 'SCRIPT_NAME': req.environ['SCRIPT_NAME'], + 'HTTP_HOST': req.environ.get('HTTP_HOST', + req.environ['SERVER_NAME']), + 'SERVER_NAME': req.environ['SERVER_NAME'], + 'SERVER_PORT': req.environ['SERVER_PORT'], + 'wsgi.url_scheme': req.environ['wsgi.url_scheme'], + }, + 'auth_user': { + 'ip_addr': ip_addr, + 'user_id': user_id + }, + } + }) + + return super(RequestContextTask, self).apply_async( + args, kwargs, task_id, producer, link, link_error, shadow, **options) + + def __call__(self, *args, **kwargs): + """ rebuild the context and then run task on celery worker """ + + proxy_data = getattr(self.request, 'rhodecode_proxy_data', None) + if not proxy_data: + return super(RequestContextTask, self).__call__(*args, **kwargs) + + log.debug('using celery proxy data to run task: %r', proxy_data) + # re-inject and register threadlocals for proper routing support + request = prepare_request(proxy_data['environ']) + request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], + ip_addr=proxy_data['auth_user']['ip_addr']) + + return super(RequestContextTask, self).__call__(*args, **kwargs) + diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -23,38 +23,18 @@ RhodeCode task modules, containing all t by celery daemon """ - import os -import logging - -from celery.task import task import rhodecode from rhodecode.lib import audit_logger -from rhodecode.lib.celerylib import ( - run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, - get_session, vcsconnection, RhodecodeCeleryTask) +from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask from rhodecode.lib.hooks_base import log_create_repository from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer -from rhodecode.lib.utils import add_cache from rhodecode.lib.utils2 import safe_int, str2bool -from rhodecode.model.db import Repository, User +from rhodecode.model.db import Session, Repository, User -def get_logger(cls): - if rhodecode.CELERY_ENABLED: - try: - log = cls.get_logger() - except Exception: - log = logging.getLogger(__name__) - else: - log = logging.getLogger(__name__) - - return log - - -@task(ignore_result=True, base=RhodecodeCeleryTask) -@dbsession +@async_task(ignore_result=True, base=RequestContextTask) def send_email(recipients, subject, body='', html_body='', email_config=None): """ Sends an email with defined parameters from the .ini files. @@ -101,18 +81,15 @@ def send_email(recipients, subject, body return True -@task(ignore_result=True, base=RhodecodeCeleryTask) -@dbsession -@vcsconnection +@async_task(ignore_result=True, base=RequestContextTask) def create_repo(form_data, cur_user): from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel from rhodecode.model.settings import SettingsModel log = get_logger(create_repo) - DBS = get_session() - cur_user = UserModel(DBS)._get_user(cur_user) + cur_user = UserModel()._get_user(cur_user) owner = cur_user repo_name = form_data['repo_name'] @@ -138,7 +115,7 @@ def create_repo(form_data, cur_user): 'enable_downloads', defs.get('repo_enable_downloads')) try: - repo = RepoModel(DBS)._create_repo( + repo = RepoModel()._create_repo( repo_name=repo_name_full, repo_type=repo_type, description=description, @@ -155,13 +132,13 @@ def create_repo(form_data, cur_user): enable_downloads=enable_downloads, state=state ) - DBS.commit() + Session().commit() # now create this repo on Filesystem - RepoModel(DBS)._create_filesystem_repo( + RepoModel()._create_filesystem_repo( repo_name=repo_name, repo_type=repo_type, - repo_group=RepoModel(DBS)._get_repo_group(repo_group), + repo_group=RepoModel()._get_repo_group(repo_group), clone_uri=clone_uri, ) repo = Repository.get_by_repo_name(repo_name_full) @@ -180,7 +157,7 @@ def create_repo(form_data, cur_user): user=cur_user, repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) - DBS.commit() + Session().commit() except Exception: log.warning('Exception occurred when creating repository, ' 'doing cleanup...', exc_info=True) @@ -188,8 +165,8 @@ def create_repo(form_data, cur_user): repo = Repository.get_by_repo_name(repo_name_full) if repo: Repository.delete(repo.repo_id) - DBS.commit() - RepoModel(DBS)._delete_filesystem_repo(repo) + Session().commit() + RepoModel()._delete_filesystem_repo(repo) raise # it's an odd fix to make celery fail task when exception occurs @@ -199,23 +176,17 @@ def create_repo(form_data, cur_user): return True -@task(ignore_result=True, base=RhodecodeCeleryTask) -@dbsession -@vcsconnection +@async_task(ignore_result=True, base=RequestContextTask) def create_repo_fork(form_data, cur_user): """ Creates a fork of repository using internal VCS methods - - :param form_data: - :param cur_user: """ from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel log = get_logger(create_repo_fork) - DBS = get_session() - cur_user = UserModel(DBS)._get_user(cur_user) + cur_user = UserModel()._get_user(cur_user) owner = cur_user repo_name = form_data['repo_name'] # fork in this case @@ -230,8 +201,8 @@ def create_repo_fork(form_data, cur_user fork_id = safe_int(form_data.get('fork_parent_id')) try: - fork_of = RepoModel(DBS)._get_repo(fork_id) - RepoModel(DBS)._create_repo( + fork_of = RepoModel()._get_repo(fork_id) + RepoModel()._create_repo( repo_name=repo_name_full, repo_type=repo_type, description=description, @@ -244,16 +215,16 @@ def create_repo_fork(form_data, cur_user copy_fork_permissions=copy_fork_permissions ) - DBS.commit() + Session().commit() base_path = Repository.base_path() source_repo_path = os.path.join(base_path, fork_of.repo_name) # now create this repo on Filesystem - RepoModel(DBS)._create_filesystem_repo( + RepoModel()._create_filesystem_repo( repo_name=repo_name, repo_type=repo_type, - repo_group=RepoModel(DBS)._get_repo_group(repo_group), + repo_group=RepoModel()._get_repo_group(repo_group), clone_uri=source_repo_path, ) repo = Repository.get_by_repo_name(repo_name_full) @@ -274,7 +245,7 @@ def create_repo_fork(form_data, cur_user user=cur_user, repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) - DBS.commit() + Session().commit() except Exception as e: log.warning('Exception %s occurred when forking repository, ' 'doing cleanup...', e) @@ -282,8 +253,8 @@ def create_repo_fork(form_data, cur_user repo = Repository.get_by_repo_name(repo_name_full) if repo: Repository.delete(repo.repo_id) - DBS.commit() - RepoModel(DBS)._delete_filesystem_repo(repo) + Session().commit() + RepoModel()._delete_filesystem_repo(repo) raise # it's an odd fix to make celery fail task when exception occurs @@ -291,3 +262,14 @@ def create_repo_fork(form_data, cur_user pass return True + + +@async_task(ignore_result=True) +def sync_repo(*args, **kwargs): + from rhodecode.model.scm import ScmModel + log = get_logger(sync_repo) + + log.info('Pulling from %s', kwargs['repo_name']) + ScmModel().pull_changes(kwargs['repo_name'], kwargs['username']) + + diff --git a/rhodecode/lib/celerylib/utils.py b/rhodecode/lib/celerylib/utils.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/celerylib/utils.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2010-2017 RhodeCode GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3 +# (only), as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# This program is dual-licensed. If you wish to learn more about the +# RhodeCode Enterprise Edition, including its added features, Support services, +# and proprietary license terms, please see https://rhodecode.com/licenses/ + +import os +import json +import logging +import datetime + +from functools import partial + +from pyramid.compat import configparser +from celery.result import AsyncResult +import celery.loaders.base +import celery.schedules + + +log = logging.getLogger(__name__) + + +def get_task_id(task): + task_id = None + if isinstance(task, AsyncResult): + task_id = task.task_id + + return task_id + + +def crontab(value): + return celery.schedules.crontab(**value) + + +def timedelta(value): + return datetime.timedelta(**value) + + +def safe_json(get, section, key): + value = '' + try: + value = get(key) + json_value = json.loads(value) + except ValueError: + msg = 'The %s=%s is not valid json in section %s' % ( + key, value, section + ) + raise ValueError(msg) + + return json_value + + +def get_beat_config(parser, section): + SCHEDULE_TYPE_MAP = { + 'crontab': crontab, + 'timedelta': timedelta, + 'integer': int + } + get = partial(parser.get, section) + has_option = partial(parser.has_option, section) + + schedule_type = get('type') + schedule_value = safe_json(get, section, 'schedule') + + scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type) + + if scheduler_cls is None: + raise ValueError( + 'schedule type %s in section %s is invalid' % ( + schedule_type, + section + ) + ) + + schedule = scheduler_cls(schedule_value) + + config = { + 'task': get('task'), + 'schedule': schedule, + } + + if has_option('args'): + config['args'] = safe_json(get, section, 'args') + + if has_option('kwargs'): + config['kwargs'] = safe_json(get, section, 'kwargs') + + return config + + +def get_ini_config(ini_location): + """ + Converts basic ini configuration into celery 4.X options + """ + def key_converter(key_name): + pref = 'celery.' + if key_name.startswith(pref): + return key_name[len(pref):].replace('.', '_').lower() + + def type_converter(parsed_key, value): + # cast to int + if value.isdigit(): + return int(value) + + # cast to bool + if value.lower() in ['true', 'false', 'True', 'False']: + return value.lower() == 'true' + return value + + parser = configparser.SafeConfigParser( + defaults={'here': os.path.abspath(ini_location)}) + parser.read(ini_location) + + ini_config = {} + for k, v in parser.items('app:main'): + pref = 'celery.' + if k.startswith(pref): + ini_config[key_converter(k)] = type_converter(key_converter(k), v) + + beat_config = {} + for section in parser.sections(): + if section.startswith('celerybeat:'): + name = section.split(':', 1)[1] + beat_config[name] = get_beat_config(parser, section) + + # final compose of settings + celery_settings = {} + + if ini_config: + celery_settings.update(ini_config) + if beat_config: + celery_settings.update({'beat_schedule': beat_config}) + + return celery_settings + + +def parse_ini_vars(ini_vars): + options = {} + for pairs in ini_vars.split(','): + key, value = pairs.split('=') + options[key] = value + return options diff --git a/rhodecode/lib/celerypylons/__init__.py b/rhodecode/lib/celerypylons/__init__.py deleted file mode 100644 --- a/rhodecode/lib/celerypylons/__init__.py +++ /dev/null @@ -1,37 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012-2017 RhodeCode GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3 -# (only), as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -# This program is dual-licensed. If you wish to learn more about the -# RhodeCode Enterprise Edition, including its added features, Support services, -# and proprietary license terms, please see https://rhodecode.com/licenses/ - -""" -Automatically sets the environment variable `CELERY_LOADER` to -`celerypylons.loader:PylonsLoader`. This ensures the loader is -specified when accessing the rest of this package, and allows celery -to be installed in a webapp just by importing celerypylons:: - - import celerypylons - -""" - -import os -import warnings - -CELERYPYLONS_LOADER = 'rhodecode.lib.celerypylons.loader.PylonsLoader' -if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER: - warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.") -os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER diff --git a/rhodecode/lib/celerypylons/commands.py b/rhodecode/lib/celerypylons/commands.py deleted file mode 100644 --- a/rhodecode/lib/celerypylons/commands.py +++ /dev/null @@ -1,118 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012-2017 RhodeCode GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3 -# (only), as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -# This program is dual-licensed. If you wish to learn more about the -# RhodeCode Enterprise Edition, including its added features, Support services, -# and proprietary license terms, please see https://rhodecode.com/licenses/ - -import rhodecode -from rhodecode.lib.utils import BasePasterCommand, Command, load_rcextensions -from celery.app import app_or_default -from celery.bin import camqadm, celerybeat, celeryd, celeryev - -from rhodecode.lib.utils2 import str2bool - -__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', - 'CAMQPAdminCommand', 'CeleryEventCommand'] - - -class CeleryCommand(BasePasterCommand): - """Abstract class implements run methods needed for celery - - Starts the celery worker that uses a paste.deploy configuration - file. - """ - - def update_parser(self): - """ - Abstract method. Allows for the class's parser to be updated - before the superclass's `run` method is called. Necessary to - allow options/arguments to be passed through to the underlying - celery command. - """ - - cmd = self.celery_command(app_or_default()) - for x in cmd.get_options(): - self.parser.add_option(x) - - def command(self): - from pylons import config - try: - CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery')) - except KeyError: - CELERY_ENABLED = False - - if not CELERY_ENABLED: - raise Exception('Please set use_celery = true in .ini config ' - 'file before running celeryd') - rhodecode.CELERY_ENABLED = CELERY_ENABLED - load_rcextensions(config['here']) - cmd = self.celery_command(app_or_default()) - return cmd.run(**vars(self.options)) - - -class CeleryDaemonCommand(CeleryCommand): - """Start the celery worker - - Starts the celery worker that uses a paste.deploy configuration - file. - """ - usage = 'CONFIG_FILE [celeryd options...]' - summary = __doc__.splitlines()[0] - description = "".join(__doc__.splitlines()[2:]) - - parser = Command.standard_parser(quiet=True) - celery_command = celeryd.WorkerCommand - - -class CeleryBeatCommand(CeleryCommand): - """Start the celery beat server - - Starts the celery beat server using a paste.deploy configuration - file. - """ - usage = 'CONFIG_FILE [celerybeat options...]' - summary = __doc__.splitlines()[0] - description = "".join(__doc__.splitlines()[2:]) - - parser = Command.standard_parser(quiet=True) - celery_command = celerybeat.BeatCommand - - -class CAMQPAdminCommand(CeleryCommand): - """CAMQP Admin - - CAMQP celery admin tool. - """ - usage = 'CONFIG_FILE [camqadm options...]' - summary = __doc__.splitlines()[0] - description = "".join(__doc__.splitlines()[2:]) - - parser = Command.standard_parser(quiet=True) - celery_command = camqadm.AMQPAdminCommand - - -class CeleryEventCommand(CeleryCommand): - """Celery event command. - - Capture celery events. - """ - usage = 'CONFIG_FILE [celeryev options...]' - summary = __doc__.splitlines()[0] - description = "".join(__doc__.splitlines()[2:]) - - parser = Command.standard_parser(quiet=True) - celery_command = celeryev.EvCommand diff --git a/rhodecode/lib/celerypylons/loader.py b/rhodecode/lib/celerypylons/loader.py deleted file mode 100644 --- a/rhodecode/lib/celerypylons/loader.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012-2017 RhodeCode GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3 -# (only), as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -# This program is dual-licensed. If you wish to learn more about the -# RhodeCode Enterprise Edition, including its added features, Support services, -# and proprietary license terms, please see https://rhodecode.com/licenses/ - -import pylons -import rhodecode - -from celery.loaders.base import BaseLoader - -to_pylons = lambda x: x.replace('_', '.').lower() -to_celery = lambda x: x.replace('.', '_').upper() - -LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split() - -class PylonsSettingsProxy(object): - """Pylons Settings Proxy - - Proxies settings from pylons.config - - """ - def __getattr__(self, key): - pylons_key = to_pylons(key) - proxy_config = rhodecode.PYRAMID_SETTINGS - try: - value = proxy_config[pylons_key] - if key in LIST_PARAMS: - return value.split() - return self.type_converter(value) - except KeyError: - raise AttributeError(pylons_key) - - def get(self, key): - try: - return self.__getattr__(key) - except AttributeError: - return None - - def __getitem__(self, key): - try: - return self.__getattr__(key) - except AttributeError: - raise KeyError() - - def __setattr__(self, key, value): - pylons_key = to_pylons(key) - proxy_config = rhodecode.PYRAMID_SETTINGS - proxy_config[pylons_key] = value - - def __setitem__(self, key, value): - self.__setattr__(key, value) - - def type_converter(self, value): - #cast to int - if value.isdigit(): - return int(value) - - #cast to bool - if value.lower() in ['true', 'false']: - return value.lower() == 'true' - return value - -class PylonsLoader(BaseLoader): - """Pylons celery loader - - Maps the celery config onto pylons.config - - """ - def read_configuration(self): - self.configured = True - return PylonsSettingsProxy() - - def on_worker_init(self): - """ - Import task modules. - """ - self.import_default_modules() - from rhodecode.config.middleware import make_pyramid_app - - # adding to self to keep a reference around - self.pyramid_app = make_pyramid_app( - pylons.config, **pylons.config['app_conf']) diff --git a/rhodecode/lib/pyramid_utils.py b/rhodecode/lib/pyramid_utils.py --- a/rhodecode/lib/pyramid_utils.py +++ b/rhodecode/lib/pyramid_utils.py @@ -22,6 +22,7 @@ import os from pyramid.compat import configparser from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa from pyramid.request import Request +from pyramid.scripting import prepare def get_config(ini_path, **kwargs): @@ -47,3 +48,9 @@ def bootstrap(config_uri, request=None, request = request or Request.blank('/', base_url=base_url) return pyramid_bootstrap(config_uri, request=request, options=options) + + +def prepare_request(environ): + request = Request.blank('/', environ=environ) + prepare(request) # set pyramid threadlocal request + return request diff --git a/rhodecode/lib/utils.py b/rhodecode/lib/utils.py --- a/rhodecode/lib/utils.py +++ b/rhodecode/lib/utils.py @@ -38,7 +38,6 @@ from os.path import join as jn import paste import pkg_resources -from paste.script.command import Command, BadCommand from webhelpers.text import collapse, remove_formatting, strip_tags from mako import exceptions from pyramid.threadlocal import get_current_registry @@ -767,85 +766,6 @@ def create_test_repositories(test_path, tar.extractall(jn(test_path, SVN_REPO)) -#============================================================================== -# PASTER COMMANDS -#============================================================================== -class BasePasterCommand(Command): - """ - Abstract Base Class for paster commands. - - The celery commands are somewhat aggressive about loading - celery.conf, and since our module sets the `CELERY_LOADER` - environment variable to our loader, we have to bootstrap a bit and - make sure we've had a chance to load the pylons config off of the - command line, otherwise everything fails. - """ - min_args = 1 - min_args_error = "Please provide a paster config file as an argument." - takes_config_file = 1 - requires_config_file = True - - def notify_msg(self, msg, log=False): - """Make a notification to user, additionally if logger is passed - it logs this action using given logger - - :param msg: message that will be printed to user - :param log: logging instance, to use to additionally log this message - - """ - if log and isinstance(log, logging): - log(msg) - - def run(self, args): - """ - Overrides Command.run - - Checks for a config file argument and loads it. - """ - if len(args) < self.min_args: - raise BadCommand( - self.min_args_error % {'min_args': self.min_args, - 'actual_args': len(args)}) - - # Decrement because we're going to lob off the first argument. - # @@ This is hacky - self.min_args -= 1 - self.bootstrap_config(args[0]) - self.update_parser() - return super(BasePasterCommand, self).run(args[1:]) - - def update_parser(self): - """ - Abstract method. Allows for the class' parser to be updated - before the superclass' `run` method is called. Necessary to - allow options/arguments to be passed through to the underlying - celery command. - """ - raise NotImplementedError("Abstract Method.") - - def bootstrap_config(self, conf): - """ - Loads the pylons configuration. - """ - from pylons import config as pylonsconfig - - self.path_to_ini_file = os.path.realpath(conf) - conf = paste.deploy.appconfig('config:' + self.path_to_ini_file) - pylonsconfig.init_app(conf.global_conf, conf.local_conf) - - def _init_session(self): - """ - Inits SqlAlchemy Session - """ - logging.config.fileConfig(self.path_to_ini_file) - from pylons import config - from rhodecode.config.utils import initialize_database - - # get to remove repos !! - add_cache(config) - initialize_database(config) - - def password_changed(auth_user, session): # Never report password change in case of default user or anonymous user. if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None: diff --git a/rhodecode/lib/utils2.py b/rhodecode/lib/utils2.py --- a/rhodecode/lib/utils2.py +++ b/rhodecode/lib/utils2.py @@ -23,7 +23,6 @@ Some simple helper functions """ - import collections import datetime import dateutil.relativedelta @@ -42,7 +41,6 @@ import sqlalchemy.engine.url import sqlalchemy.exc import sqlalchemy.sql import webob -import routes.util import pyramid.threadlocal import rhodecode @@ -941,31 +939,6 @@ class Optional(object): return val -def get_routes_generator_for_server_url(server_url): - parsed_url = urlobject.URLObject(server_url) - netloc = safe_str(parsed_url.netloc) - script_name = safe_str(parsed_url.path) - - if ':' in netloc: - server_name, server_port = netloc.split(':') - else: - server_name = netloc - server_port = (parsed_url.scheme == 'https' and '443' or '80') - - environ = { - 'REQUEST_METHOD': 'GET', - 'PATH_INFO': '/', - 'SERVER_NAME': server_name, - 'SERVER_PORT': server_port, - 'SCRIPT_NAME': script_name, - } - if parsed_url.scheme == 'https': - environ['HTTPS'] = 'on' - environ['wsgi.url_scheme'] = 'https' - - return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ) - - def glob2re(pat): """ Translate a shell PATTERN to a regular expression. diff --git a/rhodecode/tests/lib/test_utils.py b/rhodecode/tests/lib/test_utils.py --- a/rhodecode/tests/lib/test_utils.py +++ b/rhodecode/tests/lib/test_utils.py @@ -448,3 +448,9 @@ class TestGetEnabledHooks(object): ui_settings = [] result = utils.get_enabled_hook_classes(ui_settings) assert result == [] + + +def test_obfuscate_url_pw(): + from rhodecode.lib.utils2 import obfuscate_url_pw + engine = u'/home/repos/malmö' + assert obfuscate_url_pw(engine) \ No newline at end of file diff --git a/rhodecode/tests/lib/test_utils2.py b/rhodecode/tests/lib/test_utils2.py deleted file mode 100644 --- a/rhodecode/tests/lib/test_utils2.py +++ /dev/null @@ -1,58 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2010-2017 RhodeCode GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License, version 3 -# (only), as published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -# This program is dual-licensed. If you wish to learn more about the -# RhodeCode Enterprise Edition, including its added features, Support services, -# and proprietary license terms, please see https://rhodecode.com/licenses/ - -import pytest - -from rhodecode.lib.utils2 import ( - obfuscate_url_pw, get_routes_generator_for_server_url) - - -def test_obfuscate_url_pw(): - engine = u'/home/repos/malmö' - assert obfuscate_url_pw(engine) - - -@pytest.mark.parametrize('scheme', ['https', 'http']) -@pytest.mark.parametrize('domain', [ - 'www.test.com', 'test.com', 'test.co.uk', '192.168.1.3']) -@pytest.mark.parametrize('port', [None, '80', '443', '999']) -@pytest.mark.parametrize('script_path', [None, '/', '/prefix', '/prefix/more']) -def test_routes_generator(baseapp, scheme, domain, port, script_path): - server_url = '%s://%s' % (scheme, domain) - if port is not None: - server_url += ':' + port - if script_path: - server_url += script_path - - - expected_url = '%s://%s' % (scheme, domain) - if scheme == 'https': - if port not in (None, '443'): - expected_url += ':' + port - elif scheme == 'http': - if port not in ('80', None): - expected_url += ':' + port - - if script_path: - expected_url = (expected_url + script_path).rstrip('/') - - url_generator = get_routes_generator_for_server_url(server_url) - assert url_generator( - '/a_test_path', qualified=True) == expected_url + '/a_test_path' diff --git a/rhodecode/tests/plugin.py b/rhodecode/tests/plugin.py --- a/rhodecode/tests/plugin.py +++ b/rhodecode/tests/plugin.py @@ -108,7 +108,6 @@ def pytest_addoption(parser): def pytest_configure(config): - # Appy the kombu patch early on, needed for test discovery on Python 2.7.11 from rhodecode.config import patches