Show More
@@ -0,0 +1,256 b'' | |||
|
1 | # -*- coding: utf-8 -*- | |
|
2 | ||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |
|
4 | # | |
|
5 | # This program is free software: you can redistribute it and/or modify | |
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |
|
7 | # (only), as published by the Free Software Foundation. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU Affero General Public License | |
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
16 | # | |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
|
20 | """ | |
|
21 | Celery loader, run with:: | |
|
22 | ||
|
23 | celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini | |
|
24 | """ | |
|
25 | import os | |
|
26 | import logging | |
|
27 | ||
|
28 | from celery import Celery | |
|
29 | from celery import signals | |
|
30 | from celery import Task | |
|
31 | from kombu.serialization import register | |
|
32 | from pyramid.threadlocal import get_current_request | |
|
33 | ||
|
34 | import rhodecode | |
|
35 | ||
|
36 | from rhodecode.lib.auth import AuthUser | |
|
37 | from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars | |
|
38 | from rhodecode.lib.ext_json import json | |
|
39 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request | |
|
40 | from rhodecode.lib.utils2 import str2bool | |
|
41 | from rhodecode.model import meta | |
|
42 | ||
|
43 | ||
|
44 | register('json_ext', json.dumps, json.loads, | |
|
45 | content_type='application/x-json-ext', | |
|
46 | content_encoding='utf-8') | |
|
47 | ||
|
48 | log = logging.getLogger('celery.rhodecode.loader') | |
|
49 | ||
|
50 | ||
|
51 | def add_preload_arguments(parser): | |
|
52 | parser.add_argument( | |
|
53 | '--ini', default=None, | |
|
54 | help='Path to ini configuration file.' | |
|
55 | ) | |
|
56 | parser.add_argument( | |
|
57 | '--ini-var', default=None, | |
|
58 | help='Comma separated list of key=value to pass to ini.' | |
|
59 | ) | |
|
60 | ||
|
61 | ||
|
62 | def get_logger(obj): | |
|
63 | custom_log = logging.getLogger( | |
|
64 | 'rhodecode.task.{}'.format(obj.__class__.__name__)) | |
|
65 | ||
|
66 | if rhodecode.CELERY_ENABLED: | |
|
67 | try: | |
|
68 | custom_log = obj.get_logger() | |
|
69 | except Exception: | |
|
70 | pass | |
|
71 | ||
|
72 | return custom_log | |
|
73 | ||
|
74 | ||
|
75 | base_celery_config = { | |
|
76 | 'result_backend': 'rpc://', | |
|
77 | 'result_expires': 60 * 60 * 24, | |
|
78 | 'result_persistent': True, | |
|
79 | 'imports': [], | |
|
80 | 'worker_max_tasks_per_child': 100, | |
|
81 | 'accept_content': ['json_ext'], | |
|
82 | 'task_serializer': 'json_ext', | |
|
83 | 'result_serializer': 'json_ext', | |
|
84 | 'worker_hijack_root_logger': False, | |
|
85 | } | |
|
86 | # init main celery app | |
|
87 | celery_app = Celery() | |
|
88 | celery_app.user_options['preload'].add(add_preload_arguments) | |
|
89 | ini_file_glob = None | |
|
90 | ||
|
91 | ||
|
92 | @signals.setup_logging.connect | |
|
93 | def setup_logging_callback(**kwargs): | |
|
94 | setup_logging(ini_file_glob) | |
|
95 | ||
|
96 | ||
|
97 | @signals.user_preload_options.connect | |
|
98 | def on_preload_parsed(options, **kwargs): | |
|
99 | ini_location = options['ini'] | |
|
100 | ini_vars = options['ini_var'] | |
|
101 | celery_app.conf['INI_PYRAMID'] = options['ini'] | |
|
102 | ||
|
103 | if ini_location is None: | |
|
104 | print('You must provide the paste --ini argument') | |
|
105 | exit(-1) | |
|
106 | ||
|
107 | options = None | |
|
108 | if ini_vars is not None: | |
|
109 | options = parse_ini_vars(ini_vars) | |
|
110 | ||
|
111 | global ini_file_glob | |
|
112 | ini_file_glob = ini_location | |
|
113 | ||
|
114 | log.debug('Bootstrapping RhodeCode application...') | |
|
115 | env = bootstrap(ini_location, options=options) | |
|
116 | ||
|
117 | setup_celery_app( | |
|
118 | app=env['app'], root=env['root'], request=env['request'], | |
|
119 | registry=env['registry'], closer=env['closer'], | |
|
120 | ini_location=ini_location) | |
|
121 | ||
|
122 | # fix the global flag even if it's disabled via .ini file because this | |
|
123 | # is a worker code that doesn't need this to be disabled. | |
|
124 | rhodecode.CELERY_ENABLED = True | |
|
125 | ||
|
126 | ||
|
127 | @signals.task_success.connect | |
|
128 | def task_success_signal(result, **kwargs): | |
|
129 | meta.Session.commit() | |
|
130 | celery_app.conf['PYRAMID_CLOSER']() | |
|
131 | ||
|
132 | ||
|
133 | @signals.task_retry.connect | |
|
134 | def task_retry_signal( | |
|
135 | request, reason, einfo, **kwargs): | |
|
136 | meta.Session.remove() | |
|
137 | celery_app.conf['PYRAMID_CLOSER']() | |
|
138 | ||
|
139 | ||
|
140 | @signals.task_failure.connect | |
|
141 | def task_failure_signal( | |
|
142 | task_id, exception, args, kwargs, traceback, einfo, **kargs): | |
|
143 | meta.Session.remove() | |
|
144 | celery_app.conf['PYRAMID_CLOSER']() | |
|
145 | ||
|
146 | ||
|
147 | @signals.task_revoked.connect | |
|
148 | def task_revoked_signal( | |
|
149 | request, terminated, signum, expired, **kwargs): | |
|
150 | celery_app.conf['PYRAMID_CLOSER']() | |
|
151 | ||
|
152 | ||
|
153 | def setup_celery_app(app, root, request, registry, closer, ini_location): | |
|
154 | ini_dir = os.path.dirname(os.path.abspath(ini_location)) | |
|
155 | celery_config = base_celery_config | |
|
156 | celery_config.update({ | |
|
157 | # store celerybeat scheduler db where the .ini file is | |
|
158 | 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'), | |
|
159 | }) | |
|
160 | ini_settings = get_ini_config(ini_location) | |
|
161 | log.debug('Got custom celery conf: %s', ini_settings) | |
|
162 | ||
|
163 | celery_config.update(ini_settings) | |
|
164 | celery_app.config_from_object(celery_config) | |
|
165 | ||
|
166 | celery_app.conf.update({'PYRAMID_APP': app}) | |
|
167 | celery_app.conf.update({'PYRAMID_ROOT': root}) | |
|
168 | celery_app.conf.update({'PYRAMID_REQUEST': request}) | |
|
169 | celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | |
|
170 | celery_app.conf.update({'PYRAMID_CLOSER': closer}) | |
|
171 | ||
|
172 | ||
|
173 | def configure_celery(config, ini_location): | |
|
174 | """ | |
|
175 | Helper that is called from our application creation logic. It gives | |
|
176 | connection info into running webapp and allows execution of tasks from | |
|
177 | RhodeCode itself | |
|
178 | """ | |
|
179 | # store some globals into rhodecode | |
|
180 | rhodecode.CELERY_ENABLED = str2bool( | |
|
181 | config.registry.settings.get('use_celery')) | |
|
182 | if rhodecode.CELERY_ENABLED: | |
|
183 | log.info('Configuring celery based on `%s` file', ini_location) | |
|
184 | setup_celery_app( | |
|
185 | app=None, root=None, request=None, registry=config.registry, | |
|
186 | closer=None, ini_location=ini_location) | |
|
187 | ||
|
188 | ||
|
189 | class RequestContextTask(Task): | |
|
190 | """ | |
|
191 | This is a celery task which will create a rhodecode app instance context | |
|
192 | for the task, patch pyramid with the original request | |
|
193 | that created the task and also add the user to the context. | |
|
194 | """ | |
|
195 | ||
|
196 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |
|
197 | link=None, link_error=None, shadow=None, **options): | |
|
198 | """ queue the job to run (we are in web request context here) """ | |
|
199 | ||
|
200 | req = get_current_request() | |
|
201 | ||
|
202 | # web case | |
|
203 | if hasattr(req, 'user'): | |
|
204 | ip_addr = req.user.ip_addr | |
|
205 | user_id = req.user.user_id | |
|
206 | ||
|
207 | # api case | |
|
208 | elif hasattr(req, 'rpc_user'): | |
|
209 | ip_addr = req.rpc_user.ip_addr | |
|
210 | user_id = req.rpc_user.user_id | |
|
211 | else: | |
|
212 | raise Exception( | |
|
213 | 'Unable to fetch required data from request: {}. \n' | |
|
214 | 'This task is required to be executed from context of ' | |
|
215 | 'request in a webapp'.format(repr(req))) | |
|
216 | ||
|
217 | if req: | |
|
218 | # we hook into kwargs since it is the only way to pass our data to | |
|
219 | # the celery worker | |
|
220 | options['headers'] = options.get('headers', {}) | |
|
221 | options['headers'].update({ | |
|
222 | 'rhodecode_proxy_data': { | |
|
223 | 'environ': { | |
|
224 | 'PATH_INFO': req.environ['PATH_INFO'], | |
|
225 | 'SCRIPT_NAME': req.environ['SCRIPT_NAME'], | |
|
226 | 'HTTP_HOST': req.environ.get('HTTP_HOST', | |
|
227 | req.environ['SERVER_NAME']), | |
|
228 | 'SERVER_NAME': req.environ['SERVER_NAME'], | |
|
229 | 'SERVER_PORT': req.environ['SERVER_PORT'], | |
|
230 | 'wsgi.url_scheme': req.environ['wsgi.url_scheme'], | |
|
231 | }, | |
|
232 | 'auth_user': { | |
|
233 | 'ip_addr': ip_addr, | |
|
234 | 'user_id': user_id | |
|
235 | }, | |
|
236 | } | |
|
237 | }) | |
|
238 | ||
|
239 | return super(RequestContextTask, self).apply_async( | |
|
240 | args, kwargs, task_id, producer, link, link_error, shadow, **options) | |
|
241 | ||
|
242 | def __call__(self, *args, **kwargs): | |
|
243 | """ rebuild the context and then run task on celery worker """ | |
|
244 | ||
|
245 | proxy_data = getattr(self.request, 'rhodecode_proxy_data', None) | |
|
246 | if not proxy_data: | |
|
247 | return super(RequestContextTask, self).__call__(*args, **kwargs) | |
|
248 | ||
|
249 | log.debug('using celery proxy data to run task: %r', proxy_data) | |
|
250 | # re-inject and register threadlocals for proper routing support | |
|
251 | request = prepare_request(proxy_data['environ']) | |
|
252 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], | |
|
253 | ip_addr=proxy_data['auth_user']['ip_addr']) | |
|
254 | ||
|
255 | return super(RequestContextTask, self).__call__(*args, **kwargs) | |
|
256 |
@@ -0,0 +1,156 b'' | |||
|
1 | # -*- coding: utf-8 -*- | |
|
2 | ||
|
3 | # Copyright (C) 2010-2017 RhodeCode GmbH | |
|
4 | # | |
|
5 | # This program is free software: you can redistribute it and/or modify | |
|
6 | # it under the terms of the GNU Affero General Public License, version 3 | |
|
7 | # (only), as published by the Free Software Foundation. | |
|
8 | # | |
|
9 | # This program is distributed in the hope that it will be useful, | |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
|
12 | # GNU General Public License for more details. | |
|
13 | # | |
|
14 | # You should have received a copy of the GNU Affero General Public License | |
|
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
|
16 | # | |
|
17 | # This program is dual-licensed. If you wish to learn more about the | |
|
18 | # RhodeCode Enterprise Edition, including its added features, Support services, | |
|
19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ | |
|
20 | ||
|
21 | import os | |
|
22 | import json | |
|
23 | import logging | |
|
24 | import datetime | |
|
25 | ||
|
26 | from functools import partial | |
|
27 | ||
|
28 | from pyramid.compat import configparser | |
|
29 | from celery.result import AsyncResult | |
|
30 | import celery.loaders.base | |
|
31 | import celery.schedules | |
|
32 | ||
|
33 | ||
|
34 | log = logging.getLogger(__name__) | |
|
35 | ||
|
36 | ||
|
37 | def get_task_id(task): | |
|
38 | task_id = None | |
|
39 | if isinstance(task, AsyncResult): | |
|
40 | task_id = task.task_id | |
|
41 | ||
|
42 | return task_id | |
|
43 | ||
|
44 | ||
|
45 | def crontab(value): | |
|
46 | return celery.schedules.crontab(**value) | |
|
47 | ||
|
48 | ||
|
49 | def timedelta(value): | |
|
50 | return datetime.timedelta(**value) | |
|
51 | ||
|
52 | ||
|
53 | def safe_json(get, section, key): | |
|
54 | value = '' | |
|
55 | try: | |
|
56 | value = get(key) | |
|
57 | json_value = json.loads(value) | |
|
58 | except ValueError: | |
|
59 | msg = 'The %s=%s is not valid json in section %s' % ( | |
|
60 | key, value, section | |
|
61 | ) | |
|
62 | raise ValueError(msg) | |
|
63 | ||
|
64 | return json_value | |
|
65 | ||
|
66 | ||
|
67 | def get_beat_config(parser, section): | |
|
68 | SCHEDULE_TYPE_MAP = { | |
|
69 | 'crontab': crontab, | |
|
70 | 'timedelta': timedelta, | |
|
71 | 'integer': int | |
|
72 | } | |
|
73 | get = partial(parser.get, section) | |
|
74 | has_option = partial(parser.has_option, section) | |
|
75 | ||
|
76 | schedule_type = get('type') | |
|
77 | schedule_value = safe_json(get, section, 'schedule') | |
|
78 | ||
|
79 | scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type) | |
|
80 | ||
|
81 | if scheduler_cls is None: | |
|
82 | raise ValueError( | |
|
83 | 'schedule type %s in section %s is invalid' % ( | |
|
84 | schedule_type, | |
|
85 | section | |
|
86 | ) | |
|
87 | ) | |
|
88 | ||
|
89 | schedule = scheduler_cls(schedule_value) | |
|
90 | ||
|
91 | config = { | |
|
92 | 'task': get('task'), | |
|
93 | 'schedule': schedule, | |
|
94 | } | |
|
95 | ||
|
96 | if has_option('args'): | |
|
97 | config['args'] = safe_json(get, section, 'args') | |
|
98 | ||
|
99 | if has_option('kwargs'): | |
|
100 | config['kwargs'] = safe_json(get, section, 'kwargs') | |
|
101 | ||
|
102 | return config | |
|
103 | ||
|
104 | ||
|
105 | def get_ini_config(ini_location): | |
|
106 | """ | |
|
107 | Converts basic ini configuration into celery 4.X options | |
|
108 | """ | |
|
109 | def key_converter(key_name): | |
|
110 | pref = 'celery.' | |
|
111 | if key_name.startswith(pref): | |
|
112 | return key_name[len(pref):].replace('.', '_').lower() | |
|
113 | ||
|
114 | def type_converter(parsed_key, value): | |
|
115 | # cast to int | |
|
116 | if value.isdigit(): | |
|
117 | return int(value) | |
|
118 | ||
|
119 | # cast to bool | |
|
120 | if value.lower() in ['true', 'false', 'True', 'False']: | |
|
121 | return value.lower() == 'true' | |
|
122 | return value | |
|
123 | ||
|
124 | parser = configparser.SafeConfigParser( | |
|
125 | defaults={'here': os.path.abspath(ini_location)}) | |
|
126 | parser.read(ini_location) | |
|
127 | ||
|
128 | ini_config = {} | |
|
129 | for k, v in parser.items('app:main'): | |
|
130 | pref = 'celery.' | |
|
131 | if k.startswith(pref): | |
|
132 | ini_config[key_converter(k)] = type_converter(key_converter(k), v) | |
|
133 | ||
|
134 | beat_config = {} | |
|
135 | for section in parser.sections(): | |
|
136 | if section.startswith('celerybeat:'): | |
|
137 | name = section.split(':', 1)[1] | |
|
138 | beat_config[name] = get_beat_config(parser, section) | |
|
139 | ||
|
140 | # final compose of settings | |
|
141 | celery_settings = {} | |
|
142 | ||
|
143 | if ini_config: | |
|
144 | celery_settings.update(ini_config) | |
|
145 | if beat_config: | |
|
146 | celery_settings.update({'beat_schedule': beat_config}) | |
|
147 | ||
|
148 | return celery_settings | |
|
149 | ||
|
150 | ||
|
151 | def parse_ini_vars(ini_vars): | |
|
152 | options = {} | |
|
153 | for pairs in ini_vars.split(','): | |
|
154 | key, value = pairs.split('=') | |
|
155 | options[key] = value | |
|
156 | return options |
@@ -296,28 +296,15 b' labs_settings_active = true' | |||
|
296 | 296 | ### CELERY CONFIG #### |
|
297 | 297 | #################################### |
|
298 | 298 | use_celery = false |
|
299 | broker.host = localhost | |
|
300 | broker.vhost = rabbitmqhost | |
|
301 | broker.port = 5672 | |
|
302 | broker.user = rabbitmq | |
|
303 | broker.password = qweqwe | |
|
304 | ||
|
305 | celery.imports = rhodecode.lib.celerylib.tasks | |
|
306 | 299 | |
|
307 | celery.result.backend = amqp | |
|
308 | celery.result.dburi = amqp:// | |
|
309 | celery.result.serialier = json | |
|
300 | # connection url to the message broker (default rabbitmq) | |
|
301 | celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost | |
|
310 | 302 | |
|
311 | #celery.send.task.error.emails = true | |
|
312 | #celery.amqp.task.result.expires = 18000 | |
|
313 | ||
|
314 | celeryd.concurrency = 2 | |
|
315 | #celeryd.log.file = celeryd.log | |
|
316 | celeryd.log.level = debug | |
|
317 | celeryd.max.tasks.per.child = 1 | |
|
303 | # maximum tasks to execute before worker restart | |
|
304 | celery.max_tasks_per_child = 100 | |
|
318 | 305 | |
|
319 | 306 | ## tasks will never be sent to the queue, but executed locally instead. |
|
320 |
celery.always |
|
|
307 | celery.task_always_eager = false | |
|
321 | 308 | |
|
322 | 309 | #################################### |
|
323 | 310 | ### BEAKER CACHE #### |
@@ -649,7 +636,7 b' custom.conf = 1' | |||
|
649 | 636 | ### LOGGING CONFIGURATION #### |
|
650 | 637 | ################################ |
|
651 | 638 | [loggers] |
|
652 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper | |
|
639 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery | |
|
653 | 640 | |
|
654 | 641 | [handlers] |
|
655 | 642 | keys = console, console_sql |
@@ -688,6 +675,11 b' handlers =' | |||
|
688 | 675 | qualname = ssh_wrapper |
|
689 | 676 | propagate = 1 |
|
690 | 677 | |
|
678 | [logger_celery] | |
|
679 | level = DEBUG | |
|
680 | handlers = | |
|
681 | qualname = celery | |
|
682 | ||
|
691 | 683 | |
|
692 | 684 | ############## |
|
693 | 685 | ## HANDLERS ## |
@@ -271,28 +271,15 b' labs_settings_active = true' | |||
|
271 | 271 | ### CELERY CONFIG #### |
|
272 | 272 | #################################### |
|
273 | 273 | use_celery = false |
|
274 | broker.host = localhost | |
|
275 | broker.vhost = rabbitmqhost | |
|
276 | broker.port = 5672 | |
|
277 | broker.user = rabbitmq | |
|
278 | broker.password = qweqwe | |
|
279 | ||
|
280 | celery.imports = rhodecode.lib.celerylib.tasks | |
|
281 | 274 | |
|
282 | celery.result.backend = amqp | |
|
283 | celery.result.dburi = amqp:// | |
|
284 | celery.result.serialier = json | |
|
275 | # connection url to the message broker (default rabbitmq) | |
|
276 | celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost | |
|
285 | 277 | |
|
286 | #celery.send.task.error.emails = true | |
|
287 | #celery.amqp.task.result.expires = 18000 | |
|
288 | ||
|
289 | celeryd.concurrency = 2 | |
|
290 | #celeryd.log.file = celeryd.log | |
|
291 | celeryd.log.level = debug | |
|
292 | celeryd.max.tasks.per.child = 1 | |
|
278 | # maximum tasks to execute before worker restart | |
|
279 | celery.max_tasks_per_child = 100 | |
|
293 | 280 | |
|
294 | 281 | ## tasks will never be sent to the queue, but executed locally instead. |
|
295 |
celery.always |
|
|
282 | celery.task_always_eager = false | |
|
296 | 283 | |
|
297 | 284 | #################################### |
|
298 | 285 | ### BEAKER CACHE #### |
@@ -619,7 +606,7 b' custom.conf = 1' | |||
|
619 | 606 | ### LOGGING CONFIGURATION #### |
|
620 | 607 | ################################ |
|
621 | 608 | [loggers] |
|
622 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper | |
|
609 | keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery | |
|
623 | 610 | |
|
624 | 611 | [handlers] |
|
625 | 612 | keys = console, console_sql |
@@ -658,6 +645,11 b' handlers =' | |||
|
658 | 645 | qualname = ssh_wrapper |
|
659 | 646 | propagate = 1 |
|
660 | 647 | |
|
648 | [logger_celery] | |
|
649 | level = DEBUG | |
|
650 | handlers = | |
|
651 | qualname = celery | |
|
652 | ||
|
661 | 653 | |
|
662 | 654 | ############## |
|
663 | 655 | ## HANDLERS ## |
@@ -32,6 +32,7 b' from rhodecode.api.utils import (' | |||
|
32 | 32 | from rhodecode.lib import audit_logger |
|
33 | 33 | from rhodecode.lib import repo_maintenance |
|
34 | 34 | from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi |
|
35 | from rhodecode.lib.celerylib.utils import get_task_id | |
|
35 | 36 | from rhodecode.lib.utils2 import str2bool, time_to_datetime |
|
36 | 37 | from rhodecode.lib.ext_json import json |
|
37 | 38 | from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError |
@@ -712,10 +713,7 b' def create_repo(' | |||
|
712 | 713 | } |
|
713 | 714 | |
|
714 | 715 | task = RepoModel().create(form_data=data, cur_user=owner) |
|
715 | from celery.result import BaseAsyncResult | |
|
716 | task_id = None | |
|
717 | if isinstance(task, BaseAsyncResult): | |
|
718 | task_id = task.task_id | |
|
716 | task_id = get_task_id(task) | |
|
719 | 717 | # no commit, it's done in RepoModel, or async via celery |
|
720 | 718 | return { |
|
721 | 719 | 'msg': "Created new repository `%s`" % (schema_data['repo_name'],), |
@@ -1105,10 +1103,8 b' def fork_repo(request, apiuser, repoid, ' | |||
|
1105 | 1103 | |
|
1106 | 1104 | task = RepoModel().create_fork(data, cur_user=owner) |
|
1107 | 1105 | # no commit, it's done in RepoModel, or async via celery |
|
1108 | from celery.result import BaseAsyncResult | |
|
1109 | task_id = None | |
|
1110 | if isinstance(task, BaseAsyncResult): | |
|
1111 | task_id = task.task_id | |
|
1106 | task_id = get_task_id(task) | |
|
1107 | ||
|
1112 | 1108 | return { |
|
1113 | 1109 | 'msg': 'Created fork of `%s` as `%s`' % ( |
|
1114 | 1110 | repo.repo_name, schema_data['repo_name']), |
@@ -28,6 +28,7 b' from pyramid.renderers import render' | |||
|
28 | 28 | from pyramid.response import Response |
|
29 | 29 | |
|
30 | 30 | from rhodecode.apps._base import BaseAppView, DataGridAppView |
|
31 | from rhodecode.lib.celerylib.utils import get_task_id | |
|
31 | 32 | |
|
32 | 33 | from rhodecode.lib.ext_json import json |
|
33 | 34 | from rhodecode.lib.auth import ( |
@@ -143,22 +144,19 b' class AdminReposView(BaseAppView, DataGr' | |||
|
143 | 144 | c = self.load_default_context() |
|
144 | 145 | |
|
145 | 146 | form_result = {} |
|
147 | self._load_form_data(c) | |
|
146 | 148 | task_id = None |
|
147 | self._load_form_data(c) | |
|
148 | ||
|
149 | 149 | try: |
|
150 | 150 | # CanWriteToGroup validators checks permissions of this POST |
|
151 | 151 | form = RepoForm( |
|
152 | 152 | self.request.translate, repo_groups=c.repo_groups_choices, |
|
153 | 153 | landing_revs=c.landing_revs_choices)() |
|
154 |
form_result |
|
|
154 | form_result = form.to_python(dict(self.request.POST)) | |
|
155 | 155 | |
|
156 | 156 | # create is done sometimes async on celery, db transaction |
|
157 | 157 | # management is handled there. |
|
158 | 158 | task = RepoModel().create(form_result, self._rhodecode_user.user_id) |
|
159 | from celery.result import BaseAsyncResult | |
|
160 | if isinstance(task, BaseAsyncResult): | |
|
161 | task_id = task.task_id | |
|
159 | task_id = get_task_id(task) | |
|
162 | 160 | except formencode.Invalid as errors: |
|
163 | 161 | data = render('rhodecode:templates/admin/repos/repo_add.mako', |
|
164 | 162 | self._get_template_context(c), self.request) |
@@ -46,11 +46,9 b' class RepoChecksView(BaseAppView):' | |||
|
46 | 46 | |
|
47 | 47 | repo_name = self.request.matchdict['repo_name'] |
|
48 | 48 | db_repo = Repository.get_by_repo_name(repo_name) |
|
49 | if not db_repo: | |
|
50 | raise HTTPNotFound() | |
|
51 | 49 | |
|
52 | 50 | # check if maybe repo is already created |
|
53 | if db_repo.repo_state in [Repository.STATE_CREATED]: | |
|
51 | if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]: | |
|
54 | 52 | # re-check permissions before redirecting to prevent resource |
|
55 | 53 | # discovery by checking the 302 code |
|
56 | 54 | perm_set = ['repository.read', 'repository.write', 'repository.admin'] |
@@ -80,9 +78,10 b' class RepoChecksView(BaseAppView):' | |||
|
80 | 78 | |
|
81 | 79 | if task_id and task_id not in ['None']: |
|
82 | 80 | import rhodecode |
|
83 | from celery.result import AsyncResult | |
|
81 | from rhodecode.lib.celerylib.loader import celery_app | |
|
84 | 82 | if rhodecode.CELERY_ENABLED: |
|
85 | task = AsyncResult(task_id) | |
|
83 | task = celery_app.AsyncResult(task_id) | |
|
84 | task.get() | |
|
86 | 85 | if task.failed(): |
|
87 | 86 | msg = self._log_creation_exception(task.result, repo_name) |
|
88 | 87 | h.flash(msg, category='error') |
@@ -33,6 +33,7 b' from rhodecode.lib.auth import (' | |||
|
33 | 33 | LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, |
|
34 | 34 | HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired) |
|
35 | 35 | import rhodecode.lib.helpers as h |
|
36 | from rhodecode.lib.celerylib.utils import get_task_id | |
|
36 | 37 | from rhodecode.model.db import coalesce, or_, Repository, RepoGroup |
|
37 | 38 | from rhodecode.model.repo import RepoModel |
|
38 | 39 | from rhodecode.model.forms import RepoForkForm |
@@ -226,9 +227,8 b' class RepoForksView(RepoAppView, DataGri' | |||
|
226 | 227 | # management is handled there. |
|
227 | 228 | task = RepoModel().create_fork( |
|
228 | 229 | form_result, c.rhodecode_user.user_id) |
|
229 | from celery.result import BaseAsyncResult | |
|
230 | if isinstance(task, BaseAsyncResult): | |
|
231 | task_id = task.task_id | |
|
230 | ||
|
231 | task_id = get_task_id(task) | |
|
232 | 232 | except formencode.Invalid as errors: |
|
233 | 233 | c.rhodecode_db_repo = self.db_repo |
|
234 | 234 |
@@ -23,14 +23,6 b' import os' | |||
|
23 | 23 | import logging |
|
24 | 24 | import rhodecode |
|
25 | 25 | |
|
26 | # ------------------------------------------------------------------------------ | |
|
27 | # CELERY magic until refactor - issue #4163 - import order matters here: | |
|
28 | #from rhodecode.lib import celerypylons # this must be first, celerypylons | |
|
29 | # sets config settings upon import | |
|
30 | ||
|
31 | import rhodecode.integrations # any modules using celery task | |
|
32 | # decorators should be added afterwards: | |
|
33 | # ------------------------------------------------------------------------------ | |
|
34 | 26 | |
|
35 | 27 | from rhodecode.config import utils |
|
36 | 28 | |
@@ -54,14 +46,6 b' def load_pyramid_environment(global_conf' | |||
|
54 | 46 | 'secret': settings_merged.get('channelstream.secret') |
|
55 | 47 | } |
|
56 | 48 | |
|
57 | ||
|
58 | # TODO(marcink): celery | |
|
59 | # # store some globals into rhodecode | |
|
60 | # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery')) | |
|
61 | # rhodecode.CELERY_EAGER = str2bool( | |
|
62 | # config['app_conf'].get('celery.always.eager')) | |
|
63 | ||
|
64 | ||
|
65 | 49 | # If this is a test run we prepare the test environment like |
|
66 | 50 | # creating a test database, test search index and test repositories. |
|
67 | 51 | # This has to be done before the database connection is initialized. |
@@ -189,7 +189,7 b'' | |||
|
189 | 189 | "python2.7-jupyter-core-4.3.0": { |
|
190 | 190 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" |
|
191 | 191 | }, |
|
192 |
"python2.7-kombu- |
|
|
192 | "python2.7-kombu-4.1.0": { | |
|
193 | 193 | "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause" |
|
194 | 194 | }, |
|
195 | 195 | "python2.7-mistune-0.7.4": { |
@@ -42,6 +42,7 b' from rhodecode.lib.vcs import VCSCommuni' | |||
|
42 | 42 | from rhodecode.lib.exceptions import VCSServerUnavailable |
|
43 | 43 | from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled |
|
44 | 44 | from rhodecode.lib.middleware.https_fixup import HttpsFixup |
|
45 | from rhodecode.lib.celerylib.loader import configure_celery | |
|
45 | 46 | from rhodecode.lib.plugins.utils import register_rhodecode_plugin |
|
46 | 47 | from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict |
|
47 | 48 | from rhodecode.subscribers import ( |
@@ -87,9 +88,11 b' def make_pyramid_app(global_config, **se' | |||
|
87 | 88 | pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config) |
|
88 | 89 | pyramid_app.config = config |
|
89 | 90 | |
|
91 | config.configure_celery(global_config['__file__']) | |
|
90 | 92 | # creating the app uses a connection - return it after we are done |
|
91 | 93 | meta.Session.remove() |
|
92 | 94 | |
|
95 | log.info('Pyramid app %s created and configured.', pyramid_app) | |
|
93 | 96 | return pyramid_app |
|
94 | 97 | |
|
95 | 98 | |
@@ -196,6 +199,8 b' def includeme(config):' | |||
|
196 | 199 | config.add_directive( |
|
197 | 200 | 'register_rhodecode_plugin', register_rhodecode_plugin) |
|
198 | 201 | |
|
202 | config.add_directive('configure_celery', configure_celery) | |
|
203 | ||
|
199 | 204 | if asbool(settings.get('appenlight', 'false')): |
|
200 | 205 | config.include('appenlight_client.ext.pyramid_tween') |
|
201 | 206 |
@@ -20,18 +20,16 b'' | |||
|
20 | 20 | |
|
21 | 21 | from __future__ import unicode_literals |
|
22 | 22 | import deform |
|
23 | import re | |
|
24 | 23 | import logging |
|
25 | 24 | import requests |
|
26 | 25 | import colander |
|
27 | 26 | import textwrap |
|
28 | from celery.task import task | |
|
29 | 27 | from mako.template import Template |
|
30 | 28 | |
|
31 | 29 | from rhodecode import events |
|
32 | 30 | from rhodecode.translation import _ |
|
33 | 31 | from rhodecode.lib import helpers as h |
|
34 | from rhodecode.lib.celerylib import run_task | |
|
32 | from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask | |
|
35 | 33 | from rhodecode.lib.colander_utils import strip_whitespace |
|
36 | 34 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
37 | 35 | |
@@ -243,7 +241,7 b' class HipchatIntegrationType(Integration' | |||
|
243 | 241 | ) |
|
244 | 242 | |
|
245 | 243 | |
|
246 | @task(ignore_result=True) | |
|
244 | @async_task(ignore_result=True, base=RequestContextTask) | |
|
247 | 245 | def post_text_to_hipchat(settings, text): |
|
248 | 246 | log.debug('sending %s to hipchat %s' % (text, settings['server_url'])) |
|
249 | 247 | resp = requests.post(settings['server_url'], json={ |
@@ -27,13 +27,12 b' import logging' | |||
|
27 | 27 | import deform |
|
28 | 28 | import requests |
|
29 | 29 | import colander |
|
30 | from celery.task import task | |
|
31 | 30 | from mako.template import Template |
|
32 | 31 | |
|
33 | 32 | from rhodecode import events |
|
34 | 33 | from rhodecode.translation import _ |
|
35 | 34 | from rhodecode.lib import helpers as h |
|
36 | from rhodecode.lib.celerylib import run_task | |
|
35 | from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask | |
|
37 | 36 | from rhodecode.lib.colander_utils import strip_whitespace |
|
38 | 37 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
39 | 38 | |
@@ -296,7 +295,7 b' def html_to_slack_links(message):' | |||
|
296 | 295 | r'<\1|\2>', message) |
|
297 | 296 | |
|
298 | 297 | |
|
299 | @task(ignore_result=True) | |
|
298 | @async_task(ignore_result=True, base=RequestContextTask) | |
|
300 | 299 | def post_text_to_slack(settings, title, text, fields=None, overrides=None): |
|
301 | 300 | log.debug('sending %s (%s) to slack %s' % ( |
|
302 | 301 | title, text, settings['service'])) |
@@ -28,16 +28,17 b' import logging' | |||
|
28 | 28 | import requests |
|
29 | 29 | import requests.adapters |
|
30 | 30 | import colander |
|
31 | from celery.task import task | |
|
32 | 31 | from requests.packages.urllib3.util.retry import Retry |
|
33 | 32 | |
|
34 | 33 | import rhodecode |
|
35 | 34 | from rhodecode import events |
|
36 | 35 | from rhodecode.translation import _ |
|
37 | 36 | from rhodecode.integrations.types.base import IntegrationTypeBase |
|
37 | from rhodecode.lib.celerylib import async_task, RequestContextTask | |
|
38 | 38 | |
|
39 | 39 | log = logging.getLogger(__name__) |
|
40 | 40 | |
|
41 | ||
|
41 | 42 | # updating this required to update the `common_vars` passed in url calling func |
|
42 | 43 | WEBHOOK_URL_VARS = [ |
|
43 | 44 | 'repo_name', |
@@ -315,7 +316,7 b' class WebhookIntegrationType(Integration' | |||
|
315 | 316 | post_to_webhook(url_calls, self.settings) |
|
316 | 317 | |
|
317 | 318 | |
|
318 | @task(ignore_result=True) | |
|
319 | @async_task(ignore_result=True, base=RequestContextTask) | |
|
319 | 320 | def post_to_webhook(url_calls, settings): |
|
320 | 321 | max_retries = 3 |
|
321 | 322 | retries = Retry( |
@@ -17,36 +17,17 b'' | |||
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | """ | |
|
21 | celery libs for RhodeCode | |
|
22 | """ | |
|
23 | 20 | |
|
24 | ||
|
25 | import pylons | |
|
26 | 21 | import socket |
|
27 | 22 | import logging |
|
28 | 23 | |
|
29 | 24 | import rhodecode |
|
30 | ||
|
31 | from os.path import join as jn | |
|
32 | from pylons import config | |
|
33 | from celery.task import Task | |
|
34 | from pyramid.request import Request | |
|
35 | from pyramid.scripting import prepare | |
|
36 | from pyramid.threadlocal import get_current_request | |
|
37 | ||
|
38 | from decorator import decorator | |
|
25 | from zope.cachedescriptors.property import Lazy as LazyProperty | |
|
26 | from rhodecode.lib.celerylib.loader import ( | |
|
27 | celery_app, RequestContextTask, get_logger) | |
|
39 | 28 | |
|
40 | from zope.cachedescriptors.property import Lazy as LazyProperty | |
|
29 | async_task = celery_app.task | |
|
41 | 30 | |
|
42 | from rhodecode.config import utils | |
|
43 | from rhodecode.lib.utils2 import ( | |
|
44 | safe_str, md5_safe, aslist, get_routes_generator_for_server_url, | |
|
45 | get_server_url) | |
|
46 | from rhodecode.lib.pidlock import DaemonLock, LockHeld | |
|
47 | from rhodecode.lib.vcs import connect_vcs | |
|
48 | from rhodecode.model import meta | |
|
49 | from rhodecode.lib.auth import AuthUser | |
|
50 | 31 | |
|
51 | 32 | log = logging.getLogger(__name__) |
|
52 | 33 | |
@@ -60,95 +41,13 b' class ResultWrapper(object):' | |||
|
60 | 41 | return self.task |
|
61 | 42 | |
|
62 | 43 | |
|
63 | class RhodecodeCeleryTask(Task): | |
|
64 | """ | |
|
65 | This is a celery task which will create a rhodecode app instance context | |
|
66 | for the task, patch pyramid + pylons threadlocals with the original request | |
|
67 | that created the task and also add the user to the context. | |
|
68 | ||
|
69 | This class as a whole should be removed once the pylons port is complete | |
|
70 | and a pyramid only solution for celery is implemented as per issue #4139 | |
|
71 | """ | |
|
72 | ||
|
73 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |
|
74 | link=None, link_error=None, **options): | |
|
75 | """ queue the job to run (we are in web request context here) """ | |
|
76 | ||
|
77 | request = get_current_request() | |
|
78 | ||
|
79 | if hasattr(request, 'user'): | |
|
80 | ip_addr = request.user.ip_addr | |
|
81 | user_id = request.user.user_id | |
|
82 | elif hasattr(request, 'rpc_params'): | |
|
83 | # TODO(marcink) remove when migration is finished | |
|
84 | # api specific call on Pyramid. | |
|
85 | ip_addr = request.rpc_params['apiuser'].ip_addr | |
|
86 | user_id = request.rpc_params['apiuser'].user_id | |
|
87 | else: | |
|
88 | raise Exception('Unable to fetch data from request: {}'.format( | |
|
89 | request)) | |
|
90 | ||
|
91 | if request: | |
|
92 | # we hook into kwargs since it is the only way to pass our data to | |
|
93 | # the celery worker in celery 2.2 | |
|
94 | kwargs.update({ | |
|
95 | '_rhodecode_proxy_data': { | |
|
96 | 'environ': { | |
|
97 | 'PATH_INFO': request.environ['PATH_INFO'], | |
|
98 | 'SCRIPT_NAME': request.environ['SCRIPT_NAME'], | |
|
99 | 'HTTP_HOST': request.environ.get('HTTP_HOST', | |
|
100 | request.environ['SERVER_NAME']), | |
|
101 | 'SERVER_NAME': request.environ['SERVER_NAME'], | |
|
102 | 'SERVER_PORT': request.environ['SERVER_PORT'], | |
|
103 | 'wsgi.url_scheme': request.environ['wsgi.url_scheme'], | |
|
104 | }, | |
|
105 | 'auth_user': { | |
|
106 | 'ip_addr': ip_addr, | |
|
107 | 'user_id': user_id | |
|
108 | }, | |
|
109 | } | |
|
110 | }) | |
|
111 | return super(RhodecodeCeleryTask, self).apply_async( | |
|
112 | args, kwargs, task_id, producer, link, link_error, **options) | |
|
113 | ||
|
114 | def __call__(self, *args, **kwargs): | |
|
115 | """ rebuild the context and then run task on celery worker """ | |
|
116 | proxy_data = kwargs.pop('_rhodecode_proxy_data', {}) | |
|
117 | ||
|
118 | if not proxy_data: | |
|
119 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) | |
|
120 | ||
|
121 | log.debug('using celery proxy data to run task: %r', proxy_data) | |
|
122 | ||
|
123 | from rhodecode.config.routing import make_map | |
|
124 | ||
|
125 | request = Request.blank('/', environ=proxy_data['environ']) | |
|
126 | request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'], | |
|
127 | ip_addr=proxy_data['auth_user']['ip_addr']) | |
|
128 | ||
|
129 | pyramid_request = prepare(request) # set pyramid threadlocal request | |
|
130 | ||
|
131 | # pylons routing | |
|
132 | if not rhodecode.CONFIG.get('routes.map'): | |
|
133 | rhodecode.CONFIG['routes.map'] = make_map(config) | |
|
134 | pylons.url._push_object(get_routes_generator_for_server_url( | |
|
135 | get_server_url(request.environ) | |
|
136 | )) | |
|
137 | ||
|
138 | try: | |
|
139 | return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs) | |
|
140 | finally: | |
|
141 | pyramid_request['closer']() | |
|
142 | pylons.url._pop_object() | |
|
143 | ||
|
144 | ||
|
145 | 44 | def run_task(task, *args, **kwargs): |
|
146 | 45 | if rhodecode.CELERY_ENABLED: |
|
147 | 46 | celery_is_up = False |
|
148 | 47 | try: |
|
149 | 48 | t = task.apply_async(args=args, kwargs=kwargs) |
|
150 | log.info('running task %s:%s', t.task_id, task) | |
|
151 | 49 | celery_is_up = True |
|
50 | log.debug('executing task %s:%s in async mode', t.task_id, task) | |
|
152 | 51 | return t |
|
153 | 52 | |
|
154 | 53 | except socket.error as e: |
@@ -164,73 +63,10 b' def run_task(task, *args, **kwargs):' | |||
|
164 | 63 | "Fallback to sync execution.") |
|
165 | 64 | |
|
166 | 65 | # keep in mind there maybe a subtle race condition where something |
|
167 |
# depending on rhodecode.CELERY_ENABLED |
|
|
66 | # depending on rhodecode.CELERY_ENABLED | |
|
168 | 67 | # will see CELERY_ENABLED as True before this has a chance to set False |
|
169 | 68 | rhodecode.CELERY_ENABLED = celery_is_up |
|
170 | 69 | else: |
|
171 | log.debug('executing task %s in sync mode', task) | |
|
172 | return ResultWrapper(task(*args, **kwargs)) | |
|
173 | ||
|
174 | ||
|
175 | def __get_lockkey(func, *fargs, **fkwargs): | |
|
176 | params = list(fargs) | |
|
177 | params.extend(['%s-%s' % ar for ar in fkwargs.items()]) | |
|
178 | ||
|
179 | func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) | |
|
180 | _lock_key = func_name + '-' + '-'.join(map(safe_str, params)) | |
|
181 | return 'task_%s.lock' % (md5_safe(_lock_key),) | |
|
182 | ||
|
183 | ||
|
184 | def locked_task(func): | |
|
185 | def __wrapper(func, *fargs, **fkwargs): | |
|
186 | lockkey = __get_lockkey(func, *fargs, **fkwargs) | |
|
187 | lockkey_path = config['app_conf']['cache_dir'] | |
|
188 | ||
|
189 | log.info('running task with lockkey %s' % lockkey) | |
|
190 | try: | |
|
191 | l = DaemonLock(file_=jn(lockkey_path, lockkey)) | |
|
192 | ret = func(*fargs, **fkwargs) | |
|
193 | l.release() | |
|
194 | return ret | |
|
195 | except LockHeld: | |
|
196 | log.info('LockHeld') | |
|
197 | return 'Task with key %s already running' % lockkey | |
|
198 | ||
|
199 | return decorator(__wrapper, func) | |
|
200 | ||
|
70 | log.debug('executing task %s:%s in sync mode', 'TASK', task) | |
|
201 | 71 | |
|
202 | def get_session(): | |
|
203 | if rhodecode.CELERY_ENABLED: | |
|
204 | utils.initialize_database(config) | |
|
205 | sa = meta.Session() | |
|
206 | return sa | |
|
207 | ||
|
208 | ||
|
209 | def dbsession(func): | |
|
210 | def __wrapper(func, *fargs, **fkwargs): | |
|
211 | try: | |
|
212 | ret = func(*fargs, **fkwargs) | |
|
213 | return ret | |
|
214 | finally: | |
|
215 | if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER: | |
|
216 | meta.Session.remove() | |
|
217 | ||
|
218 | return decorator(__wrapper, func) | |
|
219 | ||
|
220 | ||
|
221 | def vcsconnection(func): | |
|
222 | def __wrapper(func, *fargs, **fkwargs): | |
|
223 | if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER: | |
|
224 | settings = rhodecode.PYRAMID_SETTINGS | |
|
225 | backends = settings['vcs.backends'] | |
|
226 | for alias in rhodecode.BACKENDS.keys(): | |
|
227 | if alias not in backends: | |
|
228 | del rhodecode.BACKENDS[alias] | |
|
229 | utils.configure_vcs(settings) | |
|
230 | connect_vcs( | |
|
231 | settings['vcs.server'], | |
|
232 | utils.get_vcs_server_protocol(settings)) | |
|
233 | ret = func(*fargs, **fkwargs) | |
|
234 | return ret | |
|
235 | ||
|
236 | return decorator(__wrapper, func) | |
|
72 | return ResultWrapper(task(*args, **kwargs)) |
@@ -23,38 +23,18 b' RhodeCode task modules, containing all t' | |||
|
23 | 23 | by celery daemon |
|
24 | 24 | """ |
|
25 | 25 | |
|
26 | ||
|
27 | 26 | import os |
|
28 | import logging | |
|
29 | ||
|
30 | from celery.task import task | |
|
31 | 27 | |
|
32 | 28 | import rhodecode |
|
33 | 29 | from rhodecode.lib import audit_logger |
|
34 |
from rhodecode.lib.celerylib import |
|
|
35 | run_task, dbsession, __get_lockkey, LockHeld, DaemonLock, | |
|
36 | get_session, vcsconnection, RhodecodeCeleryTask) | |
|
30 | from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask | |
|
37 | 31 | from rhodecode.lib.hooks_base import log_create_repository |
|
38 | 32 | from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer |
|
39 | from rhodecode.lib.utils import add_cache | |
|
40 | 33 | from rhodecode.lib.utils2 import safe_int, str2bool |
|
41 | from rhodecode.model.db import Repository, User | |
|
34 | from rhodecode.model.db import Session, Repository, User | |
|
42 | 35 | |
|
43 | 36 | |
|
44 | def get_logger(cls): | |
|
45 | if rhodecode.CELERY_ENABLED: | |
|
46 | try: | |
|
47 | log = cls.get_logger() | |
|
48 | except Exception: | |
|
49 | log = logging.getLogger(__name__) | |
|
50 | else: | |
|
51 | log = logging.getLogger(__name__) | |
|
52 | ||
|
53 | return log | |
|
54 | ||
|
55 | ||
|
56 | @task(ignore_result=True, base=RhodecodeCeleryTask) | |
|
57 | @dbsession | |
|
37 | @async_task(ignore_result=True, base=RequestContextTask) | |
|
58 | 38 | def send_email(recipients, subject, body='', html_body='', email_config=None): |
|
59 | 39 | """ |
|
60 | 40 | Sends an email with defined parameters from the .ini files. |
@@ -101,18 +81,15 b' def send_email(recipients, subject, body' | |||
|
101 | 81 | return True |
|
102 | 82 | |
|
103 | 83 | |
|
104 |
@task(ignore_result=True, base=R |
|
|
105 | @dbsession | |
|
106 | @vcsconnection | |
|
84 | @async_task(ignore_result=True, base=RequestContextTask) | |
|
107 | 85 | def create_repo(form_data, cur_user): |
|
108 | 86 | from rhodecode.model.repo import RepoModel |
|
109 | 87 | from rhodecode.model.user import UserModel |
|
110 | 88 | from rhodecode.model.settings import SettingsModel |
|
111 | 89 | |
|
112 | 90 | log = get_logger(create_repo) |
|
113 | DBS = get_session() | |
|
114 | 91 | |
|
115 |
cur_user = UserModel( |
|
|
92 | cur_user = UserModel()._get_user(cur_user) | |
|
116 | 93 | owner = cur_user |
|
117 | 94 | |
|
118 | 95 | repo_name = form_data['repo_name'] |
@@ -138,7 +115,7 b' def create_repo(form_data, cur_user):' | |||
|
138 | 115 | 'enable_downloads', defs.get('repo_enable_downloads')) |
|
139 | 116 | |
|
140 | 117 | try: |
|
141 |
repo = RepoModel( |
|
|
118 | repo = RepoModel()._create_repo( | |
|
142 | 119 | repo_name=repo_name_full, |
|
143 | 120 | repo_type=repo_type, |
|
144 | 121 | description=description, |
@@ -155,13 +132,13 b' def create_repo(form_data, cur_user):' | |||
|
155 | 132 | enable_downloads=enable_downloads, |
|
156 | 133 | state=state |
|
157 | 134 | ) |
|
158 |
|
|
|
135 | Session().commit() | |
|
159 | 136 | |
|
160 | 137 | # now create this repo on Filesystem |
|
161 |
RepoModel( |
|
|
138 | RepoModel()._create_filesystem_repo( | |
|
162 | 139 | repo_name=repo_name, |
|
163 | 140 | repo_type=repo_type, |
|
164 |
repo_group=RepoModel( |
|
|
141 | repo_group=RepoModel()._get_repo_group(repo_group), | |
|
165 | 142 | clone_uri=clone_uri, |
|
166 | 143 | ) |
|
167 | 144 | repo = Repository.get_by_repo_name(repo_name_full) |
@@ -180,7 +157,7 b' def create_repo(form_data, cur_user):' | |||
|
180 | 157 | user=cur_user, |
|
181 | 158 | repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) |
|
182 | 159 | |
|
183 |
|
|
|
160 | Session().commit() | |
|
184 | 161 | except Exception: |
|
185 | 162 | log.warning('Exception occurred when creating repository, ' |
|
186 | 163 | 'doing cleanup...', exc_info=True) |
@@ -188,8 +165,8 b' def create_repo(form_data, cur_user):' | |||
|
188 | 165 | repo = Repository.get_by_repo_name(repo_name_full) |
|
189 | 166 | if repo: |
|
190 | 167 | Repository.delete(repo.repo_id) |
|
191 |
|
|
|
192 |
RepoModel( |
|
|
168 | Session().commit() | |
|
169 | RepoModel()._delete_filesystem_repo(repo) | |
|
193 | 170 | raise |
|
194 | 171 | |
|
195 | 172 | # it's an odd fix to make celery fail task when exception occurs |
@@ -199,23 +176,17 b' def create_repo(form_data, cur_user):' | |||
|
199 | 176 | return True |
|
200 | 177 | |
|
201 | 178 | |
|
202 |
@task(ignore_result=True, base=R |
|
|
203 | @dbsession | |
|
204 | @vcsconnection | |
|
179 | @async_task(ignore_result=True, base=RequestContextTask) | |
|
205 | 180 | def create_repo_fork(form_data, cur_user): |
|
206 | 181 | """ |
|
207 | 182 | Creates a fork of repository using internal VCS methods |
|
208 | ||
|
209 | :param form_data: | |
|
210 | :param cur_user: | |
|
211 | 183 | """ |
|
212 | 184 | from rhodecode.model.repo import RepoModel |
|
213 | 185 | from rhodecode.model.user import UserModel |
|
214 | 186 | |
|
215 | 187 | log = get_logger(create_repo_fork) |
|
216 | DBS = get_session() | |
|
217 | 188 | |
|
218 |
cur_user = UserModel( |
|
|
189 | cur_user = UserModel()._get_user(cur_user) | |
|
219 | 190 | owner = cur_user |
|
220 | 191 | |
|
221 | 192 | repo_name = form_data['repo_name'] # fork in this case |
@@ -230,8 +201,8 b' def create_repo_fork(form_data, cur_user' | |||
|
230 | 201 | fork_id = safe_int(form_data.get('fork_parent_id')) |
|
231 | 202 | |
|
232 | 203 | try: |
|
233 |
fork_of = RepoModel( |
|
|
234 |
RepoModel( |
|
|
204 | fork_of = RepoModel()._get_repo(fork_id) | |
|
205 | RepoModel()._create_repo( | |
|
235 | 206 | repo_name=repo_name_full, |
|
236 | 207 | repo_type=repo_type, |
|
237 | 208 | description=description, |
@@ -244,16 +215,16 b' def create_repo_fork(form_data, cur_user' | |||
|
244 | 215 | copy_fork_permissions=copy_fork_permissions |
|
245 | 216 | ) |
|
246 | 217 | |
|
247 |
|
|
|
218 | Session().commit() | |
|
248 | 219 | |
|
249 | 220 | base_path = Repository.base_path() |
|
250 | 221 | source_repo_path = os.path.join(base_path, fork_of.repo_name) |
|
251 | 222 | |
|
252 | 223 | # now create this repo on Filesystem |
|
253 |
RepoModel( |
|
|
224 | RepoModel()._create_filesystem_repo( | |
|
254 | 225 | repo_name=repo_name, |
|
255 | 226 | repo_type=repo_type, |
|
256 |
repo_group=RepoModel( |
|
|
227 | repo_group=RepoModel()._get_repo_group(repo_group), | |
|
257 | 228 | clone_uri=source_repo_path, |
|
258 | 229 | ) |
|
259 | 230 | repo = Repository.get_by_repo_name(repo_name_full) |
@@ -274,7 +245,7 b' def create_repo_fork(form_data, cur_user' | |||
|
274 | 245 | user=cur_user, |
|
275 | 246 | repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) |
|
276 | 247 | |
|
277 |
|
|
|
248 | Session().commit() | |
|
278 | 249 | except Exception as e: |
|
279 | 250 | log.warning('Exception %s occurred when forking repository, ' |
|
280 | 251 | 'doing cleanup...', e) |
@@ -282,8 +253,8 b' def create_repo_fork(form_data, cur_user' | |||
|
282 | 253 | repo = Repository.get_by_repo_name(repo_name_full) |
|
283 | 254 | if repo: |
|
284 | 255 | Repository.delete(repo.repo_id) |
|
285 |
|
|
|
286 |
RepoModel( |
|
|
256 | Session().commit() | |
|
257 | RepoModel()._delete_filesystem_repo(repo) | |
|
287 | 258 | raise |
|
288 | 259 | |
|
289 | 260 | # it's an odd fix to make celery fail task when exception occurs |
@@ -291,3 +262,14 b' def create_repo_fork(form_data, cur_user' | |||
|
291 | 262 | pass |
|
292 | 263 | |
|
293 | 264 | return True |
|
265 | ||
|
266 | ||
|
267 | @async_task(ignore_result=True) | |
|
268 | def sync_repo(*args, **kwargs): | |
|
269 | from rhodecode.model.scm import ScmModel | |
|
270 | log = get_logger(sync_repo) | |
|
271 | ||
|
272 | log.info('Pulling from %s', kwargs['repo_name']) | |
|
273 | ScmModel().pull_changes(kwargs['repo_name'], kwargs['username']) | |
|
274 | ||
|
275 |
@@ -22,6 +22,7 b' import os' | |||
|
22 | 22 | from pyramid.compat import configparser |
|
23 | 23 | from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa |
|
24 | 24 | from pyramid.request import Request |
|
25 | from pyramid.scripting import prepare | |
|
25 | 26 | |
|
26 | 27 | |
|
27 | 28 | def get_config(ini_path, **kwargs): |
@@ -47,3 +48,9 b' def bootstrap(config_uri, request=None, ' | |||
|
47 | 48 | request = request or Request.blank('/', base_url=base_url) |
|
48 | 49 | |
|
49 | 50 | return pyramid_bootstrap(config_uri, request=request, options=options) |
|
51 | ||
|
52 | ||
|
53 | def prepare_request(environ): | |
|
54 | request = Request.blank('/', environ=environ) | |
|
55 | prepare(request) # set pyramid threadlocal request | |
|
56 | return request |
@@ -38,7 +38,6 b' from os.path import join as jn' | |||
|
38 | 38 | |
|
39 | 39 | import paste |
|
40 | 40 | import pkg_resources |
|
41 | from paste.script.command import Command, BadCommand | |
|
42 | 41 | from webhelpers.text import collapse, remove_formatting, strip_tags |
|
43 | 42 | from mako import exceptions |
|
44 | 43 | from pyramid.threadlocal import get_current_registry |
@@ -767,85 +766,6 b' def create_test_repositories(test_path, ' | |||
|
767 | 766 | tar.extractall(jn(test_path, SVN_REPO)) |
|
768 | 767 | |
|
769 | 768 | |
|
770 | #============================================================================== | |
|
771 | # PASTER COMMANDS | |
|
772 | #============================================================================== | |
|
773 | class BasePasterCommand(Command): | |
|
774 | """ | |
|
775 | Abstract Base Class for paster commands. | |
|
776 | ||
|
777 | The celery commands are somewhat aggressive about loading | |
|
778 | celery.conf, and since our module sets the `CELERY_LOADER` | |
|
779 | environment variable to our loader, we have to bootstrap a bit and | |
|
780 | make sure we've had a chance to load the pylons config off of the | |
|
781 | command line, otherwise everything fails. | |
|
782 | """ | |
|
783 | min_args = 1 | |
|
784 | min_args_error = "Please provide a paster config file as an argument." | |
|
785 | takes_config_file = 1 | |
|
786 | requires_config_file = True | |
|
787 | ||
|
788 | def notify_msg(self, msg, log=False): | |
|
789 | """Make a notification to user, additionally if logger is passed | |
|
790 | it logs this action using given logger | |
|
791 | ||
|
792 | :param msg: message that will be printed to user | |
|
793 | :param log: logging instance, to use to additionally log this message | |
|
794 | ||
|
795 | """ | |
|
796 | if log and isinstance(log, logging): | |
|
797 | log(msg) | |
|
798 | ||
|
799 | def run(self, args): | |
|
800 | """ | |
|
801 | Overrides Command.run | |
|
802 | ||
|
803 | Checks for a config file argument and loads it. | |
|
804 | """ | |
|
805 | if len(args) < self.min_args: | |
|
806 | raise BadCommand( | |
|
807 | self.min_args_error % {'min_args': self.min_args, | |
|
808 | 'actual_args': len(args)}) | |
|
809 | ||
|
810 | # Decrement because we're going to lob off the first argument. | |
|
811 | # @@ This is hacky | |
|
812 | self.min_args -= 1 | |
|
813 | self.bootstrap_config(args[0]) | |
|
814 | self.update_parser() | |
|
815 | return super(BasePasterCommand, self).run(args[1:]) | |
|
816 | ||
|
817 | def update_parser(self): | |
|
818 | """ | |
|
819 | Abstract method. Allows for the class' parser to be updated | |
|
820 | before the superclass' `run` method is called. Necessary to | |
|
821 | allow options/arguments to be passed through to the underlying | |
|
822 | celery command. | |
|
823 | """ | |
|
824 | raise NotImplementedError("Abstract Method.") | |
|
825 | ||
|
826 | def bootstrap_config(self, conf): | |
|
827 | """ | |
|
828 | Loads the pylons configuration. | |
|
829 | """ | |
|
830 | from pylons import config as pylonsconfig | |
|
831 | ||
|
832 | self.path_to_ini_file = os.path.realpath(conf) | |
|
833 | conf = paste.deploy.appconfig('config:' + self.path_to_ini_file) | |
|
834 | pylonsconfig.init_app(conf.global_conf, conf.local_conf) | |
|
835 | ||
|
836 | def _init_session(self): | |
|
837 | """ | |
|
838 | Inits SqlAlchemy Session | |
|
839 | """ | |
|
840 | logging.config.fileConfig(self.path_to_ini_file) | |
|
841 | from pylons import config | |
|
842 | from rhodecode.config.utils import initialize_database | |
|
843 | ||
|
844 | # get to remove repos !! | |
|
845 | add_cache(config) | |
|
846 | initialize_database(config) | |
|
847 | ||
|
848 | ||
|
849 | 769 | def password_changed(auth_user, session): |
|
850 | 770 | # Never report password change in case of default user or anonymous user. |
|
851 | 771 | if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None: |
@@ -23,7 +23,6 b'' | |||
|
23 | 23 | Some simple helper functions |
|
24 | 24 | """ |
|
25 | 25 | |
|
26 | ||
|
27 | 26 | import collections |
|
28 | 27 | import datetime |
|
29 | 28 | import dateutil.relativedelta |
@@ -42,7 +41,6 b' import sqlalchemy.engine.url' | |||
|
42 | 41 | import sqlalchemy.exc |
|
43 | 42 | import sqlalchemy.sql |
|
44 | 43 | import webob |
|
45 | import routes.util | |
|
46 | 44 | import pyramid.threadlocal |
|
47 | 45 | |
|
48 | 46 | import rhodecode |
@@ -941,31 +939,6 b' class Optional(object):' | |||
|
941 | 939 | return val |
|
942 | 940 | |
|
943 | 941 | |
|
944 | def get_routes_generator_for_server_url(server_url): | |
|
945 | parsed_url = urlobject.URLObject(server_url) | |
|
946 | netloc = safe_str(parsed_url.netloc) | |
|
947 | script_name = safe_str(parsed_url.path) | |
|
948 | ||
|
949 | if ':' in netloc: | |
|
950 | server_name, server_port = netloc.split(':') | |
|
951 | else: | |
|
952 | server_name = netloc | |
|
953 | server_port = (parsed_url.scheme == 'https' and '443' or '80') | |
|
954 | ||
|
955 | environ = { | |
|
956 | 'REQUEST_METHOD': 'GET', | |
|
957 | 'PATH_INFO': '/', | |
|
958 | 'SERVER_NAME': server_name, | |
|
959 | 'SERVER_PORT': server_port, | |
|
960 | 'SCRIPT_NAME': script_name, | |
|
961 | } | |
|
962 | if parsed_url.scheme == 'https': | |
|
963 | environ['HTTPS'] = 'on' | |
|
964 | environ['wsgi.url_scheme'] = 'https' | |
|
965 | ||
|
966 | return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ) | |
|
967 | ||
|
968 | ||
|
969 | 942 | def glob2re(pat): |
|
970 | 943 | """ |
|
971 | 944 | Translate a shell PATTERN to a regular expression. |
@@ -448,3 +448,9 b' class TestGetEnabledHooks(object):' | |||
|
448 | 448 | ui_settings = [] |
|
449 | 449 | result = utils.get_enabled_hook_classes(ui_settings) |
|
450 | 450 | assert result == [] |
|
451 | ||
|
452 | ||
|
453 | def test_obfuscate_url_pw(): | |
|
454 | from rhodecode.lib.utils2 import obfuscate_url_pw | |
|
455 | engine = u'/home/repos/malmö' | |
|
456 | assert obfuscate_url_pw(engine) No newline at end of file |
@@ -108,7 +108,6 b' def pytest_addoption(parser):' | |||
|
108 | 108 | |
|
109 | 109 | |
|
110 | 110 | def pytest_configure(config): |
|
111 | # Appy the kombu patch early on, needed for test discovery on Python 2.7.11 | |
|
112 | 111 | from rhodecode.config import patches |
|
113 | 112 | |
|
114 | 113 |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now