##// END OF EJS Templates
celery: celery 4.X support. Fixes #4169...
marcink -
r2359:246f5a4c default
parent child Browse files
Show More
@@ -0,0 +1,256 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 Celery loader, run with::
22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
24 """
25 import os
26 import logging
27
28 from celery import Celery
29 from celery import signals
30 from celery import Task
31 from kombu.serialization import register
32 from pyramid.threadlocal import get_current_request
33
34 import rhodecode
35
36 from rhodecode.lib.auth import AuthUser
37 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
38 from rhodecode.lib.ext_json import json
39 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
40 from rhodecode.lib.utils2 import str2bool
41 from rhodecode.model import meta
42
43
44 register('json_ext', json.dumps, json.loads,
45 content_type='application/x-json-ext',
46 content_encoding='utf-8')
47
48 log = logging.getLogger('celery.rhodecode.loader')
49
50
51 def add_preload_arguments(parser):
52 parser.add_argument(
53 '--ini', default=None,
54 help='Path to ini configuration file.'
55 )
56 parser.add_argument(
57 '--ini-var', default=None,
58 help='Comma separated list of key=value to pass to ini.'
59 )
60
61
62 def get_logger(obj):
63 custom_log = logging.getLogger(
64 'rhodecode.task.{}'.format(obj.__class__.__name__))
65
66 if rhodecode.CELERY_ENABLED:
67 try:
68 custom_log = obj.get_logger()
69 except Exception:
70 pass
71
72 return custom_log
73
74
75 base_celery_config = {
76 'result_backend': 'rpc://',
77 'result_expires': 60 * 60 * 24,
78 'result_persistent': True,
79 'imports': [],
80 'worker_max_tasks_per_child': 100,
81 'accept_content': ['json_ext'],
82 'task_serializer': 'json_ext',
83 'result_serializer': 'json_ext',
84 'worker_hijack_root_logger': False,
85 }
86 # init main celery app
87 celery_app = Celery()
88 celery_app.user_options['preload'].add(add_preload_arguments)
89 ini_file_glob = None
90
91
92 @signals.setup_logging.connect
93 def setup_logging_callback(**kwargs):
94 setup_logging(ini_file_glob)
95
96
97 @signals.user_preload_options.connect
98 def on_preload_parsed(options, **kwargs):
99 ini_location = options['ini']
100 ini_vars = options['ini_var']
101 celery_app.conf['INI_PYRAMID'] = options['ini']
102
103 if ini_location is None:
104 print('You must provide the paste --ini argument')
105 exit(-1)
106
107 options = None
108 if ini_vars is not None:
109 options = parse_ini_vars(ini_vars)
110
111 global ini_file_glob
112 ini_file_glob = ini_location
113
114 log.debug('Bootstrapping RhodeCode application...')
115 env = bootstrap(ini_location, options=options)
116
117 setup_celery_app(
118 app=env['app'], root=env['root'], request=env['request'],
119 registry=env['registry'], closer=env['closer'],
120 ini_location=ini_location)
121
122 # fix the global flag even if it's disabled via .ini file because this
123 # is a worker code that doesn't need this to be disabled.
124 rhodecode.CELERY_ENABLED = True
125
126
127 @signals.task_success.connect
128 def task_success_signal(result, **kwargs):
129 meta.Session.commit()
130 celery_app.conf['PYRAMID_CLOSER']()
131
132
133 @signals.task_retry.connect
134 def task_retry_signal(
135 request, reason, einfo, **kwargs):
136 meta.Session.remove()
137 celery_app.conf['PYRAMID_CLOSER']()
138
139
140 @signals.task_failure.connect
141 def task_failure_signal(
142 task_id, exception, args, kwargs, traceback, einfo, **kargs):
143 meta.Session.remove()
144 celery_app.conf['PYRAMID_CLOSER']()
145
146
147 @signals.task_revoked.connect
148 def task_revoked_signal(
149 request, terminated, signum, expired, **kwargs):
150 celery_app.conf['PYRAMID_CLOSER']()
151
152
153 def setup_celery_app(app, root, request, registry, closer, ini_location):
154 ini_dir = os.path.dirname(os.path.abspath(ini_location))
155 celery_config = base_celery_config
156 celery_config.update({
157 # store celerybeat scheduler db where the .ini file is
158 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
159 })
160 ini_settings = get_ini_config(ini_location)
161 log.debug('Got custom celery conf: %s', ini_settings)
162
163 celery_config.update(ini_settings)
164 celery_app.config_from_object(celery_config)
165
166 celery_app.conf.update({'PYRAMID_APP': app})
167 celery_app.conf.update({'PYRAMID_ROOT': root})
168 celery_app.conf.update({'PYRAMID_REQUEST': request})
169 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
170 celery_app.conf.update({'PYRAMID_CLOSER': closer})
171
172
173 def configure_celery(config, ini_location):
174 """
175 Helper that is called from our application creation logic. It gives
176 connection info into running webapp and allows execution of tasks from
177 RhodeCode itself
178 """
179 # store some globals into rhodecode
180 rhodecode.CELERY_ENABLED = str2bool(
181 config.registry.settings.get('use_celery'))
182 if rhodecode.CELERY_ENABLED:
183 log.info('Configuring celery based on `%s` file', ini_location)
184 setup_celery_app(
185 app=None, root=None, request=None, registry=config.registry,
186 closer=None, ini_location=ini_location)
187
188
189 class RequestContextTask(Task):
190 """
191 This is a celery task which will create a rhodecode app instance context
192 for the task, patch pyramid with the original request
193 that created the task and also add the user to the context.
194 """
195
196 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
197 link=None, link_error=None, shadow=None, **options):
198 """ queue the job to run (we are in web request context here) """
199
200 req = get_current_request()
201
202 # web case
203 if hasattr(req, 'user'):
204 ip_addr = req.user.ip_addr
205 user_id = req.user.user_id
206
207 # api case
208 elif hasattr(req, 'rpc_user'):
209 ip_addr = req.rpc_user.ip_addr
210 user_id = req.rpc_user.user_id
211 else:
212 raise Exception(
213 'Unable to fetch required data from request: {}. \n'
214 'This task is required to be executed from context of '
215 'request in a webapp'.format(repr(req)))
216
217 if req:
218 # we hook into kwargs since it is the only way to pass our data to
219 # the celery worker
220 options['headers'] = options.get('headers', {})
221 options['headers'].update({
222 'rhodecode_proxy_data': {
223 'environ': {
224 'PATH_INFO': req.environ['PATH_INFO'],
225 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
226 'HTTP_HOST': req.environ.get('HTTP_HOST',
227 req.environ['SERVER_NAME']),
228 'SERVER_NAME': req.environ['SERVER_NAME'],
229 'SERVER_PORT': req.environ['SERVER_PORT'],
230 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
231 },
232 'auth_user': {
233 'ip_addr': ip_addr,
234 'user_id': user_id
235 },
236 }
237 })
238
239 return super(RequestContextTask, self).apply_async(
240 args, kwargs, task_id, producer, link, link_error, shadow, **options)
241
242 def __call__(self, *args, **kwargs):
243 """ rebuild the context and then run task on celery worker """
244
245 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
246 if not proxy_data:
247 return super(RequestContextTask, self).__call__(*args, **kwargs)
248
249 log.debug('using celery proxy data to run task: %r', proxy_data)
250 # re-inject and register threadlocals for proper routing support
251 request = prepare_request(proxy_data['environ'])
252 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
253 ip_addr=proxy_data['auth_user']['ip_addr'])
254
255 return super(RequestContextTask, self).__call__(*args, **kwargs)
256
@@ -0,0 +1,156 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import os
22 import json
23 import logging
24 import datetime
25
26 from functools import partial
27
28 from pyramid.compat import configparser
29 from celery.result import AsyncResult
30 import celery.loaders.base
31 import celery.schedules
32
33
34 log = logging.getLogger(__name__)
35
36
37 def get_task_id(task):
38 task_id = None
39 if isinstance(task, AsyncResult):
40 task_id = task.task_id
41
42 return task_id
43
44
45 def crontab(value):
46 return celery.schedules.crontab(**value)
47
48
49 def timedelta(value):
50 return datetime.timedelta(**value)
51
52
53 def safe_json(get, section, key):
54 value = ''
55 try:
56 value = get(key)
57 json_value = json.loads(value)
58 except ValueError:
59 msg = 'The %s=%s is not valid json in section %s' % (
60 key, value, section
61 )
62 raise ValueError(msg)
63
64 return json_value
65
66
67 def get_beat_config(parser, section):
68 SCHEDULE_TYPE_MAP = {
69 'crontab': crontab,
70 'timedelta': timedelta,
71 'integer': int
72 }
73 get = partial(parser.get, section)
74 has_option = partial(parser.has_option, section)
75
76 schedule_type = get('type')
77 schedule_value = safe_json(get, section, 'schedule')
78
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 config = {
92 'task': get('task'),
93 'schedule': schedule,
94 }
95
96 if has_option('args'):
97 config['args'] = safe_json(get, section, 'args')
98
99 if has_option('kwargs'):
100 config['kwargs'] = safe_json(get, section, 'kwargs')
101
102 return config
103
104
105 def get_ini_config(ini_location):
106 """
107 Converts basic ini configuration into celery 4.X options
108 """
109 def key_converter(key_name):
110 pref = 'celery.'
111 if key_name.startswith(pref):
112 return key_name[len(pref):].replace('.', '_').lower()
113
114 def type_converter(parsed_key, value):
115 # cast to int
116 if value.isdigit():
117 return int(value)
118
119 # cast to bool
120 if value.lower() in ['true', 'false', 'True', 'False']:
121 return value.lower() == 'true'
122 return value
123
124 parser = configparser.SafeConfigParser(
125 defaults={'here': os.path.abspath(ini_location)})
126 parser.read(ini_location)
127
128 ini_config = {}
129 for k, v in parser.items('app:main'):
130 pref = 'celery.'
131 if k.startswith(pref):
132 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
133
134 beat_config = {}
135 for section in parser.sections():
136 if section.startswith('celerybeat:'):
137 name = section.split(':', 1)[1]
138 beat_config[name] = get_beat_config(parser, section)
139
140 # final compose of settings
141 celery_settings = {}
142
143 if ini_config:
144 celery_settings.update(ini_config)
145 if beat_config:
146 celery_settings.update({'beat_schedule': beat_config})
147
148 return celery_settings
149
150
151 def parse_ini_vars(ini_vars):
152 options = {}
153 for pairs in ini_vars.split(','):
154 key, value = pairs.split('=')
155 options[key] = value
156 return options
@@ -296,28 +296,15 b' labs_settings_active = true'
296 ### CELERY CONFIG ####
296 ### CELERY CONFIG ####
297 ####################################
297 ####################################
298 use_celery = false
298 use_celery = false
299 broker.host = localhost
300 broker.vhost = rabbitmqhost
301 broker.port = 5672
302 broker.user = rabbitmq
303 broker.password = qweqwe
304
305 celery.imports = rhodecode.lib.celerylib.tasks
306
299
307 celery.result.backend = amqp
300 # connection url to the message broker (default rabbitmq)
308 celery.result.dburi = amqp://
301 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
309 celery.result.serialier = json
310
302
311 #celery.send.task.error.emails = true
303 # maximum tasks to execute before worker restart
312 #celery.amqp.task.result.expires = 18000
304 celery.max_tasks_per_child = 100
313
314 celeryd.concurrency = 2
315 #celeryd.log.file = celeryd.log
316 celeryd.log.level = debug
317 celeryd.max.tasks.per.child = 1
318
305
319 ## tasks will never be sent to the queue, but executed locally instead.
306 ## tasks will never be sent to the queue, but executed locally instead.
320 celery.always.eager = false
307 celery.task_always_eager = false
321
308
322 ####################################
309 ####################################
323 ### BEAKER CACHE ####
310 ### BEAKER CACHE ####
@@ -649,7 +636,7 b' custom.conf = 1'
649 ### LOGGING CONFIGURATION ####
636 ### LOGGING CONFIGURATION ####
650 ################################
637 ################################
651 [loggers]
638 [loggers]
652 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
639 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
653
640
654 [handlers]
641 [handlers]
655 keys = console, console_sql
642 keys = console, console_sql
@@ -688,6 +675,11 b' handlers ='
688 qualname = ssh_wrapper
675 qualname = ssh_wrapper
689 propagate = 1
676 propagate = 1
690
677
678 [logger_celery]
679 level = DEBUG
680 handlers =
681 qualname = celery
682
691
683
692 ##############
684 ##############
693 ## HANDLERS ##
685 ## HANDLERS ##
@@ -271,28 +271,15 b' labs_settings_active = true'
271 ### CELERY CONFIG ####
271 ### CELERY CONFIG ####
272 ####################################
272 ####################################
273 use_celery = false
273 use_celery = false
274 broker.host = localhost
275 broker.vhost = rabbitmqhost
276 broker.port = 5672
277 broker.user = rabbitmq
278 broker.password = qweqwe
279
280 celery.imports = rhodecode.lib.celerylib.tasks
281
274
282 celery.result.backend = amqp
275 # connection url to the message broker (default rabbitmq)
283 celery.result.dburi = amqp://
276 celery.broker_url = amqp://rabbitmq:qweqwe@localhost:5672/rabbitmqhost
284 celery.result.serialier = json
285
277
286 #celery.send.task.error.emails = true
278 # maximum tasks to execute before worker restart
287 #celery.amqp.task.result.expires = 18000
279 celery.max_tasks_per_child = 100
288
289 celeryd.concurrency = 2
290 #celeryd.log.file = celeryd.log
291 celeryd.log.level = debug
292 celeryd.max.tasks.per.child = 1
293
280
294 ## tasks will never be sent to the queue, but executed locally instead.
281 ## tasks will never be sent to the queue, but executed locally instead.
295 celery.always.eager = false
282 celery.task_always_eager = false
296
283
297 ####################################
284 ####################################
298 ### BEAKER CACHE ####
285 ### BEAKER CACHE ####
@@ -619,7 +606,7 b' custom.conf = 1'
619 ### LOGGING CONFIGURATION ####
606 ### LOGGING CONFIGURATION ####
620 ################################
607 ################################
621 [loggers]
608 [loggers]
622 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper
609 keys = root, sqlalchemy, beaker, rhodecode, ssh_wrapper, celery
623
610
624 [handlers]
611 [handlers]
625 keys = console, console_sql
612 keys = console, console_sql
@@ -658,6 +645,11 b' handlers ='
658 qualname = ssh_wrapper
645 qualname = ssh_wrapper
659 propagate = 1
646 propagate = 1
660
647
648 [logger_celery]
649 level = DEBUG
650 handlers =
651 qualname = celery
652
661
653
662 ##############
654 ##############
663 ## HANDLERS ##
655 ## HANDLERS ##
@@ -32,6 +32,7 b' from rhodecode.api.utils import ('
32 from rhodecode.lib import audit_logger
32 from rhodecode.lib import audit_logger
33 from rhodecode.lib import repo_maintenance
33 from rhodecode.lib import repo_maintenance
34 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
34 from rhodecode.lib.auth import HasPermissionAnyApi, HasUserGroupPermissionAnyApi
35 from rhodecode.lib.celerylib.utils import get_task_id
35 from rhodecode.lib.utils2 import str2bool, time_to_datetime
36 from rhodecode.lib.utils2 import str2bool, time_to_datetime
36 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
38 from rhodecode.lib.exceptions import StatusChangeOnClosedPullRequestError
@@ -712,10 +713,7 b' def create_repo('
712 }
713 }
713
714
714 task = RepoModel().create(form_data=data, cur_user=owner)
715 task = RepoModel().create(form_data=data, cur_user=owner)
715 from celery.result import BaseAsyncResult
716 task_id = get_task_id(task)
716 task_id = None
717 if isinstance(task, BaseAsyncResult):
718 task_id = task.task_id
719 # no commit, it's done in RepoModel, or async via celery
717 # no commit, it's done in RepoModel, or async via celery
720 return {
718 return {
721 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
719 'msg': "Created new repository `%s`" % (schema_data['repo_name'],),
@@ -1105,10 +1103,8 b' def fork_repo(request, apiuser, repoid, '
1105
1103
1106 task = RepoModel().create_fork(data, cur_user=owner)
1104 task = RepoModel().create_fork(data, cur_user=owner)
1107 # no commit, it's done in RepoModel, or async via celery
1105 # no commit, it's done in RepoModel, or async via celery
1108 from celery.result import BaseAsyncResult
1106 task_id = get_task_id(task)
1109 task_id = None
1107
1110 if isinstance(task, BaseAsyncResult):
1111 task_id = task.task_id
1112 return {
1108 return {
1113 'msg': 'Created fork of `%s` as `%s`' % (
1109 'msg': 'Created fork of `%s` as `%s`' % (
1114 repo.repo_name, schema_data['repo_name']),
1110 repo.repo_name, schema_data['repo_name']),
@@ -28,6 +28,7 b' from pyramid.renderers import render'
28 from pyramid.response import Response
28 from pyramid.response import Response
29
29
30 from rhodecode.apps._base import BaseAppView, DataGridAppView
30 from rhodecode.apps._base import BaseAppView, DataGridAppView
31 from rhodecode.lib.celerylib.utils import get_task_id
31
32
32 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.auth import (
34 from rhodecode.lib.auth import (
@@ -143,22 +144,19 b' class AdminReposView(BaseAppView, DataGr'
143 c = self.load_default_context()
144 c = self.load_default_context()
144
145
145 form_result = {}
146 form_result = {}
147 self._load_form_data(c)
146 task_id = None
148 task_id = None
147 self._load_form_data(c)
148
149 try:
149 try:
150 # CanWriteToGroup validators checks permissions of this POST
150 # CanWriteToGroup validators checks permissions of this POST
151 form = RepoForm(
151 form = RepoForm(
152 self.request.translate, repo_groups=c.repo_groups_choices,
152 self.request.translate, repo_groups=c.repo_groups_choices,
153 landing_revs=c.landing_revs_choices)()
153 landing_revs=c.landing_revs_choices)()
154 form_results = form.to_python(dict(self.request.POST))
154 form_result = form.to_python(dict(self.request.POST))
155
155
156 # create is done sometimes async on celery, db transaction
156 # create is done sometimes async on celery, db transaction
157 # management is handled there.
157 # management is handled there.
158 task = RepoModel().create(form_result, self._rhodecode_user.user_id)
158 task = RepoModel().create(form_result, self._rhodecode_user.user_id)
159 from celery.result import BaseAsyncResult
159 task_id = get_task_id(task)
160 if isinstance(task, BaseAsyncResult):
161 task_id = task.task_id
162 except formencode.Invalid as errors:
160 except formencode.Invalid as errors:
163 data = render('rhodecode:templates/admin/repos/repo_add.mako',
161 data = render('rhodecode:templates/admin/repos/repo_add.mako',
164 self._get_template_context(c), self.request)
162 self._get_template_context(c), self.request)
@@ -46,11 +46,9 b' class RepoChecksView(BaseAppView):'
46
46
47 repo_name = self.request.matchdict['repo_name']
47 repo_name = self.request.matchdict['repo_name']
48 db_repo = Repository.get_by_repo_name(repo_name)
48 db_repo = Repository.get_by_repo_name(repo_name)
49 if not db_repo:
50 raise HTTPNotFound()
51
49
52 # check if maybe repo is already created
50 # check if maybe repo is already created
53 if db_repo.repo_state in [Repository.STATE_CREATED]:
51 if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]:
54 # re-check permissions before redirecting to prevent resource
52 # re-check permissions before redirecting to prevent resource
55 # discovery by checking the 302 code
53 # discovery by checking the 302 code
56 perm_set = ['repository.read', 'repository.write', 'repository.admin']
54 perm_set = ['repository.read', 'repository.write', 'repository.admin']
@@ -80,9 +78,10 b' class RepoChecksView(BaseAppView):'
80
78
81 if task_id and task_id not in ['None']:
79 if task_id and task_id not in ['None']:
82 import rhodecode
80 import rhodecode
83 from celery.result import AsyncResult
81 from rhodecode.lib.celerylib.loader import celery_app
84 if rhodecode.CELERY_ENABLED:
82 if rhodecode.CELERY_ENABLED:
85 task = AsyncResult(task_id)
83 task = celery_app.AsyncResult(task_id)
84 task.get()
86 if task.failed():
85 if task.failed():
87 msg = self._log_creation_exception(task.result, repo_name)
86 msg = self._log_creation_exception(task.result, repo_name)
88 h.flash(msg, category='error')
87 h.flash(msg, category='error')
@@ -33,6 +33,7 b' from rhodecode.lib.auth import ('
33 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
33 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous,
34 HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
34 HasRepoPermissionAny, HasPermissionAnyDecorator, CSRFRequired)
35 import rhodecode.lib.helpers as h
35 import rhodecode.lib.helpers as h
36 from rhodecode.lib.celerylib.utils import get_task_id
36 from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
37 from rhodecode.model.db import coalesce, or_, Repository, RepoGroup
37 from rhodecode.model.repo import RepoModel
38 from rhodecode.model.repo import RepoModel
38 from rhodecode.model.forms import RepoForkForm
39 from rhodecode.model.forms import RepoForkForm
@@ -226,9 +227,8 b' class RepoForksView(RepoAppView, DataGri'
226 # management is handled there.
227 # management is handled there.
227 task = RepoModel().create_fork(
228 task = RepoModel().create_fork(
228 form_result, c.rhodecode_user.user_id)
229 form_result, c.rhodecode_user.user_id)
229 from celery.result import BaseAsyncResult
230
230 if isinstance(task, BaseAsyncResult):
231 task_id = get_task_id(task)
231 task_id = task.task_id
232 except formencode.Invalid as errors:
232 except formencode.Invalid as errors:
233 c.rhodecode_db_repo = self.db_repo
233 c.rhodecode_db_repo = self.db_repo
234
234
@@ -23,14 +23,6 b' import os'
23 import logging
23 import logging
24 import rhodecode
24 import rhodecode
25
25
26 # ------------------------------------------------------------------------------
27 # CELERY magic until refactor - issue #4163 - import order matters here:
28 #from rhodecode.lib import celerypylons # this must be first, celerypylons
29 # sets config settings upon import
30
31 import rhodecode.integrations # any modules using celery task
32 # decorators should be added afterwards:
33 # ------------------------------------------------------------------------------
34
26
35 from rhodecode.config import utils
27 from rhodecode.config import utils
36
28
@@ -54,14 +46,6 b' def load_pyramid_environment(global_conf'
54 'secret': settings_merged.get('channelstream.secret')
46 'secret': settings_merged.get('channelstream.secret')
55 }
47 }
56
48
57
58 # TODO(marcink): celery
59 # # store some globals into rhodecode
60 # rhodecode.CELERY_ENABLED = str2bool(config['app_conf'].get('use_celery'))
61 # rhodecode.CELERY_EAGER = str2bool(
62 # config['app_conf'].get('celery.always.eager'))
63
64
65 # If this is a test run we prepare the test environment like
49 # If this is a test run we prepare the test environment like
66 # creating a test database, test search index and test repositories.
50 # creating a test database, test search index and test repositories.
67 # This has to be done before the database connection is initialized.
51 # This has to be done before the database connection is initialized.
@@ -189,7 +189,7 b''
189 "python2.7-jupyter-core-4.3.0": {
189 "python2.7-jupyter-core-4.3.0": {
190 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
190 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
191 },
191 },
192 "python2.7-kombu-1.5.1": {
192 "python2.7-kombu-4.1.0": {
193 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
193 "BSD 4-clause \"Original\" or \"Old\" License": "http://spdx.org/licenses/BSD-4-Clause"
194 },
194 },
195 "python2.7-mistune-0.7.4": {
195 "python2.7-mistune-0.7.4": {
@@ -42,6 +42,7 b' from rhodecode.lib.vcs import VCSCommuni'
42 from rhodecode.lib.exceptions import VCSServerUnavailable
42 from rhodecode.lib.exceptions import VCSServerUnavailable
43 from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
43 from rhodecode.lib.middleware.appenlight import wrap_in_appenlight_if_enabled
44 from rhodecode.lib.middleware.https_fixup import HttpsFixup
44 from rhodecode.lib.middleware.https_fixup import HttpsFixup
45 from rhodecode.lib.celerylib.loader import configure_celery
45 from rhodecode.lib.plugins.utils import register_rhodecode_plugin
46 from rhodecode.lib.plugins.utils import register_rhodecode_plugin
46 from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
47 from rhodecode.lib.utils2 import aslist as rhodecode_aslist, AttributeDict
47 from rhodecode.subscribers import (
48 from rhodecode.subscribers import (
@@ -87,9 +88,11 b' def make_pyramid_app(global_config, **se'
87 pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
88 pyramid_app = wrap_app_in_wsgi_middlewares(pyramid_app, config)
88 pyramid_app.config = config
89 pyramid_app.config = config
89
90
91 config.configure_celery(global_config['__file__'])
90 # creating the app uses a connection - return it after we are done
92 # creating the app uses a connection - return it after we are done
91 meta.Session.remove()
93 meta.Session.remove()
92
94
95 log.info('Pyramid app %s created and configured.', pyramid_app)
93 return pyramid_app
96 return pyramid_app
94
97
95
98
@@ -196,6 +199,8 b' def includeme(config):'
196 config.add_directive(
199 config.add_directive(
197 'register_rhodecode_plugin', register_rhodecode_plugin)
200 'register_rhodecode_plugin', register_rhodecode_plugin)
198
201
202 config.add_directive('configure_celery', configure_celery)
203
199 if asbool(settings.get('appenlight', 'false')):
204 if asbool(settings.get('appenlight', 'false')):
200 config.include('appenlight_client.ext.pyramid_tween')
205 config.include('appenlight_client.ext.pyramid_tween')
201
206
@@ -20,18 +20,16 b''
20
20
21 from __future__ import unicode_literals
21 from __future__ import unicode_literals
22 import deform
22 import deform
23 import re
24 import logging
23 import logging
25 import requests
24 import requests
26 import colander
25 import colander
27 import textwrap
26 import textwrap
28 from celery.task import task
29 from mako.template import Template
27 from mako.template import Template
30
28
31 from rhodecode import events
29 from rhodecode import events
32 from rhodecode.translation import _
30 from rhodecode.translation import _
33 from rhodecode.lib import helpers as h
31 from rhodecode.lib import helpers as h
34 from rhodecode.lib.celerylib import run_task
32 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
35 from rhodecode.lib.colander_utils import strip_whitespace
33 from rhodecode.lib.colander_utils import strip_whitespace
36 from rhodecode.integrations.types.base import IntegrationTypeBase
34 from rhodecode.integrations.types.base import IntegrationTypeBase
37
35
@@ -243,7 +241,7 b' class HipchatIntegrationType(Integration'
243 )
241 )
244
242
245
243
246 @task(ignore_result=True)
244 @async_task(ignore_result=True, base=RequestContextTask)
247 def post_text_to_hipchat(settings, text):
245 def post_text_to_hipchat(settings, text):
248 log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
246 log.debug('sending %s to hipchat %s' % (text, settings['server_url']))
249 resp = requests.post(settings['server_url'], json={
247 resp = requests.post(settings['server_url'], json={
@@ -27,13 +27,12 b' import logging'
27 import deform
27 import deform
28 import requests
28 import requests
29 import colander
29 import colander
30 from celery.task import task
31 from mako.template import Template
30 from mako.template import Template
32
31
33 from rhodecode import events
32 from rhodecode import events
34 from rhodecode.translation import _
33 from rhodecode.translation import _
35 from rhodecode.lib import helpers as h
34 from rhodecode.lib import helpers as h
36 from rhodecode.lib.celerylib import run_task
35 from rhodecode.lib.celerylib import run_task, async_task, RequestContextTask
37 from rhodecode.lib.colander_utils import strip_whitespace
36 from rhodecode.lib.colander_utils import strip_whitespace
38 from rhodecode.integrations.types.base import IntegrationTypeBase
37 from rhodecode.integrations.types.base import IntegrationTypeBase
39
38
@@ -296,7 +295,7 b' def html_to_slack_links(message):'
296 r'<\1|\2>', message)
295 r'<\1|\2>', message)
297
296
298
297
299 @task(ignore_result=True)
298 @async_task(ignore_result=True, base=RequestContextTask)
300 def post_text_to_slack(settings, title, text, fields=None, overrides=None):
299 def post_text_to_slack(settings, title, text, fields=None, overrides=None):
301 log.debug('sending %s (%s) to slack %s' % (
300 log.debug('sending %s (%s) to slack %s' % (
302 title, text, settings['service']))
301 title, text, settings['service']))
@@ -28,16 +28,17 b' import logging'
28 import requests
28 import requests
29 import requests.adapters
29 import requests.adapters
30 import colander
30 import colander
31 from celery.task import task
32 from requests.packages.urllib3.util.retry import Retry
31 from requests.packages.urllib3.util.retry import Retry
33
32
34 import rhodecode
33 import rhodecode
35 from rhodecode import events
34 from rhodecode import events
36 from rhodecode.translation import _
35 from rhodecode.translation import _
37 from rhodecode.integrations.types.base import IntegrationTypeBase
36 from rhodecode.integrations.types.base import IntegrationTypeBase
37 from rhodecode.lib.celerylib import async_task, RequestContextTask
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41 # updating this required to update the `common_vars` passed in url calling func
42 # updating this required to update the `common_vars` passed in url calling func
42 WEBHOOK_URL_VARS = [
43 WEBHOOK_URL_VARS = [
43 'repo_name',
44 'repo_name',
@@ -315,7 +316,7 b' class WebhookIntegrationType(Integration'
315 post_to_webhook(url_calls, self.settings)
316 post_to_webhook(url_calls, self.settings)
316
317
317
318
318 @task(ignore_result=True)
319 @async_task(ignore_result=True, base=RequestContextTask)
319 def post_to_webhook(url_calls, settings):
320 def post_to_webhook(url_calls, settings):
320 max_retries = 3
321 max_retries = 3
321 retries = Retry(
322 retries = Retry(
@@ -17,36 +17,17 b''
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
21 celery libs for RhodeCode
22 """
23
20
24
25 import pylons
26 import socket
21 import socket
27 import logging
22 import logging
28
23
29 import rhodecode
24 import rhodecode
30
25 from zope.cachedescriptors.property import Lazy as LazyProperty
31 from os.path import join as jn
26 from rhodecode.lib.celerylib.loader import (
32 from pylons import config
27 celery_app, RequestContextTask, get_logger)
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
37
38 from decorator import decorator
39
28
40 from zope.cachedescriptors.property import Lazy as LazyProperty
29 async_task = celery_app.task
41
30
42 from rhodecode.config import utils
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 from rhodecode.lib.vcs import connect_vcs
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
50
31
51 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
52
33
@@ -60,95 +41,13 b' class ResultWrapper(object):'
60 return self.task
41 return self.task
61
42
62
43
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 if hasattr(request, 'user'):
80 ip_addr = request.user.ip_addr
81 user_id = request.user.user_id
82 elif hasattr(request, 'rpc_params'):
83 # TODO(marcink) remove when migration is finished
84 # api specific call on Pyramid.
85 ip_addr = request.rpc_params['apiuser'].ip_addr
86 user_id = request.rpc_params['apiuser'].user_id
87 else:
88 raise Exception('Unable to fetch data from request: {}'.format(
89 request))
90
91 if request:
92 # we hook into kwargs since it is the only way to pass our data to
93 # the celery worker in celery 2.2
94 kwargs.update({
95 '_rhodecode_proxy_data': {
96 'environ': {
97 'PATH_INFO': request.environ['PATH_INFO'],
98 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
99 'HTTP_HOST': request.environ.get('HTTP_HOST',
100 request.environ['SERVER_NAME']),
101 'SERVER_NAME': request.environ['SERVER_NAME'],
102 'SERVER_PORT': request.environ['SERVER_PORT'],
103 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
104 },
105 'auth_user': {
106 'ip_addr': ip_addr,
107 'user_id': user_id
108 },
109 }
110 })
111 return super(RhodecodeCeleryTask, self).apply_async(
112 args, kwargs, task_id, producer, link, link_error, **options)
113
114 def __call__(self, *args, **kwargs):
115 """ rebuild the context and then run task on celery worker """
116 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
117
118 if not proxy_data:
119 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
120
121 log.debug('using celery proxy data to run task: %r', proxy_data)
122
123 from rhodecode.config.routing import make_map
124
125 request = Request.blank('/', environ=proxy_data['environ'])
126 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
127 ip_addr=proxy_data['auth_user']['ip_addr'])
128
129 pyramid_request = prepare(request) # set pyramid threadlocal request
130
131 # pylons routing
132 if not rhodecode.CONFIG.get('routes.map'):
133 rhodecode.CONFIG['routes.map'] = make_map(config)
134 pylons.url._push_object(get_routes_generator_for_server_url(
135 get_server_url(request.environ)
136 ))
137
138 try:
139 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
140 finally:
141 pyramid_request['closer']()
142 pylons.url._pop_object()
143
144
145 def run_task(task, *args, **kwargs):
44 def run_task(task, *args, **kwargs):
146 if rhodecode.CELERY_ENABLED:
45 if rhodecode.CELERY_ENABLED:
147 celery_is_up = False
46 celery_is_up = False
148 try:
47 try:
149 t = task.apply_async(args=args, kwargs=kwargs)
48 t = task.apply_async(args=args, kwargs=kwargs)
150 log.info('running task %s:%s', t.task_id, task)
151 celery_is_up = True
49 celery_is_up = True
50 log.debug('executing task %s:%s in async mode', t.task_id, task)
152 return t
51 return t
153
52
154 except socket.error as e:
53 except socket.error as e:
@@ -164,73 +63,10 b' def run_task(task, *args, **kwargs):'
164 "Fallback to sync execution.")
63 "Fallback to sync execution.")
165
64
166 # keep in mind there maybe a subtle race condition where something
65 # keep in mind there maybe a subtle race condition where something
167 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
66 # depending on rhodecode.CELERY_ENABLED
168 # will see CELERY_ENABLED as True before this has a chance to set False
67 # will see CELERY_ENABLED as True before this has a chance to set False
169 rhodecode.CELERY_ENABLED = celery_is_up
68 rhodecode.CELERY_ENABLED = celery_is_up
170 else:
69 else:
171 log.debug('executing task %s in sync mode', task)
70 log.debug('executing task %s:%s in sync mode', 'TASK', task)
172 return ResultWrapper(task(*args, **kwargs))
173
174
175 def __get_lockkey(func, *fargs, **fkwargs):
176 params = list(fargs)
177 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
178
179 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
180 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
181 return 'task_%s.lock' % (md5_safe(_lock_key),)
182
183
184 def locked_task(func):
185 def __wrapper(func, *fargs, **fkwargs):
186 lockkey = __get_lockkey(func, *fargs, **fkwargs)
187 lockkey_path = config['app_conf']['cache_dir']
188
189 log.info('running task with lockkey %s' % lockkey)
190 try:
191 l = DaemonLock(file_=jn(lockkey_path, lockkey))
192 ret = func(*fargs, **fkwargs)
193 l.release()
194 return ret
195 except LockHeld:
196 log.info('LockHeld')
197 return 'Task with key %s already running' % lockkey
198
199 return decorator(__wrapper, func)
200
201
71
202 def get_session():
72 return ResultWrapper(task(*args, **kwargs))
203 if rhodecode.CELERY_ENABLED:
204 utils.initialize_database(config)
205 sa = meta.Session()
206 return sa
207
208
209 def dbsession(func):
210 def __wrapper(func, *fargs, **fkwargs):
211 try:
212 ret = func(*fargs, **fkwargs)
213 return ret
214 finally:
215 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
216 meta.Session.remove()
217
218 return decorator(__wrapper, func)
219
220
221 def vcsconnection(func):
222 def __wrapper(func, *fargs, **fkwargs):
223 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
224 settings = rhodecode.PYRAMID_SETTINGS
225 backends = settings['vcs.backends']
226 for alias in rhodecode.BACKENDS.keys():
227 if alias not in backends:
228 del rhodecode.BACKENDS[alias]
229 utils.configure_vcs(settings)
230 connect_vcs(
231 settings['vcs.server'],
232 utils.get_vcs_server_protocol(settings))
233 ret = func(*fargs, **fkwargs)
234 return ret
235
236 return decorator(__wrapper, func)
@@ -23,38 +23,18 b' RhodeCode task modules, containing all t'
23 by celery daemon
23 by celery daemon
24 """
24 """
25
25
26
27 import os
26 import os
28 import logging
29
30 from celery.task import task
31
27
32 import rhodecode
28 import rhodecode
33 from rhodecode.lib import audit_logger
29 from rhodecode.lib import audit_logger
34 from rhodecode.lib.celerylib import (
30 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
36 get_session, vcsconnection, RhodecodeCeleryTask)
37 from rhodecode.lib.hooks_base import log_create_repository
31 from rhodecode.lib.hooks_base import log_create_repository
38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
32 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
39 from rhodecode.lib.utils import add_cache
40 from rhodecode.lib.utils2 import safe_int, str2bool
33 from rhodecode.lib.utils2 import safe_int, str2bool
41 from rhodecode.model.db import Repository, User
34 from rhodecode.model.db import Session, Repository, User
42
35
43
36
44 def get_logger(cls):
37 @async_task(ignore_result=True, base=RequestContextTask)
45 if rhodecode.CELERY_ENABLED:
46 try:
47 log = cls.get_logger()
48 except Exception:
49 log = logging.getLogger(__name__)
50 else:
51 log = logging.getLogger(__name__)
52
53 return log
54
55
56 @task(ignore_result=True, base=RhodecodeCeleryTask)
57 @dbsession
58 def send_email(recipients, subject, body='', html_body='', email_config=None):
38 def send_email(recipients, subject, body='', html_body='', email_config=None):
59 """
39 """
60 Sends an email with defined parameters from the .ini files.
40 Sends an email with defined parameters from the .ini files.
@@ -101,18 +81,15 b' def send_email(recipients, subject, body'
101 return True
81 return True
102
82
103
83
104 @task(ignore_result=True, base=RhodecodeCeleryTask)
84 @async_task(ignore_result=True, base=RequestContextTask)
105 @dbsession
106 @vcsconnection
107 def create_repo(form_data, cur_user):
85 def create_repo(form_data, cur_user):
108 from rhodecode.model.repo import RepoModel
86 from rhodecode.model.repo import RepoModel
109 from rhodecode.model.user import UserModel
87 from rhodecode.model.user import UserModel
110 from rhodecode.model.settings import SettingsModel
88 from rhodecode.model.settings import SettingsModel
111
89
112 log = get_logger(create_repo)
90 log = get_logger(create_repo)
113 DBS = get_session()
114
91
115 cur_user = UserModel(DBS)._get_user(cur_user)
92 cur_user = UserModel()._get_user(cur_user)
116 owner = cur_user
93 owner = cur_user
117
94
118 repo_name = form_data['repo_name']
95 repo_name = form_data['repo_name']
@@ -138,7 +115,7 b' def create_repo(form_data, cur_user):'
138 'enable_downloads', defs.get('repo_enable_downloads'))
115 'enable_downloads', defs.get('repo_enable_downloads'))
139
116
140 try:
117 try:
141 repo = RepoModel(DBS)._create_repo(
118 repo = RepoModel()._create_repo(
142 repo_name=repo_name_full,
119 repo_name=repo_name_full,
143 repo_type=repo_type,
120 repo_type=repo_type,
144 description=description,
121 description=description,
@@ -155,13 +132,13 b' def create_repo(form_data, cur_user):'
155 enable_downloads=enable_downloads,
132 enable_downloads=enable_downloads,
156 state=state
133 state=state
157 )
134 )
158 DBS.commit()
135 Session().commit()
159
136
160 # now create this repo on Filesystem
137 # now create this repo on Filesystem
161 RepoModel(DBS)._create_filesystem_repo(
138 RepoModel()._create_filesystem_repo(
162 repo_name=repo_name,
139 repo_name=repo_name,
163 repo_type=repo_type,
140 repo_type=repo_type,
164 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
141 repo_group=RepoModel()._get_repo_group(repo_group),
165 clone_uri=clone_uri,
142 clone_uri=clone_uri,
166 )
143 )
167 repo = Repository.get_by_repo_name(repo_name_full)
144 repo = Repository.get_by_repo_name(repo_name_full)
@@ -180,7 +157,7 b' def create_repo(form_data, cur_user):'
180 user=cur_user,
157 user=cur_user,
181 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
158 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
182
159
183 DBS.commit()
160 Session().commit()
184 except Exception:
161 except Exception:
185 log.warning('Exception occurred when creating repository, '
162 log.warning('Exception occurred when creating repository, '
186 'doing cleanup...', exc_info=True)
163 'doing cleanup...', exc_info=True)
@@ -188,8 +165,8 b' def create_repo(form_data, cur_user):'
188 repo = Repository.get_by_repo_name(repo_name_full)
165 repo = Repository.get_by_repo_name(repo_name_full)
189 if repo:
166 if repo:
190 Repository.delete(repo.repo_id)
167 Repository.delete(repo.repo_id)
191 DBS.commit()
168 Session().commit()
192 RepoModel(DBS)._delete_filesystem_repo(repo)
169 RepoModel()._delete_filesystem_repo(repo)
193 raise
170 raise
194
171
195 # it's an odd fix to make celery fail task when exception occurs
172 # it's an odd fix to make celery fail task when exception occurs
@@ -199,23 +176,17 b' def create_repo(form_data, cur_user):'
199 return True
176 return True
200
177
201
178
202 @task(ignore_result=True, base=RhodecodeCeleryTask)
179 @async_task(ignore_result=True, base=RequestContextTask)
203 @dbsession
204 @vcsconnection
205 def create_repo_fork(form_data, cur_user):
180 def create_repo_fork(form_data, cur_user):
206 """
181 """
207 Creates a fork of repository using internal VCS methods
182 Creates a fork of repository using internal VCS methods
208
209 :param form_data:
210 :param cur_user:
211 """
183 """
212 from rhodecode.model.repo import RepoModel
184 from rhodecode.model.repo import RepoModel
213 from rhodecode.model.user import UserModel
185 from rhodecode.model.user import UserModel
214
186
215 log = get_logger(create_repo_fork)
187 log = get_logger(create_repo_fork)
216 DBS = get_session()
217
188
218 cur_user = UserModel(DBS)._get_user(cur_user)
189 cur_user = UserModel()._get_user(cur_user)
219 owner = cur_user
190 owner = cur_user
220
191
221 repo_name = form_data['repo_name'] # fork in this case
192 repo_name = form_data['repo_name'] # fork in this case
@@ -230,8 +201,8 b' def create_repo_fork(form_data, cur_user'
230 fork_id = safe_int(form_data.get('fork_parent_id'))
201 fork_id = safe_int(form_data.get('fork_parent_id'))
231
202
232 try:
203 try:
233 fork_of = RepoModel(DBS)._get_repo(fork_id)
204 fork_of = RepoModel()._get_repo(fork_id)
234 RepoModel(DBS)._create_repo(
205 RepoModel()._create_repo(
235 repo_name=repo_name_full,
206 repo_name=repo_name_full,
236 repo_type=repo_type,
207 repo_type=repo_type,
237 description=description,
208 description=description,
@@ -244,16 +215,16 b' def create_repo_fork(form_data, cur_user'
244 copy_fork_permissions=copy_fork_permissions
215 copy_fork_permissions=copy_fork_permissions
245 )
216 )
246
217
247 DBS.commit()
218 Session().commit()
248
219
249 base_path = Repository.base_path()
220 base_path = Repository.base_path()
250 source_repo_path = os.path.join(base_path, fork_of.repo_name)
221 source_repo_path = os.path.join(base_path, fork_of.repo_name)
251
222
252 # now create this repo on Filesystem
223 # now create this repo on Filesystem
253 RepoModel(DBS)._create_filesystem_repo(
224 RepoModel()._create_filesystem_repo(
254 repo_name=repo_name,
225 repo_name=repo_name,
255 repo_type=repo_type,
226 repo_type=repo_type,
256 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
227 repo_group=RepoModel()._get_repo_group(repo_group),
257 clone_uri=source_repo_path,
228 clone_uri=source_repo_path,
258 )
229 )
259 repo = Repository.get_by_repo_name(repo_name_full)
230 repo = Repository.get_by_repo_name(repo_name_full)
@@ -274,7 +245,7 b' def create_repo_fork(form_data, cur_user'
274 user=cur_user,
245 user=cur_user,
275 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
246 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
276
247
277 DBS.commit()
248 Session().commit()
278 except Exception as e:
249 except Exception as e:
279 log.warning('Exception %s occurred when forking repository, '
250 log.warning('Exception %s occurred when forking repository, '
280 'doing cleanup...', e)
251 'doing cleanup...', e)
@@ -282,8 +253,8 b' def create_repo_fork(form_data, cur_user'
282 repo = Repository.get_by_repo_name(repo_name_full)
253 repo = Repository.get_by_repo_name(repo_name_full)
283 if repo:
254 if repo:
284 Repository.delete(repo.repo_id)
255 Repository.delete(repo.repo_id)
285 DBS.commit()
256 Session().commit()
286 RepoModel(DBS)._delete_filesystem_repo(repo)
257 RepoModel()._delete_filesystem_repo(repo)
287 raise
258 raise
288
259
289 # it's an odd fix to make celery fail task when exception occurs
260 # it's an odd fix to make celery fail task when exception occurs
@@ -291,3 +262,14 b' def create_repo_fork(form_data, cur_user'
291 pass
262 pass
292
263
293 return True
264 return True
265
266
267 @async_task(ignore_result=True)
268 def sync_repo(*args, **kwargs):
269 from rhodecode.model.scm import ScmModel
270 log = get_logger(sync_repo)
271
272 log.info('Pulling from %s', kwargs['repo_name'])
273 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
274
275
@@ -22,6 +22,7 b' import os'
22 from pyramid.compat import configparser
22 from pyramid.compat import configparser
23 from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa
23 from pyramid.paster import bootstrap as pyramid_bootstrap, setup_logging # noqa
24 from pyramid.request import Request
24 from pyramid.request import Request
25 from pyramid.scripting import prepare
25
26
26
27
27 def get_config(ini_path, **kwargs):
28 def get_config(ini_path, **kwargs):
@@ -47,3 +48,9 b' def bootstrap(config_uri, request=None, '
47 request = request or Request.blank('/', base_url=base_url)
48 request = request or Request.blank('/', base_url=base_url)
48
49
49 return pyramid_bootstrap(config_uri, request=request, options=options)
50 return pyramid_bootstrap(config_uri, request=request, options=options)
51
52
53 def prepare_request(environ):
54 request = Request.blank('/', environ=environ)
55 prepare(request) # set pyramid threadlocal request
56 return request
@@ -38,7 +38,6 b' from os.path import join as jn'
38
38
39 import paste
39 import paste
40 import pkg_resources
40 import pkg_resources
41 from paste.script.command import Command, BadCommand
42 from webhelpers.text import collapse, remove_formatting, strip_tags
41 from webhelpers.text import collapse, remove_formatting, strip_tags
43 from mako import exceptions
42 from mako import exceptions
44 from pyramid.threadlocal import get_current_registry
43 from pyramid.threadlocal import get_current_registry
@@ -767,85 +766,6 b' def create_test_repositories(test_path, '
767 tar.extractall(jn(test_path, SVN_REPO))
766 tar.extractall(jn(test_path, SVN_REPO))
768
767
769
768
770 #==============================================================================
771 # PASTER COMMANDS
772 #==============================================================================
773 class BasePasterCommand(Command):
774 """
775 Abstract Base Class for paster commands.
776
777 The celery commands are somewhat aggressive about loading
778 celery.conf, and since our module sets the `CELERY_LOADER`
779 environment variable to our loader, we have to bootstrap a bit and
780 make sure we've had a chance to load the pylons config off of the
781 command line, otherwise everything fails.
782 """
783 min_args = 1
784 min_args_error = "Please provide a paster config file as an argument."
785 takes_config_file = 1
786 requires_config_file = True
787
788 def notify_msg(self, msg, log=False):
789 """Make a notification to user, additionally if logger is passed
790 it logs this action using given logger
791
792 :param msg: message that will be printed to user
793 :param log: logging instance, to use to additionally log this message
794
795 """
796 if log and isinstance(log, logging):
797 log(msg)
798
799 def run(self, args):
800 """
801 Overrides Command.run
802
803 Checks for a config file argument and loads it.
804 """
805 if len(args) < self.min_args:
806 raise BadCommand(
807 self.min_args_error % {'min_args': self.min_args,
808 'actual_args': len(args)})
809
810 # Decrement because we're going to lob off the first argument.
811 # @@ This is hacky
812 self.min_args -= 1
813 self.bootstrap_config(args[0])
814 self.update_parser()
815 return super(BasePasterCommand, self).run(args[1:])
816
817 def update_parser(self):
818 """
819 Abstract method. Allows for the class' parser to be updated
820 before the superclass' `run` method is called. Necessary to
821 allow options/arguments to be passed through to the underlying
822 celery command.
823 """
824 raise NotImplementedError("Abstract Method.")
825
826 def bootstrap_config(self, conf):
827 """
828 Loads the pylons configuration.
829 """
830 from pylons import config as pylonsconfig
831
832 self.path_to_ini_file = os.path.realpath(conf)
833 conf = paste.deploy.appconfig('config:' + self.path_to_ini_file)
834 pylonsconfig.init_app(conf.global_conf, conf.local_conf)
835
836 def _init_session(self):
837 """
838 Inits SqlAlchemy Session
839 """
840 logging.config.fileConfig(self.path_to_ini_file)
841 from pylons import config
842 from rhodecode.config.utils import initialize_database
843
844 # get to remove repos !!
845 add_cache(config)
846 initialize_database(config)
847
848
849 def password_changed(auth_user, session):
769 def password_changed(auth_user, session):
850 # Never report password change in case of default user or anonymous user.
770 # Never report password change in case of default user or anonymous user.
851 if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None:
771 if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None:
@@ -23,7 +23,6 b''
23 Some simple helper functions
23 Some simple helper functions
24 """
24 """
25
25
26
27 import collections
26 import collections
28 import datetime
27 import datetime
29 import dateutil.relativedelta
28 import dateutil.relativedelta
@@ -42,7 +41,6 b' import sqlalchemy.engine.url'
42 import sqlalchemy.exc
41 import sqlalchemy.exc
43 import sqlalchemy.sql
42 import sqlalchemy.sql
44 import webob
43 import webob
45 import routes.util
46 import pyramid.threadlocal
44 import pyramid.threadlocal
47
45
48 import rhodecode
46 import rhodecode
@@ -941,31 +939,6 b' class Optional(object):'
941 return val
939 return val
942
940
943
941
944 def get_routes_generator_for_server_url(server_url):
945 parsed_url = urlobject.URLObject(server_url)
946 netloc = safe_str(parsed_url.netloc)
947 script_name = safe_str(parsed_url.path)
948
949 if ':' in netloc:
950 server_name, server_port = netloc.split(':')
951 else:
952 server_name = netloc
953 server_port = (parsed_url.scheme == 'https' and '443' or '80')
954
955 environ = {
956 'REQUEST_METHOD': 'GET',
957 'PATH_INFO': '/',
958 'SERVER_NAME': server_name,
959 'SERVER_PORT': server_port,
960 'SCRIPT_NAME': script_name,
961 }
962 if parsed_url.scheme == 'https':
963 environ['HTTPS'] = 'on'
964 environ['wsgi.url_scheme'] = 'https'
965
966 return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ)
967
968
969 def glob2re(pat):
942 def glob2re(pat):
970 """
943 """
971 Translate a shell PATTERN to a regular expression.
944 Translate a shell PATTERN to a regular expression.
@@ -448,3 +448,9 b' class TestGetEnabledHooks(object):'
448 ui_settings = []
448 ui_settings = []
449 result = utils.get_enabled_hook_classes(ui_settings)
449 result = utils.get_enabled_hook_classes(ui_settings)
450 assert result == []
450 assert result == []
451
452
453 def test_obfuscate_url_pw():
454 from rhodecode.lib.utils2 import obfuscate_url_pw
455 engine = u'/home/repos/malmö'
456 assert obfuscate_url_pw(engine) No newline at end of file
@@ -108,7 +108,6 b' def pytest_addoption(parser):'
108
108
109
109
110 def pytest_configure(config):
110 def pytest_configure(config):
111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 from rhodecode.config import patches
111 from rhodecode.config import patches
113
112
114
113
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now