##// END OF EJS Templates
celery: celery 4.X support. Fixes #4169...
marcink -
r2359:246f5a4c default
parent child
Show More
@@ -0,0 +1,256
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 Celery loader, run with::
22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
24 """
25 import os
26 import logging
27
28 from celery import Celery
29 from celery import signals
30 from celery import Task
31 from kombu.serialization import register
32 from pyramid.threadlocal import get_current_request
33
34 import rhodecode
35
36 from rhodecode.lib.auth import AuthUser
37 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
38 from rhodecode.lib.ext_json import json
39 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
40 from rhodecode.lib.utils2 import str2bool
41 from rhodecode.model import meta
42
43
44 register('json_ext', json.dumps, json.loads,
45 content_type='application/x-json-ext',
46 content_encoding='utf-8')
47
48 log = logging.getLogger('celery.rhodecode.loader')
49
50
51 def add_preload_arguments(parser):
52 parser.add_argument(
53 '--ini', default=None,
54 help='Path to ini configuration file.'
55 )
56 parser.add_argument(
57 '--ini-var', default=None,
58 help='Comma separated list of key=value to pass to ini.'
59 )
60
61
62 def get_logger(obj):
63 custom_log = logging.getLogger(
64 'rhodecode.task.{}'.format(obj.__class__.__name__))
65
66 if rhodecode.CELERY_ENABLED:
67 try:
68 custom_log = obj.get_logger()
69 except Exception:
70 pass
71
72 return custom_log
73
74
75 base_celery_config = {
76 'result_backend': 'rpc://',
77 'result_expires': 60 * 60 * 24,
78 'result_persistent': True,
79 'imports': [],
80 'worker_max_tasks_per_child': 100,
81 'accept_content': ['json_ext'],
82 'task_serializer': 'json_ext',
83 'result_serializer': 'json_ext',
84 'worker_hijack_root_logger': False,
85 }
86 # init main celery app
87 celery_app = Celery()
88 celery_app.user_options['preload'].add(add_preload_arguments)
89 ini_file_glob = None
90
91
92 @signals.setup_logging.connect
93 def setup_logging_callback(**kwargs):
94 setup_logging(ini_file_glob)
95
96
97 @signals.user_preload_options.connect
98 def on_preload_parsed(options, **kwargs):
99 ini_location = options['ini']
100 ini_vars = options['ini_var']
101 celery_app.conf['INI_PYRAMID'] = options['ini']
102
103 if ini_location is None:
104 print('You must provide the paste --ini argument')
105 exit(-1)
106
107 options = None
108 if ini_vars is not None:
109 options = parse_ini_vars(ini_vars)
110
111 global ini_file_glob
112 ini_file_glob = ini_location
113
114 log.debug('Bootstrapping RhodeCode application...')
115 env = bootstrap(ini_location, options=options)
116
117 setup_celery_app(
118 app=env['app'], root=env['root'], request=env['request'],
119 registry=env['registry'], closer=env['closer'],
120 ini_location=ini_location)
121
122 # fix the global flag even if it's disabled via .ini file because this
123 # is a worker code that doesn't need this to be disabled.
124 rhodecode.CELERY_ENABLED = True
125
126
127 @signals.task_success.connect
128 def task_success_signal(result, **kwargs):
129 meta.Session.commit()
130 celery_app.conf['PYRAMID_CLOSER']()
131
132
133 @signals.task_retry.connect
134 def task_retry_signal(
135 request, reason, einfo, **kwargs):
136 meta.Session.remove()
137 celery_app.conf['PYRAMID_CLOSER']()
138
139
140 @signals.task_failure.connect
141 def task_failure_signal(
142 task_id, exception, args, kwargs, traceback, einfo, **kargs):
143 meta.Session.remove()
144 celery_app.conf['PYRAMID_CLOSER']()
145
146
147 @signals.task_revoked.connect
148 def task_revoked_signal(
149 request, terminated, signum, expired, **kwargs):
150 celery_app.conf['PYRAMID_CLOSER']()
151
152
153 def setup_celery_app(app, root, request, registry, closer, ini_location):
154 ini_dir = os.path.dirname(os.path.abspath(ini_location))
155 celery_config = base_celery_config
156 celery_config.update({
157 # store celerybeat scheduler db where the .ini file is
158 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
159 })
160 ini_settings = get_ini_config(ini_location)
161 log.debug('Got custom celery conf: %s', ini_settings)
162
163 celery_config.update(ini_settings)
164 celery_app.config_from_object(celery_config)
165
166 celery_app.conf.update({'PYRAMID_APP': app})
167 celery_app.conf.update({'PYRAMID_ROOT': root})
168 celery_app.conf.update({'PYRAMID_REQUEST': request})
169 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
170 celery_app.conf.update({'PYRAMID_CLOSER': closer})
171
172
173 def configure_celery(config, ini_location):
174 """
175 Helper that is called from our application creation logic. It gives
176 connection info into running webapp and allows execution of tasks from
177 RhodeCode itself
178 """
179 # store some globals into rhodecode
180 rhodecode.CELERY_ENABLED = str2bool(
181 config.registry.settings.get('use_celery'))
182 if rhodecode.CELERY_ENABLED:
183 log.info('Configuring celery based on `%s` file', ini_location)
184 setup_celery_app(
185 app=None, root=None, request=None, registry=config.registry,
186 closer=None, ini_location=ini_location)
187
188
189 class RequestContextTask(Task):
190 """
191 This is a celery task which will create a rhodecode app instance context
192 for the task, patch pyramid with the original request
193 that created the task and also add the user to the context.
194 """
195
196 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
197 link=None, link_error=None, shadow=None, **options):
198 """ queue the job to run (we are in web request context here) """
199
200 req = get_current_request()
201
202 # web case
203 if hasattr(req, 'user'):
204 ip_addr = req.user.ip_addr
205 user_id = req.user.user_id
206
207 # api case
208 elif hasattr(req, 'rpc_user'):
209 ip_addr = req.rpc_user.ip_addr
210 user_id = req.rpc_user.user_id
211 else:
212 raise Exception(
213 'Unable to fetch required data from request: {}. \n'
214 'This task is required to be executed from context of '
215 'request in a webapp'.format(repr(req)))
216
217 if req:
218 # we hook into kwargs since it is the only way to pass our data to
219 # the celery worker
220 options['headers'] = options.get('headers', {})
221 options['headers'].update({
222 'rhodecode_proxy_data': {
223 'environ': {
224 'PATH_INFO': req.environ['PATH_INFO'],
225 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
226 'HTTP_HOST': req.environ.get('HTTP_HOST',
227 req.environ['SERVER_NAME']),
228 'SERVER_NAME': req.environ['SERVER_NAME'],
229 'SERVER_PORT': req.environ['SERVER_PORT'],
230 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
231 },
232 'auth_user': {
233 'ip_addr': ip_addr,
234 'user_id': user_id
235 },
236 }
237 })
238
239 return super(RequestContextTask, self).apply_async(
240 args, kwargs, task_id, producer, link, link_error, shadow, **options)
241
242 def __call__(self, *args, **kwargs):
243 """ rebuild the context and then run task on celery worker """
244
245 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
246 if not proxy_data:
247 return super(RequestContextTask, self).__call__(*args, **kwargs)
248
249 log.debug('using celery proxy data to run task: %r', proxy_data)
250 # re-inject and register threadlocals for proper routing support
251 request = prepare_request(proxy_data['environ'])
252 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
253 ip_addr=proxy_data['auth_user']['ip_addr'])
254
255 return super(RequestContextTask, self).__call__(*args, **kwargs)
256
@@ -0,0 +1,156
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import os
22 import json
23 import logging
24 import datetime
25
26 from functools import partial
27
28 from pyramid.compat import configparser
29 from celery.result import AsyncResult
30 import celery.loaders.base
31 import celery.schedules
32
33
34 log = logging.getLogger(__name__)
35
36
37 def get_task_id(task):
38 task_id = None
39 if isinstance(task, AsyncResult):
40 task_id = task.task_id
41
42 return task_id
43
44
45 def crontab(value):
46 return celery.schedules.crontab(**value)
47
48
49 def timedelta(value):
50 return datetime.timedelta(**value)
51
52
53 def safe_json(get, section, key):
54 value = ''
55 try:
56 value = get(key)
57 json_value = json.loads(value)
58 except ValueError:
59 msg = 'The %s=%s is not valid json in section %s' % (
60 key, value, section
61 )
62 raise ValueError(msg)
63
64 return json_value
65
66
67 def get_beat_config(parser, section):
68 SCHEDULE_TYPE_MAP = {
69 'crontab': crontab,
70 'timedelta': timedelta,
71 'integer': int
72 }
73 get = partial(parser.get, section)
74 has_option = partial(parser.has_option, section)
75
76 schedule_type = get('type')
77 schedule_value = safe_json(get, section, 'schedule')
78
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 config = {
92 'task': get('task'),
93 'schedule': schedule,
94 }
95
96 if has_option('args'):
97 config['args'] = safe_json(get, section, 'args')
98
99 if has_option('kwargs'):
100 config['kwargs'] = safe_json(get, section, 'kwargs')
101
102 return config
103
104
105 def get_ini_config(ini_location):
106 """
107 Converts basic ini configuration into celery 4.X options
108 """
109 def key_converter(key_name):
110 pref = 'celery.'
111 if key_name.startswith(pref):
112 return key_name[len(pref):].replace('.', '_').lower()
113
114 def type_converter(parsed_key, value):
115 # cast to int
116 if value.isdigit():
117 return int(value)
118
119 # cast to bool
120 if value.lower() in ['true', 'false', 'True', 'False']:
121 return value.lower() == 'true'
122 return value
123
124 parser = configparser.SafeConfigParser(
125 defaults={'here': os.path.abspath(ini_location)})
126 parser.read(ini_location)
127
128 ini_config = {}
129 for k, v in parser.items('app:main'):
130 pref = 'celery.'
131 if k.startswith(pref):
132 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
133
134 beat_config = {}
135 for section in parser.sections():
136 if section.startswith('celerybeat:'):
137 name = section.split(':', 1)[1]
138 beat_config[name] = get_beat_config(parser, section)
139
140 # final compose of settings
141 celery_settings = {}
142
143 if ini_config:
144 celery_settings.update(ini_config)
145 if beat_config:
146 celery_settings.update({'beat_schedule': beat_config})
147
148 return celery_settings
149
150
151 def parse_ini_vars(ini_vars):
152 options = {}
153 for pairs in ini_vars.split(','):
154 key, value = pairs.split('=')
155 options[key] = value
156 return options
@@ -296,28 +296,15 labs_settings_active = true
296 296 ### CELERY CONFIG ####
297 297 ####################################
298 298 use_celery = false
299 broker.host = localhost
300 broker.vhost = rabbitmqhost
301 broker.port = 5672
302 broker.user = rabbitmq
303 broker.password = qweqwe
304
305 celery.imports = rhodecode.lib.celerylib.tasks
306 299
307 celery.result.backend = amqp
308 celery.result.dburi = amqp://
309 celery.result.serialier = json
300 # connection url to the message broker (default rabbitmq)
301 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
310 302
311 #celery.send.task.error.emails = true
312 #celery.amqp.task.result.expires = 18000
313
314 celeryd.concurrency = 2
315 #celeryd.log.file = celeryd.log
316 celeryd.log.level = debug
317 celeryd.max.tasks.per.child = 1
303 # maximum tasks to execute before worker restart
304 celery.max_tasks_per_child = 100
318 305
319 306 ## tasks will never be sent to the queue, but executed locally instead.
320 celery.always.eager = false
307 celery.task_always_eager = false
321 308
322 309 ####################################
323 310 ### BEAKER CACHE ####
@@ -649,7 +636,7 custom.conf = 1
649 636 ### LOGGING CONFIGURATION ####
650 637 ################################
651 638 [loggers]
652 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
639 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
653 640
654 641 [handlers]
655 642 keys = console, console_sql
@@ -688,6 +675,11 handlers =
688 675 qualname = ssh_wrapper
689 676 propagate = 1
690 677
678 [logger_celery]
679 level = DEBUG
680 handlers =
681 qualname = celery
682
691 683
692 684 ##############
693 685 ## HANDLERS ##
@@ -271,28 +271,15 labs_settings_active = true
271 271 ### CELERY CONFIG ####
272 272 ####################################
273 273 use_celery = false
274 broker.host = localhost
275 broker.vhost = rabbitmqhost
276 broker.port = 5672
277 broker.user = rabbitmq
278 broker.password = qweqwe
279
280 celery.imports = rhodecode.lib.celerylib.tasks
281 274
282 celery.result.backend = amqp
283 celery.result.dburi = amqp://
284 celery.result.serialier = json
275 # connection url to the message broker (default rabbitmq)
276 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
285 277
286 #celery.send.task.error.emails = true
287 #celery.amqp.task.result.expires = 18000
288
289 celeryd.concurrency = 2
290 #celeryd.log.file = celeryd.log
291 celeryd.log.level = debug
292 celeryd.max.tasks.per.child = 1
278 # maximum tasks to execute before worker restart
279 celery.max_tasks_per_child = 100
293 280
294 281 ## tasks will never be sent to the queue, but executed locally instead.
295 celery.always.eager = false
282 celery.task_always_eager = false
296 283
297 284 ####################################
298 285 ### BEAKER CACHE ####
@@ -619,7 +606,7 custom.conf = 1
619 606 ### LOGGING CONFIGURATION ####
620 607 ################################
621 608 [loggers]
622 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
609 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
623 610
624 611 [handlers]
625 612 keys = console, console_sql
@@ -658,6 +645,11 handlers =
658 645 qualname = ssh_wrapper
659 646 propagate = 1
660 647
648 [logger_celery]
649 level = DEBUG
650 handlers =
651 qualname = celery
652
661 653
662 654 ##############
663 655 ## HANDLERS ##
@@ -32,6 +32,7 from rhodecode.api.utils import (
32 32 from rhodecode.lib import audit_logger
33 33 from rhodecode.lib import repo_maintenance
34 34 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
35 from rhodecode.lib.celerylib.utils import get_task_id
35 36 from rhodecode.lib.utils2 import str2bool, time_to_datetime
36 37 from rhodecode.lib.ext_json import json
37 38 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
@@ -712,10 +713,7 def create_repo(
712 713 }
713 714
714 715 task = RepoModel().create(form_data=data, cur_user=owner)
715 from celery.result import BaseAsyncResult
716 task_id = None
717 if isinstance(task, BaseAsyncResult):
718 task_id = task.task_id
716 task_id = get_task_id(task)
719 717 # no commit, it's done in RepoModel, or async via celery
720 718 return {
721 719 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
@@ -1105,10 +1103,8 def fork_repo(request, apiuser, repoid,
1105 1103
1106 1104 task = RepoModel().create_fork(data, cur_user=owner)
1107 1105 # no commit, it's done in RepoModel, or async via celery
1108 from celery.result import BaseAsyncResult
1109 task_id = None
1110 if isinstance(task, BaseAsyncResult):
1111 task_id = task.task_id
1106 task_id = get_task_id(task)
1107
1112 1108 return {
1113 1109 'msg': 'Created fork of `%s` as `%s`' % (
1114 1110 repo.repo_name, schema_data['repo_name']),
@@ -28,6 +28,7 from pyramid.renderers import render
28 28 from pyramid.response import Response
29 29
30 30 from rhodecode.apps._base import BaseAppView, DataGridAppView
31 from rhodecode.lib.celerylib.utils import get_task_id
31 32
32 33 from rhodecode.lib.ext_json import json
33 34 from rhodecode.lib.auth import (
@@ -143,22 +144,19 class AdminReposView(BaseAppView, DataGr
143 144 c = self.load_default_context()
144 145
145 146 form_result = {}
147 self._load_form_data(c)
146 148 task_id = None
147 self._load_form_data(c)
148
149 149 try:
150 150 # CanWriteToGroup validators checks permissions of this POST
151 151 form = RepoForm(
152 152 self.request.translate, repo_groups=c.repo_groups_choices,
153 153 landing_revs=c.landing_revs_choices)()
154 form_results = form.to_python(dict(self.request.POST))
154 form_result = form.to_python(dict(self.request.POST))
155 155
156 156 # create is done sometimes async on celery, db transaction
157 157 # management is handled there.
158 158 task = RepoModel().create(form_result, self._rhodecode_user.user_id)
159 from celery.result import BaseAsyncResult
160 if isinstance(task, BaseAsyncResult):
161 task_id = task.task_id
159 task_id = get_task_id(task)
162 160 except formencode.Invalid as errors:
163 161 data = render('rhodecode:templates/admin/repos/repo_add.mako',
164 162 self._get_template_context(c), self.request)
@@ -46,11 +46,9 class RepoChecksView(BaseAppView):
46 46
47 47 repo_name = self.request.matchdict['repo_name']
48 48 db_repo = Repository.get_by_repo_name(repo_name)
49 if not db_repo:
50 raise HTTPNotFound()
51 49
52 50 # check if maybe repo is already created
53 if db_repo.repo_state in [Repository.STATE_CREATED]:
51 if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]:
54 52 # re-check permissions before redirecting to prevent resource
55 53 # discovery by checking the 302 code
56 54 perm_set = ['repository.read', 'repository.write', 'repository.admin']
@@ -80,9 +78,10 class RepoChecksView(BaseAppView):
80 78
81 79 if task_id and task_id not in ['None']:
82 80 import rhodecode
83 from celery.result import AsyncResult
81 from rhodecode.lib.celerylib.loader import celery_app
84 82 if rhodecode.CELERY_ENABLED:
85 task = AsyncResult(task_id)
83 task = celery_app.AsyncResult(task_id)
84 task.get()
86 85 if task.failed():
87 86 msg = self._log_creation_exception(task.result, repo_name)
88 87 h.flash(msg, category='error')
@@ -33,6 +33,7 from rhodecode.lib.auth import (
33 33 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
34 34 HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
35 35 import rhodecode.lib.helpers as h
36 from rhodecode.lib.celerylib.utils import get_task_id
36 37 from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
37 38 from rhodecode.model.repo import RepoModel
38 39 from rhodecode.model.forms import RepoForkForm
@@ -226,9 +227,8 class RepoForksView(RepoAppView, DataGri
226 227 # management is handled there.
227 228 task = RepoModel().create_fork(
228 229 form_result, c.rhodecode_user.user_id)
229 from celery.result import BaseAsyncResult
230 if isinstance(task, BaseAsyncResult):
231 task_id = task.task_id
230
231 task_id = get_task_id(task)
232 232 except formencode.Invalid as errors:
233 233 c.rhodecode_db_repo = self.db_repo
234 234
@@ -23,14 +23,6 import os
23 23 import logging
24 24 import rhodecode
25 25
26 # ------------------------------------------------------------------------------
27 # CELERY magic until refactor - issue #4163 - import order matters here:
28 #from rhodecode.lib import celerypylons # this must be first, celerypylons
29 # sets config settings upon import
30
31 import rhodecode.integrations # any modules using celery task
32 # decorators should be added afterwards:
33 # ------------------------------------------------------------------------------
34 26
35 27 from rhodecode.config import utils
36 28
@@ -54,14 +46,6 def load_pyramid_environment(global_conf
54 46 'secret': settings_merged.get('channelstream.secret')
55 47 }
56 48
57
58 # TODO(marcink): celery
59 # # store some globals into rhodecode
60 # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery'))
61 # rhodecode.CELERY_EAGER = str2bool(
62 # config['app_conf'].get('celery.always.eager'))
63
64
65 49 # If this is a test run we prepare the test environment like
66 50 # creating a test database, test search index and test repositories.
67 51 # This has to be done before the database connection is initialized.
@@ -189,7 +189,7
189 189 "python2.7-jupyter-core-4.3.0": {
190 190 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
191 191 },
192 "python2.7-kombu-1.5.1": {
192 "python2.7-kombu-4.1.0": {
193 193 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
194 194 },
195 195 "python2.7-mistune-0.7.4": {
@@ -42,6 +42,7 from rhodecode.lib.vcs import VCSCommuni
42 42 from rhodecode.lib.exceptions import VCSServerUnavailable
43 43 from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
44 44 from rhodecode.lib.middleware.https_fixup import HttpsFixup
45 from rhodecode.lib.celerylib.loader import configure_celery
45 46 from rhodecode.lib.plugins.utils import register_rhodecode_plugin
46 47 from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
47 48 from rhodecode.subscribers import (
@@ -87,9 +88,11 def make_pyramid_app(global_config, **se
87 88 pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
88 89 pyramid_app.config = config
89 90
91 config.configure_celery(global_config['__file__'])
90 92 # creating the app uses a connection - return it after we are done
91 93 meta.Session.remove()
92 94
95 log.info('Pyramid app %s created and configured.', pyramid_app)
93 96 return pyramid_app
94 97
95 98
@@ -196,6 +199,8 def includeme(config):
196 199 config.add_directive(
197 200 'register_rhodecode_plugin', register_rhodecode_plugin)
198 201
202 config.add_directive('configure_celery', configure_celery)
203
199 204 if asbool(settings.get('appenlight', 'false')):
200 205 config.include('appenlight_client.ext.pyramid_tween')
201 206
@@ -20,18 +20,16
20 20
21 21 from __future__ import unicode_literals
22 22 import deform
23 import re
24 23 import logging
25 24 import requests
26 25 import colander
27 26 import textwrap
28 from celery.task import task
29 27 from mako.template import Template
30 28
31 29 from rhodecode import events
32 30 from rhodecode.translation import _
33 31 from rhodecode.lib import helpers as h
34 from rhodecode.lib.celerylib import run_task
32 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
35 33 from rhodecode.lib.colander_utils import strip_whitespace
36 34 from rhodecode.integrations.types.base import IntegrationTypeBase
37 35
@@ -243,7 +241,7 class HipchatIntegrationType(Integration
243 241 )
244 242
245 243
246 @task(ignore_result=True)
244 @async_task(ignore_result=True, base=RequestContextTask)
247 245 def post_text_to_hipchat(settings, text):
248 246 log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
249 247 resp = requests.post(settings['server_url'], json={
@@ -27,13 +27,12 import logging
27 27 import deform
28 28 import requests
29 29 import colander
30 from celery.task import task
31 30 from mako.template import Template
32 31
33 32 from rhodecode import events
34 33 from rhodecode.translation import _
35 34 from rhodecode.lib import helpers as h
36 from rhodecode.lib.celerylib import run_task
35 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
37 36 from rhodecode.lib.colander_utils import strip_whitespace
38 37 from rhodecode.integrations.types.base import IntegrationTypeBase
39 38
@@ -296,7 +295,7 def html_to_slack_links(message):
296 295 r'<\1|\2>', message)
297 296
298 297
299 @task(ignore_result=True)
298 @async_task(ignore_result=True, base=RequestContextTask)
300 299 def post_text_to_slack(settings, title, text, fields=None, overrides=None):
301 300 log.debug('sending %s (%s) to slack %s' % (
302 301 title, text, settings['service']))
@@ -28,16 +28,17 import logging
28 28 import requests
29 29 import requests.adapters
30 30 import colander
31 from celery.task import task
32 31 from requests.packages.urllib3.util.retry import Retry
33 32
34 33 import rhodecode
35 34 from rhodecode import events
36 35 from rhodecode.translation import _
37 36 from rhodecode.integrations.types.base import IntegrationTypeBase
37 from rhodecode.lib.celerylib import async_task, RequestContextTask
38 38
39 39 log = logging.getLogger(__name__)
40 40
41
41 42 # updating this required to update the `common_vars` passed in url calling func
42 43 WEBHOOK_URL_VARS = [
43 44 'repo_name',
@@ -315,7 +316,7 class WebhookIntegrationType(Integration
315 316 post_to_webhook(url_calls, self.settings)
316 317
317 318
318 @task(ignore_result=True)
319 @async_task(ignore_result=True, base=RequestContextTask)
319 320 def post_to_webhook(url_calls, settings):
320 321 max_retries = 3
321 322 retries = Retry(
@@ -17,36 +17,17
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 celery libs for RhodeCode
22 """
23 20
24
25 import pylons
26 21 import socket
27 22 import logging
28 23
29 24 import rhodecode
30
31 from os.path import join as jn
32 from pylons import config
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
37
38 from decorator import decorator
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
39 28
40 from zope.cachedescriptors.property import Lazy as LazyProperty
29 async_task = celery_app.task
41 30
42 from rhodecode.config import utils
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 from rhodecode.lib.vcs import connect_vcs
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
50 31
51 32 log = logging.getLogger(__name__)
52 33
@@ -60,95 +41,13 class ResultWrapper(object):
60 41 return self.task
61 42
62 43
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 if hasattr(request, 'user'):
80 ip_addr = request.user.ip_addr
81 user_id = request.user.user_id
82 elif hasattr(request, 'rpc_params'):
83 # TODO(marcink) remove when migration is finished
84 # api specific call on Pyramid.
85 ip_addr = request.rpc_params['apiuser'].ip_addr
86 user_id = request.rpc_params['apiuser'].user_id
87 else:
88 raise Exception('Unable to fetch data from request: {}'.format(
89 request))
90
91 if request:
92 # we hook into kwargs since it is the only way to pass our data to
93 # the celery worker in celery 2.2
94 kwargs.update({
95 '_rhodecode_proxy_data': {
96 'environ': {
97 'PATH_INFO': request.environ['PATH_INFO'],
98 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
99 'HTTP_HOST': request.environ.get('HTTP_HOST',
100 request.environ['SERVER_NAME']),
101 'SERVER_NAME': request.environ['SERVER_NAME'],
102 'SERVER_PORT': request.environ['SERVER_PORT'],
103 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
104 },
105 'auth_user': {
106 'ip_addr': ip_addr,
107 'user_id': user_id
108 },
109 }
110 })
111 return super(RhodecodeCeleryTask, self).apply_async(
112 args, kwargs, task_id, producer, link, link_error, **options)
113
114 def __call__(self, *args, **kwargs):
115 """ rebuild the context and then run task on celery worker """
116 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
117
118 if not proxy_data:
119 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
120
121 log.debug('using celery proxy data to run task: %r', proxy_data)
122
123 from rhodecode.config.routing import make_map
124
125 request = Request.blank('/', environ=proxy_data['environ'])
126 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
127 ip_addr=proxy_data['auth_user']['ip_addr'])
128
129 pyramid_request = prepare(request) # set pyramid threadlocal request
130
131 # pylons routing
132 if not rhodecode.CONFIG.get('routes.map'):
133 rhodecode.CONFIG['routes.map'] = make_map(config)
134 pylons.url._push_object(get_routes_generator_for_server_url(
135 get_server_url(request.environ)
136 ))
137
138 try:
139 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
140 finally:
141 pyramid_request['closer']()
142 pylons.url._pop_object()
143
144
145 44 def run_task(task, *args, **kwargs):
146 45 if rhodecode.CELERY_ENABLED:
147 46 celery_is_up = False
148 47 try:
149 48 t = task.apply_async(args=args, kwargs=kwargs)
150 log.info('running task %s:%s', t.task_id, task)
151 49 celery_is_up = True
50 log.debug('executing task %s:%s in async mode', t.task_id, task)
152 51 return t
153 52
154 53 except socket.error as e:
@@ -164,73 +63,10 def run_task(task, *args, **kwargs):
164 63 "Fallback to sync execution.")
165 64
166 65 # keep in mind there maybe a subtle race condition where something
167 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
66 # depending on rhodecode.CELERY_ENABLED
168 67 # will see CELERY_ENABLED as True before this has a chance to set False
169 68 rhodecode.CELERY_ENABLED = celery_is_up
170 69 else:
171 log.debug('executing task %s in sync mode', task)
172 return ResultWrapper(task(*args, **kwargs))
173
174
175 def __get_lockkey(func, *fargs, **fkwargs):
176 params = list(fargs)
177 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
178
179 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
180 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
181 return 'task_%s.lock' % (md5_safe(_lock_key),)
182
183
184 def locked_task(func):
185 def __wrapper(func, *fargs, **fkwargs):
186 lockkey = __get_lockkey(func, *fargs, **fkwargs)
187 lockkey_path = config['app_conf']['cache_dir']
188
189 log.info('running task with lockkey %s' % lockkey)
190 try:
191 l = DaemonLock(file_=jn(lockkey_path, lockkey))
192 ret = func(*fargs, **fkwargs)
193 l.release()
194 return ret
195 except LockHeld:
196 log.info('LockHeld')
197 return 'Task with key %s already running' % lockkey
198
199 return decorator(__wrapper, func)
200
70 log.debug('executing task %s:%s in sync mode', 'TASK', task)
201 71
202 def get_session():
203 if rhodecode.CELERY_ENABLED:
204 utils.initialize_database(config)
205 sa = meta.Session()
206 return sa
207
208
209 def dbsession(func):
210 def __wrapper(func, *fargs, **fkwargs):
211 try:
212 ret = func(*fargs, **fkwargs)
213 return ret
214 finally:
215 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
216 meta.Session.remove()
217
218 return decorator(__wrapper, func)
219
220
221 def vcsconnection(func):
222 def __wrapper(func, *fargs, **fkwargs):
223 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
224 settings = rhodecode.PYRAMID_SETTINGS
225 backends = settings['vcs.backends']
226 for alias in rhodecode.BACKENDS.keys():
227 if alias not in backends:
228 del rhodecode.BACKENDS[alias]
229 utils.configure_vcs(settings)
230 connect_vcs(
231 settings['vcs.server'],
232 utils.get_vcs_server_protocol(settings))
233 ret = func(*fargs, **fkwargs)
234 return ret
235