##// END OF EJS Templates
celery: celery 4.X support. Fixes #4169...
marcink -
r2359:246f5a4c default
parent child Browse files
Show More
@@ -0,0 +1,256 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 Celery loader, run with::
22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
24 """
25 import os
26 import logging
27
28 from celery import Celery
29 from celery import signals
30 from celery import Task
31 from kombu.serialization import register
32 from pyramid.threadlocal import get_current_request
33
34 import rhodecode
35
36 from rhodecode.lib.auth import AuthUser
37 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
38 from rhodecode.lib.ext_json import json
39 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
40 from rhodecode.lib.utils2 import str2bool
41 from rhodecode.model import meta
42
43
44 register('json_ext', json.dumps, json.loads,
45 content_type='application/x-json-ext',
46 content_encoding='utf-8')
47
48 log = logging.getLogger('celery.rhodecode.loader')
49
50
51 def add_preload_arguments(parser):
52 parser.add_argument(
53 '--ini', default=None,
54 help='Path to ini configuration file.'
55 )
56 parser.add_argument(
57 '--ini-var', default=None,
58 help='Comma separated list of key=value to pass to ini.'
59 )
60
61
62 def get_logger(obj):
63 custom_log = logging.getLogger(
64 'rhodecode.task.{}'.format(obj.__class__.__name__))
65
66 if rhodecode.CELERY_ENABLED:
67 try:
68 custom_log = obj.get_logger()
69 except Exception:
70 pass
71
72 return custom_log
73
74
75 base_celery_config = {
76 'result_backend': 'rpc://',
77 'result_expires': 60 * 60 * 24,
78 'result_persistent': True,
79 'imports': [],
80 'worker_max_tasks_per_child': 100,
81 'accept_content': ['json_ext'],
82 'task_serializer': 'json_ext',
83 'result_serializer': 'json_ext',
84 'worker_hijack_root_logger': False,
85 }
86 # init main celery app
87 celery_app = Celery()
88 celery_app.user_options['preload'].add(add_preload_arguments)
89 ini_file_glob = None
90
91
92 @signals.setup_logging.connect
93 def setup_logging_callback(**kwargs):
94 setup_logging(ini_file_glob)
95
96
97 @signals.user_preload_options.connect
98 def on_preload_parsed(options, **kwargs):
99 ini_location = options['ini']
100 ini_vars = options['ini_var']
101 celery_app.conf['INI_PYRAMID'] = options['ini']
102
103 if ini_location is None:
104 print('You must provide the paste --ini argument')
105 exit(-1)
106
107 options = None
108 if ini_vars is not None:
109 options = parse_ini_vars(ini_vars)
110
111 global ini_file_glob
112 ini_file_glob = ini_location
113
114 log.debug('Bootstrapping RhodeCode application...')
115 env = bootstrap(ini_location, options=options)
116
117 setup_celery_app(
118 app=env['app'], root=env['root'], request=env['request'],
119 registry=env['registry'], closer=env['closer'],
120 ini_location=ini_location)
121
122 # fix the global flag even if it's disabled via .ini file because this
123 # is a worker code that doesn't need this to be disabled.
124 rhodecode.CELERY_ENABLED = True
125
126
127 @signals.task_success.connect
128 def task_success_signal(result, **kwargs):
129 meta.Session.commit()
130 celery_app.conf['PYRAMID_CLOSER']()
131
132
133 @signals.task_retry.connect
134 def task_retry_signal(
135 request, reason, einfo, **kwargs):
136 meta.Session.remove()
137 celery_app.conf['PYRAMID_CLOSER']()
138
139
140 @signals.task_failure.connect
141 def task_failure_signal(
142 task_id, exception, args, kwargs, traceback, einfo, **kargs):
143 meta.Session.remove()
144 celery_app.conf['PYRAMID_CLOSER']()
145
146
147 @signals.task_revoked.connect
148 def task_revoked_signal(
149 request, terminated, signum, expired, **kwargs):
150 celery_app.conf['PYRAMID_CLOSER']()
151
152
153 def setup_celery_app(app, root, request, registry, closer, ini_location):
154 ini_dir = os.path.dirname(os.path.abspath(ini_location))
155 celery_config = base_celery_config
156 celery_config.update({
157 # store celerybeat scheduler db where the .ini file is
158 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
159 })
160 ini_settings = get_ini_config(ini_location)
161 log.debug('Got custom celery conf: %s', ini_settings)
162
163 celery_config.update(ini_settings)
164 celery_app.config_from_object(celery_config)
165
166 celery_app.conf.update({'PYRAMID_APP': app})
167 celery_app.conf.update({'PYRAMID_ROOT': root})
168 celery_app.conf.update({'PYRAMID_REQUEST': request})
169 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
170 celery_app.conf.update({'PYRAMID_CLOSER': closer})
171
172
173 def configure_celery(config, ini_location):
174 """
175 Helper that is called from our application creation logic. It gives
176 connection info into running webapp and allows execution of tasks from
177 RhodeCode itself
178 """
179 # store some globals into rhodecode
180 rhodecode.CELERY_ENABLED = str2bool(
181 config.registry.settings.get('use_celery'))
182 if rhodecode.CELERY_ENABLED:
183 log.info('Configuring celery based on `%s` file', ini_location)
184 setup_celery_app(
185 app=None, root=None, request=None, registry=config.registry,
186 closer=None, ini_location=ini_location)
187
188
189 class RequestContextTask(Task):
190 """
191 This is a celery task which will create a rhodecode app instance context
192 for the task, patch pyramid with the original request
193 that created the task and also add the user to the context.
194 """
195
196 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
197 link=None, link_error=None, shadow=None, **options):
198 """ queue the job to run (we are in web request context here) """
199
200 req = get_current_request()
201
202 # web case
203 if hasattr(req, 'user'):
204 ip_addr = req.user.ip_addr
205 user_id = req.user.user_id
206
207 # api case
208 elif hasattr(req, 'rpc_user'):
209 ip_addr = req.rpc_user.ip_addr
210 user_id = req.rpc_user.user_id
211 else:
212 raise Exception(
213 'Unable to fetch required data from request: {}. \n'
214 'This task is required to be executed from context of '
215 'request in a webapp'.format(repr(req)))
216
217 if req:
218 # we hook into kwargs since it is the only way to pass our data to
219 # the celery worker
220 options['headers'] = options.get('headers', {})
221 options['headers'].update({
222 'rhodecode_proxy_data': {
223 'environ': {
224 'PATH_INFO': req.environ['PATH_INFO'],
225 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
226 'HTTP_HOST': req.environ.get('HTTP_HOST',
227 req.environ['SERVER_NAME']),
228 'SERVER_NAME': req.environ['SERVER_NAME'],
229 'SERVER_PORT': req.environ['SERVER_PORT'],
230 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
231 },
232 'auth_user': {
233 'ip_addr': ip_addr,
234 'user_id': user_id
235 },
236 }
237 })
238
239 return super(RequestContextTask, self).apply_async(
240 args, kwargs, task_id, producer, link, link_error, shadow, **options)
241
242 def __call__(self, *args, **kwargs):
243 """ rebuild the context and then run task on celery worker """
244
245 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
246 if not proxy_data:
247 return super(RequestContextTask, self).__call__(*args, **kwargs)
248
249 log.debug('using celery proxy data to run task: %r', proxy_data)
250 # re-inject and register threadlocals for proper routing support
251 request = prepare_request(proxy_data['environ'])
252 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
253 ip_addr=proxy_data['auth_user']['ip_addr'])
254
255 return super(RequestContextTask, self).__call__(*args, **kwargs)
256
@@ -0,0 +1,156 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import os
22 import json
23 import logging
24 import datetime
25
26 from functools import partial
27
28 from pyramid.compat import configparser
29 from celery.result import AsyncResult
30 import celery.loaders.base
31 import celery.schedules
32
33
34 log = logging.getLogger(__name__)
35
36
37 def get_task_id(task):
38 task_id = None
39 if isinstance(task, AsyncResult):
40 task_id = task.task_id
41
42 return task_id
43
44
45 def crontab(value):
46 return celery.schedules.crontab(**value)
47
48
49 def timedelta(value):
50 return datetime.timedelta(**value)
51
52
53 def safe_json(get, section, key):
54 value = ''
55 try:
56 value = get(key)
57 json_value = json.loads(value)
58 except ValueError:
59 msg = 'The %s=%s is not valid json in section %s' % (
60 key, value, section
61 )
62 raise ValueError(msg)
63
64 return json_value
65
66
67 def get_beat_config(parser, section):
68 SCHEDULE_TYPE_MAP = {
69 'crontab': crontab,
70 'timedelta': timedelta,
71 'integer': int
72 }
73 get = partial(parser.get, section)
74 has_option = partial(parser.has_option, section)
75
76 schedule_type = get('type')
77 schedule_value = safe_json(get, section, 'schedule')
78
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 config = {
92 'task': get('task'),
93 'schedule': schedule,
94 }
95
96 if has_option('args'):
97 config['args'] = safe_json(get, section, 'args')
98
99 if has_option('kwargs'):
100 config['kwargs'] = safe_json(get, section, 'kwargs')
101
102 return config
103
104
105 def get_ini_config(ini_location):
106 """
107 Converts basic ini configuration into celery 4.X options
108 """
109 def key_converter(key_name):
110 pref = 'celery.'
111 if key_name.startswith(pref):
112 return key_name[len(pref):].replace('.', '_').lower()
113
114 def type_converter(parsed_key, value):
115 # cast to int
116 if value.isdigit():
117 return int(value)
118
119 # cast to bool
120 if value.lower() in ['true', 'false', 'True', 'False']:
121 return value.lower() == 'true'
122 return value
123
124 parser = configparser.SafeConfigParser(
125 defaults={'here': os.path.abspath(ini_location)})
126 parser.read(ini_location)
127
128 ini_config = {}
129 for k, v in parser.items('app:main'):
130 pref = 'celery.'
131 if k.startswith(pref):
132 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
133
134 beat_config = {}
135 for section in parser.sections():
136 if section.startswith('celerybeat:'):
137 name = section.split(':', 1)[1]
138 beat_config[name] = get_beat_config(parser, section)
139
140 # final compose of settings
141 celery_settings = {}
142
143 if ini_config:
144 celery_settings.update(ini_config)
145 if beat_config:
146 celery_settings.update({'beat_schedule': beat_config})
147
148 return celery_settings
149
150
151 def parse_ini_vars(ini_vars):
152 options = {}
153 for pairs in ini_vars.split(','):
154 key, value = pairs.split('=')
155 options[key] = value
156 return options
@@ -296,28 +296,15 b' labs_settings_active = true'
296 296 ### CELERY CONFIG ####
297 297 ####################################
298 298 use_celery = false
299 broker.host = localhost
300 broker.vhost = rabbitmqhost
301 broker.port = 5672
302 broker.user = rabbitmq
303 broker.password = qweqwe
304
305 celery.imports = rhodecode.lib.celerylib.tasks
306 299
307 celery.result.backend = amqp
308 celery.result.dburi = amqp://
309 celery.result.serialier = json
300 # connection url to the message broker (default rabbitmq)
301 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
310 302
311 #celery.send.task.error.emails = true
312 #celery.amqp.task.result.expires = 18000
313
314 celeryd.concurrency = 2
315 #celeryd.log.file = celeryd.log
316 celeryd.log.level = debug
317 celeryd.max.tasks.per.child = 1
303 # maximum tasks to execute before worker restart
304 celery.max_tasks_per_child = 100
318 305
319 306 ## tasks will never be sent to the queue, but executed locally instead.
320 celery.always.eager = false
307 celery.task_always_eager = false
321 308
322 309 ####################################
323 310 ### BEAKER CACHE ####
@@ -649,7 +636,7 b' custom.conf = 1'
649 636 ### LOGGING CONFIGURATION ####
650 637 ################################
651 638 [loggers]
652 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
639 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
653 640
654 641 [handlers]
655 642 keys = console, console_sql
@@ -688,6 +675,11 b' handlers ='
688 675 qualname = ssh_wrapper
689 676 propagate = 1
690 677
678 [logger_celery]
679 level = DEBUG
680 handlers =
681 qualname = celery
682
691 683
692 684 ##############
693 685 ## HANDLERS ##
@@ -271,28 +271,15 b' labs_settings_active = true'
271 271 ### CELERY CONFIG ####
272 272 ####################################
273 273 use_celery = false
274 broker.host = localhost
275 broker.vhost = rabbitmqhost
276 broker.port = 5672
277 broker.user = rabbitmq
278 broker.password = qweqwe
279
280 celery.imports = rhodecode.lib.celerylib.tasks
281 274
282 celery.result.backend = amqp
283 celery.result.dburi = amqp://
284 celery.result.serialier = json
275 # connection url to the message broker (default rabbitmq)
276 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
285 277
286 #celery.send.task.error.emails = true
287 #celery.amqp.task.result.expires = 18000
288
289 celeryd.concurrency = 2
290 #celeryd.log.file = celeryd.log
291 celeryd.log.level = debug
292 celeryd.max.tasks.per.child = 1
278 # maximum tasks to execute before worker restart
279 celery.max_tasks_per_child = 100
293 280
294 281 ## tasks will never be sent to the queue, but executed locally instead.
295 celery.always.eager = false
282 celery.task_always_eager = false
296 283
297 284 ####################################
298 285 ### BEAKER CACHE ####
@@ -619,7 +606,7 b' custom.conf = 1'
619 606 ### LOGGING CONFIGURATION ####
620 607 ################################
621 608 [loggers]
622 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
609 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
623 610
624 611 [handlers]
625 612 keys = console, console_sql
@@ -658,6 +645,11 b' handlers ='
658 645 qualname = ssh_wrapper
659 646 propagate = 1
660 647
648 [logger_celery]
649 level = DEBUG
650 handlers =
651 qualname = celery
652
661 653
662 654 ##############
663 655 ## HANDLERS ##
@@ -32,6 +32,7 b' from rhodecode.api.utils import ('
32 32 from rhodecode.lib import audit_logger
33 33 from rhodecode.lib import repo_maintenance
34 34 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
35 from rhodecode.lib.celerylib.utils import get_task_id
35 36 from rhodecode.lib.utils2 import str2bool, time_to_datetime
36 37 from rhodecode.lib.ext_json import json
37 38 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
@@ -712,10 +713,7 b' def create_repo('
712 713 }
713 714
714 715 task = RepoModel().create(form_data=data, cur_user=owner)
715 from celery.result import BaseAsyncResult
716 task_id = None
717 if isinstance(task, BaseAsyncResult):
718 task_id = task.task_id
716 task_id = get_task_id(task)
719 717 # no commit, it's done in RepoModel, or async via celery
720 718 return {
721 719 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
@@ -1105,10 +1103,8 b' def fork_repo(request, apiuser, repoid, '
1105 1103
1106 1104 task = RepoModel().create_fork(data, cur_user=owner)
1107 1105 # no commit, it's done in RepoModel, or async via celery
1108 from celery.result import BaseAsyncResult
1109 task_id = None
1110 if isinstance(task, BaseAsyncResult):
1111 task_id = task.task_id
1106 task_id = get_task_id(task)
1107
1112 1108 return {
1113 1109 'msg': 'Created fork of `%s` as `%s`' % (
1114 1110 repo.repo_name, schema_data['repo_name']),
@@ -28,6 +28,7 b' from pyramid.renderers import render'
28 28 from pyramid.response import Response
29 29
30 30 from rhodecode.apps._base import BaseAppView, DataGridAppView
31 from rhodecode.lib.celerylib.utils import get_task_id
31 32
32 33 from rhodecode.lib.ext_json import json
33 34 from rhodecode.lib.auth import (
@@ -143,22 +144,19 b' class AdminReposView(BaseAppView, DataGr'
143 144 c = self.load_default_context()
144 145
145 146 form_result = {}
147 self._load_form_data(c)
146 148 task_id = None
147 self._load_form_data(c)
148
149 149 try:
150 150 # CanWriteToGroup validators checks permissions of this POST
151 151 form = RepoForm(
152 152 self.request.translate, repo_groups=c.repo_groups_choices,
153 153 landing_revs=c.landing_revs_choices)()
154 form_results = form.to_python(dict(self.request.POST))
154 form_result = form.to_python(dict(self.request.POST))
155 155
156 156 # create is done sometimes async on celery, db transaction
157 157 # management is handled there.
158 158 task = RepoModel().create(form_result, self._rhodecode_user.user_id)
159 from celery.result import BaseAsyncResult
160 if isinstance(task, BaseAsyncResult):
161 task_id = task.task_id
159 task_id = get_task_id(task)
162 160 except formencode.Invalid as errors:
163 161 data = render('rhodecode:templates/admin/repos/repo_add.mako',
164 162 self._get_template_context(c), self.request)
@@ -46,11 +46,9 b' class RepoChecksView(BaseAppView):'
46 46
47 47 repo_name = self.request.matchdict['repo_name']
48 48 db_repo = Repository.get_by_repo_name(repo_name)
49 if not db_repo:
50 raise HTTPNotFound()
51 49
52 50 # check if maybe repo is already created
53 if db_repo.repo_state in [Repository.STATE_CREATED]:
51 if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]:
54 52 # re-check permissions before redirecting to prevent resource
55 53 # discovery by checking the 302 code
56 54 perm_set = ['repository.read', 'repository.write', 'repository.admin']
@@ -80,9 +78,10 b' class RepoChecksView(BaseAppView):'
80 78
81 79 if task_id and task_id not in ['None']:
82 80 import rhodecode
83 from celery.result import AsyncResult
81 from rhodecode.lib.celerylib.loader import celery_app
84 82 if rhodecode.CELERY_ENABLED:
85 task = AsyncResult(task_id)
83 task = celery_app.AsyncResult(task_id)
84 task.get()
86 85 if task.failed():
87 86 msg = self._log_creation_exception(task.result, repo_name)
88 87 h.flash(msg, category='error')
@@ -33,6 +33,7 b' from rhodecode.lib.auth import ('
33 33 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
34 34 HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
35 35 import rhodecode.lib.helpers as h
36 from rhodecode.lib.celerylib.utils import get_task_id
36 37 from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
37 38 from rhodecode.model.repo import RepoModel
38 39 from rhodecode.model.forms import RepoForkForm
@@ -226,9 +227,8 b' class RepoForksView(RepoAppView, DataGri'
226 227 # management is handled there.
227 228 task = RepoModel().create_fork(
228 229 form_result, c.rhodecode_user.user_id)
229 from celery.result import BaseAsyncResult
230 if isinstance(task, BaseAsyncResult):
231 task_id = task.task_id
230
231 task_id = get_task_id(task)
232 232 except formencode.Invalid as errors:
233 233 c.rhodecode_db_repo = self.db_repo
234 234
@@ -23,14 +23,6 b' import os'
23 23 import logging
24 24 import rhodecode
25 25
26 # ------------------------------------------------------------------------------
27 # CELERY magic until refactor - issue #4163 - import order matters here:
28 #from rhodecode.lib import celerypylons # this must be first, celerypylons
29 # sets config settings upon import
30
31 import rhodecode.integrations # any modules using celery task
32 # decorators should be added afterwards:
33 # ------------------------------------------------------------------------------
34 26
35 27 from rhodecode.config import utils
36 28
@@ -54,14 +46,6 b' def load_pyramid_environment(global_conf'
54 46 'secret': settings_merged.get('channelstream.secret')
55 47 }
56 48
57
58 # TODO(marcink): celery
59 # # store some globals into rhodecode
60 # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery'))
61 # rhodecode.CELERY_EAGER = str2bool(
62 # config['app_conf'].get('celery.always.eager'))
63
64
65 49 # If this is a test run we prepare the test environment like
66 50 # creating a test database, test search index and test repositories.
67 51 # This has to be done before the database connection is initialized.
@@ -189,7 +189,7 b''
189 189 "python2.7-jupyter-core-4.3.0": {
190 190 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
191 191 },
192 "python2.7-kombu-1.5.1": {
192 "python2.7-kombu-4.1.0": {
193 193 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
194 194 },
195 195 "python2.7-mistune-0.7.4": {
@@ -42,6 +42,7 b' from rhodecode.lib.vcs import VCSCommuni'
42 42 from rhodecode.lib.exceptions import VCSServerUnavailable
43 43 from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
44 44 from rhodecode.lib.middleware.https_fixup import HttpsFixup
45 from rhodecode.lib.celerylib.loader import configure_celery
45 46 from rhodecode.lib.plugins.utils import register_rhodecode_plugin
46 47 from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
47 48 from rhodecode.subscribers import (
@@ -87,9 +88,11 b' def make_pyramid_app(global_config, **se'
87 88 pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
88 89 pyramid_app.config = config
89 90
91 config.configure_celery(global_config['__file__'])
90 92 # creating the app uses a connection - return it after we are done
91 93 meta.Session.remove()
92 94
95 log.info('Pyramid app %s created and configured.', pyramid_app)
93 96 return pyramid_app
94 97
95 98
@@ -196,6 +199,8 b' def includeme(config):'
196 199 config.add_directive(
197 200 'register_rhodecode_plugin', register_rhodecode_plugin)
198 201
202 config.add_directive('configure_celery', configure_celery)
203
199 204 if asbool(settings.get('appenlight', 'false')):
200 205 config.include('appenlight_client.ext.pyramid_tween')
201 206
@@ -20,18 +20,16 b''
20 20
21 21 from __future__ import unicode_literals
22 22 import deform
23 import re
24 23 import logging
25 24 import requests
26 25 import colander
27 26 import textwrap
28 from celery.task import task
29 27 from mako.template import Template
30 28
31 29 from rhodecode import events
32 30 from rhodecode.translation import _
33 31 from rhodecode.lib import helpers as h
34 from rhodecode.lib.celerylib import run_task
32 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
35 33 from rhodecode.lib.colander_utils import strip_whitespace
36 34 from rhodecode.integrations.types.base import IntegrationTypeBase
37 35
@@ -243,7 +241,7 b' class HipchatIntegrationType(Integration'
243 241 )
244 242
245 243
246 @task(ignore_result=True)
244 @async_task(ignore_result=True, base=RequestContextTask)
247 245 def post_text_to_hipchat(settings, text):
248 246 log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
249 247 resp = requests.post(settings['server_url'], json={
@@ -27,13 +27,12 b' import logging'
27 27 import deform
28 28 import requests
29 29 import colander
30 from celery.task import task
31 30 from mako.template import Template
32 31
33 32 from rhodecode import events
34 33 from rhodecode.translation import _
35 34 from rhodecode.lib import helpers as h
36 from rhodecode.lib.celerylib import run_task
35 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
37 36 from rhodecode.lib.colander_utils import strip_whitespace
38 37 from rhodecode.integrations.types.base import IntegrationTypeBase
39 38
@@ -296,7 +295,7 b' def html_to_slack_links(message):'
296 295 r'<\1|\2>', message)
297 296
298 297
299 @task(ignore_result=True)
298 @async_task(ignore_result=True, base=RequestContextTask)
300 299 def post_text_to_slack(settings, title, text, fields=None, overrides=None):
301 300 log.debug('sending %s (%s) to slack %s' % (
302 301 title, text, settings['service']))
@@ -28,16 +28,17 b' import logging'
28 28 import requests
29 29 import requests.adapters
30 30 import colander
31 from celery.task import task
32 31 from requests.packages.urllib3.util.retry import Retry
33 32
34 33 import rhodecode
35 34 from rhodecode import events
36 35 from rhodecode.translation import _
37 36 from rhodecode.integrations.types.base import IntegrationTypeBase
37 from rhodecode.lib.celerylib import async_task, RequestContextTask
38 38
39 39 log = logging.getLogger(__name__)
40 40
41
41 42 # updating this required to update the `common_vars` passed in url calling func
42 43 WEBHOOK_URL_VARS = [
43 44 'repo_name',
@@ -315,7 +316,7 b' class WebhookIntegrationType(Integration'
315 316 post_to_webhook(url_calls, self.settings)
316 317
317 318
318 @task(ignore_result=True)
319 @async_task(ignore_result=True, base=RequestContextTask)
319 320 def post_to_webhook(url_calls, settings):
320 321 max_retries = 3
321 322 retries = Retry(
@@ -17,36 +17,17 b''
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 celery libs for RhodeCode
22 """
23 20
24
25 import pylons
26 21 import socket
27 22 import logging
28 23
29 24 import rhodecode
30
31 from os.path import join as jn
32 from pylons import config
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
37
38 from decorator import decorator
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
39 28
40 from zope.cachedescriptors.property import Lazy as LazyProperty
29 async_task = celery_app.task
41 30
42 from rhodecode.config import utils
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 from rhodecode.lib.vcs import connect_vcs
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
50 31
51 32 log = logging.getLogger(__name__)
52 33
@@ -60,95 +41,13 b' class ResultWrapper(object):'
60 41 return self.task
61 42
62 43
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 if hasattr(request, 'user'):
80 ip_addr = request.user.ip_addr
81 user_id = request.user.user_id
82 elif hasattr(request, 'rpc_params'):
83 # TODO(marcink) remove when migration is finished
84 # api specific call on Pyramid.
85 ip_addr = request.rpc_params['apiuser'].ip_addr
86 user_id = request.rpc_params['apiuser'].user_id
87 else:
88 raise Exception('Unable to fetch data from request: {}'.format(
89 request))
90
91 if request:
92 # we hook into kwargs since it is the only way to pass our data to
93 # the celery worker in celery 2.2
94 kwargs.update({
95 '_rhodecode_proxy_data': {
96 'environ': {
97 'PATH_INFO': request.environ['PATH_INFO'],
98 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
99 'HTTP_HOST': request.environ.get('HTTP_HOST',
100 request.environ['SERVER_NAME']),
101 'SERVER_NAME': request.environ['SERVER_NAME'],
102 'SERVER_PORT': request.environ['SERVER_PORT'],
103 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
104 },
105 'auth_user': {
106 'ip_addr': ip_addr,
107 'user_id': user_id
108 },
109 }
110 })
111 return super(RhodecodeCeleryTask, self).apply_async(
112 args, kwargs, task_id, producer, link, link_error, **options)
113
114 def __call__(self, *args, **kwargs):
115 """ rebuild the context and then run task on celery worker """
116 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
117
118 if not proxy_data:
119 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
120
121 log.debug('using celery proxy data to run task: %r', proxy_data)
122
123 from rhodecode.config.routing import make_map
124
125 request = Request.blank('/', environ=proxy_data['environ'])
126 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
127 ip_addr=proxy_data['auth_user']['ip_addr'])
128
129 pyramid_request = prepare(request) # set pyramid threadlocal request
130
131 # pylons routing
132 if not rhodecode.CONFIG.get('routes.map'):
133 rhodecode.CONFIG['routes.map'] = make_map(config)
134 pylons.url._push_object(get_routes_generator_for_server_url(
135 get_server_url(request.environ)
136 ))
137
138 try:
139 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
140 finally:
141 pyramid_request['closer']()
142 pylons.url._pop_object()
143
144
145 44 def run_task(task, *args, **kwargs):
146 45 if rhodecode.CELERY_ENABLED:
147 46 celery_is_up = False
148 47 try:
149 48 t = task.apply_async(args=args, kwargs=kwargs)
150 log.info('running task %s:%s', t.task_id, task)
151 49 celery_is_up = True
50 log.debug('executing task %s:%s in async mode', t.task_id, task)
152 51 return t
153 52
154 53 except socket.error as e:
@@ -164,73 +63,10 b' def run_task(task, *args, **kwargs):'
164 63 "Fallback to sync execution.")
165 64
166 65 # keep in mind there maybe a subtle race condition where something
167 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
66 # depending on rhodecode.CELERY_ENABLED
168 67 # will see CELERY_ENABLED as True before this has a chance to set False
169 68 rhodecode.CELERY_ENABLED = celery_is_up
170 69 else:
171 log.debug('executing task %s in sync mode', task)
172 return ResultWrapper(task(*args, **kwargs))
173
174
175 def __get_lockkey(func, *fargs, **fkwargs):
176 params = list(fargs)
177 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
178
179 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
180 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
181 return 'task_%s.lock' % (md5_safe(_lock_key),)
182
183
184 def locked_task(func):
185 def __wrapper(func, *fargs, **fkwargs):
186 lockkey = __get_lockkey(func, *fargs, **fkwargs)
187 lockkey_path = config['app_conf']['cache_dir']
188
189 log.info('running task with lockkey %s' % lockkey)
190 try:
191 l = DaemonLock(file_=jn(lockkey_path, lockkey))
192 ret = func(*fargs, **fkwargs)
193 l.release()
194 return ret
195 except LockHeld:
196 log.info('LockHeld')
197 return 'Task with key %s already running' % lockkey
198
199 return decorator(__wrapper, func)
200
70 log.debug('executing task %s:%s in sync mode', 'TASK', task)
201 71
202 def get_session():
203 if rhodecode.CELERY_ENABLED:
204 utils.initialize_database(config)
205 sa = meta.Session()
206 return sa
207
208
209 def dbsession(func):
210 def __wrapper(func, *fargs, **fkwargs):
211 try:
212 ret = func(*fargs, **fkwargs)
213 return ret
214 finally:
215 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
216 meta.Session.remove()
217
218 return decorator(__wrapper, func)
219
220
221 def vcsconnection(func):
222 def __wrapper(func, *fargs, **fkwargs):
223 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
224 settings = rhodecode.PYRAMID_SETTINGS
225 backends = settings['vcs.backends']
226 for alias in rhodecode.BACKENDS.keys():
227 if alias not in backends:
228 del rhodecode.BACKENDS[alias]
229 utils.configure_vcs(settings)
230 connect_vcs(
231 settings['vcs.server'],
232 utils.get_vcs_server_protocol(settings))
233 ret = func(*fargs, **fkwargs)
234 return ret
235
236 return decorator(__wrapper, func)
72 return ResultWrapper(task(*args, **kwargs))
@@ -23,38 +23,18 b' RhodeCode task modules, containing all t'
23 23 by celery daemon
24 24 """
25 25
26
27 26 import os
28 import logging
29
30 from celery.task import task
31 27
32 28 import rhodecode
33 29 from rhodecode.lib import audit_logger
34 from rhodecode.lib.celerylib import (
35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
36 get_session, vcsconnection, RhodecodeCeleryTask)
30 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
37 31 from rhodecode.lib.hooks_base import log_create_repository
38 32 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
39 from rhodecode.lib.utils import add_cache
40 33 from rhodecode.lib.utils2 import safe_int, str2bool
41 from rhodecode.model.db import Repository, User
34 from rhodecode.model.db import Session, Repository, User
42 35
43 36
44 def get_logger(cls):
45 if rhodecode.CELERY_ENABLED:
46 try:
47 log = cls.get_logger()
48 except Exception:
49 log = logging.getLogger(__name__)
50 else:
51 log = logging.getLogger(__name__)
52
53 return log
54
55
56 @task(ignore_result=True, base=RhodecodeCeleryTask)
57 @dbsession
37 @async_task(ignore_result=True, base=RequestContextTask)
58 38 def send_email(recipients, subject, body='', html_body='', email_config=None):
59 39 """
60 40 Sends an email with defined parameters from the .ini files.
@@ -101,18 +81,15 b' def send_email(recipients, subject, body'
101 81 return True
102 82
103 83
104 @task(ignore_result=True, base=RhodecodeCeleryTask)
105 @dbsession
106 @vcsconnection
84 @async_task(ignore_result=True, base=RequestContextTask)
107 85 def create_repo(form_data, cur_user):
108 86 from rhodecode.model.repo import RepoModel
109 87 from rhodecode.model.user import UserModel
110 88 from rhodecode.model.settings import SettingsModel
111 89
112 90 log = get_logger(create_repo)
113 DBS = get_session()
114 91
115 cur_user = UserModel(DBS)._get_user(cur_user)
92 cur_user = UserModel()._get_user(cur_user)
116 93 owner = cur_user
117 94
118 95 repo_name = form_data['repo_name']
@@ -138,7 +115,7 b' def create_repo(form_data, cur_user):'
138 115 'enable_downloads', defs.get('repo_enable_downloads'))
139 116
140 117 try:
141 repo = RepoModel(DBS)._create_repo(
118 repo = RepoModel()._create_repo(
142 119 repo_name=repo_name_full,
143 120 repo_type=repo_type,
144 121 description=description,
@@ -155,13 +132,13 b' def create_repo(form_data, cur_user):'
155 132 enable_downloads=enable_downloads,
156 133 state=state
157 134 )
158 DBS.commit()
135 Session().commit()
159 136
160 137 # now create this repo on Filesystem
161 RepoModel(DBS)._create_filesystem_repo(
138 RepoModel()._create_filesystem_repo(
162 139 repo_name=repo_name,
163 140 repo_type=repo_type,
164 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
141 repo_group=RepoModel()._get_repo_group(repo_group),
165 142 clone_uri=clone_uri,
166 143 )
167 144 repo = Repository.get_by_repo_name(repo_name_full)
@@ -180,7 +157,7 b' def create_repo(form_data, cur_user):'
180 157 user=cur_user,
181 158 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
182 159
183 DBS.commit()
160 Session().commit()
184 161 except Exception:
185 162 log.warning('Exception occurred when creating repository, '
186 163 'doing cleanup...', exc_info=True)
@@ -188,8 +165,8 b' def create_repo(form_data, cur_user):'
188 165 repo = Repository.get_by_repo_name(repo_name_full)
189 166 if repo:
190 167 Repository.delete(repo.repo_id)
191 DBS.commit()
192 RepoModel(DBS)._delete_filesystem_repo(repo)
168 Session().commit()
169 RepoModel()._delete_filesystem_repo(repo)
193 170 raise
194 171
195 172 # it's an odd fix to make celery fail task when exception occurs
@@ -199,23 +176,17 b' def create_repo(form_data, cur_user):'
199 176 return True
200 177
201 178
202 @task(ignore_result=True, base=RhodecodeCeleryTask)
203 @dbsession
204 @vcsconnection
179 @async_task(ignore_result=True, base=RequestContextTask)
205 180 def create_repo_fork(form_data, cur_user):
206 181 """
207 182 Creates a fork of repository using internal VCS methods
208
209 :param form_data:
210 :param cur_user:
211 183 """
212 184 from rhodecode.model.repo import RepoModel
213 185 from rhodecode.model.user import UserModel
214 186
215 187 log = get_logger(create_repo_fork)
216 DBS = get_session()
217 188
218 cur_user = UserModel(DBS)._get_user(cur_user)
189 cur_user = UserModel()._get_user(cur_user)
219 190 owner = cur_user
220 191
221 192 repo_name = form_data['repo_name'] # fork in this case
@@ -230,8 +201,8 b' def create_repo_fork(form_data, cur_user'
230 201 fork_id = safe_int(form_data.get('fork_parent_id'))
231 202
232 203 try:
233 fork_of = RepoModel(DBS)._get_repo(fork_id)
234 RepoModel(DBS)._create_repo(
204 fork_of = RepoModel()._get_repo(fork_id)
205 RepoModel()._create_repo(
235 206 repo_name=repo_name_full,
236 207 repo_type=repo_type,
237 208 description=description,
@@ -244,16 +215,16 b' def create_repo_fork(form_data, cur_user'
244 215 copy_fork_permissions=copy_fork_permissions
245 216 )
246 217
247 DBS.commit()
218 Session().commit()
248 219
249 220 base_path = Repository.base_path()
250 221 source_repo_path = os.path.join(base_path, fork_of.repo_name)
251 222
252 223 # now create this repo on Filesystem
253 RepoModel(DBS)._create_filesystem_repo(
224 RepoModel()._create_filesystem_repo(
254 225 repo_name=repo_name,
255 226 repo_type=repo_type,
256 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
227 repo_group=RepoModel()._get_repo_group(repo_group),
257 228 clone_uri=source_repo_path,
258 229 )
259 230 repo = Repository.get_by_repo_name(repo_name_full)
@@ -274,7 +245,7 b' def create_repo_fork(form_data, cur_user'
274 245 user=cur_user,
275 246 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
276 247
277 DBS.commit()
248 Session().commit()
278 249 except Exception as e:
279 250 log.warning('Exception %s occurred when forking repository, '
280 251 'doing cleanup...', e)
@@ -282,8 +253,8 b' def create_repo_fork(form_data, cur_user'
282 253 repo = Repository.get_by_repo_name(repo_name_full)
283 254 if repo:
284 255 Repository.delete(repo.repo_id)
285 DBS.commit()
286 RepoModel(DBS)._delete_filesystem_repo(repo)
256 Session().commit()
257 RepoModel()._delete_filesystem_repo(repo)
287 258 raise
288 259
289 260 # it's an odd fix to make celery fail task when exception occurs
@@ -291,3 +262,14 b' def create_repo_fork(form_data, cur_user'
291 262 pass
292 263
293 264 return True
265
266
267 @async_task(ignore_result=True)
268 def sync_repo(*args, **kwargs):
269 from rhodecode.model.scm import ScmModel
270 log = get_logger(sync_repo)
271
272 log.info('Pulling from %s', kwargs['repo_name'])
273 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
274
275
@@ -22,6 +22,7 b' import os'
22 22 from pyramid.compat import configparser
23 23 from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa
24 24 from pyramid.request import Request
25 from pyramid.scripting import prepare
25 26
26 27
27 28 def get_config(ini_path, **kwargs):
@@ -47,3 +48,9 b' def bootstrap(config_uri, request=None, '
47 48 request = request or Request.blank('/', base_url=base_url)
48 49
49 50 return pyramid_bootstrap(config_uri, request=request, options=options)
51
52
53 def prepare_request(environ):
54 request = Request.blank('/', environ=environ)
55 prepare(request) # set pyramid threadlocal request
56 return request
@@ -38,7 +38,6 b' from os.path import join as jn'
38 38
39 39 import paste
40 40 import pkg_resources
41 from paste.script.command import Command, BadCommand
42 41 from webhelpers.text import collapse, remove_formatting, strip_tags
43 42 from mako import exceptions
44 43 from pyramid.threadlocal import get_current_registry
@@ -767,85 +766,6 b' def create_test_repositories(test_path, '
767 766 tar.extractall(jn(test_path, SVN_REPO))
768 767
769 768
770 #==============================================================================
771 # PASTER COMMANDS
772 #==============================================================================
773 class BasePasterCommand(Command):
774 """
775 Abstract Base Class for paster commands.
776
777 The celery commands are somewhat aggressive about loading
778 celery.conf, and since our module sets the `CELERY_LOADER`
779 environment variable to our loader, we have to bootstrap a bit and
780 make sure we've had a chance to load the pylons config off of the
781 command line, otherwise everything fails.
782 """
783 min_args = 1
784 min_args_error = "Please provide a paster config file as an argument."
785 takes_config_file = 1
786 requires_config_file = True
787
788 def notify_msg(self, msg, log=False):
789 """Make a notification to user, additionally if logger is passed
790 it logs this action using given logger
791
792 :param msg: message that will be printed to user
793 :param log: logging instance, to use to additionally log this message
794
795 """
796 if log and isinstance(log, logging):
797 log(msg)
798
799 def run(self, args):
800 """
801 Overrides Command.run
802
803 Checks for a config file argument and loads it.
804 """
805 if len(args) < self.min_args:
806 raise BadCommand(
807 self.min_args_error % {'min_args': self.min_args,
808 'actual_args': len(args)})
809
810 # Decrement because we're going to lob off the first argument.
811 # @@ This is hacky
812 self.min_args -= 1
813 self.bootstrap_config(args[0])
814 self.update_parser()
815 return super(BasePasterCommand, self).run(args[1:])
816
817 def update_parser(self):
818 """
819 Abstract method. Allows for the class' parser to be updated
820 before the superclass' `run` method is called. Necessary to
821 allow options/arguments to be passed through to the underlying
822 celery command.
823 """
824 raise NotImplementedError("Abstract Method.")
825
826 def bootstrap_config(self, conf):
827 """
828 Loads the pylons configuration.
829 """
830 from pylons import config as pylonsconfig
831
832 self.path_to_ini_file = os.path.realpath(conf)
833 conf = paste.deploy.appconfig('config:' + self.path_to_ini_file)
834 pylonsconfig.init_app(conf.global_conf, conf.local_conf)
835
836 def _init_session(self):
837 """
838 Inits SqlAlchemy Session
839 """
840 logging.config.fileConfig(self.path_to_ini_file)
841 from pylons import config
842 from rhodecode.config.utils import initialize_database
843
844 # get to remove repos !!
845 add_cache(config)
846 initialize_database(config)
847
848
849 769 def password_changed(auth_user, session):
850 770 # Never report password change in case of default user or anonymous user.
851 771 if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None:
@@ -23,7 +23,6 b''
23 23 Some simple helper functions
24 24 """
25 25
26
27 26 import collections
28 27 import datetime
29 28 import dateutil.relativedelta
@@ -42,7 +41,6 b' import sqlalchemy.engine.url'
42 41 import sqlalchemy.exc
43 42 import sqlalchemy.sql
44 43 import webob
45 import routes.util
46 44 import pyramid.threadlocal
47 45
48 46 import rhodecode
@@ -941,31 +939,6 b' class Optional(object):'
941 939 return val
942 940
943 941
944 def get_routes_generator_for_server_url(server_url):
945 parsed_url = urlobject.URLObject(server_url)
946 netloc = safe_str(parsed_url.netloc)
947 script_name = safe_str(parsed_url.path)
948
949 if ':' in netloc:
950 server_name, server_port = netloc.split(':')
951 else:
952 server_name = netloc
953 server_port = (parsed_url.scheme == 'https' and '443' or '80')
954
955 environ = {
956 'REQUEST_METHOD': 'GET',
957 'PATH_INFO': '/',
958 'SERVER_NAME': server_name,
959 'SERVER_PORT': server_port,
960 'SCRIPT_NAME': script_name,
961 }
962 if parsed_url.scheme == 'https':
963 environ['HTTPS'] = 'on'
964 environ['wsgi.url_scheme'] = 'https'
965
966 return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ)
967
968
969 942 def glob2re(pat):
970 943 """
971 944 Translate a shell PATTERN to a regular expression.
@@ -448,3 +448,9 b' class TestGetEnabledHooks(object):'
448 448 ui_settings = []
449 449 result = utils.get_enabled_hook_classes(ui_settings)
450 450 assert result == []
451
452
453 def test_obfuscate_url_pw():
454 from rhodecode.lib.utils2 import obfuscate_url_pw
455 engine = u'/home/repos/malmö'
456 assert obfuscate_url_pw(engine) No newline at end of file
@@ -108,7 +108,6 b' def pytest_addoption(parser):'
108 108
109 109
110 110 def pytest_configure(config):
111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 111 from rhodecode.config import patches
113 112
114 113
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now