diff --git a/configs/development.ini b/configs/development.ini
--- a/configs/development.ini
+++ b/configs/development.ini
@@ -296,28 +296,15 @@ labs_settings_active = true
### CELERY CONFIG ####
####################################
use_celery = false
-broker.host = localhost
-broker.vhost = rabbitmqhost
-broker.port = 5672
-broker.user = rabbitmq
-broker.password = qweqwe
-
-celery.imports = rhodecode.lib.celerylib.tasks
-celery.result.backend = amqp
-celery.result.dburi = amqp://
-celery.result.serialier = json
+# connection url to the message broker (default rabbitmq)
+celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
-#celery.send.task.error.emails = true
-#celery.amqp.task.result.expires = 18000
-
-celeryd.concurrency = 2
-#celeryd.log.file = celeryd.log
-celeryd.log.level = debug
-celeryd.max.tasks.per.child = 1
+# maximum tasks to execute before worker restart
+celery.max_tasks_per_child = 100
## tasks will never be sent to the queue, but executed locally instead.
-celery.always.eager = false
+celery.task_always_eager = false
####################################
### BEAKER CACHE ####
@@ -649,7 +636,7 @@ custom.conf = 1
### LOGGING CONFIGURATION ####
################################
[loggers]
-keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
+keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
[handlers]
keys = console, console_sql
@@ -688,6 +675,11 @@ handlers =
qualname = ssh_wrapper
propagate = 1
+[logger_celery]
+level = DEBUG
+handlers =
+qualname = celery
+
##############
## HANDLERS ##
diff --git a/configs/production.ini b/configs/production.ini
--- a/configs/production.ini
+++ b/configs/production.ini
@@ -271,28 +271,15 @@ labs_settings_active = true
### CELERY CONFIG ####
####################################
use_celery = false
-broker.host = localhost
-broker.vhost = rabbitmqhost
-broker.port = 5672
-broker.user = rabbitmq
-broker.password = qweqwe
-
-celery.imports = rhodecode.lib.celerylib.tasks
-celery.result.backend = amqp
-celery.result.dburi = amqp://
-celery.result.serialier = json
+# connection url to the message broker (default rabbitmq)
+celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
-#celery.send.task.error.emails = true
-#celery.amqp.task.result.expires = 18000
-
-celeryd.concurrency = 2
-#celeryd.log.file = celeryd.log
-celeryd.log.level = debug
-celeryd.max.tasks.per.child = 1
+# maximum tasks to execute before worker restart
+celery.max_tasks_per_child = 100
## tasks will never be sent to the queue, but executed locally instead.
-celery.always.eager = false
+celery.task_always_eager = false
####################################
### BEAKER CACHE ####
@@ -619,7 +606,7 @@ custom.conf = 1
### LOGGING CONFIGURATION ####
################################
[loggers]
-keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
+keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
[handlers]
keys = console, console_sql
@@ -658,6 +645,11 @@ handlers =
qualname = ssh_wrapper
propagate = 1
+[logger_celery]
+level = DEBUG
+handlers =
+qualname = celery
+
##############
## HANDLERS ##
diff --git a/pkgs/patch-kombu-msgpack.diff b/pkgs/patch-kombu-msgpack.diff
deleted file mode 100644
--- a/pkgs/patch-kombu-msgpack.diff
+++ /dev/null
@@ -1,12 +0,0 @@
-diff -rup kombu-1.5.1-orig/kombu/serialization.py kombu-1.5.1/kombu/serialization.py
---- kombu-1.5.1-orig/kombu/serialization.py 2016-03-09 15:11:34.000000000 +0100
-+++ kombu-1.5.1/kombu/serialization.py 2016-03-09 15:19:20.000000000 +0100
-@@ -318,7 +318,7 @@ def register_msgpack():
- """See http://msgpack.sourceforge.net/"""
- try:
- import msgpack
-- registry.register('msgpack', msgpack.packs, msgpack.unpacks,
-+ registry.register('msgpack', msgpack.packb, msgpack.unpackb,
- content_type='application/x-msgpack',
- content_encoding='binary')
- except ImportError:
diff --git a/rhodecode/api/views/repo_api.py b/rhodecode/api/views/repo_api.py
--- a/rhodecode/api/views/repo_api.py
+++ b/rhodecode/api/views/repo_api.py
@@ -32,6 +32,7 @@ from rhodecode.api.utils import (
from rhodecode.lib import audit_logger
from rhodecode.lib import repo_maintenance
from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
+from rhodecode.lib.celerylib.utils import get_task_id
from rhodecode.lib.utils2 import str2bool, time_to_datetime
from rhodecode.lib.ext_json import json
from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
@@ -712,10 +713,7 @@ def create_repo(
}
task = RepoModel().create(form_data=data, cur_user=owner)
- from celery.result import BaseAsyncResult
- task_id = None
- if isinstance(task, BaseAsyncResult):
- task_id = task.task_id
+ task_id = get_task_id(task)
# no commit, it's done in RepoModel, or async via celery
return {
'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
@@ -1105,10 +1103,8 @@ def fork_repo(request, apiuser, repoid,
task = RepoModel().create_fork(data, cur_user=owner)
# no commit, it's done in RepoModel, or async via celery
- from celery.result import BaseAsyncResult
- task_id = None
- if isinstance(task, BaseAsyncResult):
- task_id = task.task_id
+ task_id = get_task_id(task)
+
return {
'msg': 'Created fork of `%s` as `%s`' % (
repo.repo_name, schema_data['repo_name']),
diff --git a/rhodecode/apps/admin/views/repositories.py b/rhodecode/apps/admin/views/repositories.py
--- a/rhodecode/apps/admin/views/repositories.py
+++ b/rhodecode/apps/admin/views/repositories.py
@@ -28,6 +28,7 @@ from pyramid.renderers import render
from pyramid.response import Response
from rhodecode.apps._base import BaseAppView, DataGridAppView
+from rhodecode.lib.celerylib.utils import get_task_id
from rhodecode.lib.ext_json import json
from rhodecode.lib.auth import (
@@ -143,22 +144,19 @@ class AdminReposView(BaseAppView, DataGr
c = self.load_default_context()
form_result = {}
+ self._load_form_data(c)
task_id = None
- self._load_form_data(c)
-
try:
# CanWriteToGroup validators checks permissions of this POST
form = RepoForm(
self.request.translate, repo_groups=c.repo_groups_choices,
landing_revs=c.landing_revs_choices)()
- form_results = form.to_python(dict(self.request.POST))
+ form_result = form.to_python(dict(self.request.POST))
# create is done sometimes async on celery, db transaction
# management is handled there.
task = RepoModel().create(form_result, self._rhodecode_user.user_id)
- from celery.result import BaseAsyncResult
- if isinstance(task, BaseAsyncResult):
- task_id = task.task_id
+ task_id = get_task_id(task)
except formencode.Invalid as errors:
data = render('rhodecode:templates/admin/repos/repo_add.mako',
self._get_template_context(c), self.request)
diff --git a/rhodecode/apps/repository/views/repo_checks.py b/rhodecode/apps/repository/views/repo_checks.py
--- a/rhodecode/apps/repository/views/repo_checks.py
+++ b/rhodecode/apps/repository/views/repo_checks.py
@@ -46,11 +46,9 @@ class RepoChecksView(BaseAppView):
repo_name = self.request.matchdict['repo_name']
db_repo = Repository.get_by_repo_name(repo_name)
- if not db_repo:
- raise HTTPNotFound()
# check if maybe repo is already created
- if db_repo.repo_state in [Repository.STATE_CREATED]:
+ if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]:
# re-check permissions before redirecting to prevent resource
# discovery by checking the 302 code
perm_set = ['repository.read', 'repository.write', 'repository.admin']
@@ -80,9 +78,10 @@ class RepoChecksView(BaseAppView):
if task_id and task_id not in ['None']:
import rhodecode
- from celery.result import AsyncResult
+ from rhodecode.lib.celerylib.loader import celery_app
if rhodecode.CELERY_ENABLED:
- task = AsyncResult(task_id)
+ task = celery_app.AsyncResult(task_id)
+ task.get()
if task.failed():
msg = self._log_creation_exception(task.result, repo_name)
h.flash(msg, category='error')
diff --git a/rhodecode/apps/repository/views/repo_forks.py b/rhodecode/apps/repository/views/repo_forks.py
--- a/rhodecode/apps/repository/views/repo_forks.py
+++ b/rhodecode/apps/repository/views/repo_forks.py
@@ -33,6 +33,7 @@ from rhodecode.lib.auth import (
LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
import rhodecode.lib.helpers as h
+from rhodecode.lib.celerylib.utils import get_task_id
from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
from rhodecode.model.repo import RepoModel
from rhodecode.model.forms import RepoForkForm
@@ -226,9 +227,8 @@ class RepoForksView(RepoAppView, DataGri
# management is handled there.
task = RepoModel().create_fork(
form_result, c.rhodecode_user.user_id)
- from celery.result import BaseAsyncResult
- if isinstance(task, BaseAsyncResult):
- task_id = task.task_id
+
+ task_id = get_task_id(task)
except formencode.Invalid as errors:
c.rhodecode_db_repo = self.db_repo
diff --git a/rhodecode/config/environment.py b/rhodecode/config/environment.py
--- a/rhodecode/config/environment.py
+++ b/rhodecode/config/environment.py
@@ -23,14 +23,6 @@ import os
import logging
import rhodecode
-# ------------------------------------------------------------------------------
-# CELERY magic until refactor - issue #4163 - import order matters here:
-#from rhodecode.lib import celerypylons # this must be first, celerypylons
- # sets config settings upon import
-
-import rhodecode.integrations # any modules using celery task
- # decorators should be added afterwards:
-# ------------------------------------------------------------------------------
from rhodecode.config import utils
@@ -54,14 +46,6 @@ def load_pyramid_environment(global_conf
'secret': settings_merged.get('channelstream.secret')
}
-
- # TODO(marcink): celery
- # # store some globals into rhodecode
- # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery'))
- # rhodecode.CELERY_EAGER = str2bool(
- # config['app_conf'].get('celery.always.eager'))
-
-
# If this is a test run we prepare the test environment like
# creating a test database, test search index and test repositories.
# This has to be done before the database connection is initialized.
diff --git a/rhodecode/config/licenses.json b/rhodecode/config/licenses.json
--- a/rhodecode/config/licenses.json
+++ b/rhodecode/config/licenses.json
@@ -189,7 +189,7 @@
"python2.7-jupyter-core-4.3.0": {
"BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
},
- "python2.7-kombu-1.5.1": {
+ "python2.7-kombu-4.1.0": {
"BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
},
"python2.7-mistune-0.7.4": {
diff --git a/rhodecode/config/middleware.py b/rhodecode/config/middleware.py
--- a/rhodecode/config/middleware.py
+++ b/rhodecode/config/middleware.py
@@ -42,6 +42,7 @@ from rhodecode.lib.vcs import VCSCommuni
from rhodecode.lib.exceptions import VCSServerUnavailable
from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
from rhodecode.lib.middleware.https_fixup import HttpsFixup
+from rhodecode.lib.celerylib.loader import configure_celery
from rhodecode.lib.plugins.utils import register_rhodecode_plugin
from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
from rhodecode.subscribers import (
@@ -87,9 +88,11 @@ def make_pyramid_app(global_config, **se
pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
pyramid_app.config = config
+ config.configure_celery(global_config['__file__'])
# creating the app uses a connection - return it after we are done
meta.Session.remove()
+ log.info('Pyramid app %s created and configured.', pyramid_app)
return pyramid_app
@@ -196,6 +199,8 @@ def includeme(config):
config.add_directive(
'register_rhodecode_plugin', register_rhodecode_plugin)
+ config.add_directive('configure_celery', configure_celery)
+
if asbool(settings.get('appenlight', 'false')):
config.include('appenlight_client.ext.pyramid_tween')
diff --git a/rhodecode/integrations/types/hipchat.py b/rhodecode/integrations/types/hipchat.py
--- a/rhodecode/integrations/types/hipchat.py
+++ b/rhodecode/integrations/types/hipchat.py
@@ -20,18 +20,16 @@
from __future__ import unicode_literals
import deform
-import re
import logging
import requests
import colander
import textwrap
-from celery.task import task
from mako.template import Template
from rhodecode import events
from rhodecode.translation import _
from rhodecode.lib import helpers as h
-from rhodecode.lib.celerylib import run_task
+from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
from rhodecode.lib.colander_utils import strip_whitespace
from rhodecode.integrations.types.base import IntegrationTypeBase
@@ -243,7 +241,7 @@ class HipchatIntegrationType(Integration
)
-@task(ignore_result=True)
+@async_task(ignore_result=True, base=RequestContextTask)
def post_text_to_hipchat(settings, text):
log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
resp = requests.post(settings['server_url'], json={
diff --git a/rhodecode/integrations/types/slack.py b/rhodecode/integrations/types/slack.py
--- a/rhodecode/integrations/types/slack.py
+++ b/rhodecode/integrations/types/slack.py
@@ -27,13 +27,12 @@ import logging
import deform
import requests
import colander
-from celery.task import task
from mako.template import Template
from rhodecode import events
from rhodecode.translation import _
from rhodecode.lib import helpers as h
-from rhodecode.lib.celerylib import run_task
+from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
from rhodecode.lib.colander_utils import strip_whitespace
from rhodecode.integrations.types.base import IntegrationTypeBase
@@ -296,7 +295,7 @@ def html_to_slack_links(message):
r'<\1|\2>', message)
-@task(ignore_result=True)
+@async_task(ignore_result=True, base=RequestContextTask)
def post_text_to_slack(settings, title, text, fields=None, overrides=None):
log.debug('sending %s (%s) to slack %s' % (
title, text, settings['service']))
diff --git a/rhodecode/integrations/types/webhook.py b/rhodecode/integrations/types/webhook.py
--- a/rhodecode/integrations/types/webhook.py
+++ b/rhodecode/integrations/types/webhook.py
@@ -28,16 +28,17 @@ import logging
import requests
import requests.adapters
import colander
-from celery.task import task
from requests.packages.urllib3.util.retry import Retry
import rhodecode
from rhodecode import events
from rhodecode.translation import _
from rhodecode.integrations.types.base import IntegrationTypeBase
+from rhodecode.lib.celerylib import async_task, RequestContextTask
log = logging.getLogger(__name__)
+
# updating this required to update the `common_vars` passed in url calling func
WEBHOOK_URL_VARS = [
'repo_name',
@@ -315,7 +316,7 @@ class WebhookIntegrationType(Integration
post_to_webhook(url_calls, self.settings)
-@task(ignore_result=True)
+@async_task(ignore_result=True, base=RequestContextTask)
def post_to_webhook(url_calls, settings):
max_retries = 3
retries = Retry(
diff --git a/rhodecode/lib/celerylib/__init__.py b/rhodecode/lib/celerylib/__init__.py
--- a/rhodecode/lib/celerylib/__init__.py
+++ b/rhodecode/lib/celerylib/__init__.py
@@ -17,36 +17,17 @@
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
-"""
-celery libs for RhodeCode
-"""
-
-import pylons
import socket
import logging
import rhodecode
-
-from os.path import join as jn
-from pylons import config
-from celery.task import Task
-from pyramid.request import Request
-from pyramid.scripting import prepare
-from pyramid.threadlocal import get_current_request
-
-from decorator import decorator
+from zope.cachedescriptors.property import Lazy as LazyProperty
+from rhodecode.lib.celerylib.loader import (
+ celery_app, RequestContextTask, get_logger)
-from zope.cachedescriptors.property import Lazy as LazyProperty
+async_task = celery_app.task
-from rhodecode.config import utils
-from rhodecode.lib.utils2 import (
- safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
- get_server_url)
-from rhodecode.lib.pidlock import DaemonLock, LockHeld
-from rhodecode.lib.vcs import connect_vcs
-from rhodecode.model import meta
-from rhodecode.lib.auth import AuthUser
log = logging.getLogger(__name__)
@@ -60,95 +41,13 @@ class ResultWrapper(object):
return self.task
-class RhodecodeCeleryTask(Task):
- """
- This is a celery task which will create a rhodecode app instance context
- for the task, patch pyramid + pylons threadlocals with the original request
- that created the task and also add the user to the context.
-
- This class as a whole should be removed once the pylons port is complete
- and a pyramid only solution for celery is implemented as per issue #4139
- """
-
- def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
- link=None, link_error=None, **options):
- """ queue the job to run (we are in web request context here) """
-
- request = get_current_request()
-
- if hasattr(request, 'user'):
- ip_addr = request.user.ip_addr
- user_id = request.user.user_id
- elif hasattr(request, 'rpc_params'):
- # TODO(marcink) remove when migration is finished
- # api specific call on Pyramid.
- ip_addr = request.rpc_params['apiuser'].ip_addr
- user_id = request.rpc_params['apiuser'].user_id
- else:
- raise Exception('Unable to fetch data from request: {}'.format(
- request))
-
- if request:
- # we hook into kwargs since it is the only way to pass our data to
- # the celery worker in celery 2.2
- kwargs.update({
- '_rhodecode_proxy_data': {
- 'environ': {
- 'PATH_INFO': request.environ['PATH_INFO'],
- 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
- 'HTTP_HOST': request.environ.get('HTTP_HOST',
- request.environ['SERVER_NAME']),
- 'SERVER_NAME': request.environ['SERVER_NAME'],
- 'SERVER_PORT': request.environ['SERVER_PORT'],
- 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
- },
- 'auth_user': {
- 'ip_addr': ip_addr,
- 'user_id': user_id
- },
- }
- })
- return super(RhodecodeCeleryTask, self).apply_async(
- args, kwargs, task_id, producer, link, link_error, **options)
-
- def __call__(self, *args, **kwargs):
- """ rebuild the context and then run task on celery worker """
- proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
-
- if not proxy_data:
- return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
-
- log.debug('using celery proxy data to run task: %r', proxy_data)
-
- from rhodecode.config.routing import make_map
-
- request = Request.blank('/', environ=proxy_data['environ'])
- request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
- ip_addr=proxy_data['auth_user']['ip_addr'])
-
- pyramid_request = prepare(request) # set pyramid threadlocal request
-
- # pylons routing
- if not rhodecode.CONFIG.get('routes.map'):
- rhodecode.CONFIG['routes.map'] = make_map(config)
- pylons.url._push_object(get_routes_generator_for_server_url(
- get_server_url(request.environ)
- ))
-
- try:
- return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
- finally:
- pyramid_request['closer']()
- pylons.url._pop_object()
-
-
def run_task(task, *args, **kwargs):
if rhodecode.CELERY_ENABLED:
celery_is_up = False
try:
t = task.apply_async(args=args, kwargs=kwargs)
- log.info('running task %s:%s', t.task_id, task)
celery_is_up = True
+ log.debug('executing task %s:%s in async mode', t.task_id, task)
return t
except socket.error as e:
@@ -164,73 +63,10 @@ def run_task(task, *args, **kwargs):
"Fallback to sync execution.")
# keep in mind there maybe a subtle race condition where something
- # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
+ # depending on rhodecode.CELERY_ENABLED
# will see CELERY_ENABLED as True before this has a chance to set False
rhodecode.CELERY_ENABLED = celery_is_up
else:
- log.debug('executing task %s in sync mode', task)
- return ResultWrapper(task(*args, **kwargs))
-
-
-def __get_lockkey(func, *fargs, **fkwargs):
- params = list(fargs)
- params.extend(['%s-%s' % ar for ar in fkwargs.items()])
-
- func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
- _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
- return 'task_%s.lock' % (md5_safe(_lock_key),)
-
-
-def locked_task(func):
- def __wrapper(func, *fargs, **fkwargs):
- lockkey = __get_lockkey(func, *fargs, **fkwargs)
- lockkey_path = config['app_conf']['cache_dir']
-
- log.info('running task with lockkey %s' % lockkey)
- try:
- l = DaemonLock(file_=jn(lockkey_path, lockkey))
- ret = func(*fargs, **fkwargs)
- l.release()
- return ret
- except LockHeld:
- log.info('LockHeld')
- return 'Task with key %s already running' % lockkey
-
- return decorator(__wrapper, func)
-
+ log.debug('executing task %s:%s in sync mode', 'TASK', task)
-def get_session():
- if rhodecode.CELERY_ENABLED:
- utils.initialize_database(config)
- sa = meta.Session()
- return sa
-
-
-def dbsession(func):
- def __wrapper(func, *fargs, **fkwargs):
- try:
- ret = func(*fargs, **fkwargs)
- return ret
- finally:
- if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
- meta.Session.remove()
-
- return decorator(__wrapper, func)
-
-
-def vcsconnection(func):
- def __wrapper(func, *fargs, **fkwargs):
- if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
- settings = rhodecode.PYRAMID_SETTINGS
- backends = settings['vcs.backends']
- for alias in rhodecode.BACKENDS.keys():
- if alias not in backends:
- del rhodecode.BACKENDS[alias]
- utils.configure_vcs(settings)
- connect_vcs(
- settings['vcs.server'],
- utils.get_vcs_server_protocol(settings))
- ret = func(*fargs, **fkwargs)
- return ret
-
- return decorator(__wrapper, func)
+ return ResultWrapper(task(*args, **kwargs))
diff --git a/rhodecode/lib/celerylib/loader.py b/rhodecode/lib/celerylib/loader.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/celerylib/loader.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2010-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+"""
+Celery loader, run with::
+
+ celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
+"""
+import os
+import logging
+
+from celery import Celery
+from celery import signals
+from celery import Task
+from kombu.serialization import register
+from pyramid.threadlocal import get_current_request
+
+import rhodecode
+
+from rhodecode.lib.auth import AuthUser
+from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
+from rhodecode.lib.ext_json import json
+from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
+from rhodecode.lib.utils2 import str2bool
+from rhodecode.model import meta
+
+
+register('json_ext', json.dumps, json.loads,
+ content_type='application/x-json-ext',
+ content_encoding='utf-8')
+
+log = logging.getLogger('celery.rhodecode.loader')
+
+
+def add_preload_arguments(parser):
+ parser.add_argument(
+ '--ini', default=None,
+ help='Path to ini configuration file.'
+ )
+ parser.add_argument(
+ '--ini-var', default=None,
+ help='Comma separated list of key=value to pass to ini.'
+ )
+
+
+def get_logger(obj):
+ custom_log = logging.getLogger(
+ 'rhodecode.task.{}'.format(obj.__class__.__name__))
+
+ if rhodecode.CELERY_ENABLED:
+ try:
+ custom_log = obj.get_logger()
+ except Exception:
+ pass
+
+ return custom_log
+
+
+base_celery_config = {
+ 'result_backend': 'rpc://',
+ 'result_expires': 60 * 60 * 24,
+ 'result_persistent': True,
+ 'imports': [],
+ 'worker_max_tasks_per_child': 100,
+ 'accept_content': ['json_ext'],
+ 'task_serializer': 'json_ext',
+ 'result_serializer': 'json_ext',
+ 'worker_hijack_root_logger': False,
+}
+# init main celery app
+celery_app = Celery()
+celery_app.user_options['preload'].add(add_preload_arguments)
+ini_file_glob = None
+
+
+@signals.setup_logging.connect
+def setup_logging_callback(**kwargs):
+ setup_logging(ini_file_glob)
+
+
+@signals.user_preload_options.connect
+def on_preload_parsed(options, **kwargs):
+ ini_location = options['ini']
+ ini_vars = options['ini_var']
+ celery_app.conf['INI_PYRAMID'] = options['ini']
+
+ if ini_location is None:
+ print('You must provide the paste --ini argument')
+ exit(-1)
+
+ options = None
+ if ini_vars is not None:
+ options = parse_ini_vars(ini_vars)
+
+ global ini_file_glob
+ ini_file_glob = ini_location
+
+ log.debug('Bootstrapping RhodeCode application...')
+ env = bootstrap(ini_location, options=options)
+
+ setup_celery_app(
+ app=env['app'], root=env['root'], request=env['request'],
+ registry=env['registry'], closer=env['closer'],
+ ini_location=ini_location)
+
+ # fix the global flag even if it's disabled via .ini file because this
+ # is a worker code that doesn't need this to be disabled.
+ rhodecode.CELERY_ENABLED = True
+
+
+@signals.task_success.connect
+def task_success_signal(result, **kwargs):
+ meta.Session.commit()
+ celery_app.conf['PYRAMID_CLOSER']()
+
+
+@signals.task_retry.connect
+def task_retry_signal(
+ request, reason, einfo, **kwargs):
+ meta.Session.remove()
+ celery_app.conf['PYRAMID_CLOSER']()
+
+
+@signals.task_failure.connect
+def task_failure_signal(
+ task_id, exception, args, kwargs, traceback, einfo, **kargs):
+ meta.Session.remove()
+ celery_app.conf['PYRAMID_CLOSER']()
+
+
+@signals.task_revoked.connect
+def task_revoked_signal(
+ request, terminated, signum, expired, **kwargs):
+ celery_app.conf['PYRAMID_CLOSER']()
+
+
+def setup_celery_app(app, root, request, registry, closer, ini_location):
+ ini_dir = os.path.dirname(os.path.abspath(ini_location))
+ celery_config = base_celery_config
+ celery_config.update({
+ # store celerybeat scheduler db where the .ini file is
+ 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
+ })
+ ini_settings = get_ini_config(ini_location)
+ log.debug('Got custom celery conf: %s', ini_settings)
+
+ celery_config.update(ini_settings)
+ celery_app.config_from_object(celery_config)
+
+ celery_app.conf.update({'PYRAMID_APP': app})
+ celery_app.conf.update({'PYRAMID_ROOT': root})
+ celery_app.conf.update({'PYRAMID_REQUEST': request})
+ celery_app.conf.update({'PYRAMID_REGISTRY': registry})
+ celery_app.conf.update({'PYRAMID_CLOSER': closer})
+
+
+def configure_celery(config, ini_location):
+ """
+ Helper that is called from our application creation logic. It gives
+ connection info into running webapp and allows execution of tasks from
+ RhodeCode itself
+ """
+ # store some globals into rhodecode
+ rhodecode.CELERY_ENABLED = str2bool(
+ config.registry.settings.get('use_celery'))
+ if rhodecode.CELERY_ENABLED:
+ log.info('Configuring celery based on `%s` file', ini_location)
+ setup_celery_app(
+ app=None, root=None, request=None, registry=config.registry,
+ closer=None, ini_location=ini_location)
+
+
+class RequestContextTask(Task):
+ """
+ This is a celery task which will create a rhodecode app instance context
+ for the task, patch pyramid with the original request
+ that created the task and also add the user to the context.
+ """
+
+ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
+ link=None, link_error=None, shadow=None, **options):
+ """ queue the job to run (we are in web request context here) """
+
+ req = get_current_request()
+
+ # web case
+ if hasattr(req, 'user'):
+ ip_addr = req.user.ip_addr
+ user_id = req.user.user_id
+
+ # api case
+ elif hasattr(req, 'rpc_user'):
+ ip_addr = req.rpc_user.ip_addr
+ user_id = req.rpc_user.user_id
+ else:
+ raise Exception(
+ 'Unable to fetch required data from request: {}. \n'
+ 'This task is required to be executed from context of '
+ 'request in a webapp'.format(repr(req)))
+
+ if req:
+ # we hook into kwargs since it is the only way to pass our data to
+ # the celery worker
+ options['headers'] = options.get('headers', {})
+ options['headers'].update({
+ 'rhodecode_proxy_data': {
+ 'environ': {
+ 'PATH_INFO': req.environ['PATH_INFO'],
+ 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
+ 'HTTP_HOST': req.environ.get('HTTP_HOST',
+ req.environ['SERVER_NAME']),
+ 'SERVER_NAME': req.environ['SERVER_NAME'],
+ 'SERVER_PORT': req.environ['SERVER_PORT'],
+ 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
+ },
+ 'auth_user': {
+ 'ip_addr': ip_addr,
+ 'user_id': user_id
+ },
+ }
+ })
+
+ return super(RequestContextTask, self).apply_async(
+ args, kwargs, task_id, producer, link, link_error, shadow, **options)
+
+ def __call__(self, *args, **kwargs):
+ """ rebuild the context and then run task on celery worker """
+
+ proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
+ if not proxy_data:
+ return super(RequestContextTask, self).__call__(*args, **kwargs)
+
+ log.debug('using celery proxy data to run task: %r', proxy_data)
+ # re-inject and register threadlocals for proper routing support
+ request = prepare_request(proxy_data['environ'])
+ request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
+ ip_addr=proxy_data['auth_user']['ip_addr'])
+
+ return super(RequestContextTask, self).__call__(*args, **kwargs)
+
diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py
--- a/rhodecode/lib/celerylib/tasks.py
+++ b/rhodecode/lib/celerylib/tasks.py
@@ -23,38 +23,18 @@ RhodeCode task modules, containing all t
by celery daemon
"""
-
import os
-import logging
-
-from celery.task import task
import rhodecode
from rhodecode.lib import audit_logger
-from rhodecode.lib.celerylib import (
- run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
- get_session, vcsconnection, RhodecodeCeleryTask)
+from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
from rhodecode.lib.hooks_base import log_create_repository
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
-from rhodecode.lib.utils import add_cache
from rhodecode.lib.utils2 import safe_int, str2bool
-from rhodecode.model.db import Repository, User
+from rhodecode.model.db import Session, Repository, User
-def get_logger(cls):
- if rhodecode.CELERY_ENABLED:
- try:
- log = cls.get_logger()
- except Exception:
- log = logging.getLogger(__name__)
- else:
- log = logging.getLogger(__name__)
-
- return log
-
-
-@task(ignore_result=True, base=RhodecodeCeleryTask)
-@dbsession
+@async_task(ignore_result=True, base=RequestContextTask)
def send_email(recipients, subject, body='', html_body='', email_config=None):
"""
Sends an email with defined parameters from the .ini files.
@@ -101,18 +81,15 @@ def send_email(recipients, subject, body
return True
-@task(ignore_result=True, base=RhodecodeCeleryTask)
-@dbsession
-@vcsconnection
+@async_task(ignore_result=True, base=RequestContextTask)
def create_repo(form_data, cur_user):
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.settings import SettingsModel
log = get_logger(create_repo)
- DBS = get_session()
- cur_user = UserModel(DBS)._get_user(cur_user)
+ cur_user = UserModel()._get_user(cur_user)
owner = cur_user
repo_name = form_data['repo_name']
@@ -138,7 +115,7 @@ def create_repo(form_data, cur_user):
'enable_downloads', defs.get('repo_enable_downloads'))
try:
- repo = RepoModel(DBS)._create_repo(
+ repo = RepoModel()._create_repo(
repo_name=repo_name_full,
repo_type=repo_type,
description=description,
@@ -155,13 +132,13 @@ def create_repo(form_data, cur_user):
enable_downloads=enable_downloads,
state=state
)
- DBS.commit()
+ Session().commit()
# now create this repo on Filesystem
- RepoModel(DBS)._create_filesystem_repo(
+ RepoModel()._create_filesystem_repo(
repo_name=repo_name,
repo_type=repo_type,
- repo_group=RepoModel(DBS)._get_repo_group(repo_group),
+ repo_group=RepoModel()._get_repo_group(repo_group),
clone_uri=clone_uri,
)
repo = Repository.get_by_repo_name(repo_name_full)
@@ -180,7 +157,7 @@ def create_repo(form_data, cur_user):
user=cur_user,
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
- DBS.commit()
+ Session().commit()
except Exception:
log.warning('Exception occurred when creating repository, '
'doing cleanup...', exc_info=True)
@@ -188,8 +165,8 @@ def create_repo(form_data, cur_user):
repo = Repository.get_by_repo_name(repo_name_full)
if repo:
Repository.delete(repo.repo_id)
- DBS.commit()
- RepoModel(DBS)._delete_filesystem_repo(repo)
+ Session().commit()
+ RepoModel()._delete_filesystem_repo(repo)
raise
# it's an odd fix to make celery fail task when exception occurs
@@ -199,23 +176,17 @@ def create_repo(form_data, cur_user):
return True
-@task(ignore_result=True, base=RhodecodeCeleryTask)
-@dbsession
-@vcsconnection
+@async_task(ignore_result=True, base=RequestContextTask)
def create_repo_fork(form_data, cur_user):
"""
Creates a fork of repository using internal VCS methods
-
- :param form_data:
- :param cur_user:
"""
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
log = get_logger(create_repo_fork)
- DBS = get_session()
- cur_user = UserModel(DBS)._get_user(cur_user)
+ cur_user = UserModel()._get_user(cur_user)
owner = cur_user
repo_name = form_data['repo_name'] # fork in this case
@@ -230,8 +201,8 @@ def create_repo_fork(form_data, cur_user
fork_id = safe_int(form_data.get('fork_parent_id'))
try:
- fork_of = RepoModel(DBS)._get_repo(fork_id)
- RepoModel(DBS)._create_repo(
+ fork_of = RepoModel()._get_repo(fork_id)
+ RepoModel()._create_repo(
repo_name=repo_name_full,
repo_type=repo_type,
description=description,
@@ -244,16 +215,16 @@ def create_repo_fork(form_data, cur_user
copy_fork_permissions=copy_fork_permissions
)
- DBS.commit()
+ Session().commit()
base_path = Repository.base_path()
source_repo_path = os.path.join(base_path, fork_of.repo_name)
# now create this repo on Filesystem
- RepoModel(DBS)._create_filesystem_repo(
+ RepoModel()._create_filesystem_repo(
repo_name=repo_name,
repo_type=repo_type,
- repo_group=RepoModel(DBS)._get_repo_group(repo_group),
+ repo_group=RepoModel()._get_repo_group(repo_group),
clone_uri=source_repo_path,
)
repo = Repository.get_by_repo_name(repo_name_full)
@@ -274,7 +245,7 @@ def create_repo_fork(form_data, cur_user
user=cur_user,
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
- DBS.commit()
+ Session().commit()
except Exception as e:
log.warning('Exception %s occurred when forking repository, '
'doing cleanup...', e)
@@ -282,8 +253,8 @@ def create_repo_fork(form_data, cur_user
repo = Repository.get_by_repo_name(repo_name_full)
if repo:
Repository.delete(repo.repo_id)
- DBS.commit()
- RepoModel(DBS)._delete_filesystem_repo(repo)
+ Session().commit()
+ RepoModel()._delete_filesystem_repo(repo)
raise
# it's an odd fix to make celery fail task when exception occurs
@@ -291,3 +262,14 @@ def create_repo_fork(form_data, cur_user
pass
return True
+
+
+@async_task(ignore_result=True)
+def sync_repo(*args, **kwargs):
+ from rhodecode.model.scm import ScmModel
+ log = get_logger(sync_repo)
+
+ log.info('Pulling from %s', kwargs['repo_name'])
+ ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
+
+
diff --git a/rhodecode/lib/celerylib/utils.py b/rhodecode/lib/celerylib/utils.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/lib/celerylib/utils.py
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2010-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import json
+import logging
+import datetime
+
+from functools import partial
+
+from pyramid.compat import configparser
+from celery.result import AsyncResult
+import celery.loaders.base
+import celery.schedules
+
+
+log = logging.getLogger(__name__)
+
+
+def get_task_id(task):
+ task_id = None
+ if isinstance(task, AsyncResult):
+ task_id = task.task_id
+
+ return task_id
+
+
+def crontab(value):
+ return celery.schedules.crontab(**value)
+
+
+def timedelta(value):
+ return datetime.timedelta(**value)
+
+
+def safe_json(get, section, key):
+ value = ''
+ try:
+ value = get(key)
+ json_value = json.loads(value)
+ except ValueError:
+ msg = 'The %s=%s is not valid json in section %s' % (
+ key, value, section
+ )
+ raise ValueError(msg)
+
+ return json_value
+
+
+def get_beat_config(parser, section):
+ SCHEDULE_TYPE_MAP = {
+ 'crontab': crontab,
+ 'timedelta': timedelta,
+ 'integer': int
+ }
+ get = partial(parser.get, section)
+ has_option = partial(parser.has_option, section)
+
+ schedule_type = get('type')
+ schedule_value = safe_json(get, section, 'schedule')
+
+ scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
+
+ if scheduler_cls is None:
+ raise ValueError(
+ 'schedule type %s in section %s is invalid' % (
+ schedule_type,
+ section
+ )
+ )
+
+ schedule = scheduler_cls(schedule_value)
+
+ config = {
+ 'task': get('task'),
+ 'schedule': schedule,
+ }
+
+ if has_option('args'):
+ config['args'] = safe_json(get, section, 'args')
+
+ if has_option('kwargs'):
+ config['kwargs'] = safe_json(get, section, 'kwargs')
+
+ return config
+
+
+def get_ini_config(ini_location):
+ """
+ Converts basic ini configuration into celery 4.X options
+ """
+ def key_converter(key_name):
+ pref = 'celery.'
+ if key_name.startswith(pref):
+ return key_name[len(pref):].replace('.', '_').lower()
+
+ def type_converter(parsed_key, value):
+ # cast to int
+ if value.isdigit():
+ return int(value)
+
+ # cast to bool
+ if value.lower() in ['true', 'false', 'True', 'False']:
+ return value.lower() == 'true'
+ return value
+
+ parser = configparser.SafeConfigParser(
+ defaults={'here': os.path.abspath(ini_location)})
+ parser.read(ini_location)
+
+ ini_config = {}
+ for k, v in parser.items('app:main'):
+ pref = 'celery.'
+ if k.startswith(pref):
+ ini_config[key_converter(k)] = type_converter(key_converter(k), v)
+
+ beat_config = {}
+ for section in parser.sections():
+ if section.startswith('celerybeat:'):
+ name = section.split(':', 1)[1]
+ beat_config[name] = get_beat_config(parser, section)
+
+ # final compose of settings
+ celery_settings = {}
+
+ if ini_config:
+ celery_settings.update(ini_config)
+ if beat_config:
+ celery_settings.update({'beat_schedule': beat_config})
+
+ return celery_settings
+
+
+def parse_ini_vars(ini_vars):
+ options = {}
+ for pairs in ini_vars.split(','):
+ key, value = pairs.split('=')
+ options[key] = value
+ return options
diff --git a/rhodecode/lib/celerypylons/__init__.py b/rhodecode/lib/celerypylons/__init__.py
deleted file mode 100644
--- a/rhodecode/lib/celerypylons/__init__.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012-2017 RhodeCode GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3
-# (only), as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-# This program is dual-licensed. If you wish to learn more about the
-# RhodeCode Enterprise Edition, including its added features, Support services,
-# and proprietary license terms, please see https://rhodecode.com/licenses/
-
-"""
-Automatically sets the environment variable `CELERY_LOADER` to
-`celerypylons.loader:PylonsLoader`. This ensures the loader is
-specified when accessing the rest of this package, and allows celery
-to be installed in a webapp just by importing celerypylons::
-
- import celerypylons
-
-"""
-
-import os
-import warnings
-
-CELERYPYLONS_LOADER = 'rhodecode.lib.celerypylons.loader.PylonsLoader'
-if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER:
- warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.")
-os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER
diff --git a/rhodecode/lib/celerypylons/commands.py b/rhodecode/lib/celerypylons/commands.py
deleted file mode 100644
--- a/rhodecode/lib/celerypylons/commands.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012-2017 RhodeCode GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3
-# (only), as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-# This program is dual-licensed. If you wish to learn more about the
-# RhodeCode Enterprise Edition, including its added features, Support services,
-# and proprietary license terms, please see https://rhodecode.com/licenses/
-
-import rhodecode
-from rhodecode.lib.utils import BasePasterCommand, Command, load_rcextensions
-from celery.app import app_or_default
-from celery.bin import camqadm, celerybeat, celeryd, celeryev
-
-from rhodecode.lib.utils2 import str2bool
-
-__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
- 'CAMQPAdminCommand', 'CeleryEventCommand']
-
-
-class CeleryCommand(BasePasterCommand):
- """Abstract class implements run methods needed for celery
-
- Starts the celery worker that uses a paste.deploy configuration
- file.
- """
-
- def update_parser(self):
- """
- Abstract method. Allows for the class's parser to be updated
- before the superclass's `run` method is called. Necessary to
- allow options/arguments to be passed through to the underlying
- celery command.
- """
-
- cmd = self.celery_command(app_or_default())
- for x in cmd.get_options():
- self.parser.add_option(x)
-
- def command(self):
- from pylons import config
- try:
- CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery'))
- except KeyError:
- CELERY_ENABLED = False
-
- if not CELERY_ENABLED:
- raise Exception('Please set use_celery = true in .ini config '
- 'file before running celeryd')
- rhodecode.CELERY_ENABLED = CELERY_ENABLED
- load_rcextensions(config['here'])
- cmd = self.celery_command(app_or_default())
- return cmd.run(**vars(self.options))
-
-
-class CeleryDaemonCommand(CeleryCommand):
- """Start the celery worker
-
- Starts the celery worker that uses a paste.deploy configuration
- file.
- """
- usage = 'CONFIG_FILE [celeryd options...]'
- summary = __doc__.splitlines()[0]
- description = "".join(__doc__.splitlines()[2:])
-
- parser = Command.standard_parser(quiet=True)
- celery_command = celeryd.WorkerCommand
-
-
-class CeleryBeatCommand(CeleryCommand):
- """Start the celery beat server
-
- Starts the celery beat server using a paste.deploy configuration
- file.
- """
- usage = 'CONFIG_FILE [celerybeat options...]'
- summary = __doc__.splitlines()[0]
- description = "".join(__doc__.splitlines()[2:])
-
- parser = Command.standard_parser(quiet=True)
- celery_command = celerybeat.BeatCommand
-
-
-class CAMQPAdminCommand(CeleryCommand):
- """CAMQP Admin
-
- CAMQP celery admin tool.
- """
- usage = 'CONFIG_FILE [camqadm options...]'
- summary = __doc__.splitlines()[0]
- description = "".join(__doc__.splitlines()[2:])
-
- parser = Command.standard_parser(quiet=True)
- celery_command = camqadm.AMQPAdminCommand
-
-
-class CeleryEventCommand(CeleryCommand):
- """Celery event command.
-
- Capture celery events.
- """
- usage = 'CONFIG_FILE [celeryev options...]'
- summary = __doc__.splitlines()[0]
- description = "".join(__doc__.splitlines()[2:])
-
- parser = Command.standard_parser(quiet=True)
- celery_command = celeryev.EvCommand
diff --git a/rhodecode/lib/celerypylons/loader.py b/rhodecode/lib/celerypylons/loader.py
deleted file mode 100644
--- a/rhodecode/lib/celerypylons/loader.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012-2017 RhodeCode GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3
-# (only), as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-# This program is dual-licensed. If you wish to learn more about the
-# RhodeCode Enterprise Edition, including its added features, Support services,
-# and proprietary license terms, please see https://rhodecode.com/licenses/
-
-import pylons
-import rhodecode
-
-from celery.loaders.base import BaseLoader
-
-to_pylons = lambda x: x.replace('_', '.').lower()
-to_celery = lambda x: x.replace('.', '_').upper()
-
-LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
-
-class PylonsSettingsProxy(object):
- """Pylons Settings Proxy
-
- Proxies settings from pylons.config
-
- """
- def __getattr__(self, key):
- pylons_key = to_pylons(key)
- proxy_config = rhodecode.PYRAMID_SETTINGS
- try:
- value = proxy_config[pylons_key]
- if key in LIST_PARAMS:
- return value.split()
- return self.type_converter(value)
- except KeyError:
- raise AttributeError(pylons_key)
-
- def get(self, key):
- try:
- return self.__getattr__(key)
- except AttributeError:
- return None
-
- def __getitem__(self, key):
- try:
- return self.__getattr__(key)
- except AttributeError:
- raise KeyError()
-
- def __setattr__(self, key, value):
- pylons_key = to_pylons(key)
- proxy_config = rhodecode.PYRAMID_SETTINGS
- proxy_config[pylons_key] = value
-
- def __setitem__(self, key, value):
- self.__setattr__(key, value)
-
- def type_converter(self, value):
- #cast to int
- if value.isdigit():
- return int(value)
-
- #cast to bool
- if value.lower() in ['true', 'false']:
- return value.lower() == 'true'
- return value
-
-class PylonsLoader(BaseLoader):
- """Pylons celery loader
-
- Maps the celery config onto pylons.config
-
- """
- def read_configuration(self):
- self.configured = True
- return PylonsSettingsProxy()
-
- def on_worker_init(self):
- """
- Import task modules.
- """
- self.import_default_modules()
- from rhodecode.config.middleware import make_pyramid_app
-
- # adding to self to keep a reference around
- self.pyramid_app = make_pyramid_app(
- pylons.config, **pylons.config['app_conf'])
diff --git a/rhodecode/lib/pyramid_utils.py b/rhodecode/lib/pyramid_utils.py
--- a/rhodecode/lib/pyramid_utils.py
+++ b/rhodecode/lib/pyramid_utils.py
@@ -22,6 +22,7 @@ import os
from pyramid.compat import configparser
from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa
from pyramid.request import Request
+from pyramid.scripting import prepare
def get_config(ini_path, **kwargs):
@@ -47,3 +48,9 @@ def bootstrap(config_uri, request=None,
request = request or Request.blank('/', base_url=base_url)
return pyramid_bootstrap(config_uri, request=request, options=options)
+
+
+def prepare_request(environ):
+ request = Request.blank('/', environ=environ)
+ prepare(request) # set pyramid threadlocal request
+ return request
diff --git a/rhodecode/lib/utils.py b/rhodecode/lib/utils.py
--- a/rhodecode/lib/utils.py
+++ b/rhodecode/lib/utils.py
@@ -38,7 +38,6 @@ from os.path import join as jn
import paste
import pkg_resources
-from paste.script.command import Command, BadCommand
from webhelpers.text import collapse, remove_formatting, strip_tags
from mako import exceptions
from pyramid.threadlocal import get_current_registry
@@ -767,85 +766,6 @@ def create_test_repositories(test_path,
tar.extractall(jn(test_path, SVN_REPO))
-#==============================================================================
-# PASTER COMMANDS
-#==============================================================================
-class BasePasterCommand(Command):
- """
- Abstract Base Class for paster commands.
-
- The celery commands are somewhat aggressive about loading
- celery.conf, and since our module sets the `CELERY_LOADER`
- environment variable to our loader, we have to bootstrap a bit and
- make sure we've had a chance to load the pylons config off of the
- command line, otherwise everything fails.
- """
- min_args = 1
- min_args_error = "Please provide a paster config file as an argument."
- takes_config_file = 1
- requires_config_file = True
-
- def notify_msg(self, msg, log=False):
- """Make a notification to user, additionally if logger is passed
- it logs this action using given logger
-
- :param msg: message that will be printed to user
- :param log: logging instance, to use to additionally log this message
-
- """
- if log and isinstance(log, logging):
- log(msg)
-
- def run(self, args):
- """
- Overrides Command.run
-
- Checks for a config file argument and loads it.
- """
- if len(args) < self.min_args:
- raise BadCommand(
- self.min_args_error % {'min_args': self.min_args,
- 'actual_args': len(args)})
-
- # Decrement because we're going to lob off the first argument.
- # @@ This is hacky
- self.min_args -= 1
- self.bootstrap_config(args[0])
- self.update_parser()
- return super(BasePasterCommand, self).run(args[1:])
-
- def update_parser(self):
- """
- Abstract method. Allows for the class' parser to be updated
- before the superclass' `run` method is called. Necessary to
- allow options/arguments to be passed through to the underlying
- celery command.
- """
- raise NotImplementedError("Abstract Method.")
-
- def bootstrap_config(self, conf):
- """
- Loads the pylons configuration.
- """
- from pylons import config as pylonsconfig
-
- self.path_to_ini_file = os.path.realpath(conf)
- conf = paste.deploy.appconfig('config:' + self.path_to_ini_file)
- pylonsconfig.init_app(conf.global_conf, conf.local_conf)
-
- def _init_session(self):
- """
- Inits SqlAlchemy Session
- """
- logging.config.fileConfig(self.path_to_ini_file)
- from pylons import config
- from rhodecode.config.utils import initialize_database
-
- # get to remove repos !!
- add_cache(config)
- initialize_database(config)
-
-
def password_changed(auth_user, session):
# Never report password change in case of default user or anonymous user.
if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None:
diff --git a/rhodecode/lib/utils2.py b/rhodecode/lib/utils2.py
--- a/rhodecode/lib/utils2.py
+++ b/rhodecode/lib/utils2.py
@@ -23,7 +23,6 @@
Some simple helper functions
"""
-
import collections
import datetime
import dateutil.relativedelta
@@ -42,7 +41,6 @@ import sqlalchemy.engine.url
import sqlalchemy.exc
import sqlalchemy.sql
import webob
-import routes.util
import pyramid.threadlocal
import rhodecode
@@ -941,31 +939,6 @@ class Optional(object):
return val
-def get_routes_generator_for_server_url(server_url):
- parsed_url = urlobject.URLObject(server_url)
- netloc = safe_str(parsed_url.netloc)
- script_name = safe_str(parsed_url.path)
-
- if ':' in netloc:
- server_name, server_port = netloc.split(':')
- else:
- server_name = netloc
- server_port = (parsed_url.scheme == 'https' and '443' or '80')
-
- environ = {
- 'REQUEST_METHOD': 'GET',
- 'PATH_INFO': '/',
- 'SERVER_NAME': server_name,
- 'SERVER_PORT': server_port,
- 'SCRIPT_NAME': script_name,
- }
- if parsed_url.scheme == 'https':
- environ['HTTPS'] = 'on'
- environ['wsgi.url_scheme'] = 'https'
-
- return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ)
-
-
def glob2re(pat):
"""
Translate a shell PATTERN to a regular expression.
diff --git a/rhodecode/tests/lib/test_utils.py b/rhodecode/tests/lib/test_utils.py
--- a/rhodecode/tests/lib/test_utils.py
+++ b/rhodecode/tests/lib/test_utils.py
@@ -448,3 +448,9 @@ class TestGetEnabledHooks(object):
ui_settings = []
result = utils.get_enabled_hook_classes(ui_settings)
assert result == []
+
+
+def test_obfuscate_url_pw():
+ from rhodecode.lib.utils2 import obfuscate_url_pw
+ engine = u'/home/repos/malmö'
+ assert obfuscate_url_pw(engine)
\ No newline at end of file
diff --git a/rhodecode/tests/lib/test_utils2.py b/rhodecode/tests/lib/test_utils2.py
deleted file mode 100644
--- a/rhodecode/tests/lib/test_utils2.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2010-2017 RhodeCode GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3
-# (only), as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-# This program is dual-licensed. If you wish to learn more about the
-# RhodeCode Enterprise Edition, including its added features, Support services,
-# and proprietary license terms, please see https://rhodecode.com/licenses/
-
-import pytest
-
-from rhodecode.lib.utils2 import (
- obfuscate_url_pw, get_routes_generator_for_server_url)
-
-
-def test_obfuscate_url_pw():
- engine = u'/home/repos/malmö'
- assert obfuscate_url_pw(engine)
-
-
-@pytest.mark.parametrize('scheme', ['https', 'http'])
-@pytest.mark.parametrize('domain', [
- 'www.test.com', 'test.com', 'test.co.uk', '192.168.1.3'])
-@pytest.mark.parametrize('port', [None, '80', '443', '999'])
-@pytest.mark.parametrize('script_path', [None, '/', '/prefix', '/prefix/more'])
-def test_routes_generator(baseapp, scheme, domain, port, script_path):
- server_url = '%s://%s' % (scheme, domain)
- if port is not None:
- server_url += ':' + port
- if script_path:
- server_url += script_path
-
-
- expected_url = '%s://%s' % (scheme, domain)
- if scheme == 'https':
- if port not in (None, '443'):
- expected_url += ':' + port
- elif scheme == 'http':
- if port not in ('80', None):
- expected_url += ':' + port
-
- if script_path:
- expected_url = (expected_url + script_path).rstrip('/')
-
- url_generator = get_routes_generator_for_server_url(server_url)
- assert url_generator(
- '/a_test_path', qualified=True) == expected_url + '/a_test_path'
diff --git a/rhodecode/tests/plugin.py b/rhodecode/tests/plugin.py
--- a/rhodecode/tests/plugin.py
+++ b/rhodecode/tests/plugin.py
@@ -108,7 +108,6 @@ def pytest_addoption(parser):
def pytest_configure(config):
- # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
from rhodecode.config import patches