##// END OF EJS Templates
celery: celery 4.X support. Fixes #4169...
marcink -
r2359:246f5a4c default
parent child
Show More
@@ -0,0 +1,256
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 Celery loader, run with::
22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
24 """
25 import os
26 import logging
27
28 from celery import Celery
29 from celery import signals
30 from celery import Task
31 from kombu.serialization import register
32 from pyramid.threadlocal import get_current_request
33
34 import rhodecode
35
36 from rhodecode.lib.auth import AuthUser
37 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
38 from rhodecode.lib.ext_json import json
39 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
40 from rhodecode.lib.utils2 import str2bool
41 from rhodecode.model import meta
42
43
44 register('json_ext', json.dumps, json.loads,
45 content_type='application/x-json-ext',
46 content_encoding='utf-8')
47
48 log = logging.getLogger('celery.rhodecode.loader')
49
50
51 def add_preload_arguments(parser):
52 parser.add_argument(
53 '--ini', default=None,
54 help='Path to ini configuration file.'
55 )
56 parser.add_argument(
57 '--ini-var', default=None,
58 help='Comma separated list of key=value to pass to ini.'
59 )
60
61
62 def get_logger(obj):
63 custom_log = logging.getLogger(
64 'rhodecode.task.{}'.format(obj.__class__.__name__))
65
66 if rhodecode.CELERY_ENABLED:
67 try:
68 custom_log = obj.get_logger()
69 except Exception:
70 pass
71
72 return custom_log
73
74
75 base_celery_config = {
76 'result_backend': 'rpc://',
77 'result_expires': 60 * 60 * 24,
78 'result_persistent': True,
79 'imports': [],
80 'worker_max_tasks_per_child': 100,
81 'accept_content': ['json_ext'],
82 'task_serializer': 'json_ext',
83 'result_serializer': 'json_ext',
84 'worker_hijack_root_logger': False,
85 }
86 # init main celery app
87 celery_app = Celery()
88 celery_app.user_options['preload'].add(add_preload_arguments)
89 ini_file_glob = None
90
91
92 @signals.setup_logging.connect
93 def setup_logging_callback(**kwargs):
94 setup_logging(ini_file_glob)
95
96
97 @signals.user_preload_options.connect
98 def on_preload_parsed(options, **kwargs):
99 ini_location = options['ini']
100 ini_vars = options['ini_var']
101 celery_app.conf['INI_PYRAMID'] = options['ini']
102
103 if ini_location is None:
104 print('You must provide the paste --ini argument')
105 exit(-1)
106
107 options = None
108 if ini_vars is not None:
109 options = parse_ini_vars(ini_vars)
110
111 global ini_file_glob
112 ini_file_glob = ini_location
113
114 log.debug('Bootstrapping RhodeCode application...')
115 env = bootstrap(ini_location, options=options)
116
117 setup_celery_app(
118 app=env['app'], root=env['root'], request=env['request'],
119 registry=env['registry'], closer=env['closer'],
120 ini_location=ini_location)
121
122 # fix the global flag even if it's disabled via .ini file because this
123 # is a worker code that doesn't need this to be disabled.
124 rhodecode.CELERY_ENABLED = True
125
126
127 @signals.task_success.connect
128 def task_success_signal(result, **kwargs):
129 meta.Session.commit()
130 celery_app.conf['PYRAMID_CLOSER']()
131
132
133 @signals.task_retry.connect
134 def task_retry_signal(
135 request, reason, einfo, **kwargs):
136 meta.Session.remove()
137 celery_app.conf['PYRAMID_CLOSER']()
138
139
140 @signals.task_failure.connect
141 def task_failure_signal(
142 task_id, exception, args, kwargs, traceback, einfo, **kargs):
143 meta.Session.remove()
144 celery_app.conf['PYRAMID_CLOSER']()
145
146
147 @signals.task_revoked.connect
148 def task_revoked_signal(
149 request, terminated, signum, expired, **kwargs):
150 celery_app.conf['PYRAMID_CLOSER']()
151
152
153 def setup_celery_app(app, root, request, registry, closer, ini_location):
154 ini_dir = os.path.dirname(os.path.abspath(ini_location))
155 celery_config = base_celery_config
156 celery_config.update({
157 # store celerybeat scheduler db where the .ini file is
158 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
159 })
160 ini_settings = get_ini_config(ini_location)
161 log.debug('Got custom celery conf: %s', ini_settings)
162
163 celery_config.update(ini_settings)
164 celery_app.config_from_object(celery_config)
165
166 celery_app.conf.update({'PYRAMID_APP': app})
167 celery_app.conf.update({'PYRAMID_ROOT': root})
168 celery_app.conf.update({'PYRAMID_REQUEST': request})
169 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
170 celery_app.conf.update({'PYRAMID_CLOSER': closer})
171
172
173 def configure_celery(config, ini_location):
174 """
175 Helper that is called from our application creation logic. It gives
176 connection info into running webapp and allows execution of tasks from
177 RhodeCode itself
178 """
179 # store some globals into rhodecode
180 rhodecode.CELERY_ENABLED = str2bool(
181 config.registry.settings.get('use_celery'))
182 if rhodecode.CELERY_ENABLED:
183 log.info('Configuring celery based on `%s` file', ini_location)
184 setup_celery_app(
185 app=None, root=None, request=None, registry=config.registry,
186 closer=None, ini_location=ini_location)
187
188
189 class RequestContextTask(Task):
190 """
191 This is a celery task which will create a rhodecode app instance context
192 for the task, patch pyramid with the original request
193 that created the task and also add the user to the context.
194 """
195
196 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
197 link=None, link_error=None, shadow=None, **options):
198 """ queue the job to run (we are in web request context here) """
199
200 req = get_current_request()
201
202 # web case
203 if hasattr(req, 'user'):
204 ip_addr = req.user.ip_addr
205 user_id = req.user.user_id
206
207 # api case
208 elif hasattr(req, 'rpc_user'):
209 ip_addr = req.rpc_user.ip_addr
210 user_id = req.rpc_user.user_id
211 else:
212 raise Exception(
213 'Unable to fetch required data from request: {}. \n'
214 'This task is required to be executed from context of '
215 'request in a webapp'.format(repr(req)))
216
217 if req:
218 # we hook into kwargs since it is the only way to pass our data to
219 # the celery worker
220 options['headers'] = options.get('headers', {})
221 options['headers'].update({
222 'rhodecode_proxy_data': {
223 'environ': {
224 'PATH_INFO': req.environ['PATH_INFO'],
225 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
226 'HTTP_HOST': req.environ.get('HTTP_HOST',
227 req.environ['SERVER_NAME']),
228 'SERVER_NAME': req.environ['SERVER_NAME'],
229 'SERVER_PORT': req.environ['SERVER_PORT'],
230 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
231 },
232 'auth_user': {
233 'ip_addr': ip_addr,
234 'user_id': user_id
235 },
236 }
237 })
238
239 return super(RequestContextTask, self).apply_async(
240 args, kwargs, task_id, producer, link, link_error, shadow, **options)
241
242 def __call__(self, *args, **kwargs):
243 """ rebuild the context and then run task on celery worker """
244
245 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
246 if not proxy_data:
247 return super(RequestContextTask, self).__call__(*args, **kwargs)
248
249 log.debug('using celery proxy data to run task: %r', proxy_data)
250 # re-inject and register threadlocals for proper routing support
251 request = prepare_request(proxy_data['environ'])
252 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
253 ip_addr=proxy_data['auth_user']['ip_addr'])
254
255 return super(RequestContextTask, self).__call__(*args, **kwargs)
256
@@ -0,0 +1,156
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import os
22 import json
23 import logging
24 import datetime
25
26 from functools import partial
27
28 from pyramid.compat import configparser
29 from celery.result import AsyncResult
30 import celery.loaders.base
31 import celery.schedules
32
33
34 log = logging.getLogger(__name__)
35
36
37 def get_task_id(task):
38 task_id = None
39 if isinstance(task, AsyncResult):
40 task_id = task.task_id
41
42 return task_id
43
44
45 def crontab(value):
46 return celery.schedules.crontab(**value)
47
48
49 def timedelta(value):
50 return datetime.timedelta(**value)
51
52
53 def safe_json(get, section, key):
54 value = ''
55 try:
56 value = get(key)
57 json_value = json.loads(value)
58 except ValueError:
59 msg = 'The %s=%s is not valid json in section %s' % (
60 key, value, section
61 )
62 raise ValueError(msg)
63
64 return json_value
65
66
67 def get_beat_config(parser, section):
68 SCHEDULE_TYPE_MAP = {
69 'crontab': crontab,
70 'timedelta': timedelta,
71 'integer': int
72 }
73 get = partial(parser.get, section)
74 has_option = partial(parser.has_option, section)
75
76 schedule_type = get('type')
77 schedule_value = safe_json(get, section, 'schedule')
78
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 config = {
92 'task': get('task'),
93 'schedule': schedule,
94 }
95
96 if has_option('args'):
97 config['args'] = safe_json(get, section, 'args')
98
99 if has_option('kwargs'):
100 config['kwargs'] = safe_json(get, section, 'kwargs')
101
102 return config
103
104
105 def get_ini_config(ini_location):
106 """
107 Converts basic ini configuration into celery 4.X options
108 """
109 def key_converter(key_name):
110 pref = 'celery.'
111 if key_name.startswith(pref):
112 return key_name[len(pref):].replace('.', '_').lower()
113
114 def type_converter(parsed_key, value):
115 # cast to int
116 if value.isdigit():
117 return int(value)
118
119 # cast to bool
120 if value.lower() in ['true', 'false', 'True', 'False']:
121 return value.lower() == 'true'
122 return value
123
124 parser = configparser.SafeConfigParser(
125 defaults={'here': os.path.abspath(ini_location)})
126 parser.read(ini_location)
127
128 ini_config = {}
129 for k, v in parser.items('app:main'):
130 pref = 'celery.'
131 if k.startswith(pref):
132 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
133
134 beat_config = {}
135 for section in parser.sections():
136 if section.startswith('celerybeat:'):
137 name = section.split(':', 1)[1]
138 beat_config[name] = get_beat_config(parser, section)
139
140 # final compose of settings
141 celery_settings = {}
142
143 if ini_config:
144 celery_settings.update(ini_config)
145 if beat_config:
146 celery_settings.update({'beat_schedule': beat_config})
147
148 return celery_settings
149
150
151 def parse_ini_vars(ini_vars):
152 options = {}
153 for pairs in ini_vars.split(','):
154 key, value = pairs.split('=')
155 options[key] = value
156 return options
@@ -296,28 +296,15 labs_settings_active = true
296 ### CELERY CONFIG ####
296 ### CELERY CONFIG ####
297 ####################################
297 ####################################
298 use_celery = false
298 use_celery = false
299 broker.host = localhost
300 broker.vhost = rabbitmqhost
301 broker.port = 5672
302 broker.user = rabbitmq
303 broker.password = qweqwe
304
305 celery.imports = rhodecode.lib.celerylib.tasks
306
299
307 celery.result.backend = amqp
300 # connection url to the message broker (default rabbitmq)
308 celery.result.dburi = amqp://
301 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
309 celery.result.serialier = json
310
302
311 #celery.send.task.error.emails = true
303 # maximum tasks to execute before worker restart
312 #celery.amqp.task.result.expires = 18000
304 celery.max_tasks_per_child = 100
313
314 celeryd.concurrency = 2
315 #celeryd.log.file = celeryd.log
316 celeryd.log.level = debug
317 celeryd.max.tasks.per.child = 1
318
305
319 ## tasks will never be sent to the queue, but executed locally instead.
306 ## tasks will never be sent to the queue, but executed locally instead.
320 celery.always.eager = false
307 celery.task_always_eager = false
321
308
322 ####################################
309 ####################################
323 ### BEAKER CACHE ####
310 ### BEAKER CACHE ####
@@ -649,7 +636,7 custom.conf = 1
649 ### LOGGING CONFIGURATION ####
636 ### LOGGING CONFIGURATION ####
650 ################################
637 ################################
651 [loggers]
638 [loggers]
652 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
639 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
653
640
654 [handlers]
641 [handlers]
655 keys = console, console_sql
642 keys = console, console_sql
@@ -688,6 +675,11 handlers =
688 qualname = ssh_wrapper
675 qualname = ssh_wrapper
689 propagate = 1
676 propagate = 1
690
677
678 [logger_celery]
679 level = DEBUG
680 handlers =
681 qualname = celery
682
691
683
692 ##############
684 ##############
693 ## HANDLERS ##
685 ## HANDLERS ##
@@ -271,28 +271,15 labs_settings_active = true
271 ### CELERY CONFIG ####
271 ### CELERY CONFIG ####
272 ####################################
272 ####################################
273 use_celery = false
273 use_celery = false
274 broker.host = localhost
275 broker.vhost = rabbitmqhost
276 broker.port = 5672
277 broker.user = rabbitmq
278 broker.password = qweqwe
279
280 celery.imports = rhodecode.lib.celerylib.tasks
281
274
282 celery.result.backend = amqp
275 # connection url to the message broker (default rabbitmq)
283 celery.result.dburi = amqp://
276 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
284 celery.result.serialier = json
285
277
286 #celery.send.task.error.emails = true
278 # maximum tasks to execute before worker restart
287 #celery.amqp.task.result.expires = 18000
279 celery.max_tasks_per_child = 100
288
289 celeryd.concurrency = 2
290 #celeryd.log.file = celeryd.log
291 celeryd.log.level = debug
292 celeryd.max.tasks.per.child = 1
293
280
294 ## tasks will never be sent to the queue, but executed locally instead.
281 ## tasks will never be sent to the queue, but executed locally instead.
295 celery.always.eager = false
282 celery.task_always_eager = false
296
283
297 ####################################
284 ####################################
298 ### BEAKER CACHE ####
285 ### BEAKER CACHE ####
@@ -619,7 +606,7 custom.conf = 1
619 ### LOGGING CONFIGURATION ####
606 ### LOGGING CONFIGURATION ####
620 ################################
607 ################################
621 [loggers]
608 [loggers]
622 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
609 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
623
610
624 [handlers]
611 [handlers]
625 keys = console, console_sql
612 keys = console, console_sql
@@ -658,6 +645,11 handlers =
658 qualname = ssh_wrapper
645 qualname = ssh_wrapper
659 propagate = 1
646 propagate = 1
660
647
648 [logger_celery]
649 level = DEBUG
650 handlers =
651 qualname = celery
652
661
653
662 ##############
654 ##############
663 ## HANDLERS ##
655 ## HANDLERS ##
@@ -32,6 +32,7 from rhodecode.api.utils import (
32 from rhodecode.lib import audit_logger
32 from rhodecode.lib import audit_logger
33 from rhodecode.lib import repo_maintenance
33 from rhodecode.lib import repo_maintenance
34 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
34 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
35 from rhodecode.lib.celerylib.utils import get_task_id
35 from rhodecode.lib.utils2 import str2bool, time_to_datetime
36 from rhodecode.lib.utils2 import str2bool, time_to_datetime
36 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
38 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
@@ -712,10 +713,7 def create_repo(
712 }
713 }
713
714
714 task = RepoModel().create(form_data=data, cur_user=owner)
715 task = RepoModel().create(form_data=data, cur_user=owner)
715 from celery.result import BaseAsyncResult
716 task_id = get_task_id(task)
716 task_id = None
717 if isinstance(task, BaseAsyncResult):
718 task_id = task.task_id
719 # no commit, it's done in RepoModel, or async via celery
717 # no commit, it's done in RepoModel, or async via celery
720 return {
718 return {
721 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
719 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
@@ -1105,10 +1103,8 def fork_repo(request, apiuser, repoid,
1105
1103
1106 task = RepoModel().create_fork(data, cur_user=owner)
1104 task = RepoModel().create_fork(data, cur_user=owner)
1107 # no commit, it's done in RepoModel, or async via celery
1105 # no commit, it's done in RepoModel, or async via celery
1108 from celery.result import BaseAsyncResult
1106 task_id = get_task_id(task)
1109 task_id = None
1107
1110 if isinstance(task, BaseAsyncResult):
1111 task_id = task.task_id
1112 return {
1108 return {
1113 'msg': 'Created fork of `%s` as `%s`' % (
1109 'msg': 'Created fork of `%s` as `%s`' % (
1114 repo.repo_name, schema_data['repo_name']),
1110 repo.repo_name, schema_data['repo_name']),
@@ -28,6 +28,7 from pyramid.renderers import render
28 from pyramid.response import Response
28 from pyramid.response import Response
29
29
30 from rhodecode.apps._base import BaseAppView, DataGridAppView
30 from rhodecode.apps._base import BaseAppView, DataGridAppView
31 from rhodecode.lib.celerylib.utils import get_task_id
31
32
32 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.auth import (
34 from rhodecode.lib.auth import (
@@ -143,22 +144,19 class AdminReposView(BaseAppView, DataGr
143 c = self.load_default_context()
144 c = self.load_default_context()
144
145
145 form_result = {}
146 form_result = {}
147 self._load_form_data(c)
146 task_id = None
148 task_id = None
147 self._load_form_data(c)
148
149 try:
149 try:
150 # CanWriteToGroup validators checks permissions of this POST
150 # CanWriteToGroup validators checks permissions of this POST
151 form = RepoForm(
151 form = RepoForm(
152 self.request.translate, repo_groups=c.repo_groups_choices,
152 self.request.translate, repo_groups=c.repo_groups_choices,
153 landing_revs=c.landing_revs_choices)()
153 landing_revs=c.landing_revs_choices)()
154 form_results = form.to_python(dict(self.request.POST))
154 form_result = form.to_python(dict(self.request.POST))
155
155
156 # create is done sometimes async on celery, db transaction
156 # create is done sometimes async on celery, db transaction
157 # management is handled there.
157 # management is handled there.
158 task = RepoModel().create(form_result, self._rhodecode_user.user_id)
158 task = RepoModel().create(form_result, self._rhodecode_user.user_id)
159 from celery.result import BaseAsyncResult
159 task_id = get_task_id(task)
160 if isinstance(task, BaseAsyncResult):
161 task_id = task.task_id
162 except formencode.Invalid as errors:
160 except formencode.Invalid as errors:
163 data = render('rhodecode:templates/admin/repos/repo_add.mako',
161 data = render('rhodecode:templates/admin/repos/repo_add.mako',
164 self._get_template_context(c), self.request)
162 self._get_template_context(c), self.request)
@@ -46,11 +46,9 class RepoChecksView(BaseAppView):
46
46
47 repo_name = self.request.matchdict['repo_name']
47 repo_name = self.request.matchdict['repo_name']
48 db_repo = Repository.get_by_repo_name(repo_name)
48 db_repo = Repository.get_by_repo_name(repo_name)
49 if not db_repo:
50 raise HTTPNotFound()
51
49
52 # check if maybe repo is already created
50 # check if maybe repo is already created
53 if db_repo.repo_state in [Repository.STATE_CREATED]:
51 if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]:
54 # re-check permissions before redirecting to prevent resource
52 # re-check permissions before redirecting to prevent resource
55 # discovery by checking the 302 code
53 # discovery by checking the 302 code
56 perm_set = ['repository.read', 'repository.write', 'repository.admin']
54 perm_set = ['repository.read', 'repository.write', 'repository.admin']
@@ -80,9 +78,10 class RepoChecksView(BaseAppView):
80
78
81 if task_id and task_id not in ['None']:
79 if task_id and task_id not in ['None']:
82 import rhodecode
80 import rhodecode
83 from celery.result import AsyncResult
81 from rhodecode.lib.celerylib.loader import celery_app
84 if rhodecode.CELERY_ENABLED:
82 if rhodecode.CELERY_ENABLED:
85 task = AsyncResult(task_id)
83 task = celery_app.AsyncResult(task_id)
84 task.get()
86 if task.failed():
85 if task.failed():
87 msg = self._log_creation_exception(task.result, repo_name)
86 msg = self._log_creation_exception(task.result, repo_name)
88 h.flash(msg, category='error')
87 h.flash(msg, category='error')
@@ -33,6 +33,7 from rhodecode.lib.auth import (
33 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
33 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
34 HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
34 HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
35 import rhodecode.lib.helpers as h
35 import rhodecode.lib.helpers as h
36 from rhodecode.lib.celerylib.utils import get_task_id
36 from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
37 from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
37 from rhodecode.model.repo import RepoModel
38 from rhodecode.model.repo import RepoModel
38 from rhodecode.model.forms import RepoForkForm
39 from rhodecode.model.forms import RepoForkForm
@@ -226,9 +227,8 class RepoForksView(RepoAppView, DataGri
226 # management is handled there.
227 # management is handled there.
227 task = RepoModel().create_fork(
228 task = RepoModel().create_fork(
228 form_result, c.rhodecode_user.user_id)
229 form_result, c.rhodecode_user.user_id)
229 from celery.result import BaseAsyncResult
230
230 if isinstance(task, BaseAsyncResult):
231 task_id = get_task_id(task)
231 task_id = task.task_id
232 except formencode.Invalid as errors:
232 except formencode.Invalid as errors:
233 c.rhodecode_db_repo = self.db_repo
233 c.rhodecode_db_repo = self.db_repo
234
234
@@ -23,14 +23,6 import os
23 import logging
23 import logging
24 import rhodecode
24 import rhodecode
25
25
26 # ------------------------------------------------------------------------------
27 # CELERY magic until refactor - issue #4163 - import order matters here:
28 #from rhodecode.lib import celerypylons # this must be first, celerypylons
29 # sets config settings upon import
30
31 import rhodecode.integrations # any modules using celery task
32 # decorators should be added afterwards:
33 # ------------------------------------------------------------------------------
34
26
35 from rhodecode.config import utils
27 from rhodecode.config import utils
36
28
@@ -54,14 +46,6 def load_pyramid_environment(global_conf
54 'secret': settings_merged.get('channelstream.secret')
46 'secret': settings_merged.get('channelstream.secret')
55 }
47 }
56
48
57
58 # TODO(marcink): celery
59 # # store some globals into rhodecode
60 # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery'))
61 # rhodecode.CELERY_EAGER = str2bool(
62 # config['app_conf'].get('celery.always.eager'))
63
64
65 # If this is a test run we prepare the test environment like
49 # If this is a test run we prepare the test environment like
66 # creating a test database, test search index and test repositories.
50 # creating a test database, test search index and test repositories.
67 # This has to be done before the database connection is initialized.
51 # This has to be done before the database connection is initialized.
@@ -189,7 +189,7
189 "python2.7-jupyter-core-4.3.0": {
189 "python2.7-jupyter-core-4.3.0": {
190 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
190 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
191 },
191 },
192 "python2.7-kombu-1.5.1": {
192 "python2.7-kombu-4.1.0": {
193 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
193 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
194 },
194 },
195 "python2.7-mistune-0.7.4": {
195 "python2.7-mistune-0.7.4": {
@@ -42,6 +42,7 from rhodecode.lib.vcs import VCSCommuni
42 from rhodecode.lib.exceptions import VCSServerUnavailable
42 from rhodecode.lib.exceptions import VCSServerUnavailable
43 from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
43 from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
44 from rhodecode.lib.middleware.https_fixup import HttpsFixup
44 from rhodecode.lib.middleware.https_fixup import HttpsFixup
45 from rhodecode.lib.celerylib.loader import configure_celery
45 from rhodecode.lib.plugins.utils import register_rhodecode_plugin
46 from rhodecode.lib.plugins.utils import register_rhodecode_plugin
46 from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
47 from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
47 from rhodecode.subscribers import (
48 from rhodecode.subscribers import (
@@ -87,9 +88,11 def make_pyramid_app(global_config, **se
87 pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
88 pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
88 pyramid_app.config = config
89 pyramid_app.config = config
89
90
91 config.configure_celery(global_config['__file__'])
90 # creating the app uses a connection - return it after we are done
92 # creating the app uses a connection - return it after we are done
91 meta.Session.remove()
93 meta.Session.remove()
92
94
95 log.info('Pyramid app %s created and configured.', pyramid_app)
93 return pyramid_app
96 return pyramid_app
94
97
95
98
@@ -196,6 +199,8 def includeme(config):
196 config.add_directive(
199 config.add_directive(
197 'register_rhodecode_plugin', register_rhodecode_plugin)
200 'register_rhodecode_plugin', register_rhodecode_plugin)
198
201
202 config.add_directive('configure_celery', configure_celery)
203
199 if asbool(settings.get('appenlight', 'false')):
204 if asbool(settings.get('appenlight', 'false')):
200 config.include('appenlight_client.ext.pyramid_tween')
205 config.include('appenlight_client.ext.pyramid_tween')
201
206
@@ -20,18 +20,16
20
20
21 from __future__ import unicode_literals
21 from __future__ import unicode_literals
22 import deform
22 import deform
23 import re
24 import logging
23 import logging
25 import requests
24 import requests
26 import colander
25 import colander
27 import textwrap
26 import textwrap
28 from celery.task import task
29 from mako.template import Template
27 from mako.template import Template
30
28
31 from rhodecode import events
29 from rhodecode import events
32 from rhodecode.translation import _
30 from rhodecode.translation import _
33 from rhodecode.lib import helpers as h
31 from rhodecode.lib import helpers as h
34 from rhodecode.lib.celerylib import run_task
32 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
35 from rhodecode.lib.colander_utils import strip_whitespace
33 from rhodecode.lib.colander_utils import strip_whitespace
36 from rhodecode.integrations.types.base import IntegrationTypeBase
34 from rhodecode.integrations.types.base import IntegrationTypeBase
37
35
@@ -243,7 +241,7 class HipchatIntegrationType(Integration
243 )
241 )
244
242
245
243
246 @task(ignore_result=True)
244 @async_task(ignore_result=True, base=RequestContextTask)
247 def post_text_to_hipchat(settings, text):
245 def post_text_to_hipchat(settings, text):
248 log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
246 log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
249 resp = requests.post(settings['server_url'], json={
247 resp = requests.post(settings['server_url'], json={