Show More
@@ -0,0 +1,256 b'' | |||||
|
1 | # -*- coding: utf-8 -*- | |||
|
2 | ||||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
|
4 | # | |||
|
5 | # This program is free software: you can redistribute it and/or modify | |||
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |||
|
7 | # (only), as published by the Free Software Foundation. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU Affero General Public License | |||
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
|
16 | # | |||
|
17 | # This program is dual-licensed. If you wish to learn more about the | |||
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |||
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |||
|
20 | """ | |||
|
21 | Celery loader, run with:: | |||
|
22 | ||||
|
23 | celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini | |||
|
24 | """ | |||
|
25 | import os | |||
|
26 | import logging | |||
|
27 | ||||
|
28 | from celery import Celery | |||
|
29 | from celery import signals | |||
|
30 | from celery import Task | |||
|
31 | from kombu.serialization import register | |||
|
32 | from pyramid.threadlocal import get_current_request | |||
|
33 | ||||
|
34 | import rhodecode | |||
|
35 | ||||
|
36 | from rhodecode.lib.auth import AuthUser | |||
|
37 | from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars | |||
|
38 | from rhodecode.lib.ext_json import json | |||
|
39 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request | |||
|
40 | from rhodecode.lib.utils2 import str2bool | |||
|
41 | from rhodecode.model import meta | |||
|
42 | ||||
|
43 | ||||
|
44 | register('json_ext', json.dumps, json.loads, | |||
|
45 | content_type='application/x-json-ext', | |||
|
46 | content_encoding='utf-8') | |||
|
47 | ||||
|
48 | log = logging.getLogger('celery.rhodecode.loader') | |||
|
49 | ||||
|
50 | ||||
|
51 | def add_preload_arguments(parser): | |||
|
52 | parser.add_argument( | |||
|
53 | '--ini', default=None, | |||
|
54 | help='Path to ini configuration file.' | |||
|
55 | ) | |||
|
56 | parser.add_argument( | |||
|
57 | '--ini-var', default=None, | |||
|
58 | help='Comma separated list of key=value to pass to ini.' | |||
|
59 | ) | |||
|
60 | ||||
|
61 | ||||
|
62 | def get_logger(obj): | |||
|
63 | custom_log = logging.getLogger( | |||
|
64 | 'rhodecode.task.{}'.format(obj.__class__.__name__)) | |||
|
65 | ||||
|
66 | if rhodecode.CELERY_ENABLED: | |||
|
67 | try: | |||
|
68 | custom_log = obj.get_logger() | |||
|
69 | except Exception: | |||
|
70 | pass | |||
|
71 | ||||
|
72 | return custom_log | |||
|
73 | ||||
|
74 | ||||
|
75 | base_celery_config = { | |||
|
76 | 'result_backend': 'rpc://', | |||
|
77 | 'result_expires': 60 * 60 * 24, | |||
|
78 | 'result_persistent': True, | |||
|
79 | 'imports': [], | |||
|
80 | 'worker_max_tasks_per_child': 100, | |||
|
81 | 'accept_content': ['json_ext'], | |||
|
82 | 'task_serializer': 'json_ext', | |||
|
83 | 'result_serializer': 'json_ext', | |||
|
84 | 'worker_hijack_root_logger': False, | |||
|
85 | } | |||
|
86 | # init main celery app | |||
|
87 | celery_app = Celery() | |||
|
88 | celery_app.user_options['preload'].add(add_preload_arguments) | |||
|
89 | ini_file_glob = None | |||
|
90 | ||||
|
91 | ||||
|
92 | @signals.setup_logging.connect | |||
|
93 | def setup_logging_callback(**kwargs): | |||
|
94 | setup_logging(ini_file_glob) | |||
|
95 | ||||
|
96 | ||||
|
97 | @signals.user_preload_options.connect | |||
|
98 | def on_preload_parsed(options, **kwargs): | |||
|
99 | ini_location = options['ini'] | |||
|
100 | ini_vars = options['ini_var'] | |||
|
101 | celery_app.conf['INI_PYRAMID'] = options['ini'] | |||
|
102 | ||||
|
103 | if ini_location is None: | |||
|
104 | print('You must provide the paste --ini argument') | |||
|
105 | exit(-1) | |||
|
106 | ||||
|
107 | options = None | |||
|
108 | if ini_vars is not None: | |||
|
109 | options = parse_ini_vars(ini_vars) | |||
|
110 | ||||
|
111 | global ini_file_glob | |||
|
112 | ini_file_glob = ini_location | |||
|
113 | ||||
|
114 | log.debug('Bootstrapping RhodeCode application...') | |||
|
115 | env = bootstrap(ini_location, options=options) | |||
|
116 | ||||
|
117 | setup_celery_app( | |||
|
118 | app=env['app'], root=env['root'], request=env['request'], | |||
|
119 | registry=env['registry'], closer=env['closer'], | |||
|
120 | ini_location=ini_location) | |||
|
121 | ||||
|
122 | # fix the global flag even if it's disabled via .ini file because this | |||
|
123 | # is a worker code that doesn't need this to be disabled. | |||
|
124 | rhodecode.CELERY_ENABLED = True | |||
|
125 | ||||
|
126 | ||||
|
127 | @signals.task_success.connect | |||
|
128 | def task_success_signal(result, **kwargs): | |||
|
129 | meta.Session.commit() | |||
|
130 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
131 | ||||
|
132 | ||||
|
133 | @signals.task_retry.connect | |||
|
134 | def task_retry_signal( | |||
|
135 | request, reason, einfo, **kwargs): | |||
|
136 | meta.Session.remove() | |||
|
137 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
138 | ||||
|
139 | ||||
|
140 | @signals.task_failure.connect | |||
|
141 | def task_failure_signal( | |||
|
142 | task_id, exception, args, kwargs, traceback, einfo, **kargs): | |||
|
143 | meta.Session.remove() | |||
|
144 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
145 | ||||
|
146 | ||||
|
147 | @signals.task_revoked.connect | |||
|
148 | def task_revoked_signal( | |||
|
149 | request, terminated, signum, expired, **kwargs): | |||
|
150 | celery_app.conf['PYRAMID_CLOSER']() | |||
|
151 | ||||
|
152 | ||||
|
153 | def setup_celery_app(app, root, request, registry, closer, ini_location): | |||
|
154 | ini_dir = os.path.dirname(os.path.abspath(ini_location)) | |||
|
155 | celery_config = base_celery_config | |||
|
156 | celery_config.update({ | |||
|
157 | # store celerybeat scheduler db where the .ini file is | |||
|
158 | 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'), | |||
|
159 | }) | |||
|
160 | ini_settings = get_ini_config(ini_location) | |||
|
161 | log.debug('Got custom celery conf: %s', ini_settings) | |||
|
162 | ||||
|
163 | celery_config.update(ini_settings) | |||
|
164 | celery_app.config_from_object(celery_config) | |||
|
165 | ||||
|
166 | celery_app.conf.update({'PYRAMID_APP': app}) | |||
|
167 | celery_app.conf.update({'PYRAMID_ROOT': root}) | |||
|
168 | celery_app.conf.update({'PYRAMID_REQUEST': request}) | |||
|
169 | celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | |||
|
170 | celery_app.conf.update({'PYRAMID_CLOSER': closer}) | |||
|
171 | ||||
|
172 | ||||
|
173 | def configure_celery(config, ini_location): | |||
|
174 | """ | |||
|
175 | Helper that is called from our application creation logic. It gives | |||
|
176 | connection info into running webapp and allows execution of tasks from | |||
|
177 | RhodeCode itself | |||
|
178 | """ | |||
|
179 | # store some globals into rhodecode | |||
|
180 | rhodecode.CELERY_ENABLED = str2bool( | |||
|
181 | config.registry.settings.get('use_celery')) | |||
|
182 | if rhodecode.CELERY_ENABLED: | |||
|
183 | log.info('Configuring celery based on `%s` file', ini_location) | |||
|
184 | setup_celery_app( | |||
|
185 | app=None, root=None, request=None, registry=config.registry, | |||
|
186 | closer=None, ini_location=ini_location) | |||
|
187 | ||||
|
188 | ||||
|
189 | class RequestContextTask(Task): | |||
|
190 | """ | |||
|
191 | This is a celery task which will create a rhodecode app instance context | |||
|
192 | for the task, patch pyramid with the original request | |||
|
193 | that created the task and also add the user to the context. | |||
|
194 | """ | |||
|
195 | ||||
|
196 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |||
|
197 | link=None, link_error=None, shadow=None, **options): | |||
|
198 | """ queue the job to run (we are in web request context here) """ | |||
|
199 | ||||
|
200 | req = get_current_request() | |||
|
201 | ||||
|
202 | # web case | |||
|
203 | if hasattr(req, 'user'): | |||
|
204 | ip_addr = req.user.ip_addr | |||
|
205 | user_id = req.user.user_id | |||
|
206 | ||||
|
207 | # api case | |||
|
208 | elif hasattr(req, 'rpc_user'): | |||
|
209 | ip_addr = req.rpc_user.ip_addr | |||
|
210 | user_id = req.rpc_user.user_id | |||
|
211 | else: | |||
|
212 | raise Exception( | |||
|
213 | 'Unable to fetch required data from request: {}. \n' | |||
|
214 | 'This task is required to be executed from context of ' | |||
|
215 | 'request in a webapp'.format(repr(req))) | |||
|
216 | ||||
|
217 | if req: | |||
|
218 | # we hook into kwargs since it is the only way to pass our data to | |||
|
219 | # the celery worker | |||
|
220 | options['headers'] = options.get('headers', {}) | |||
|
221 | options['headers'].update({ | |||
|
222 | 'rhodecode_proxy_data': { | |||
|
223 | 'environ': { | |||
|
224 | 'PATH_INFO': req.environ['PATH_INFO'], | |||
|
225 | 'SCRIPT_NAME': req.environ['SCRIPT_NAME'], | |||
|
226 | 'HTTP_HOST': req.environ.get('HTTP_HOST', | |||
|
227 | req.environ['SERVER_NAME']), | |||
|
228 | 'SERVER_NAME': req.environ['SERVER_NAME'], | |||
|
229 | 'SERVER_PORT': req.environ['SERVER_PORT'], | |||
|
230 | 'wsgi.url_scheme': req.environ['wsgi.url_scheme'], | |||
|
231 | }, | |||
|
232 | 'auth_user': { | |||
|
233 | 'ip_addr': ip_addr, | |||
|
234 | 'user_id': user_id | |||
|
235 | }, | |||
|
236 | } | |||
|
237 | }) | |||
|
238 | ||||
|
239 | return super(RequestContextTask, self).apply_async( | |||
|
240 | args, kwargs, task_id, producer, link, link_error, shadow, **options) | |||
|
241 | ||||
|
242 | def __call__(self, *args, **kwargs): | |||
|
243 | """ rebuild the context and then run task on celery worker """ | |||
|
244 | ||||
|
245 | proxy_data = getattr(self.request, 'rhodecode_proxy_data', None) | |||
|
246 | if not proxy_data: | |||
|
247 | return super(RequestContextTask, self).__call__(*args, **kwargs) | |||
|
248 | ||||
|
249 | log.debug('using celery proxy data to run task: %r', proxy_data) | |||
|
250 | # re-inject and register threadlocals for proper routing support | |||
|
251 | request = prepare_request(proxy_data['environ']) | |||
|
252 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], | |||
|
253 | ip_addr=proxy_data['auth_user']['ip_addr']) | |||
|
254 | ||||
|
255 | return super(RequestContextTask, self).__call__(*args, **kwargs) | |||
|
256 |
@@ -0,0 +1,156 b'' | |||||
|
1 | # -*- coding: utf-8 -*- | |||
|
2 | ||||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
|
4 | # | |||
|
5 | # This program is free software: you can redistribute it and/or modify | |||
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |||
|
7 | # (only), as published by the Free Software Foundation. | |||
|
8 | # | |||
|
9 | # This program is distributed in the hope that it will be useful, | |||
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
12 | # GNU General Public License for more details. | |||
|
13 | # | |||
|
14 | # You should have received a copy of the GNU Affero General Public License | |||
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
|
16 | # | |||
|
17 | # This program is dual-licensed. If you wish to learn more about the | |||
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |||
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |||
|
20 | ||||
|
21 | import os | |||
|
22 | import json | |||
|
23 | import logging | |||
|
24 | import datetime | |||
|
25 | ||||
|
26 | from functools import partial | |||
|
27 | ||||
|
28 | from pyramid.compat import configparser | |||
|
29 | from celery.result import AsyncResult | |||
|
30 | import celery.loaders.base | |||
|
31 | import celery.schedules | |||
|
32 | ||||
|
33 | ||||
|
34 | log = logging.getLogger(__name__) | |||
|
35 | ||||
|
36 | ||||
|
37 | def get_task_id(task): | |||
|
38 | task_id = None | |||
|
39 | if isinstance(task, AsyncResult): | |||
|
40 | task_id = task.task_id | |||
|
41 | ||||
|
42 | return task_id | |||
|
43 | ||||
|
44 | ||||
|
45 | def crontab(value): | |||
|
46 | return celery.schedules.crontab(**value) | |||
|
47 | ||||
|
48 | ||||
|
49 | def timedelta(value): | |||
|
50 | return datetime.timedelta(**value) | |||
|
51 | ||||
|
52 | ||||
|
53 | def safe_json(get, section, key): | |||
|
54 | value = '' | |||
|
55 | try: | |||
|
56 | value = get(key) | |||
|
57 | json_value = json.loads(value) | |||
|
58 | except ValueError: | |||
|
59 | msg = 'The %s=%s is not valid json in section %s' % ( | |||
|
60 | key, value, section | |||
|
61 | ) | |||
|
62 | raise ValueError(msg) | |||
|
63 | ||||
|
64 | return json_value | |||
|
65 | ||||
|
66 | ||||
|
67 | def get_beat_config(parser, section): | |||
|
68 | SCHEDULE_TYPE_MAP = { | |||
|
69 | 'crontab': crontab, | |||
|
70 | 'timedelta': timedelta, | |||
|
71 | 'integer': int | |||
|
72 | } | |||
|
73 | get = partial(parser.get, section) | |||
|
74 | has_option = partial(parser.has_option, section) | |||
|
75 | ||||
|
76 | schedule_type = get('type') | |||
|
77 | schedule_value = safe_json(get, section, 'schedule') | |||
|
78 | ||||
|
79 | scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type) | |||
|
80 | ||||
|
81 | if scheduler_cls is None: | |||
|
82 | raise ValueError( | |||
|
83 | 'schedule type %s in section %s is invalid' % ( | |||
|
84 | schedule_type, | |||
|
85 | section | |||
|
86 | ) | |||
|
87 | ) | |||
|
88 | ||||
|
89 | schedule = scheduler_cls(schedule_value) | |||
|
90 | ||||
|
91 | config = { | |||
|
92 | 'task': get('task'), | |||
|
93 | 'schedule': schedule, | |||
|
94 | } | |||
|
95 | ||||
|
96 | if has_option('args'): | |||
|
97 | config['args'] = safe_json(get, section, 'args') | |||
|
98 | ||||
|
99 | if has_option('kwargs'): | |||
|
100 | config['kwargs'] = safe_json(get, section, 'kwargs') | |||
|
101 | ||||
|
102 | return config | |||
|
103 | ||||
|
104 | ||||
|
105 | def get_ini_config(ini_location): | |||
|
106 | """ | |||
|
107 | Converts basic ini configuration into celery 4.X options | |||
|
108 | """ | |||
|
109 | def key_converter(key_name): | |||
|
110 | pref = 'celery.' | |||
|
111 | if key_name.startswith(pref): | |||
|
112 | return key_name[len(pref):].replace('.', '_').lower() | |||
|
113 | ||||
|
114 | def type_converter(parsed_key, value): | |||
|
115 | # cast to int | |||
|
116 | if value.isdigit(): | |||
|
117 | return int(value) | |||
|
118 | ||||
|
119 | # cast to bool | |||
|
120 | if value.lower() in ['true', 'false', 'True', 'False']: | |||
|
121 | return value.lower() == 'true' | |||
|
122 | return value | |||
|
123 | ||||
|
124 | parser = configparser.SafeConfigParser( | |||
|
125 | defaults={'here': os.path.abspath(ini_location)}) | |||
|
126 | parser.read(ini_location) | |||
|
127 | ||||
|
128 | ini_config = {} | |||
|
129 | for k, v in parser.items('app:main'): | |||
|
130 | pref = 'celery.' | |||
|
131 | if k.startswith(pref): | |||
|
132 | ini_config[key_converter(k)] = type_converter(key_converter(k), v) | |||
|
133 | ||||
|
134 | beat_config = {} | |||
|
135 | for section in parser.sections(): | |||
|
136 | if section.startswith('celerybeat:'): | |||
|
137 | name = section.split(':', 1)[1] | |||
|
138 | beat_config[name] = get_beat_config(parser, section) | |||
|
139 | ||||
|
140 | # final compose of settings | |||
|
141 | celery_settings = {} | |||
|
142 | ||||
|
143 | if ini_config: | |||
|
144 | celery_settings.update(ini_config) | |||
|
145 | if beat_config: | |||
|
146 | celery_settings.update({'beat_schedule': beat_config}) | |||
|
147 | ||||
|
148 | return celery_settings | |||
|
149 | ||||
|
150 | ||||
|
151 | def parse_ini_vars(ini_vars): | |||
|
152 | options = {} | |||
|
153 | for pairs in ini_vars.split(','): | |||
|
154 | key, value = pairs.split('=') | |||
|
155 | options[key] = value | |||
|
156 | return options |
@@ -296,28 +296,15 b' labs_settings_active = true' | |||||
296 | ### CELERY CONFIG #### |
|
296 | ### CELERY CONFIG #### | |
297 | #################################### |
|
297 | #################################### | |
298 | use_celery = false |
|
298 | use_celery = false | |
299 | broker.host = localhost |
|
|||
300 | broker.vhost = rabbitmqhost |
|
|||
301 | broker.port = 5672 |
|
|||
302 | broker.user = rabbitmq |
|
|||
303 | broker.password = qweqwe |
|
|||
304 |
|
||||
305 | celery.imports = rhodecode.lib.celerylib.tasks |
|
|||
306 |
|
299 | |||
307 | celery.result.backend = amqp |
|
300 | # connection url to the message broker (default rabbitmq) | |
308 | celery.result.dburi = amqp:// |
|
301 | celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost | |
309 | celery.result.serialier = json |
|
|||
310 |
|
302 | |||
311 | #celery.send.task.error.emails = true |
|
303 | # maximum tasks to execute before worker restart | |
312 | #celery.amqp.task.result.expires = 18000 |
|
304 | celery.max_tasks_per_child = 100 | |
313 |
|
||||
314 | celeryd.concurrency = 2 |
|
|||
315 | #celeryd.log.file = celeryd.log |
|
|||
316 | celeryd.log.level = debug |
|
|||
317 | celeryd.max.tasks.per.child = 1 |
|
|||
318 |
|
305 | |||
319 | ## tasks will never be sent to the queue, but executed locally instead. |
|
306 | ## tasks will never be sent to the queue, but executed locally instead. | |
320 |
celery.always |
|
307 | celery.task_always_eager = false | |
321 |
|
308 | |||
322 | #################################### |
|
309 | #################################### | |
323 | ### BEAKER CACHE #### |
|
310 | ### BEAKER CACHE #### | |
@@ -649,7 +636,7 b' custom.conf = 1' | |||||
649 | ### LOGGING CONFIGURATION #### |
|
636 | ### LOGGING CONFIGURATION #### | |
650 | ################################ |
|
637 | ################################ | |
651 | [loggers] |
|
638 | [loggers] | |
652 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper |
|
639 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery | |
653 |
|
640 | |||
654 | [handlers] |
|
641 | [handlers] | |
655 | keys = console, console_sql |
|
642 | keys = console, console_sql | |
@@ -688,6 +675,11 b' handlers =' | |||||
688 | qualname = ssh_wrapper |
|
675 | qualname = ssh_wrapper | |
689 | propagate = 1 |
|
676 | propagate = 1 | |
690 |
|
677 | |||
|
678 | [logger_celery] | |||
|
679 | level = DEBUG | |||
|
680 | handlers = | |||
|
681 | qualname = celery | |||
|
682 | ||||
691 |
|
683 | |||
692 | ############## |
|
684 | ############## | |
693 | ## HANDLERS ## |
|
685 | ## HANDLERS ## |
@@ -271,28 +271,15 b' labs_settings_active = true' | |||||
271 | ### CELERY CONFIG #### |
|
271 | ### CELERY CONFIG #### | |
272 | #################################### |
|
272 | #################################### | |
273 | use_celery = false |
|
273 | use_celery = false | |
274 | broker.host = localhost |
|
|||
275 | broker.vhost = rabbitmqhost |
|
|||
276 | broker.port = 5672 |
|
|||
277 | broker.user = rabbitmq |
|
|||
278 | broker.password = qweqwe |
|
|||
279 |
|
||||
280 | celery.imports = rhodecode.lib.celerylib.tasks |
|
|||
281 |
|
274 | |||
282 | celery.result.backend = amqp |
|
275 | # connection url to the message broker (default rabbitmq) | |
283 | celery.result.dburi = amqp:// |
|
276 | celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost | |
284 | celery.result.serialier = json |
|
|||
285 |
|
277 | |||
286 | #celery.send.task.error.emails = true |
|
278 | # maximum tasks to execute before worker restart | |
287 | #celery.amqp.task.result.expires = 18000 |
|
279 | celery.max_tasks_per_child = 100 | |
288 |
|
||||
289 | celeryd.concurrency = 2 |
|
|||
290 | #celeryd.log.file = celeryd.log |
|
|||
291 | celeryd.log.level = debug |
|
|||
292 | celeryd.max.tasks.per.child = 1 |
|
|||
293 |
|
280 | |||
294 | ## tasks will never be sent to the queue, but executed locally instead. |
|
281 | ## tasks will never be sent to the queue, but executed locally instead. | |
295 |
celery.always |
|
282 | celery.task_always_eager = false | |
296 |
|
283 | |||
297 | #################################### |
|
284 | #################################### | |
298 | ### BEAKER CACHE #### |
|
285 | ### BEAKER CACHE #### | |
@@ -619,7 +606,7 b' custom.conf = 1' | |||||
619 | ### LOGGING CONFIGURATION #### |
|
606 | ### LOGGING CONFIGURATION #### | |
620 | ################################ |
|
607 | ################################ | |
621 | [loggers] |
|
608 | [loggers] | |
622 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper |
|
609 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery | |
623 |
|
610 | |||
624 | [handlers] |
|
611 | [handlers] | |
625 | keys = console, console_sql |
|
612 | keys = console, console_sql | |
@@ -658,6 +645,11 b' handlers =' | |||||
658 | qualname = ssh_wrapper |
|
645 | qualname = ssh_wrapper | |
659 | propagate = 1 |
|
646 | propagate = 1 | |
660 |
|
647 | |||
|
648 | [logger_celery] | |||
|
649 | level = DEBUG | |||
|
650 | handlers = | |||
|
651 | qualname = celery | |||
|
652 | ||||
661 |
|
653 | |||
662 | ############## |
|
654 | ############## | |
663 | ## HANDLERS ## |
|
655 | ## HANDLERS ## |
@@ -32,6 +32,7 b' from rhodecode.api.utils import (' | |||||
32 | from rhodecode.lib import audit_logger |
|
32 | from rhodecode.lib import audit_logger | |
33 | from rhodecode.lib import repo_maintenance |
|
33 | from rhodecode.lib import repo_maintenance | |
34 | from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi |
|
34 | from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi | |
|
35 | from rhodecode.lib.celerylib.utils import get_task_id | |||
35 | from rhodecode.lib.utils2 import str2bool, time_to_datetime |
|
36 | from rhodecode.lib.utils2 import str2bool, time_to_datetime | |
36 | from rhodecode.lib.ext_json import json |
|
37 | from rhodecode.lib.ext_json import json | |
37 | from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError |
|
38 | from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError | |
@@ -712,10 +713,7 b' def create_repo(' | |||||
712 | } |
|
713 | } | |
713 |
|
714 | |||
714 | task = RepoModel().create(form_data=data, cur_user=owner) |
|
715 | task = RepoModel().create(form_data=data, cur_user=owner) | |
715 | from celery.result import BaseAsyncResult |
|
716 | task_id = get_task_id(task) | |
716 | task_id = None |
|
|||
717 | if isinstance(task, BaseAsyncResult): |
|
|||
718 | task_id = task.task_id |
|
|||
719 | # no commit, it's done in RepoModel, or async via celery |
|
717 | # no commit, it's done in RepoModel, or async via celery | |
720 | return { |
|
718 | return { | |
721 | 'msg': "Created new repository `%s`" % (schema_data['repo_name'],), |
|
719 | 'msg': "Created new repository `%s`" % (schema_data['repo_name'],), | |
@@ -1105,10 +1103,8 b' def fork_repo(request, apiuser, repoid, ' | |||||
1105 |
|
1103 | |||
1106 | task = RepoModel().create_fork(data, cur_user=owner) |
|
1104 | task = RepoModel().create_fork(data, cur_user=owner) | |
1107 | # no commit, it's done in RepoModel, or async via celery |
|
1105 | # no commit, it's done in RepoModel, or async via celery | |
1108 | from celery.result import BaseAsyncResult |
|
1106 | task_id = get_task_id(task) | |
1109 | task_id = None |
|
1107 | ||
1110 | if isinstance(task, BaseAsyncResult): |
|
|||
1111 | task_id = task.task_id |
|
|||
1112 | return { |
|
1108 | return { | |
1113 | 'msg': 'Created fork of `%s` as `%s`' % ( |
|
1109 | 'msg': 'Created fork of `%s` as `%s`' % ( | |
1114 | repo.repo_name, schema_data['repo_name']), |
|
1110 | repo.repo_name, schema_data['repo_name']), |
@@ -28,6 +28,7 b' from pyramid.renderers import render' | |||||
28 | from pyramid.response import Response |
|
28 | from pyramid.response import Response | |
29 |
|
29 | |||
30 | from rhodecode.apps._base import BaseAppView, DataGridAppView |
|
30 | from rhodecode.apps._base import BaseAppView, DataGridAppView | |
|
31 | from rhodecode.lib.celerylib.utils import get_task_id | |||
31 |
|
32 | |||
32 | from rhodecode.lib.ext_json import json |
|
33 | from rhodecode.lib.ext_json import json | |
33 | from rhodecode.lib.auth import ( |
|
34 | from rhodecode.lib.auth import ( | |
@@ -143,22 +144,19 b' class AdminReposView(BaseAppView, DataGr' | |||||
143 | c = self.load_default_context() |
|
144 | c = self.load_default_context() | |
144 |
|
145 | |||
145 | form_result = {} |
|
146 | form_result = {} | |
|
147 | self._load_form_data(c) | |||
146 | task_id = None |
|
148 | task_id = None | |
147 | self._load_form_data(c) |
|
|||
148 |
|
||||
149 | try: |
|
149 | try: | |
150 | # CanWriteToGroup validators checks permissions of this POST |
|
150 | # CanWriteToGroup validators checks permissions of this POST | |
151 | form = RepoForm( |
|
151 | form = RepoForm( | |
152 | self.request.translate, repo_groups=c.repo_groups_choices, |
|
152 | self.request.translate, repo_groups=c.repo_groups_choices, | |
153 | landing_revs=c.landing_revs_choices)() |
|
153 | landing_revs=c.landing_revs_choices)() | |
154 |
form_result |
|
154 | form_result = form.to_python(dict(self.request.POST)) | |
155 |
|
155 | |||
156 | # create is done sometimes async on celery, db transaction |
|
156 | # create is done sometimes async on celery, db transaction | |
157 | # management is handled there. |
|
157 | # management is handled there. | |
158 | task = RepoModel().create(form_result, self._rhodecode_user.user_id) |
|
158 | task = RepoModel().create(form_result, self._rhodecode_user.user_id) | |
159 | from celery.result import BaseAsyncResult |
|
159 | task_id = get_task_id(task) | |
160 | if isinstance(task, BaseAsyncResult): |
|
|||
161 | task_id = task.task_id |
|
|||
162 | except formencode.Invalid as errors: |
|
160 | except formencode.Invalid as errors: | |
163 | data = render('rhodecode:templates/admin/repos/repo_add.mako', |
|
161 | data = render('rhodecode:templates/admin/repos/repo_add.mako', | |
164 | self._get_template_context(c), self.request) |
|
162 | self._get_template_context(c), self.request) |
@@ -46,11 +46,9 b' class RepoChecksView(BaseAppView):' | |||||
46 |
|
46 | |||
47 | repo_name = self.request.matchdict['repo_name'] |
|
47 | repo_name = self.request.matchdict['repo_name'] | |
48 | db_repo = Repository.get_by_repo_name(repo_name) |
|
48 | db_repo = Repository.get_by_repo_name(repo_name) | |
49 | if not db_repo: |
|
|||
50 | raise HTTPNotFound() |
|
|||
51 |
|
49 | |||
52 | # check if maybe repo is already created |
|
50 | # check if maybe repo is already created | |
53 | if db_repo.repo_state in [Repository.STATE_CREATED]: |
|
51 | if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]: | |
54 | # re-check permissions before redirecting to prevent resource |
|
52 | # re-check permissions before redirecting to prevent resource | |
55 | # discovery by checking the 302 code |
|
53 | # discovery by checking the 302 code | |
56 | perm_set = ['repository.read', 'repository.write', 'repository.admin'] |
|
54 | perm_set = ['repository.read', 'repository.write', 'repository.admin'] | |
@@ -80,9 +78,10 b' class RepoChecksView(BaseAppView):' | |||||
80 |
|
78 | |||
81 | if task_id and task_id not in ['None']: |
|
79 | if task_id and task_id not in ['None']: | |
82 | import rhodecode |
|
80 | import rhodecode | |
83 | from celery.result import AsyncResult |
|
81 | from rhodecode.lib.celerylib.loader import celery_app | |
84 | if rhodecode.CELERY_ENABLED: |
|
82 | if rhodecode.CELERY_ENABLED: | |
85 | task = AsyncResult(task_id) |
|
83 | task = celery_app.AsyncResult(task_id) | |
|
84 | task.get() | |||
86 | if task.failed(): |
|
85 | if task.failed(): | |
87 | msg = self._log_creation_exception(task.result, repo_name) |
|
86 | msg = self._log_creation_exception(task.result, repo_name) | |
88 | h.flash(msg, category='error') |
|
87 | h.flash(msg, category='error') |
@@ -33,6 +33,7 b' from rhodecode.lib.auth import (' | |||||
33 | LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, |
|
33 | LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, | |
34 | HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired) |
|
34 | HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired) | |
35 | import rhodecode.lib.helpers as h |
|
35 | import rhodecode.lib.helpers as h | |
|
36 | from rhodecode.lib.celerylib.utils import get_task_id | |||
36 | from rhodecode.model.db import coalesce, or_, Repository, RepoGroup |
|
37 | from rhodecode.model.db import coalesce, or_, Repository, RepoGroup | |
37 | from rhodecode.model.repo import RepoModel |
|
38 | from rhodecode.model.repo import RepoModel | |
38 | from rhodecode.model.forms import RepoForkForm |
|
39 | from rhodecode.model.forms import RepoForkForm | |
@@ -226,9 +227,8 b' class RepoForksView(RepoAppView, DataGri' | |||||
226 | # management is handled there. |
|
227 | # management is handled there. | |
227 | task = RepoModel().create_fork( |
|
228 | task = RepoModel().create_fork( | |
228 | form_result, c.rhodecode_user.user_id) |
|
229 | form_result, c.rhodecode_user.user_id) | |
229 | from celery.result import BaseAsyncResult |
|
230 | ||
230 | if isinstance(task, BaseAsyncResult): |
|
231 | task_id = get_task_id(task) | |
231 | task_id = task.task_id |
|
|||
232 | except formencode.Invalid as errors: |
|
232 | except formencode.Invalid as errors: | |
233 | c.rhodecode_db_repo = self.db_repo |
|
233 | c.rhodecode_db_repo = self.db_repo | |
234 |
|
234 |
@@ -23,14 +23,6 b' import os' | |||||
23 | import logging |
|
23 | import logging | |
24 | import rhodecode |
|
24 | import rhodecode | |
25 |
|
25 | |||
26 | # ------------------------------------------------------------------------------ |
|
|||
27 | # CELERY magic until refactor - issue #4163 - import order matters here: |
|
|||
28 | #from rhodecode.lib import celerypylons # this must be first, celerypylons |
|
|||
29 | # sets config settings upon import |
|
|||
30 |
|
||||
31 | import rhodecode.integrations # any modules using celery task |
|
|||
32 | # decorators should be added afterwards: |
|
|||
33 | # ------------------------------------------------------------------------------ |
|
|||
34 |
|
26 | |||
35 | from rhodecode.config import utils |
|
27 | from rhodecode.config import utils | |
36 |
|
28 | |||
@@ -54,14 +46,6 b' def load_pyramid_environment(global_conf' | |||||
54 | 'secret': settings_merged.get('channelstream.secret') |
|
46 | 'secret': settings_merged.get('channelstream.secret') | |
55 | } |
|
47 | } | |
56 |
|
48 | |||
57 |
|
||||
58 | # TODO(marcink): celery |
|
|||
59 | # # store some globals into rhodecode |
|
|||
60 | # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery')) |
|
|||
61 | # rhodecode.CELERY_EAGER = str2bool( |
|
|||
62 | # config['app_conf'].get('celery.always.eager')) |
|
|||
63 |
|
||||
64 |
|
||||
65 | # If this is a test run we prepare the test environment like |
|
49 | # If this is a test run we prepare the test environment like | |
66 | # creating a test database, test search index and test repositories. |
|
50 | # creating a test database, test search index and test repositories. | |
67 | # This has to be done before the database connection is initialized. |
|
51 | # This has to be done before the database connection is initialized. |
@@ -189,7 +189,7 b'' | |||||
189 | "python2.7-jupyter-core-4.3.0": { |
|
189 | "python2.7-jupyter-core-4.3.0": { | |
190 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" |
|
190 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" | |
191 | }, |
|
191 | }, | |
192 |
"python2.7-kombu- |
|
192 | "python2.7-kombu-4.1.0": { | |
193 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" |
|
193 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" | |
194 | }, |
|
194 | }, | |
195 | "python2.7-mistune-0.7.4": { |
|
195 | "python2.7-mistune-0.7.4": { |
@@ -42,6 +42,7 b' from rhodecode.lib.vcs import VCSCommuni' | |||||
42 | from rhodecode.lib.exceptions import VCSServerUnavailable |
|
42 | from rhodecode.lib.exceptions import VCSServerUnavailable | |
43 | from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled |
|
43 | from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled | |
44 | from rhodecode.lib.middleware.https_fixup import HttpsFixup |
|
44 | from rhodecode.lib.middleware.https_fixup import HttpsFixup | |
|
45 | from rhodecode.lib.celerylib.loader import configure_celery | |||
45 | from rhodecode.lib.plugins.utils import register_rhodecode_plugin |
|
46 | from rhodecode.lib.plugins.utils import register_rhodecode_plugin | |
46 | from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict |
|
47 | from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict | |
47 | from rhodecode.subscribers import ( |
|
48 | from rhodecode.subscribers import ( | |
@@ -87,9 +88,11 b' def make_pyramid_app(global_config, **se' | |||||
87 | pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config) |
|
88 | pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config) | |
88 | pyramid_app.config = config |
|
89 | pyramid_app.config = config | |
89 |
|
90 | |||
|
91 | config.configure_celery(global_config['__file__']) | |||
90 | # creating the app uses a connection - return it after we are done |
|
92 | # creating the app uses a connection - return it after we are done | |
91 | meta.Session.remove() |
|
93 | meta.Session.remove() | |
92 |
|
94 | |||
|
95 | log.info('Pyramid app %s created and configured.', pyramid_app) | |||
93 | return pyramid_app |
|
96 | return pyramid_app | |
94 |
|
97 | |||
95 |
|
98 | |||
@@ -196,6 +199,8 b' def includeme(config):' | |||||
196 | config.add_directive( |
|
199 | config.add_directive( | |
197 | 'register_rhodecode_plugin', register_rhodecode_plugin) |
|
200 | 'register_rhodecode_plugin', register_rhodecode_plugin) | |
198 |
|
201 | |||
|
202 | config.add_directive('configure_celery', configure_celery) | |||
|
203 | ||||
199 | if asbool(settings.get('appenlight', 'false')): |
|
204 | if asbool(settings.get('appenlight', 'false')): | |
200 | config.include('appenlight_client.ext.pyramid_tween') |
|
205 | config.include('appenlight_client.ext.pyramid_tween') | |
201 |
|
206 |
@@ -20,18 +20,16 b'' | |||||
20 |
|
20 | |||
21 | from __future__ import unicode_literals |
|
21 | from __future__ import unicode_literals | |
22 | import deform |
|
22 | import deform | |
23 | import re |
|
|||
24 | import logging |
|
23 | import logging | |
25 | import requests |
|
24 | import requests | |
26 | import colander |
|
25 | import colander | |
27 | import textwrap |
|
26 | import textwrap | |
28 | from celery.task import task |
|
|||
29 | from mako.template import Template |
|
27 | from mako.template import Template | |
30 |
|
28 | |||
31 | from rhodecode import events |
|
29 | from rhodecode import events | |
32 | from rhodecode.translation import _ |
|
30 | from rhodecode.translation import _ | |
33 | from rhodecode.lib import helpers as h |
|
31 | from rhodecode.lib import helpers as h | |
34 | from rhodecode.lib.celerylib import run_task |
|
32 | from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask | |
35 | from rhodecode.lib.colander_utils import strip_whitespace |
|
33 | from rhodecode.lib.colander_utils import strip_whitespace | |
36 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
34 | from rhodecode.integrations.types.base import IntegrationTypeBase | |
37 |
|
35 | |||
@@ -243,7 +241,7 b' class HipchatIntegrationType(Integration' | |||||
243 | ) |
|
241 | ) | |
244 |
|
242 | |||
245 |
|
243 | |||
246 | @task(ignore_result=True) |
|
244 | @async_task(ignore_result=True, base=RequestContextTask) | |
247 | def post_text_to_hipchat(settings, text): |
|
245 | def post_text_to_hipchat(settings, text): | |
248 | log.debug('sending %s to hipchat %s' % (text, settings['server_url'])) |
|
246 | log.debug('sending %s to hipchat %s' % (text, settings['server_url'])) | |
249 | resp = requests.post(settings['server_url'], json={ |
|
247 | resp = requests.post(settings['server_url'], json={ |
@@ -27,13 +27,12 b' import logging' | |||||
27 | import deform |
|
27 | import deform | |
28 | import requests |
|
28 | import requests | |
29 | import colander |
|
29 | import colander | |
30 | from celery.task import task |
|
|||
31 | from mako.template import Template |
|
30 | from mako.template import Template | |
32 |
|
31 | |||
33 | from rhodecode import events |
|
32 | from rhodecode import events | |
34 | from rhodecode.translation import _ |
|
33 | from rhodecode.translation import _ | |
35 | from rhodecode.lib import helpers as h |
|
34 | from rhodecode.lib import helpers as h | |
36 | from rhodecode.lib.celerylib import run_task |
|
35 | from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask | |
37 | from rhodecode.lib.colander_utils import strip_whitespace |
|
36 | from rhodecode.lib.colander_utils import strip_whitespace | |
38 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
37 | from rhodecode.integrations.types.base import IntegrationTypeBase | |
39 |
|
38 | |||
@@ -296,7 +295,7 b' def html_to_slack_links(message):' | |||||
296 | r'<\1|\2>', message) |
|
295 | r'<\1|\2>', message) | |
297 |
|
296 | |||
298 |
|
297 | |||
299 | @task(ignore_result=True) |
|
298 | @async_task(ignore_result=True, base=RequestContextTask) | |
300 | def post_text_to_slack(settings, title, text, fields=None, overrides=None): |
|
299 | def post_text_to_slack(settings, title, text, fields=None, overrides=None): | |
301 | log.debug('sending %s (%s) to slack %s' % ( |
|
300 | log.debug('sending %s (%s) to slack %s' % ( | |
302 | title, text, settings['service'])) |
|
301 | title, text, settings['service'])) |
@@ -28,16 +28,17 b' import logging' | |||||
28 | import requests |
|
28 | import requests | |
29 | import requests.adapters |
|
29 | import requests.adapters | |
30 | import colander |
|
30 | import colander | |
31 | from celery.task import task |
|
|||
32 | from requests.packages.urllib3.util.retry import Retry |
|
31 | from requests.packages.urllib3.util.retry import Retry | |
33 |
|
32 | |||
34 | import rhodecode |
|
33 | import rhodecode | |
35 | from rhodecode import events |
|
34 | from rhodecode import events | |
36 | from rhodecode.translation import _ |
|
35 | from rhodecode.translation import _ | |
37 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
36 | from rhodecode.integrations.types.base import IntegrationTypeBase | |
|
37 | from rhodecode.lib.celerylib import async_task, RequestContextTask | |||
38 |
|
38 | |||
39 | log = logging.getLogger(__name__) |
|
39 | log = logging.getLogger(__name__) | |
40 |
|
40 | |||
|
41 | ||||
41 | # updating this required to update the `common_vars` passed in url calling func |
|
42 | # updating this required to update the `common_vars` passed in url calling func | |
42 | WEBHOOK_URL_VARS = [ |
|
43 | WEBHOOK_URL_VARS = [ | |
43 | 'repo_name', |
|
44 | 'repo_name', | |
@@ -315,7 +316,7 b' class WebhookIntegrationType(Integration' | |||||
315 | post_to_webhook(url_calls, self.settings) |
|
316 | post_to_webhook(url_calls, self.settings) | |
316 |
|
317 | |||
317 |
|
318 | |||
318 | @task(ignore_result=True) |
|
319 | @async_task(ignore_result=True, base=RequestContextTask) | |
319 | def post_to_webhook(url_calls, settings): |
|
320 | def post_to_webhook(url_calls, settings): | |
320 | max_retries = 3 |
|
321 | max_retries = 3 | |
321 | retries = Retry( |
|
322 | retries = Retry( |
@@ -17,36 +17,17 b'' | |||||
17 | # This program is dual-licensed. If you wish to learn more about the |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
20 | """ |
|
|||
21 | celery libs for RhodeCode |
|
|||
22 | """ |
|
|||
23 |
|
20 | |||
24 |
|
||||
25 | import pylons |
|
|||
26 | import socket |
|
21 | import socket | |
27 | import logging |
|
22 | import logging | |
28 |
|
23 | |||
29 | import rhodecode |
|
24 | import rhodecode | |
30 |
|
25 | from zope.cachedescriptors.property import Lazy as LazyProperty | ||
31 | from os.path import join as jn |
|
26 | from rhodecode.lib.celerylib.loader import ( | |
32 | from pylons import config |
|
27 | celery_app, RequestContextTask, get_logger) | |
33 | from celery.task import Task |
|
|||
34 | from pyramid.request import Request |
|
|||
35 | from pyramid.scripting import prepare |
|
|||
36 | from pyramid.threadlocal import get_current_request |
|
|||
37 |
|
||||
38 | from decorator import decorator |
|
|||
39 |
|
28 | |||
40 | from zope.cachedescriptors.property import Lazy as LazyProperty |
|
29 | async_task = celery_app.task | |
41 |
|
30 | |||
42 | from rhodecode.config import utils |
|
|||
43 | from rhodecode.lib.utils2 import ( |
|
|||
44 | safe_str, md5_safe, aslist, get_routes_generator_for_server_url, |
|
|||
45 | get_server_url) |
|
|||
46 | from rhodecode.lib.pidlock import DaemonLock, LockHeld |
|
|||
47 | from rhodecode.lib.vcs import connect_vcs |
|
|||
48 | from rhodecode.model import meta |
|
|||
49 | from rhodecode.lib.auth import AuthUser |
|
|||
50 |
|
31 | |||
51 | log = logging.getLogger(__name__) |
|
32 | log = logging.getLogger(__name__) | |
52 |
|
33 | |||
@@ -60,95 +41,13 b' class ResultWrapper(object):' | |||||
60 | return self.task |
|
41 | return self.task | |
61 |
|
42 | |||
62 |
|
43 | |||
63 | class RhodecodeCeleryTask(Task): |
|
|||
64 | """ |
|
|||
65 | This is a celery task which will create a rhodecode app instance context |
|
|||
66 | for the task, patch pyramid + pylons threadlocals with the original request |
|
|||
67 | that created the task and also add the user to the context. |
|
|||
68 |
|
||||
69 | This class as a whole should be removed once the pylons port is complete |
|
|||
70 | and a pyramid only solution for celery is implemented as per issue #4139 |
|
|||
71 | """ |
|
|||
72 |
|
||||
73 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, |
|
|||
74 | link=None, link_error=None, **options): |
|
|||
75 | """ queue the job to run (we are in web request context here) """ |
|
|||
76 |
|
||||
77 | request = get_current_request() |
|
|||
78 |
|
||||
79 | if hasattr(request, 'user'): |
|
|||
80 | ip_addr = request.user.ip_addr |
|
|||
81 | user_id = request.user.user_id |
|
|||
82 | elif hasattr(request, 'rpc_params'): |
|
|||
83 | # TODO(marcink) remove when migration is finished |
|
|||
84 | # api specific call on Pyramid. |
|
|||
85 | ip_addr = request.rpc_params['apiuser'].ip_addr |
|
|||
86 | user_id = request.rpc_params['apiuser'].user_id |
|
|||
87 | else: |
|
|||
88 | raise Exception('Unable to fetch data from request: {}'.format( |
|
|||
89 | request)) |
|
|||
90 |
|
||||
91 | if request: |
|
|||
92 | # we hook into kwargs since it is the only way to pass our data to |
|
|||
93 | # the celery worker in celery 2.2 |
|
|||
94 | kwargs.update({ |
|
|||
95 | '_rhodecode_proxy_data': { |
|
|||
96 | 'environ': { |
|
|||
97 | 'PATH_INFO': request.environ['PATH_INFO'], |
|
|||
98 | 'SCRIPT_NAME': request.environ['SCRIPT_NAME'], |
|
|||
99 | 'HTTP_HOST': request.environ.get('HTTP_HOST', |
|
|||
100 | request.environ['SERVER_NAME']), |
|
|||
101 | 'SERVER_NAME': request.environ['SERVER_NAME'], |
|
|||
102 | 'SERVER_PORT': request.environ['SERVER_PORT'], |
|
|||
103 | 'wsgi.url_scheme': request.environ['wsgi.url_scheme'], |
|
|||
104 | }, |
|
|||
105 | 'auth_user': { |
|
|||
106 | 'ip_addr': ip_addr, |
|
|||
107 | 'user_id': user_id |
|
|||
108 | }, |
|
|||
109 | } |
|
|||
110 | }) |
|
|||
111 | return super(RhodecodeCeleryTask, self).apply_async( |
|
|||
112 | args, kwargs, task_id, producer, link, link_error, **options) |
|
|||
113 |
|
||||
114 | def __call__(self, *args, **kwargs): |
|
|||
115 | """ rebuild the context and then run task on celery worker """ |
|
|||
116 | proxy_data = kwargs.pop('_rhodecode_proxy_data', {}) |
|
|||
117 |
|
||||
118 | if not proxy_data: |
|
|||
119 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) |
|
|||
120 |
|
||||
121 | log.debug('using celery proxy data to run task: %r', proxy_data) |
|
|||
122 |
|
||||
123 | from rhodecode.config.routing import make_map |
|
|||
124 |
|
||||
125 | request = Request.blank('/', environ=proxy_data['environ']) |
|
|||
126 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], |
|
|||
127 | ip_addr=proxy_data['auth_user']['ip_addr']) |
|
|||
128 |
|
||||
129 | pyramid_request = prepare(request) # set pyramid threadlocal request |
|
|||
130 |
|
||||
131 | # pylons routing |
|
|||
132 | if not rhodecode.CONFIG.get('routes.map'): |
|
|||
133 | rhodecode.CONFIG['routes.map'] = make_map(config) |
|
|||
134 | pylons.url._push_object(get_routes_generator_for_server_url( |
|
|||
135 | get_server_url(request.environ) |
|
|||
136 | )) |
|
|||
137 |
|
||||
138 | try: |
|
|||
139 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) |
|
|||
140 | finally: |
|
|||
141 | pyramid_request['closer']() |
|
|||
142 | pylons.url._pop_object() |
|
|||
143 |
|
||||
144 |
|
||||
145 | def run_task(task, *args, **kwargs): |
|
44 | def run_task(task, *args, **kwargs): | |
146 | if rhodecode.CELERY_ENABLED: |
|
45 | if rhodecode.CELERY_ENABLED: | |
147 | celery_is_up = False |
|
46 | celery_is_up = False | |
148 | try: |
|
47 | try: | |
149 | t = task.apply_async(args=args, kwargs=kwargs) |
|
48 | t = task.apply_async(args=args, kwargs=kwargs) | |
150 | log.info('running task %s:%s', t.task_id, task) |
|
|||
151 | celery_is_up = True |
|
49 | celery_is_up = True | |
|
50 | log.debug('executing task %s:%s in async mode', t.task_id, task) | |||
152 | return t |
|
51 | return t | |
153 |
|
52 | |||
154 | except socket.error as e: |
|
53 | except socket.error as e: | |
@@ -164,73 +63,10 b' def run_task(task, *args, **kwargs):' | |||||
164 | "Fallback to sync execution.") |
|
63 | "Fallback to sync execution.") | |
165 |
|
64 | |||
166 | # keep in mind there maybe a subtle race condition where something |
|
65 | # keep in mind there maybe a subtle race condition where something | |
167 |
# depending on rhodecode.CELERY_ENABLED |
|
66 | # depending on rhodecode.CELERY_ENABLED | |
168 | # will see CELERY_ENABLED as True before this has a chance to set False |
|
67 | # will see CELERY_ENABLED as True before this has a chance to set False | |
169 | rhodecode.CELERY_ENABLED = celery_is_up |
|
68 | rhodecode.CELERY_ENABLED = celery_is_up | |
170 | else: |
|
69 | else: | |
171 | log.debug('executing task %s in sync mode', task) |
|
70 | log.debug('executing task %s:%s in sync mode', 'TASK', task) | |
172 | return ResultWrapper(task(*args, **kwargs)) |
|
|||
173 |
|
||||
174 |
|
||||
175 | def __get_lockkey(func, *fargs, **fkwargs): |
|
|||
176 | params = list(fargs) |
|
|||
177 | params.extend(['%s-%s' % ar for ar in fkwargs.items()]) |
|
|||
178 |
|
||||
179 | func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) |
|
|||
180 | _lock_key = func_name + '-' + '-'.join(map(safe_str, params)) |
|
|||
181 | return 'task_%s.lock' % (md5_safe(_lock_key),) |
|
|||
182 |
|
||||
183 |
|
||||
184 | def locked_task(func): |
|
|||
185 | def __wrapper(func, *fargs, **fkwargs): |
|
|||
186 | lockkey = __get_lockkey(func, *fargs, **fkwargs) |
|
|||
187 | lockkey_path = config['app_conf']['cache_dir'] |
|
|||
188 |
|
||||
189 | log.info('running task with lockkey %s' % lockkey) |
|
|||
190 | try: |
|
|||
191 | l = DaemonLock(file_=jn(lockkey_path, lockkey)) |
|
|||
192 | ret = func(*fargs, **fkwargs) |
|
|||
193 | l.release() |
|
|||
194 | return ret |
|
|||
195 | except LockHeld: |
|
|||
196 | log.info('LockHeld') |
|
|||
197 | return 'Task with key %s already running' % lockkey |
|
|||
198 |
|
||||
199 | return decorator(__wrapper, func) |
|
|||
200 |
|
||||
201 |
|
71 | |||
202 | def get_session(): |
|
72 | return ResultWrapper(task(*args, **kwargs)) | |
203 | if rhodecode.CELERY_ENABLED: |
|
|||
204 | utils.initialize_database(config) |
|
|||
205 | sa = meta.Session() |
|
|||
206 | return sa |
|
|||
207 |
|
||||
208 |
|
||||
209 | def dbsession(func): |
|
|||
210 | def __wrapper(func, *fargs, **fkwargs): |
|
|||
211 | try: |
|
|||
212 | ret = func(*fargs, **fkwargs) |
|
|||
213 | return ret |
|
|||
214 | finally: |
|
|||
215 | if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER: |
|
|||
216 | meta.Session.remove() |
|
|||
217 |
|
||||
218 | return decorator(__wrapper, func) |
|
|||
219 |
|
||||
220 |
|
||||
221 | def vcsconnection(func): |
|
|||
222 | def __wrapper(func, *fargs, **fkwargs): |
|
|||
223 | if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER: |
|
|||
224 | settings = rhodecode.PYRAMID_SETTINGS |
|
|||
225 | backends = settings['vcs.backends'] |
|
|||
226 | for alias in rhodecode.BACKENDS.keys(): |
|
|||
227 | if alias not in backends: |
|
|||
228 | del rhodecode.BACKENDS[alias] |
|
|||
229 | utils.configure_vcs(settings) |
|
|||
230 | connect_vcs( |
|
|||
231 | settings['vcs.server'], |
|
|||
232 | utils.get_vcs_server_protocol(settings)) |
|
|||
233 | ret = func(*fargs, **fkwargs) |
|
|||
234 | return ret |
|
|||
235 |
|
||||
236 | return decorator(__wrapper, func) |
|
@@ -23,38 +23,18 b' RhodeCode task modules, containing all t' | |||||
23 | by celery daemon |
|
23 | by celery daemon | |
24 | """ |
|
24 | """ | |
25 |
|
25 | |||
26 |
|
||||
27 | import os |
|
26 | import os | |
28 | import logging |
|
|||
29 |
|
||||
30 | from celery.task import task |
|
|||
31 |
|
27 | |||
32 | import rhodecode |
|
28 | import rhodecode | |
33 | from rhodecode.lib import audit_logger |
|
29 | from rhodecode.lib import audit_logger | |
34 |
from rhodecode.lib.celerylib import |
|
30 | from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask | |
35 | run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, |
|
|||
36 | get_session, vcsconnection, RhodecodeCeleryTask) |
|
|||
37 | from rhodecode.lib.hooks_base import log_create_repository |
|
31 | from rhodecode.lib.hooks_base import log_create_repository | |
38 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer |
|
32 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer | |
39 | from rhodecode.lib.utils import add_cache |
|
|||
40 | from rhodecode.lib.utils2 import safe_int, str2bool |
|
33 | from rhodecode.lib.utils2 import safe_int, str2bool | |
41 | from rhodecode.model.db import Repository, User |
|
34 | from rhodecode.model.db import Session, Repository, User | |
42 |
|
35 | |||
43 |
|
36 | |||
44 | def get_logger(cls): |
|
37 | @async_task(ignore_result=True, base=RequestContextTask) | |
45 | if rhodecode.CELERY_ENABLED: |
|
|||
46 | try: |
|
|||
47 | log = cls.get_logger() |
|
|||
48 | except Exception: |
|
|||
49 | log = logging.getLogger(__name__) |
|
|||
50 | else: |
|
|||
51 | log = logging.getLogger(__name__) |
|
|||
52 |
|
||||
53 | return log |
|
|||
54 |
|
||||
55 |
|
||||
56 | @task(ignore_result=True, base=RhodecodeCeleryTask) |
|
|||
57 | @dbsession |
|
|||
58 | def send_email(recipients, subject, body='', html_body='', email_config=None): |
|
38 | def send_email(recipients, subject, body='', html_body='', email_config=None): | |
59 | """ |
|
39 | """ | |
60 | Sends an email with defined parameters from the .ini files. |
|
40 | Sends an email with defined parameters from the .ini files. | |
@@ -101,18 +81,15 b' def send_email(recipients, subject, body' | |||||
101 | return True |
|
81 | return True | |
102 |
|
82 | |||
103 |
|
83 | |||
104 |
@task(ignore_result=True, base=R |
|
84 | @async_task(ignore_result=True, base=RequestContextTask) | |
105 | @dbsession |
|
|||
106 | @vcsconnection |
|
|||
107 | def create_repo(form_data, cur_user): |
|
85 | def create_repo(form_data, cur_user): | |
108 | from rhodecode.model.repo import RepoModel |
|
86 | from rhodecode.model.repo import RepoModel | |
109 | from rhodecode.model.user import UserModel |
|
87 | from rhodecode.model.user import UserModel | |
110 | from rhodecode.model.settings import SettingsModel |
|
88 | from rhodecode.model.settings import SettingsModel | |
111 |
|
89 | |||
112 | log = get_logger(create_repo) |
|
90 | log = get_logger(create_repo) | |
113 | DBS = get_session() |
|
|||
114 |
|
91 | |||
115 |
cur_user = UserModel( |
|
92 | cur_user = UserModel()._get_user(cur_user) | |
116 | owner = cur_user |
|
93 | owner = cur_user | |
117 |
|
94 | |||
118 | repo_name = form_data['repo_name'] |
|
95 | repo_name = form_data['repo_name'] | |
@@ -138,7 +115,7 b' def create_repo(form_data, cur_user):' | |||||
138 | 'enable_downloads', defs.get('repo_enable_downloads')) |
|
115 | 'enable_downloads', defs.get('repo_enable_downloads')) | |
139 |
|
116 | |||
140 | try: |
|
117 | try: | |
141 |
repo = RepoModel( |
|
118 | repo = RepoModel()._create_repo( | |
142 | repo_name=repo_name_full, |
|
119 | repo_name=repo_name_full, | |
143 | repo_type=repo_type, |
|
120 | repo_type=repo_type, | |
144 | description=description, |
|
121 | description=description, | |
@@ -155,13 +132,13 b' def create_repo(form_data, cur_user):' | |||||
155 | enable_downloads=enable_downloads, |
|
132 | enable_downloads=enable_downloads, | |
156 | state=state |
|
133 | state=state | |
157 | ) |
|
134 | ) | |
158 |
|
|
135 | Session().commit() | |
159 |
|
136 | |||
160 | # now create this repo on Filesystem |
|
137 | # now create this repo on Filesystem | |
161 |
RepoModel( |
|
138 | RepoModel()._create_filesystem_repo( | |
162 | repo_name=repo_name, |
|
139 | repo_name=repo_name, | |
163 | repo_type=repo_type, |
|
140 | repo_type=repo_type, | |
164 |
repo_group=RepoModel( |
|
141 | repo_group=RepoModel()._get_repo_group(repo_group), | |
165 | clone_uri=clone_uri, |
|
142 | clone_uri=clone_uri, | |
166 | ) |
|
143 | ) | |
167 | repo = Repository.get_by_repo_name(repo_name_full) |
|
144 | repo = Repository.get_by_repo_name(repo_name_full) | |
@@ -180,7 +157,7 b' def create_repo(form_data, cur_user):' | |||||
180 | user=cur_user, |
|
157 | user=cur_user, | |
181 | repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) |
|
158 | repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) | |
182 |
|
159 | |||
183 |
|
|
160 | Session().commit() | |
184 | except Exception: |
|
161 | except Exception: | |
185 | log.warning('Exception occurred when creating repository, ' |
|
162 | log.warning('Exception occurred when creating repository, ' | |
186 | 'doing cleanup...', exc_info=True) |
|
163 | 'doing cleanup...', exc_info=True) | |
@@ -188,8 +165,8 b' def create_repo(form_data, cur_user):' | |||||
188 | repo = Repository.get_by_repo_name(repo_name_full) |
|
165 | repo = Repository.get_by_repo_name(repo_name_full) | |
189 | if repo: |
|
166 | if repo: | |
190 | Repository.delete(repo.repo_id) |
|
167 | Repository.delete(repo.repo_id) | |
191 |
|
|
168 | Session().commit() | |
192 |
RepoModel( |
|
169 | RepoModel()._delete_filesystem_repo(repo) | |
193 | raise |
|
170 | raise | |
194 |
|
171 | |||
195 | # it's an odd fix to make celery fail task when exception occurs |
|
172 | # it's an odd fix to make celery fail task when exception occurs | |
@@ -199,23 +176,17 b' def create_repo(form_data, cur_user):' | |||||
199 | return True |
|
176 | return True | |
200 |
|
177 | |||
201 |
|
178 | |||
202 |
@task(ignore_result=True, base=R |
|
179 | @async_task(ignore_result=True, base=RequestContextTask) | |
203 | @dbsession |
|
|||
204 | @vcsconnection |
|
|||
205 | def create_repo_fork(form_data, cur_user): |
|
180 | def create_repo_fork(form_data, cur_user): | |
206 | """ |
|
181 | """ | |
207 | Creates a fork of repository using internal VCS methods |
|
182 | Creates a fork of repository using internal VCS methods | |
208 |
|
||||
209 | :param form_data: |
|
|||
210 | :param cur_user: |
|
|||
211 | """ |
|
183 | """ | |
212 | from rhodecode.model.repo import RepoModel |
|
184 | from rhodecode.model.repo import RepoModel | |
213 | from rhodecode.model.user import UserModel |
|
185 | from rhodecode.model.user import UserModel | |
214 |
|
186 | |||
215 | log = get_logger(create_repo_fork) |
|
187 | log = get_logger(create_repo_fork) | |
216 | DBS = get_session() |
|
|||
217 |
|
188 | |||
218 |
cur_user = UserModel( |
|
189 | cur_user = UserModel()._get_user(cur_user) | |
219 | owner = cur_user |
|
190 | owner = cur_user | |
220 |
|
191 | |||
221 | repo_name = form_data['repo_name'] # fork in this case |
|
192 | repo_name = form_data['repo_name'] # fork in this case | |
@@ -230,8 +201,8 b' def create_repo_fork(form_data, cur_user' | |||||
230 | fork_id = safe_int(form_data.get('fork_parent_id')) |
|
201 | fork_id = safe_int(form_data.get('fork_parent_id')) | |
231 |
|
202 | |||
232 | try: |
|
203 | try: | |
233 |
fork_of = RepoModel( |
|
204 | fork_of = RepoModel()._get_repo(fork_id) | |
234 |
RepoModel( |
|
205 | RepoModel()._create_repo( | |
235 | repo_name=repo_name_full, |
|
206 | repo_name=repo_name_full, | |
236 | repo_type=repo_type, |
|
207 | repo_type=repo_type, | |
237 | description=description, |
|
208 | description=description, | |
@@ -244,16 +215,16 b' def create_repo_fork(form_data, cur_user' | |||||
244 | copy_fork_permissions=copy_fork_permissions |
|
215 | copy_fork_permissions=copy_fork_permissions | |
245 | ) |
|
216 | ) | |
246 |
|
217 | |||
247 |
|
|
218 | Session().commit() | |
248 |
|
219 | |||
249 | base_path = Repository.base_path() |
|
220 | base_path = Repository.base_path() | |
250 | source_repo_path = os.path.join(base_path, fork_of.repo_name) |
|
221 | source_repo_path = os.path.join(base_path, fork_of.repo_name) | |
251 |
|
222 | |||
252 | # now create this repo on Filesystem |
|
223 | # now create this repo on Filesystem | |
253 |
RepoModel( |
|
224 | RepoModel()._create_filesystem_repo( | |
254 | repo_name=repo_name, |
|
225 | repo_name=repo_name, | |
255 | repo_type=repo_type, |
|
226 | repo_type=repo_type, | |
256 |
repo_group=RepoModel( |
|
227 | repo_group=RepoModel()._get_repo_group(repo_group), | |
257 | clone_uri=source_repo_path, |
|
228 | clone_uri=source_repo_path, | |
258 | ) |
|
229 | ) | |
259 | repo = Repository.get_by_repo_name(repo_name_full) |
|
230 | repo = Repository.get_by_repo_name(repo_name_full) | |
@@ -274,7 +245,7 b' def create_repo_fork(form_data, cur_user' | |||||
274 | user=cur_user, |
|
245 | user=cur_user, | |
275 | repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) |
|
246 | repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) | |
276 |
|
247 | |||
277 |
|
|
248 | Session().commit() | |
278 | except Exception as e: |
|
249 | except Exception as e: | |
279 | log.warning('Exception %s occurred when forking repository, ' |
|
250 | log.warning('Exception %s occurred when forking repository, ' | |
280 | 'doing cleanup...', e) |
|
251 | 'doing cleanup...', e) | |
@@ -282,8 +253,8 b' def create_repo_fork(form_data, cur_user' | |||||
282 | repo = Repository.get_by_repo_name(repo_name_full) |
|
253 | repo = Repository.get_by_repo_name(repo_name_full) | |
283 | if repo: |
|
254 | if repo: | |
284 | Repository.delete(repo.repo_id) |
|
255 | Repository.delete(repo.repo_id) | |
285 |
|
|
256 | Session().commit() | |
286 |
RepoModel( |
|
257 | RepoModel()._delete_filesystem_repo(repo) | |
287 | raise |
|
258 | raise | |
288 |
|
259 | |||
289 | # it's an odd fix to make celery fail task when exception occurs |
|
260 | # it's an odd fix to make celery fail task when exception occurs | |
@@ -291,3 +262,14 b' def create_repo_fork(form_data, cur_user' | |||||
291 | pass |
|
262 | pass | |
292 |
|
263 | |||
293 | return True |
|
264 | return True | |
|
265 | ||||
|
266 | ||||
|
267 | @async_task(ignore_result=True) | |||
|
268 | def sync_repo(*args, **kwargs): | |||
|
269 | from rhodecode.model.scm import ScmModel | |||
|
270 | log = get_logger(sync_repo) | |||
|
271 | ||||
|
272 | log.info('Pulling from %s', kwargs['repo_name']) | |||
|
273 | ScmModel().pull_changes(kwargs['repo_name'], kwargs['username']) | |||
|
274 | ||||
|
275 |
@@ -22,6 +22,7 b' import os' | |||||
22 | from pyramid.compat import configparser |
|
22 | from pyramid.compat import configparser | |
23 | from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa |
|
23 | from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa | |
24 | from pyramid.request import Request |
|
24 | from pyramid.request import Request | |
|
25 | from pyramid.scripting import prepare | |||
25 |
|
26 | |||
26 |
|
27 | |||
27 | def get_config(ini_path, **kwargs): |
|
28 | def get_config(ini_path, **kwargs): | |
@@ -47,3 +48,9 b' def bootstrap(config_uri, request=None, ' | |||||
47 | request = request or Request.blank('/', base_url=base_url) |
|
48 | request = request or Request.blank('/', base_url=base_url) | |
48 |
|
49 | |||
49 | return pyramid_bootstrap(config_uri, request=request, options=options) |
|
50 | return pyramid_bootstrap(config_uri, request=request, options=options) | |
|
51 | ||||
|
52 | ||||
|
53 | def prepare_request(environ): | |||
|
54 | request = Request.blank('/', environ=environ) | |||
|
55 | prepare(request) # set pyramid threadlocal request | |||
|
56 | return request |
@@ -38,7 +38,6 b' from os.path import join as jn' | |||||
38 |
|
38 | |||
39 | import paste |
|
39 | import paste | |
40 | import pkg_resources |
|
40 | import pkg_resources | |
41 | from paste.script.command import Command, BadCommand |
|
|||
42 | from webhelpers.text import collapse, remove_formatting, strip_tags |
|
41 | from webhelpers.text import collapse, remove_formatting, strip_tags | |
43 | from mako import exceptions |
|
42 | from mako import exceptions | |
44 | from pyramid.threadlocal import get_current_registry |
|
43 | from pyramid.threadlocal import get_current_registry | |
@@ -767,85 +766,6 b' def create_test_repositories(test_path, ' | |||||
767 | tar.extractall(jn(test_path, SVN_REPO)) |
|
766 | tar.extractall(jn(test_path, SVN_REPO)) | |
768 |
|
767 | |||
769 |
|
768 | |||
770 | #============================================================================== |
|
|||
771 | # PASTER COMMANDS |
|
|||
772 | #============================================================================== |
|
|||
773 | class BasePasterCommand(Command): |
|
|||
774 | """ |
|
|||
775 | Abstract Base Class for paster commands. |
|
|||
776 |
|
||||
777 | The celery commands are somewhat aggressive about loading |
|
|||
778 | celery.conf, and since our module sets the `CELERY_LOADER` |
|
|||
779 | environment variable to our loader, we have to bootstrap a bit and |
|
|||
780 | make sure we've had a chance to load the pylons config off of the |
|
|||
781 | command line, otherwise everything fails. |
|
|||
782 | """ |
|
|||
783 | min_args = 1 |
|
|||
784 | min_args_error = "Please provide a paster config file as an argument." |
|
|||
785 | takes_config_file = 1 |
|
|||
786 | requires_config_file = True |
|
|||
787 |
|
||||
788 | def notify_msg(self, msg, log=False): |
|
|||
789 | """Make a notification to user, additionally if logger is passed |
|
|||
790 | it logs this action using given logger |
|
|||
791 |
|
||||
792 | :param msg: message that will be printed to user |
|
|||
793 | :param log: logging instance, to use to additionally log this message |
|
|||
794 |
|
||||
795 | """ |
|
|||
796 | if log and isinstance(log, logging): |
|
|||
797 | log(msg) |
|
|||
798 |
|
||||
799 | def run(self, args): |
|
|||
800 | """ |
|
|||
801 | Overrides Command.run |
|
|||
802 |
|
||||
803 | Checks for a config file argument and loads it. |
|
|||
804 | """ |
|
|||
805 | if len(args) < self.min_args: |
|
|||
806 | raise BadCommand( |
|
|||
807 | self.min_args_error % {'min_args': self.min_args, |
|
|||
808 | 'actual_args': len(args)}) |
|
|||
809 |
|
||||
810 | # Decrement because we're going to lob off the first argument. |
|
|||
811 | # @@ This is hacky |
|
|||
812 | self.min_args -= 1 |
|
|||
813 | self.bootstrap_config(args[0]) |
|
|||
814 | self.update_parser() |
|
|||
815 | return super(BasePasterCommand, self).run(args[1:]) |
|
|||
816 |
|
||||
817 | def update_parser(self): |
|
|||
818 | """ |
|
|||
819 | Abstract method. Allows for the class' parser to be updated |
|
|||
820 | before the superclass' `run` method is called. Necessary to |
|
|||
821 | allow options/arguments to be passed through to the underlying |
|
|||
822 | celery command. |
|
|||
823 | """ |
|
|||
824 | raise NotImplementedError("Abstract Method.") |
|
|||
825 |
|
||||
826 | def bootstrap_config(self, conf): |
|
|||
827 | """ |
|
|||
828 | Loads the pylons configuration. |
|
|||
829 | """ |
|
|||
830 | from pylons import config as pylonsconfig |
|
|||
831 |
|
||||
832 | self.path_to_ini_file = os.path.realpath(conf) |
|
|||
833 | conf = paste.deploy.appconfig('config:' + self.path_to_ini_file) |
|
|||
834 | pylonsconfig.init_app(conf.global_conf, conf.local_conf) |
|
|||
835 |
|
||||
836 | def _init_session(self): |
|
|||
837 | """ |
|
|||
838 | Inits SqlAlchemy Session |
|
|||
839 | """ |
|
|||
840 | logging.config.fileConfig(self.path_to_ini_file) |
|
|||
841 | from pylons import config |
|
|||
842 | from rhodecode.config.utils import initialize_database |
|
|||
843 |
|
||||
844 | # get to remove repos !! |
|
|||
845 | add_cache(config) |
|
|||
846 | initialize_database(config) |
|
|||
847 |
|
||||
848 |
|
||||
849 | def password_changed(auth_user, session): |
|
769 | def password_changed(auth_user, session): | |
850 | # Never report password change in case of default user or anonymous user. |
|
770 | # Never report password change in case of default user or anonymous user. | |
851 | if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None: |
|
771 | if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None: |
@@ -23,7 +23,6 b'' | |||||
23 | Some simple helper functions |
|
23 | Some simple helper functions | |
24 | """ |
|
24 | """ | |
25 |
|
25 | |||
26 |
|
||||
27 | import collections |
|
26 | import collections | |
28 | import datetime |
|
27 | import datetime | |
29 | import dateutil.relativedelta |
|
28 | import dateutil.relativedelta | |
@@ -42,7 +41,6 b' import sqlalchemy.engine.url' | |||||
42 | import sqlalchemy.exc |
|
41 | import sqlalchemy.exc | |
43 | import sqlalchemy.sql |
|
42 | import sqlalchemy.sql | |
44 | import webob |
|
43 | import webob | |
45 | import routes.util |
|
|||
46 | import pyramid.threadlocal |
|
44 | import pyramid.threadlocal | |
47 |
|
45 | |||
48 | import rhodecode |
|
46 | import rhodecode | |
@@ -941,31 +939,6 b' class Optional(object):' | |||||
941 | return val |
|
939 | return val | |
942 |
|
940 | |||
943 |
|
941 | |||
944 | def get_routes_generator_for_server_url(server_url): |
|
|||
945 | parsed_url = urlobject.URLObject(server_url) |
|
|||
946 | netloc = safe_str(parsed_url.netloc) |
|
|||
947 | script_name = safe_str(parsed_url.path) |
|
|||
948 |
|
||||
949 | if ':' in netloc: |
|
|||
950 | server_name, server_port = netloc.split(':') |
|
|||
951 | else: |
|
|||
952 | server_name = netloc |
|
|||
953 | server_port = (parsed_url.scheme == 'https' and '443' or '80') |
|
|||
954 |
|
||||
955 | environ = { |
|
|||
956 | 'REQUEST_METHOD': 'GET', |
|
|||
957 | 'PATH_INFO': '/', |
|
|||
958 | 'SERVER_NAME': server_name, |
|
|||
959 | 'SERVER_PORT': server_port, |
|
|||
960 | 'SCRIPT_NAME': script_name, |
|
|||
961 | } |
|
|||
962 | if parsed_url.scheme == 'https': |
|
|||
963 | environ['HTTPS'] = 'on' |
|
|||
964 | environ['wsgi.url_scheme'] = 'https' |
|
|||
965 |
|
||||
966 | return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ) |
|
|||
967 |
|
||||
968 |
|
||||
969 | def glob2re(pat): |
|
942 | def glob2re(pat): | |
970 | """ |
|
943 | """ | |
971 | Translate a shell PATTERN to a regular expression. |
|
944 | Translate a shell PATTERN to a regular expression. |
@@ -448,3 +448,9 b' class TestGetEnabledHooks(object):' | |||||
448 | ui_settings = [] |
|
448 | ui_settings = [] | |
449 | result = utils.get_enabled_hook_classes(ui_settings) |
|
449 | result = utils.get_enabled_hook_classes(ui_settings) | |
450 | assert result == [] |
|
450 | assert result == [] | |
|
451 | ||||
|
452 | ||||
|
453 | def test_obfuscate_url_pw(): | |||
|
454 | from rhodecode.lib.utils2 import obfuscate_url_pw | |||
|
455 | engine = u'/home/repos/malmö' | |||
|
456 | assert obfuscate_url_pw(engine) No newline at end of file |
@@ -108,7 +108,6 b' def pytest_addoption(parser):' | |||||
108 |
|
108 | |||
109 |
|
109 | |||
110 | def pytest_configure(config): |
|
110 | def pytest_configure(config): | |
111 | # Appy the kombu patch early on, needed for test discovery on Python 2.7.11 |
|
|||
112 | from rhodecode.config import patches |
|
111 | from rhodecode.config import patches | |
113 |
|
112 | |||
114 |
|
113 |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now