##// END OF EJS Templates
celery: improve fetching of repo_check tasks....
marcink -
r2387:e641eb6b default
parent child Browse files
Show More
@@ -1,109 +1,113 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22
23 23 from pyramid.view import view_config
24 24 from pyramid.httpexceptions import HTTPFound, HTTPNotFound
25 25
26 26 from rhodecode.apps._base import BaseAppView
27 27 from rhodecode.lib import helpers as h
28 28 from rhodecode.lib.auth import (NotAnonymous, HasRepoPermissionAny)
29 29 from rhodecode.model.db import Repository
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class RepoChecksView(BaseAppView):
35 35 def load_default_context(self):
36 36 c = self._get_local_tmpl_context()
37 37
38 38 return c
39 39
40 40 @NotAnonymous()
41 41 @view_config(
42 42 route_name='repo_creating', request_method='GET',
43 43 renderer='rhodecode:templates/admin/repos/repo_creating.mako')
44 44 def repo_creating(self):
45 45 c = self.load_default_context()
46 46
47 47 repo_name = self.request.matchdict['repo_name']
48 48 db_repo = Repository.get_by_repo_name(repo_name)
49 49
50 50 # check if maybe repo is already created
51 51 if db_repo and db_repo.repo_state in [Repository.STATE_CREATED]:
52 52 # re-check permissions before redirecting to prevent resource
53 53 # discovery by checking the 302 code
54 54 perm_set = ['repository.read', 'repository.write', 'repository.admin']
55 55 has_perm = HasRepoPermissionAny(*perm_set)(
56 56 db_repo.repo_name, 'Repo Creating check')
57 57 if not has_perm:
58 58 raise HTTPNotFound()
59 59
60 60 raise HTTPFound(h.route_path(
61 61 'repo_summary', repo_name=db_repo.repo_name))
62 62
63 63 c.task_id = self.request.GET.get('task_id')
64 64 c.repo_name = repo_name
65 65
66 66 return self._get_template_context(c)
67 67
68 68 @NotAnonymous()
69 69 @view_config(
70 70 route_name='repo_creating_check', request_method='GET',
71 71 renderer='json_ext')
72 72 def repo_creating_check(self):
73 73 _ = self.request.translate
74 74 task_id = self.request.GET.get('task_id')
75 75 self.load_default_context()
76 76
77 77 repo_name = self.request.matchdict['repo_name']
78 78
79 79 if task_id and task_id not in ['None']:
80 80 import rhodecode
81 from rhodecode.lib.celerylib.loader import celery_app
81 from rhodecode.lib.celerylib.loader import celery_app, exceptions
82 82 if rhodecode.CELERY_ENABLED:
83 log.debug('celery: checking result for task:%s', task_id)
83 84 task = celery_app.AsyncResult(task_id)
84 task.get()
85 if task.failed():
85 try:
86 task.get(timeout=10)
87 except exceptions.TimeoutError:
88 task = None
89 if task and task.failed():
86 90 msg = self._log_creation_exception(task.result, repo_name)
87 91 h.flash(msg, category='error')
88 92 raise HTTPFound(h.route_path('home'), code=501)
89 93
90 94 db_repo = Repository.get_by_repo_name(repo_name)
91 95 if db_repo and db_repo.repo_state == Repository.STATE_CREATED:
92 96 if db_repo.clone_uri:
93 97 clone_uri = db_repo.clone_uri_hidden
94 98 h.flash(_('Created repository %s from %s')
95 99 % (db_repo.repo_name, clone_uri), category='success')
96 100 else:
97 101 repo_url = h.link_to(
98 102 db_repo.repo_name,
99 103 h.route_path('repo_summary', repo_name=db_repo.repo_name))
100 104 fork = db_repo.fork
101 105 if fork:
102 106 fork_name = fork.repo_name
103 107 h.flash(h.literal(_('Forked repository %s as %s')
104 108 % (fork_name, repo_url)), category='success')
105 109 else:
106 110 h.flash(h.literal(_('Created repository %s') % repo_url),
107 111 category='success')
108 112 return {'result': True}
109 113 return {'result': False}
@@ -1,256 +1,257 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
24 24 """
25 25 import os
26 26 import logging
27 27
28 28 from celery import Celery
29 29 from celery import signals
30 30 from celery import Task
31 from celery import exceptions # noqa
31 32 from kombu.serialization import register
32 33 from pyramid.threadlocal import get_current_request
33 34
34 35 import rhodecode
35 36
36 37 from rhodecode.lib.auth import AuthUser
37 38 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
38 39 from rhodecode.lib.ext_json import json
39 40 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
40 41 from rhodecode.lib.utils2 import str2bool
41 42 from rhodecode.model import meta
42 43
43 44
44 45 register('json_ext', json.dumps, json.loads,
45 46 content_type='application/x-json-ext',
46 47 content_encoding='utf-8')
47 48
48 49 log = logging.getLogger('celery.rhodecode.loader')
49 50
50 51
51 52 def add_preload_arguments(parser):
52 53 parser.add_argument(
53 54 '--ini', default=None,
54 55 help='Path to ini configuration file.'
55 56 )
56 57 parser.add_argument(
57 58 '--ini-var', default=None,
58 59 help='Comma separated list of key=value to pass to ini.'
59 60 )
60 61
61 62
62 63 def get_logger(obj):
63 64 custom_log = logging.getLogger(
64 65 'rhodecode.task.{}'.format(obj.__class__.__name__))
65 66
66 67 if rhodecode.CELERY_ENABLED:
67 68 try:
68 69 custom_log = obj.get_logger()
69 70 except Exception:
70 71 pass
71 72
72 73 return custom_log
73 74
74 75
75 76 base_celery_config = {
76 77 'result_backend': 'rpc://',
77 78 'result_expires': 60 * 60 * 24,
78 79 'result_persistent': True,
79 80 'imports': [],
80 81 'worker_max_tasks_per_child': 100,
81 82 'accept_content': ['json_ext'],
82 83 'task_serializer': 'json_ext',
83 84 'result_serializer': 'json_ext',
84 85 'worker_hijack_root_logger': False,
85 86 }
86 87 # init main celery app
87 88 celery_app = Celery()
88 89 celery_app.user_options['preload'].add(add_preload_arguments)
89 90 ini_file_glob = None
90 91
91 92
92 93 @signals.setup_logging.connect
93 94 def setup_logging_callback(**kwargs):
94 95 setup_logging(ini_file_glob)
95 96
96 97
97 98 @signals.user_preload_options.connect
98 99 def on_preload_parsed(options, **kwargs):
99 100 ini_location = options['ini']
100 101 ini_vars = options['ini_var']
101 102 celery_app.conf['INI_PYRAMID'] = options['ini']
102 103
103 104 if ini_location is None:
104 105 print('You must provide the paste --ini argument')
105 106 exit(-1)
106 107
107 108 options = None
108 109 if ini_vars is not None:
109 110 options = parse_ini_vars(ini_vars)
110 111
111 112 global ini_file_glob
112 113 ini_file_glob = ini_location
113 114
114 115 log.debug('Bootstrapping RhodeCode application...')
115 116 env = bootstrap(ini_location, options=options)
116 117
117 118 setup_celery_app(
118 119 app=env['app'], root=env['root'], request=env['request'],
119 120 registry=env['registry'], closer=env['closer'],
120 121 ini_location=ini_location)
121 122
122 123 # fix the global flag even if it's disabled via .ini file because this
123 124 # is a worker code that doesn't need this to be disabled.
124 125 rhodecode.CELERY_ENABLED = True
125 126
126 127
127 128 @signals.task_success.connect
128 129 def task_success_signal(result, **kwargs):
129 130 meta.Session.commit()
130 131 celery_app.conf['PYRAMID_CLOSER']()
131 132
132 133
133 134 @signals.task_retry.connect
134 135 def task_retry_signal(
135 136 request, reason, einfo, **kwargs):
136 137 meta.Session.remove()
137 138 celery_app.conf['PYRAMID_CLOSER']()
138 139
139 140
140 141 @signals.task_failure.connect
141 142 def task_failure_signal(
142 143 task_id, exception, args, kwargs, traceback, einfo, **kargs):
143 144 meta.Session.remove()
144 145 celery_app.conf['PYRAMID_CLOSER']()
145 146
146 147
147 148 @signals.task_revoked.connect
148 149 def task_revoked_signal(
149 150 request, terminated, signum, expired, **kwargs):
150 151 celery_app.conf['PYRAMID_CLOSER']()
151 152
152 153
153 154 def setup_celery_app(app, root, request, registry, closer, ini_location):
154 155 ini_dir = os.path.dirname(os.path.abspath(ini_location))
155 156 celery_config = base_celery_config
156 157 celery_config.update({
157 158 # store celerybeat scheduler db where the .ini file is
158 159 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
159 160 })
160 161 ini_settings = get_ini_config(ini_location)
161 162 log.debug('Got custom celery conf: %s', ini_settings)
162 163
163 164 celery_config.update(ini_settings)
164 165 celery_app.config_from_object(celery_config)
165 166
166 167 celery_app.conf.update({'PYRAMID_APP': app})
167 168 celery_app.conf.update({'PYRAMID_ROOT': root})
168 169 celery_app.conf.update({'PYRAMID_REQUEST': request})
169 170 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
170 171 celery_app.conf.update({'PYRAMID_CLOSER': closer})
171 172
172 173
173 174 def configure_celery(config, ini_location):
174 175 """
175 176 Helper that is called from our application creation logic. It gives
176 177 connection info into running webapp and allows execution of tasks from
177 178 RhodeCode itself
178 179 """
179 180 # store some globals into rhodecode
180 181 rhodecode.CELERY_ENABLED = str2bool(
181 182 config.registry.settings.get('use_celery'))
182 183 if rhodecode.CELERY_ENABLED:
183 184 log.info('Configuring celery based on `%s` file', ini_location)
184 185 setup_celery_app(
185 186 app=None, root=None, request=None, registry=config.registry,
186 187 closer=None, ini_location=ini_location)
187 188
188 189
189 190 class RequestContextTask(Task):
190 191 """
191 192 This is a celery task which will create a rhodecode app instance context
192 193 for the task, patch pyramid with the original request
193 194 that created the task and also add the user to the context.
194 195 """
195 196
196 197 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
197 198 link=None, link_error=None, shadow=None, **options):
198 199 """ queue the job to run (we are in web request context here) """
199 200
200 201 req = get_current_request()
201 202
202 203 # web case
203 204 if hasattr(req, 'user'):
204 205 ip_addr = req.user.ip_addr
205 206 user_id = req.user.user_id
206 207
207 208 # api case
208 209 elif hasattr(req, 'rpc_user'):
209 210 ip_addr = req.rpc_user.ip_addr
210 211 user_id = req.rpc_user.user_id
211 212 else:
212 213 raise Exception(
213 214 'Unable to fetch required data from request: {}. \n'
214 215 'This task is required to be executed from context of '
215 216 'request in a webapp'.format(repr(req)))
216 217
217 218 if req:
218 219 # we hook into kwargs since it is the only way to pass our data to
219 220 # the celery worker
220 221 options['headers'] = options.get('headers', {})
221 222 options['headers'].update({
222 223 'rhodecode_proxy_data': {
223 224 'environ': {
224 225 'PATH_INFO': req.environ['PATH_INFO'],
225 226 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
226 227 'HTTP_HOST': req.environ.get('HTTP_HOST',
227 228 req.environ['SERVER_NAME']),
228 229 'SERVER_NAME': req.environ['SERVER_NAME'],
229 230 'SERVER_PORT': req.environ['SERVER_PORT'],
230 231 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
231 232 },
232 233 'auth_user': {
233 234 'ip_addr': ip_addr,
234 235 'user_id': user_id
235 236 },
236 237 }
237 238 })
238 239
239 240 return super(RequestContextTask, self).apply_async(
240 241 args, kwargs, task_id, producer, link, link_error, shadow, **options)
241 242
242 243 def __call__(self, *args, **kwargs):
243 244 """ rebuild the context and then run task on celery worker """
244 245
245 246 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
246 247 if not proxy_data:
247 248 return super(RequestContextTask, self).__call__(*args, **kwargs)
248 249
249 250 log.debug('using celery proxy data to run task: %r', proxy_data)
250 251 # re-inject and register threadlocals for proper routing support
251 252 request = prepare_request(proxy_data['environ'])
252 253 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
253 254 ip_addr=proxy_data['auth_user']['ip_addr'])
254 255
255 256 return super(RequestContextTask, self).__call__(*args, **kwargs)
256 257
@@ -1,77 +1,79 b''
1 1 ## -*- coding: utf-8 -*-
2 2 <%inherit file="/base/base.mako"/>
3 3
4 4 <%def name="title()">
5 5 ${_('{} Creating repository').format(c.repo_name)}
6 6 %if c.rhodecode_name:
7 7 &middot; ${h.branding(c.rhodecode_name)}
8 8 %endif
9 9 </%def>
10 10
11 11 <%def name="breadcrumbs_links()">
12 12 ${_('Creating repository')} ${c.repo_name}
13 13 </%def>
14 14
15 15 <%def name="menu_bar_nav()">
16 16 ${self.menu_items(active='repositories')}
17 17 </%def>
18 18 <%def name="main()">
19 19 <div class="box">
20 20 <!-- box / title -->
21 21 <div class="title">
22 22 ${self.breadcrumbs()}
23 23 </div>
24 24
25 25 <div id="progress-message">
26 26 ${_('Repository "%(repo_name)s" is being created, you will be redirected when this process is finished.' % {'repo_name':c.repo_name})}
27 27 </div>
28 28
29 29 <div id="progress">
30 30 <div class="progress progress-striped active">
31 31 <div class="progress-bar progress-bar" role="progressbar"
32 32 aria-valuenow="100" aria-valuemin="0" aria-valuemax="100">
33 33 </div>
34 34 </div>
35 35 </div>
36 36 </div>
37 37
38 38 <script>
39 39 (function worker() {
40 40 var skipCheck = false;
41 41 var url = "${h.route_path('repo_creating_check', repo_name=c.repo_name, _query=dict(task_id=c.task_id))}";
42 42 $.ajax({
43 43 url: url,
44 timeout: 60*1000, // sets timeout to 60 seconds
44 45 complete: function(resp) {
45 if (resp.status == 200) {
46 if (resp.status === 200) {
46 47 var jsonResponse = resp.responseJSON;
47 48
48 49 if (jsonResponse === undefined) {
49 50 setTimeout(function () {
50 51 // we might have a backend problem, try dashboard again
51 52 window.location = "${h.route_path('repo_summary', repo_name = c.repo_name)}";
52 53 }, 3000);
53 54 } else {
54 55 if (skipCheck || jsonResponse.result === true) {
55 56 // success, means go to dashboard
56 57 window.location = "${h.route_path('repo_summary', repo_name = c.repo_name)}";
57 58 } else {
58 59 // Schedule the next request when the current one's complete
59 60 setTimeout(worker, 1000);
60 61 }
61 62 }
62 63 }
63 64 else {
65 var message = _gettext('Fetching repository state failed. Error code: {0} {1}. Try <a href="{2}">refreshing</a> this page.').format(resp.status, resp.statusText, url);
64 66 var payload = {
65 67 message: {
66 message: _gettext('Fetching repository state failed. Error code: {0} {1}. Try refreshing this page.').format(resp.status, resp.statusText),
68 message: message,
67 69 level: 'error',
68 70 force: true
69 71 }
70 72 };
71 73 $.Topic('/notifications').publish(payload);
72 74 }
73 75 }
74 76 });
75 77 })();
76 78 </script>
77 79 </%def>
General Comments 0
You need to be logged in to leave comments. Login now