Show More
@@ -0,0 +1,256 b'' | |||||
|
1 | # -*- coding: utf-8 -*- | |||
|
2 | ||||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
|
4 | # | |||
|
5 | # This program is free software: you can redistribute it and/or modify | |||
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |||
|
7 | # (only), as published by the Free Software Foundation. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU Affero General Public License | |||
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
|
16 | # | |||
|
17 | # This program is dual-licensed. If you wish to learn more about the | |||
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |||
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |||
|
20 | """ | |||
|
21 | Celery loader, run with:: | |||
|
22 | ||||
|
23 | celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini | |||
|
24 | """ | |||
|
25 | import os | |||
|
26 | import logging | |||
|
27 | ||||
|
28 | from celery import Celery | |||
|
29 | from celery import signals | |||
|
30 | from celery import Task | |||
|
31 | from kombu.serialization import register | |||
|
32 | from pyramid.threadlocal import get_current_request | |||
|
33 | ||||
|
34 | import rhodecode | |||
|
35 | ||||
|
36 | from rhodecode.lib.auth import AuthUser | |||
|
37 | from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars | |||
|
38 | from rhodecode.lib.ext_json import json | |||
|
39 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request | |||
|
40 | from rhodecode.lib.utils2 import str2bool | |||
|
41 | from rhodecode.model import meta | |||
|
42 | ||||
|
43 | ||||
|
44 | register('json_ext', json.dumps, json.loads, | |||
|
45 | content_type='application/x-json-ext', | |||
|
46 | content_encoding='utf-8') | |||
|
47 | ||||
|
48 | log = logging.getLogger('celery.rhodecode.loader') | |||
|
49 | ||||
|
50 | ||||
|
51 | def add_preload_arguments(parser): | |||
|
52 | parser.add_argument( | |||
|
53 | '--ini', default=None, | |||
|
54 | help='Path to ini configuration file.' | |||
|
55 | ) | |||
|
56 | parser.add_argument( | |||
|
57 | '--ini-var', default=None, | |||
|
58 | help='Comma separated list of key=value to pass to ini.' | |||
|
59 | ) | |||
|
60 | ||||
|
61 | ||||
|
62 | def get_logger(obj): | |||
|
63 | custom_log = logging.getLogger( | |||
|
64 | 'rhodecode.task.{}'.format(obj.__class__.__name__)) | |||
|
65 | ||||
|
66 | if rhodecode.CELERY_ENABLED: | |||
|
67 | try: | |||
|
68 | custom_log = obj.get_logger() | |||
|
69 | except Exception: | |||
|
70 | pass | |||
|
71 | ||||
|
72 | return custom_log | |||
|
73 | ||||
|
74 | ||||
|
75 | base_celery_config = { | |||
|
76 | 'result_backend': 'rpc://', | |||
|
77 | 'result_expires': 60 * 60 * 24, | |||
|
78 | 'result_persistent': True, | |||
|
79 | 'imports': [], | |||
|
80 | 'worker_max_tasks_per_child': 100, | |||
|
81 | 'accept_content': ['json_ext'], | |||
|
82 | 'task_serializer': 'json_ext', | |||
|
83 | 'result_serializer': 'json_ext', | |||
|
84 | 'worker_hijack_root_logger': False, | |||
|
85 | } | |||
|
86 | # init main celery app | |||
|
87 | celery_app = Celery() | |||
|
88 | celery_app.user_options['preload'].add(add_preload_arguments) | |||
|
89 | ini_file_glob = None | |||
|
90 | ||||
|
91 | ||||
|
92 | @signals.setup_logging.connect | |||
|
93 | def setup_logging_callback(**kwargs): | |||
|
94 | setup_logging(ini_file_glob) | |||
|
95 | ||||
|
96 | ||||
|
97 | @signals.user_preload_options.connect | |||
|
98 | def on_preload_parsed(options, **kwargs): | |||
|
99 | ini_location = options['ini'] | |||
|
100 | ini_vars = options['ini_var'] | |||
|
101 | celery_app.conf['INI_PYRAMID'] = options['ini'] | |||
|
102 | ||||
|
103 | if ini_location is None: | |||
|
104 | print('You must provide the paste --ini argument') | |||
|
105 | exit(-1) | |||
|
106 | ||||
|
107 | options = None | |||
|
108 | if ini_vars is not None: | |||
|
109 | options = parse_ini_vars(ini_vars) | |||
|
110 | ||||
|
111 | global ini_file_glob | |||
|
112 | ini_file_glob = ini_location | |||
|
113 | ||||
|
114 | log.debug('Bootstrapping RhodeCode application...') | |||
|
115 | env = bootstrap(ini_location, options=options) | |||
|
116 | ||||
|
117 | setup_celery_app( | |||
|
118 | app=env['app'], root=env['root'], request=env['request'], | |||
|
119 | registry=env['registry'], closer=env['closer'], | |||
|
120 | ini_location=ini_location) | |||
|
121 | ||||
|
122 | # fix the global flag even if it's disabled via .ini file because this | |||
|
123 | # is a worker code that doesn't need this to be disabled. | |||
|
124 | rhodecode.CELERY_ENABLED = True | |||
|
125 | ||||
|
126 | ||||
|
127 | @signals.task_success.connect | |||
|
128 | def task_success_signal(result, **kwargs): | |||
|
129 | meta.Session.commit() | |||
|
130 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
131 | ||||
|
132 | ||||
|
133 | @signals.task_retry.connect | |||
|
134 | def task_retry_signal( | |||
|
135 | request, reason, einfo, **kwargs): | |||
|
136 | meta.Session.remove() | |||
|
137 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
138 | ||||
|
139 | ||||
|
140 | @signals.task_failure.connect | |||
|
141 | def task_failure_signal( | |||
|
142 | task_id, exception, args, kwargs, traceback, einfo, **kargs): | |||
|
143 | meta.Session.remove() | |||
|
144 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
145 | ||||
|
146 | ||||
|
147 | @signals.task_revoked.connect | |||
|
148 | def task_revoked_signal( | |||
|
149 | request, terminated, signum, expired, **kwargs): | |||
|
150 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
151 | ||||
|
152 | ||||
|
153 | def setup_celery_app(app, root, request, registry, closer, ini_location): | |||
|
154 | ini_dir = os.path.dirname(os.path.abspath(ini_location)) | |||
|
155 | celery_config = base_celery_config | |||
|
156 | celery_config.update({ | |||
|
157 | # store celerybeat scheduler db where the .ini file is | |||
|
158 | 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'), | |||
|
159 | }) | |||
|
160 | ini_settings = get_ini_config(ini_location) | |||
|
161 | log.debug('Got custom celery conf: %s', ini_settings) | |||
|
162 | ||||
|
163 | celery_config.update(ini_settings) | |||
|
164 | celery_app.config_from_object(celery_config) | |||
|
165 | ||||
|
166 | celery_app.conf.update({'PYRAMID_APP': app}) | |||
|
167 | celery_app.conf.update({'PYRAMID_ROOT': root}) | |||
|
168 | celery_app.conf.update({'PYRAMID_REQUEST': request}) | |||
|
169 | celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | |||
|
170 | celery_app.conf.update({'PYRAMID_CLOSER': closer}) | |||
|
171 | ||||
|
172 | ||||
|
173 | def configure_celery(config, ini_location): | |||
|
174 | """ | |||
|
175 | Helper that is called from our application creation logic. It gives | |||
|
176 | connection info into running webapp and allows execution of tasks from | |||
|
177 | RhodeCode itself | |||
|
178 | """ | |||
|
179 | # store some globals into rhodecode | |||
|
180 | rhodecode.CELERY_ENABLED = str2bool( | |||
|
181 | config.registry.settings.get('use_celery')) | |||
|
182 | if rhodecode.CELERY_ENABLED: | |||
|
183 | log.info('Configuring celery based on `%s` file', ini_location) | |||
|
184 | setup_celery_app( | |||
|
185 | app=None, root=None, request=None, registry=config.registry, | |||
|
186 | closer=None, ini_location=ini_location) | |||
|
187 | ||||
|
188 | ||||
|
189 | class RequestContextTask(Task): | |||
|
190 | """ | |||
|
191 | This is a celery task which will create a rhodecode app instance context | |||
|
192 | for the task, patch pyramid with the original request | |||
|
193 | that created the task and also add the user to the context. | |||
|
194 | """ | |||
|
195 | ||||
|
196 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |||
|
197 | link=None, link_error=None, shadow=None, **options): | |||
|
198 | """ queue the job to run (we are in web request context here) """ | |||
|
199 | ||||
|
200 | req = get_current_request() | |||
|
201 | ||||
|
202 | # web case | |||
|
203 | if hasattr(req, 'user'): | |||
|
204 | ip_addr = req.user.ip_addr | |||
|
205 | user_id = req.user.user_id | |||
|
206 | ||||
|
207 | # api case | |||
|
208 | elif hasattr(req, 'rpc_user'): | |||
|
209 | ip_addr = req.rpc_user.ip_addr | |||
|
210 | user_id = req.rpc_user.user_id | |||
|
211 | else: | |||
|
212 | raise Exception( | |||
|
213 | 'Unable to fetch required data from request: {}. \n' | |||
|
214 | 'This task is required to be executed from context of ' | |||
|
215 | 'request in a webapp'.format(repr(req))) | |||
|
216 | ||||
|
217 | if req: | |||
|
218 | # we hook into kwargs since it is the only way to pass our data to | |||
|
219 | # the celery worker | |||
|
220 | options['headers'] = options.get('headers', {}) | |||
|
221 | options['headers'].update({ | |||
|
222 | 'rhodecode_proxy_data': { | |||
|
223 | 'environ': { | |||
|
224 | 'PATH_INFO': req.environ['PATH_INFO'], | |||
|
225 | 'SCRIPT_NAME': req.environ['SCRIPT_NAME'], | |||
|
226 | 'HTTP_HOST': req.environ.get('HTTP_HOST', | |||
|
227 | req.environ['SERVER_NAME']), | |||
|
228 | 'SERVER_NAME': req.environ['SERVER_NAME'], | |||
|
229 | 'SERVER_PORT': req.environ['SERVER_PORT'], | |||
|
230 | 'wsgi.url_scheme': req.environ['wsgi.url_scheme'], | |||
|
231 | }, | |||
|
232 | 'auth_user': { | |||
|
233 | 'ip_addr': ip_addr, | |||
|
234 | 'user_id': user_id | |||
|
235 | }, | |||
|
236 | } | |||
|
237 | }) | |||
|
238 | ||||
|
239 | return super(RequestContextTask, self).apply_async( | |||
|
240 | args, kwargs, task_id, producer, link, link_error, shadow, **options) | |||
|
241 | ||||
|
242 | def __call__(self, *args, **kwargs): | |||
|
243 | """ rebuild the context and then run task on celery worker """ | |||
|
244 | ||||
|
245 | proxy_data = getattr(self.request, 'rhodecode_proxy_data', None) | |||
|
246 | if not proxy_data: | |||
|
247 | return super(RequestContextTask, self).__call__(*args, **kwargs) | |||
|
248 | ||||
|
249 | log.debug('using celery proxy data to run task: %r', proxy_data) | |||
|
250 | # re-inject and register threadlocals for proper routing support | |||
|
251 | request = prepare_request(proxy_data['environ']) | |||
|
252 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], | |||
|
253 | ip_addr=proxy_data['auth_user']['ip_addr']) | |||
|
254 | ||||
|
255 | return super(RequestContextTask, self).__call__(*args, **kwargs) | |||
|
256 |
@@ -0,0 +1,156 b'' | |||||
|
1 | # -*- coding: utf-8 -*- | |||
|
2 | ||||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
|
4 | # | |||
|
5 | # This program is free software: you can redistribute it and/or modify | |||
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |||
|
7 | # (only), as published by the Free Software Foundation. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU Affero General Public License | |||
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
|
16 | # | |||
|
17 | # This program is dual-licensed. If you wish to learn more about the | |||
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |||
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |||
|
20 | ||||
|
21 | import os | |||
|
22 | import json | |||
|
23 | import logging | |||
|
24 | import datetime | |||
|
25 | ||||
|
26 | from functools import partial | |||
|
27 | ||||
|
28 | from pyramid.compat import configparser | |||
|
29 | from celery.result import AsyncResult | |||
|
30 | import celery.loaders.base | |||
|
31 | import celery.schedules | |||
|
32 | ||||
|
33 | ||||
|
34 | log = logging.getLogger(__name__) | |||
|
35 | ||||
|
36 | ||||
|
37 | def get_task_id(task): | |||
|
38 | task_id = None | |||
|
39 | if isinstance(task, AsyncResult): | |||
|
40 | task_id = task.task_id | |||
|
41 | ||||
|
42 | return task_id | |||
|
43 | ||||
|
44 | ||||
|
45 | def crontab(value): | |||
|
46 | return celery.schedules.crontab(**value) | |||
|
47 | ||||
|
48 | ||||
|
49 | def timedelta(value): | |||
|
50 | return datetime.timedelta(**value) | |||
|
51 | ||||
|
52 | ||||
|
53 | def safe_json(get, section, key): | |||
|
54 | value = '' | |||
|
55 | try: | |||
|
56 | value = get(key) | |||
|
57 | json_value = json.loads(value) | |||
|
58 | except ValueError: | |||
|
59 | msg = 'The %s=%s is not valid json in section %s' % ( | |||
|
60 | key, value, section | |||
|
61 | ) | |||
|
62 | raise ValueError(msg) | |||
|
63 | ||||
|
64 | return json_value | |||
|
65 | ||||
|
66 | ||||
|
67 | def get_beat_config(parser, section): | |||
|
68 | SCHEDULE_TYPE_MAP = { | |||
|
69 | 'crontab': crontab, | |||
|
70 | 'timedelta': timedelta, | |||
|
71 | 'integer': int | |||
|
72 | } | |||
|
73 | get = partial(parser.get, section) | |||
|
74 | has_option = partial(parser.has_option, section) | |||
|
75 | ||||
|
76 | schedule_type = get('type') | |||
|
77 | schedule_value = safe_json(get, section, 'schedule') | |||
|
78 | ||||
|
79 | scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type) | |||
|
80 | ||||
|
81 | if scheduler_cls is None: | |||
|
82 | raise ValueError( | |||
|
83 | 'schedule type %s in section %s is invalid' % ( | |||
|
84 | schedule_type, | |||
|
85 | section | |||
|
86 | ) | |||
|
87 | ) | |||
|
88 | ||||
|
89 | schedule = scheduler_cls(schedule_value) | |||
|
90 | ||||
|
91 | config = { | |||
|
92 | 'task': get('task'), | |||
|
93 | 'schedule': schedule, | |||
|
94 | } | |||
|
95 | ||||
|
96 | if has_option('args'): | |||
|
97 | config['args'] = safe_json(get, section, 'args') | |||
|
98 | ||||
|
99 | if has_option('kwargs'): | |||
|
100 | config['kwargs'] = safe_json(get, section, 'kwargs') | |||
|
101 | ||||
|
102 | return config | |||
|
103 | ||||
|
104 | ||||
|
105 | def get_ini_config(ini_location): | |||
|
106 | """ | |||
|
107 | Converts basic ini configuration into celery 4.X options | |||
|
108 | """ | |||
|
109 | def key_converter(key_name): | |||
|
110 | pref = 'celery.' | |||
|
111 | if key_name.startswith(pref): | |||
|
112 | return key_name[len(pref):].replace('.', '_').lower() | |||
|
113 | ||||
|
114 | def type_converter(parsed_key, value): | |||
|
115 | # cast to int | |||
|
116 | if value.isdigit(): | |||
|
117 | return int(value) | |||
|
118 | ||||
|
119 | # cast to bool | |||
|
120 | if value.lower() in ['true', 'false', 'True', 'False']: | |||
|
121 | return value.lower() == 'true' | |||
|
122 | return value | |||
|
123 | ||||
|
124 | parser = configparser.SafeConfigParser( | |||
|
125 | defaults={'here': os.path.abspath(ini_location)}) | |||
|
126 | parser.read(ini_location) | |||
|
127 | ||||
|
128 | ini_config = {} | |||
|
129 | for k, v in parser.items('app:main'): | |||
|
130 | pref = 'celery.' | |||
|
131 | if k.startswith(pref): | |||
|
132 | ini_config[key_converter(k)] = type_converter(key_converter(k), v) | |||
|
133 | ||||
|
134 | beat_config = {} | |||
|
135 | for section in parser.sections(): | |||
|
136 | if section.startswith('celerybeat:'): | |||
|
137 | name = section.split(':', 1)[1] | |||
|
138 | beat_config[name] = get_beat_config(parser, section) | |||
|
139 | ||||
|
140 | # final compose of settings | |||
|
141 | celery_settings = {} | |||
|
142 | ||||
|
143 | if ini_config: | |||
|
144 | celery_settings.update(ini_config) | |||
|
145 | if beat_config: | |||
|
146 | celery_settings.update({'beat_schedule': beat_config}) | |||
|
147 | ||||
|
148 | return celery_settings | |||
|
149 | ||||
|
150 | ||||
|
151 | def parse_ini_vars(ini_vars): | |||
|
152 | options = {} | |||
|
153 | for pairs in ini_vars.split(','): | |||
|
154 | key, value = pairs.split('=') | |||
|
155 | options[key] = value | |||
|
156 | return options |
@@ -296,28 +296,15 b' labs_settings_active = true' | |||||
296 | ### CELERY CONFIG #### |
|
296 | ### CELERY CONFIG #### | |
297 | #################################### |
|
297 | #################################### | |
298 | use_celery = false |
|
298 | use_celery = false | |
299 | broker.host = localhost |
|
|||
300 | broker.vhost = rabbitmqhost |
|
|||
301 | broker.port = 5672 |
|
|||
302 | broker.user = rabbitmq |
|
|||
303 | broker.password = qweqwe |
|
|||
304 |
|
||||
305 | celery.imports = rhodecode.lib.celerylib.tasks |
|
|||
306 |
|
299 | |||
307 | celery.result.backend = amqp |
|
300 | # connection url to the message broker (default rabbitmq) | |
308 | celery.result.dburi = amqp:// |
|
301 | celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost | |
309 | celery.result.serialier = json |
|
|||
310 |
|
302 | |||
311 | #celery.send.task.error.emails = true |
|
303 | # maximum tasks to execute before worker restart | |
312 | #celery.amqp.task.result.expires = 18000 |
|
304 | celery.max_tasks_per_child = 100 | |
313 |
|
||||
314 | celeryd.concurrency = 2 |
|
|||
315 | #celeryd.log.file = celeryd.log |
|
|||
316 | celeryd.log.level = debug |
|
|||
317 | celeryd.max.tasks.per.child = 1 |
|
|||
318 |
|
305 | |||
319 | ## tasks will never be sent to the queue, but executed locally instead. |
|
306 | ## tasks will never be sent to the queue, but executed locally instead. | |
320 |
celery.always |
|
307 | celery.task_always_eager = false | |
321 |
|
308 | |||
322 | #################################### |
|
309 | #################################### | |
323 | ### BEAKER CACHE #### |
|
310 | ### BEAKER CACHE #### | |
@@ -649,7 +636,7 b' custom.conf = 1' | |||||
649 | ### LOGGING CONFIGURATION #### |
|
636 | ### LOGGING CONFIGURATION #### | |
650 | ################################ |
|
637 | ################################ | |
651 | [loggers] |
|
638 | [loggers] | |
652 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper |
|
639 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery | |
653 |
|
640 | |||
654 | [handlers] |
|
641 | [handlers] | |
655 | keys = console, console_sql |
|
642 | keys = console, console_sql | |
@@ -688,6 +675,11 b' handlers =' | |||||
688 | qualname = ssh_wrapper |
|
675 | qualname = ssh_wrapper | |
689 | propagate = 1 |
|
676 | propagate = 1 | |
690 |
|
677 | |||
|
678 | [logger_celery] | |||
|
679 | level = DEBUG | |||
|
680 | handlers = | |||
|
681 | qualname = celery | |||
|
682 | ||||
691 |
|
683 | |||
692 | ############## |
|
684 | ############## | |
693 | ## HANDLERS ## |
|
685 | ## HANDLERS ## |
@@ -271,28 +271,15 b' labs_settings_active = true' | |||||
271 | ### CELERY CONFIG #### |
|
271 | ### CELERY CONFIG #### | |
272 | #################################### |
|
272 | #################################### | |
273 | use_celery = false |
|
273 | use_celery = false | |
274 | broker.host = localhost |
|
|||
275 | broker.vhost = rabbitmqhost |
|
|||
276 | broker.port = 5672 |
|
|||
277 | broker.user = rabbitmq |
|
|||
278 | broker.password = qweqwe |
|
|||
279 |
|
||||
280 | celery.imports = rhodecode.lib.celerylib.tasks |
|
|||
281 |
|
274 | |||
282 | celery.result.backend = amqp |
|
275 | # connection url to the message broker (default rabbitmq) | |
283 | celery.result.dburi = amqp:// |
|
276 | celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost | |
284 | celery.result.serialier = json |
|
|||
285 |
|
277 | |||
286 | #celery.send.task.error.emails = true |
|
278 | # maximum tasks to execute before worker restart | |
287 | #celery.amqp.task.result.expires = 18000 |
|
279 | celery.max_tasks_per_child = 100 | |
288 |
|
||||
289 | celeryd.concurrency = 2 |
|
|||
290 | #celeryd.log.file = celeryd.log |
|
|||
291 | celeryd.log.level = debug |
|
|||
292 | celeryd.max.tasks.per.child = 1 |
|
|||
293 |
|
280 | |||
294 | ## tasks will never be sent to the queue, but executed locally instead. |
|
281 | ## tasks will never be sent to the queue, but executed locally instead. | |
295 |
celery.always |
|
282 | celery.task_always_eager = false | |
296 |
|
283 | |||
297 | #################################### |
|
284 | #################################### | |
298 | ### BEAKER CACHE #### |
|
285 | ### BEAKER CACHE #### | |
@@ -619,7 +606,7 b' custom.conf = 1' | |||||
619 | ### LOGGING CONFIGURATION #### |
|
606 | ### LOGGING CONFIGURATION #### | |
620 | ################################ |
|
607 | ################################ | |
621 | [loggers] |
|
608 | [loggers] | |
622 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper |
|
609 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery | |
623 |
|
610 | |||
624 | [handlers] |
|
611 | [handlers] | |
625 | keys = console, console_sql |
|
612 | keys = console, console_sql | |
@@ -658,6 +645,11 b' handlers =' | |||||
658 | qualname = ssh_wrapper |
|
645 | qualname = ssh_wrapper | |
659 | propagate = 1 |
|
646 | propagate = 1 | |
660 |
|
647 | |||
|
648 | [logger_celery] | |||
|
649 | level = DEBUG | |||
|
650 | handlers = | |||
|
651 | qualname = celery | |||
|
652 | ||||
661 |
|
653 | |||
662 | ############## |
|
654 | ############## | |
663 | ## HANDLERS ## |
|
655 | ## HANDLERS ## |
@@ -32,6 +32,7 b' from rhodecode.api.utils import (' | |||||
32 | from rhodecode.lib import audit_logger |
|
32 | from rhodecode.lib import audit_logger | |
33 | from rhodecode.lib import repo_maintenance |
|
33 | from rhodecode.lib import repo_maintenance | |
34 | from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi |
|
34 | from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi | |
|
35 | from rhodecode.lib.celerylib.utils import get_task_id | |||
35 | from rhodecode.lib.utils2 import str2bool, time_to_datetime |
|
36 | from rhodecode.lib.utils2 import str2bool, time_to_datetime | |
36 | from rhodecode.lib.ext_json import json |
|
37 | from rhodecode.lib.ext_json import json | |
37 | from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError |
|
38 | from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError | |
@@ -712,10 +713,7 b' def create_repo(' | |||||
712 | } |
|
713 | } | |
713 |
|
714 | |||
714 | task = RepoModel().create(form_data=data, cur_user=owner) |
|
715 | task = RepoModel().create(form_data=data, cur_user=owner) | |
715 | from celery.result import BaseAsyncResult |
|
716 | task_id = get_task_id(task) | |
716 | task_id = None |
|
|||
717 | if isinstance(task, BaseAsyncResult): |
|
|||
718 | task_id = task.task_id |
|
|||
719 | # no commit, it's done in RepoModel, or async via celery |
|
717 | # no commit, it's done in RepoModel, or async via celery | |
720 | return { |
|
718 | return { | |
721 | 'msg': "Created new repository `%s`" % (schema_data['repo_name'],), |
|
719 | 'msg': "Created new repository `%s`" % (schema_data['repo_name'],), | |
@@ -1105,10 +1103,8 b' def fork_repo(request, apiuser, repoid, ' | |||||
1105 |
|
1103 | |||
1106 | task = RepoModel().create_fork(data, cur_user=owner) |
|
1104 | task = RepoModel().create_fork(data, cur_user=owner) | |
1107 | # no commit, it's done in RepoModel, or async via celery |
|
1105 | # no commit, it's done in RepoModel, or async via celery | |
1108 | from celery.result import BaseAsyncResult |
|
1106 | task_id = get_task_id(task) | |
1109 | task_id = None |
|
1107 | ||
1110 | if isinstance(task, BaseAsyncResult): |
|
|||
1111 | task_id = task.task_id |
|
|||
1112 | return { |
|
1108 | return { | |
1113 | 'msg': 'Created fork of `%s` as `%s`' % ( |
|
1109 | 'msg': 'Created fork of `%s` as `%s`' % ( | |
1114 | repo.repo_name, schema_data['repo_name']), |
|
1110 | repo.repo_name, schema_data['repo_name']), |
@@ -28,6 +28,7 b' from pyramid.renderers import render' | |||||
28 | from pyramid.response import Response |
|
28 | from pyramid.response import Response | |
29 |
|
29 | |||
30 | from rhodecode.apps._base import BaseAppView, DataGridAppView |
|
30 | from rhodecode.apps._base import BaseAppView, DataGridAppView | |
|
31 | from rhodecode.lib.celerylib.utils import get_task_id | |||
31 |
|
32 | |||
32 | from rhodecode.lib.ext_json import json |
|
33 | from rhodecode.lib.ext_json import json | |
33 | from rhodecode.lib.auth import ( |
|
34 | from rhodecode.lib.auth import ( | |
@@ -143,22 +144,19 b' class AdminReposView(BaseAppView, DataGr' | |||||
143 | c = self.load_default_context() |
|
144 | c = self.load_default_context() | |
144 |
|
145 | |||
145 | form_result = {} |
|
146 | form_result = {} | |
|
147 | self._load_form_data(c) | |||
146 | task_id = None |
|
148 | task_id = None | |
147 | self._load_form_data(c) |
|
|||
148 |
|
||||
149 | try: |
|
149 | try: | |
150 | # CanWriteToGroup validators checks permissions of this POST |
|
150 | # CanWriteToGroup validators checks permissions of this POST | |
151 | form = RepoForm( |
|
151 | form = RepoForm( | |
152 | self.request.translate, repo_groups=c.repo_groups_choices, |
|
152 | self.request.translate, repo_groups=c.repo_groups_choices, | |
153 | landing_revs=c.landing_revs_choices)() |
|
153 | landing_revs=c.landing_revs_choices)() | |
154 |
form_result |
|
154 | form_result = form.to_python(dict(self.request.POST)) | |
155 |
|
155 | |||
156 | # create is done sometimes async on celery, db transaction |
|
156 | # create is done sometimes async on celery, db transaction | |
157 | # management is handled there. |
|
157 | # management is handled there. | |
158 | task = RepoModel().create(form_result, self._rhodecode_user.user_id) |
|
158 | task = RepoModel().create(form_result, self._rhodecode_user.user_id) | |
159 | from celery.result import BaseAsyncResult |
|
159 | task_id = get_task_id(task) | |
160 | if isinstance(task, BaseAsyncResult): |
|
|||
161 | task_id = task.task_id |
|
|||
162 | except formencode.Invalid as errors: |
|
160 | except formencode.Invalid as errors: | |
163 | data = render('rhodecode:templates/admin/repos/repo_add.mako', |
|
161 | data = render('rhodecode:templates/admin/repos/repo_add.mako', | |
164 | self._get_template_context(c), self.request) |
|
162 | self._get_template_context(c), self.request) |
@@ -46,11 +46,9 b' class RepoChecksView(BaseAppView):' | |||||
46 |
|
46 | |||
47 | repo_name = self.request.matchdict['repo_name'] |
|
47 | repo_name = self.request.matchdict['repo_name'] | |
48 | db_repo = Repository.get_by_repo_name(repo_name) |
|
48 | db_repo = Repository.get_by_repo_name(repo_name) | |
49 | if not db_repo: |
|
|||
50 | raise HTTPNotFound() |
|
|||
51 |
|
49 | |||
52 | # check if maybe repo is already created |
|
50 | # check if maybe repo is already created | |
53 | if db_repo.repo_state in [Repository.STATE_CREATED]: |
|
51 | if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]: | |
54 | # re-check permissions before redirecting to prevent resource |
|
52 | # re-check permissions before redirecting to prevent resource | |
55 | # discovery by checking the 302 code |
|
53 | # discovery by checking the 302 code | |
56 | perm_set = ['repository.read', 'repository.write', 'repository.admin'] |
|
54 | perm_set = ['repository.read', 'repository.write', 'repository.admin'] | |
@@ -80,9 +78,10 b' class RepoChecksView(BaseAppView):' | |||||
80 |
|
78 | |||
81 | if task_id and task_id not in ['None']: |
|
79 | if task_id and task_id not in ['None']: | |
82 | import rhodecode |
|
80 | import rhodecode | |
83 | from celery.result import AsyncResult |
|
81 | from rhodecode.lib.celerylib.loader import celery_app | |
84 | if rhodecode.CELERY_ENABLED: |
|
82 | if rhodecode.CELERY_ENABLED: | |
85 | task = AsyncResult(task_id) |
|
83 | task = celery_app.AsyncResult(task_id) | |
|
84 | task.get() | |||
86 | if task.failed(): |
|
85 | if task.failed(): | |
87 | msg = self._log_creation_exception(task.result, repo_name) |
|
86 | msg = self._log_creation_exception(task.result, repo_name) | |
88 | h.flash(msg, category='error') |
|
87 | h.flash(msg, category='error') |
@@ -33,6 +33,7 b' from rhodecode.lib.auth import (' | |||||
33 | LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, |
|
33 | LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, | |
34 | HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired) |
|
34 | HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired) | |
35 | import rhodecode.lib.helpers as h |
|
35 | import rhodecode.lib.helpers as h | |
|
36 | from rhodecode.lib.celerylib.utils import get_task_id | |||
36 | from rhodecode.model.db import coalesce, or_, Repository, RepoGroup |
|
37 | from rhodecode.model.db import coalesce, or_, Repository, RepoGroup | |
37 | from rhodecode.model.repo import RepoModel |
|
38 | from rhodecode.model.repo import RepoModel | |
38 | from rhodecode.model.forms import RepoForkForm |
|
39 | from rhodecode.model.forms import RepoForkForm | |
@@ -226,9 +227,8 b' class RepoForksView(RepoAppView, DataGri' | |||||
226 | # management is handled there. |
|
227 | # management is handled there. | |
227 | task = RepoModel().create_fork( |
|
228 | task = RepoModel().create_fork( | |
228 | form_result, c.rhodecode_user.user_id) |
|
229 | form_result, c.rhodecode_user.user_id) | |
229 | from celery.result import BaseAsyncResult |
|
230 | ||
230 | if isinstance(task, BaseAsyncResult): |
|
231 | task_id = get_task_id(task) | |
231 | task_id = task.task_id |
|
|||
232 | except formencode.Invalid as errors: |
|
232 | except formencode.Invalid as errors: | |
233 | c.rhodecode_db_repo = self.db_repo |
|
233 | c.rhodecode_db_repo = self.db_repo | |
234 |
|
234 |
@@ -23,14 +23,6 b' import os' | |||||
23 | import logging |
|
23 | import logging | |
24 | import rhodecode |
|
24 | import rhodecode | |
25 |
|
25 | |||
26 | # ------------------------------------------------------------------------------ |
|
|||
27 | # CELERY magic until refactor - issue #4163 - import order matters here: |
|
|||
28 | #from rhodecode.lib import celerypylons # this must be first, celerypylons |
|
|||
29 | # sets config settings upon import |
|
|||
30 |
|
||||
31 | import rhodecode.integrations # any modules using celery task |
|
|||
32 | # decorators should be added afterwards: |
|
|||
33 | # ------------------------------------------------------------------------------ |
|
|||
34 |
|
26 | |||
35 | from rhodecode.config import utils |
|
27 | from rhodecode.config import utils | |
36 |
|
28 | |||
@@ -54,14 +46,6 b' def load_pyramid_environment(global_conf' | |||||
54 | 'secret': settings_merged.get('channelstream.secret') |
|
46 | 'secret': settings_merged.get('channelstream.secret') | |
55 | } |
|
47 | } | |
56 |
|
48 | |||
57 |
|
||||
58 | # TODO(marcink): celery |
|
|||
59 | # # store some globals into rhodecode |
|
|||
60 | # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery')) |
|
|||
61 | # rhodecode.CELERY_EAGER = str2bool( |
|
|||
62 | # config['app_conf'].get('celery.always.eager')) |
|
|||
63 |
|
||||
64 |
|
||||
65 | # If this is a test run we prepare the test environment like |
|
49 | # If this is a test run we prepare the test environment like | |
66 | # creating a test database, test search index and test repositories. |
|
50 | # creating a test database, test search index and test repositories. | |
67 | # This has to be done before the database connection is initialized. |
|
51 | # This has to be done before the database connection is initialized. |
@@ -189,7 +189,7 b'' | |||||
189 | "python2.7-jupyter-core-4.3.0": { |
|
189 | "python2.7-jupyter-core-4.3.0": { | |
190 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" |
|
190 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" | |
191 | }, |
|
191 | }, | |
192 |
"python2.7-kombu- |
|
192 | "python2.7-kombu-4.1.0": { | |
193 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" |
|
193 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" | |
194 | }, |
|
194 | }, | |
195 | "python2.7-mistune-0.7.4": { |
|
195 | "python2.7-mistune-0.7.4": { |
@@ -42,6 +42,7 b' from rhodecode.lib.vcs import VCSCommuni' | |||||
42 | from rhodecode.lib.exceptions import VCSServerUnavailable |
|
42 | from rhodecode.lib.exceptions import VCSServerUnavailable | |
43 | from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled |
|
43 | from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled | |
44 | from rhodecode.lib.middleware.https_fixup import HttpsFixup |
|
44 | from rhodecode.lib.middleware.https_fixup import HttpsFixup | |
|
45 | from rhodecode.lib.celerylib.loader import configure_celery | |||
45 | from rhodecode.lib.plugins.utils import register_rhodecode_plugin |
|
46 | from rhodecode.lib.plugins.utils import register_rhodecode_plugin | |
46 | from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict |
|
47 | from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict | |
47 | from rhodecode.subscribers import ( |
|
48 | from rhodecode.subscribers import ( | |
@@ -87,9 +88,11 b' def make_pyramid_app(global_config, **se' | |||||
87 | pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config) |
|
88 | pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config) | |
88 | pyramid_app.config = config |
|
89 | pyramid_app.config = config | |
89 |
|
90 | |||
|
91 | config.configure_celery(global_config['__file__']) | |||
90 | # creating the app uses a connection - return it after we are done |
|
92 | # creating the app uses a connection - return it after we are done | |
91 | meta.Session.remove() |
|
93 | meta.Session.remove() | |
92 |
|
94 | |||
|
95 | log.info('Pyramid app %s created and configured.', pyramid_app) | |||
93 | return pyramid_app |
|
96 | return pyramid_app | |
94 |
|
97 | |||
95 |
|
98 | |||
@@ -196,6 +199,8 b' def includeme(config):' | |||||
196 | config.add_directive( |
|
199 | config.add_directive( | |
197 | 'register_rhodecode_plugin', register_rhodecode_plugin) |
|
200 | 'register_rhodecode_plugin', register_rhodecode_plugin) | |
198 |
|
201 | |||
|
202 | config.add_directive('configure_celery', configure_celery) | |||
|
203 | ||||
199 | if asbool(settings.get('appenlight', 'false')): |
|
204 | if asbool(settings.get('appenlight', 'false')): | |
200 | config.include('appenlight_client.ext.pyramid_tween') |
|
205 | config.include('appenlight_client.ext.pyramid_tween') | |
201 |
|
206 |
@@ -20,18 +20,16 b'' | |||||
20 |
|
20 | |||
21 | from __future__ import unicode_literals |
|
21 | from __future__ import unicode_literals | |
22 | import deform |
|
22 | import deform | |
23 | import re |
|
|||
24 | import logging |
|
23 | import logging | |
25 | import requests |
|
24 | import requests | |
26 | import colander |
|
25 | import colander | |
27 | import textwrap |
|
26 | import textwrap | |
28 | from celery.task import task |
|
|||
29 | from mako.template import Template |
|
27 | from mako.template import Template | |
30 |
|
28 | |||
31 | from rhodecode import events |
|
29 | from rhodecode import events | |
32 | from rhodecode.translation import _ |
|
30 | from rhodecode.translation import _ | |
33 | from rhodecode.lib import helpers as h |
|
31 | from rhodecode.lib import helpers as h | |