##// END OF EJS Templates
celery: added test view to test celery exception handling
super-admin -
r5435:cd13bacd default
parent child Browse files
Show More
@@ -1,62 +1,71 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 from rhodecode.apps._base import ADMIN_PREFIX
20 20
21 21
22 22 def admin_routes(config):
23 23 from rhodecode.apps.ops.views import OpsView
24 24
25 25 config.add_route(
26 26 name='ops_ping',
27 27 pattern='/ping')
28 28 config.add_view(
29 29 OpsView,
30 30 attr='ops_ping',
31 31 route_name='ops_ping', request_method='GET',
32 32 renderer='json_ext')
33 33
34 34 config.add_route(
35 35 name='ops_error_test',
36 36 pattern='/error')
37 37 config.add_view(
38 38 OpsView,
39 39 attr='ops_error_test',
40 40 route_name='ops_error_test', request_method='GET',
41 41 renderer='json_ext')
42 42
43 43 config.add_route(
44 name='ops_celery_error_test',
45 pattern='/error-celery')
46 config.add_view(
47 OpsView,
48 attr='ops_celery_error_test',
49 route_name='ops_celery_error_test', request_method='GET',
50 renderer='json_ext')
51
52 config.add_route(
44 53 name='ops_redirect_test',
45 54 pattern='/redirect')
46 55 config.add_view(
47 56 OpsView,
48 57 attr='ops_redirect_test',
49 58 route_name='ops_redirect_test', request_method='GET',
50 59 renderer='json_ext')
51 60
52 61 config.add_route(
53 62 name='ops_healthcheck',
54 63 pattern='/status')
55 64 config.add_view(
56 65 OpsView,
57 66 attr='ops_healthcheck',
58 67 route_name='ops_healthcheck', request_method='GET',
59 68 renderer='json_ext')
60 69
61 70 def includeme(config):
62 71 config.include(admin_routes, route_prefix=ADMIN_PREFIX + '/ops')
@@ -1,94 +1,108 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import time
20 20 import logging
21 21
22 22
23 23 from pyramid.httpexceptions import HTTPFound
24 24
25 25 from rhodecode.apps._base import BaseAppView
26 26 from rhodecode.lib import helpers as h
27 27 from rhodecode.lib.auth import LoginRequired
28 28 from collections import OrderedDict
29 29 from rhodecode.model.db import UserApiKeys
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class OpsView(BaseAppView):
35 35
36 36 def load_default_context(self):
37 37 c = self._get_local_tmpl_context()
38 38 c.user = c.auth_user.get_instance()
39 39
40 40 return c
41 41
42 42 def ops_ping(self):
43 43 data = OrderedDict()
44 44 data['instance'] = self.request.registry.settings.get('instance_id')
45 45
46 46 if getattr(self.request, 'user'):
47 47 caller_name = 'anonymous'
48 48 if self.request.user.user_id:
49 49 caller_name = self.request.user.username
50 50
51 51 data['caller_ip'] = self.request.user.ip_addr
52 52 data['caller_name'] = caller_name
53 53
54 54 return {'ok': data}
55 55
56 56 def ops_error_test(self):
57 57 """
58 58 Test exception handling and emails on errors
59 59 """
60 60
61 61 class TestException(Exception):
62 62 pass
63 63 # add timeout so we add some sort of rate limiter
64 64 time.sleep(2)
65 65 msg = ('RhodeCode Enterprise test exception. '
66 66 'Client:{}. Generation time: {}.'.format(self.request.user, time.time()))
67 67 raise TestException(msg)
68 68
69 def ops_celery_error_test(self):
70 """
71 Test exception handling and emails on errors
72 """
73 from rhodecode.lib.celerylib import tasks, run_task
74
75 # add timeout so we add some sort of rate limiter
76 time.sleep(2)
77
78 msg = ('RhodeCode Enterprise test exception. '
79 'Client:{}. Generation time: {}.'.format(self.request.user, time.time()))
80 celery_task = run_task(tasks.test_celery_exception, msg)
81 return {'task': str(celery_task)}
82
69 83 def ops_redirect_test(self):
70 84 """
71 85 Test redirect handling
72 86 """
73 87 redirect_to = self.request.GET.get('to') or h.route_path('home')
74 88 raise HTTPFound(redirect_to)
75 89
76 90 @LoginRequired(auth_token_access=[UserApiKeys.ROLE_HTTP])
77 91 def ops_healthcheck(self):
78 92 from rhodecode.lib.system_info import load_system_info
79 93
80 94 vcsserver_info = load_system_info('vcs_server')
81 95 if vcsserver_info:
82 96 vcsserver_info = vcsserver_info['human_value']
83 97
84 98 db_info = load_system_info('database_info')
85 99 if db_info:
86 100 db_info = db_info['human_value']
87 101
88 102 health_spec = {
89 103 'caller_ip': self.request.user.ip_addr,
90 104 'vcsserver': vcsserver_info,
91 105 'db': db_info,
92 106 }
93 107
94 108 return {'healthcheck': health_spec}
@@ -1,449 +1,454 b''
1 1 # Copyright (C) 2012-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 RhodeCode task modules, containing all task that suppose to be run
21 21 by celery daemon
22 22 """
23 23
24 24 import os
25 25 import time
26 26
27 27 from pyramid_mailer.mailer import Mailer
28 28 from pyramid_mailer.message import Message
29 29 from email.utils import formatdate
30 30
31 31 import rhodecode
32 32 from rhodecode.lib import audit_logger
33 33 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task
34 34 from rhodecode.lib import hooks_base
35 35 from rhodecode.lib.utils import adopt_for_celery
36 36 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
37 37 from rhodecode.lib.statsd_client import StatsdClient
38 38 from rhodecode.model.db import (
39 39 true, null, Session, IntegrityError, Repository, RepoGroup, User)
40 40 from rhodecode.model.permission import PermissionModel
41 41
42 42
43 43 @async_task(ignore_result=True, base=RequestContextTask)
44 44 def send_email(recipients, subject, body='', html_body='', email_config=None,
45 45 extra_headers=None):
46 46 """
47 47 Sends an email with defined parameters from the .ini files.
48 48
49 49 :param recipients: list of recipients, it this is empty the defined email
50 50 address from field 'email_to' is used instead
51 51 :param subject: subject of the mail
52 52 :param body: body of the mail
53 53 :param html_body: html version of body
54 54 :param email_config: specify custom configuration for mailer
55 55 :param extra_headers: specify custom headers
56 56 """
57 57 log = get_logger(send_email)
58 58
59 59 email_config = email_config or rhodecode.CONFIG
60 60
61 61 mail_server = email_config.get('smtp_server') or None
62 62 if mail_server is None:
63 63 log.error("SMTP server information missing. Sending email failed. "
64 64 "Make sure that `smtp_server` variable is configured "
65 65 "inside the .ini file")
66 66 return False
67 67
68 68 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
69 69
70 70 if recipients:
71 71 if isinstance(recipients, str):
72 72 recipients = recipients.split(',')
73 73 else:
74 74 # if recipients are not defined we send to email_config + all admins
75 75 admins = []
76 76 for u in User.query().filter(User.admin == true()).all():
77 77 if u.email:
78 78 admins.append(u.email)
79 79 recipients = []
80 80 config_email = email_config.get('email_to')
81 81 if config_email:
82 82 recipients += [config_email]
83 83 recipients += admins
84 84
85 85 # translate our LEGACY config into the one that pyramid_mailer supports
86 86 email_conf = dict(
87 87 host=mail_server,
88 88 port=email_config.get('smtp_port', 25),
89 89 username=email_config.get('smtp_username'),
90 90 password=email_config.get('smtp_password'),
91 91
92 92 tls=str2bool(email_config.get('smtp_use_tls')),
93 93 ssl=str2bool(email_config.get('smtp_use_ssl')),
94 94
95 95 # SSL key file
96 96 # keyfile='',
97 97
98 98 # SSL certificate file
99 99 # certfile='',
100 100
101 101 # Location of maildir
102 102 # queue_path='',
103 103
104 104 default_sender=email_config.get('app_email_from', 'RhodeCode-noreply@rhodecode.com'),
105 105
106 106 debug=str2bool(email_config.get('smtp_debug')),
107 107 # /usr/sbin/sendmail Sendmail executable
108 108 # sendmail_app='',
109 109
110 110 # {sendmail_app} -t -i -f {sender} Template for sendmail execution
111 111 # sendmail_template='',
112 112 )
113 113
114 114 if extra_headers is None:
115 115 extra_headers = {}
116 116
117 117 extra_headers.setdefault('Date', formatdate(time.time()))
118 118
119 119 if 'thread_ids' in extra_headers:
120 120 thread_ids = extra_headers.pop('thread_ids')
121 121 extra_headers['References'] = ' '.join('<{}>'.format(t) for t in thread_ids)
122 122
123 123 try:
124 124 mailer = Mailer(**email_conf)
125 125
126 126 message = Message(subject=subject,
127 127 sender=email_conf['default_sender'],
128 128 recipients=recipients,
129 129 body=body, html=html_body,
130 130 extra_headers=extra_headers)
131 131 mailer.send_immediately(message)
132 132 statsd = StatsdClient.statsd
133 133 if statsd:
134 134 statsd.incr('rhodecode_email_sent_total')
135 135
136 136 except Exception:
137 137 log.exception('Mail sending failed')
138 138 return False
139 139 return True
140 140
141 141
142 142 @async_task(ignore_result=True, base=RequestContextTask)
143 143 def create_repo(form_data, cur_user):
144 144 from rhodecode.model.repo import RepoModel
145 145 from rhodecode.model.user import UserModel
146 146 from rhodecode.model.scm import ScmModel
147 147 from rhodecode.model.settings import SettingsModel
148 148
149 149 log = get_logger(create_repo)
150 150
151 151 cur_user = UserModel()._get_user(cur_user)
152 152 owner = cur_user
153 153
154 154 repo_name = form_data['repo_name']
155 155 repo_name_full = form_data['repo_name_full']
156 156 repo_type = form_data['repo_type']
157 157 description = form_data['repo_description']
158 158 private = form_data['repo_private']
159 159 clone_uri = form_data.get('clone_uri')
160 160 repo_group = safe_int(form_data['repo_group'])
161 161 copy_fork_permissions = form_data.get('copy_permissions')
162 162 copy_group_permissions = form_data.get('repo_copy_permissions')
163 163 fork_of = form_data.get('fork_parent_id')
164 164 state = form_data.get('repo_state', Repository.STATE_PENDING)
165 165
166 166 # repo creation defaults, private and repo_type are filled in form
167 167 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
168 168 enable_statistics = form_data.get(
169 169 'enable_statistics', defs.get('repo_enable_statistics'))
170 170 enable_locking = form_data.get(
171 171 'enable_locking', defs.get('repo_enable_locking'))
172 172 enable_downloads = form_data.get(
173 173 'enable_downloads', defs.get('repo_enable_downloads'))
174 174
175 175 # set landing rev based on default branches for SCM
176 176 landing_ref, _label = ScmModel.backend_landing_ref(repo_type)
177 177
178 178 try:
179 179 RepoModel()._create_repo(
180 180 repo_name=repo_name_full,
181 181 repo_type=repo_type,
182 182 description=description,
183 183 owner=owner,
184 184 private=private,
185 185 clone_uri=clone_uri,
186 186 repo_group=repo_group,
187 187 landing_rev=landing_ref,
188 188 fork_of=fork_of,
189 189 copy_fork_permissions=copy_fork_permissions,
190 190 copy_group_permissions=copy_group_permissions,
191 191 enable_statistics=enable_statistics,
192 192 enable_locking=enable_locking,
193 193 enable_downloads=enable_downloads,
194 194 state=state
195 195 )
196 196
197 197 Session().commit()
198 198
199 199 # now create this repo on Filesystem
200 200 RepoModel()._create_filesystem_repo(
201 201 repo_name=repo_name,
202 202 repo_type=repo_type,
203 203 repo_group=RepoModel()._get_repo_group(repo_group),
204 204 clone_uri=clone_uri,
205 205 )
206 206 repo = Repository.get_by_repo_name(repo_name_full)
207 207 hooks_base.create_repository(created_by=owner.username, **repo.get_dict())
208 208
209 209 # update repo commit caches initially
210 210 repo.update_commit_cache()
211 211
212 212 # set new created state
213 213 repo.set_state(Repository.STATE_CREATED)
214 214 repo_id = repo.repo_id
215 215 repo_data = repo.get_api_data()
216 216
217 217 audit_logger.store(
218 218 'repo.create', action_data={'data': repo_data},
219 219 user=cur_user,
220 220 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
221 221
222 222 Session().commit()
223 223
224 224 PermissionModel().trigger_permission_flush()
225 225
226 226 except Exception as e:
227 227 log.warning('Exception occurred when creating repository, '
228 228 'doing cleanup...', exc_info=True)
229 229 if isinstance(e, IntegrityError):
230 230 Session().rollback()
231 231
232 232 # rollback things manually !
233 233 repo = Repository.get_by_repo_name(repo_name_full)
234 234 if repo:
235 235 Repository.delete(repo.repo_id)
236 236 Session().commit()
237 237 RepoModel()._delete_filesystem_repo(repo)
238 238 log.info('Cleanup of repo %s finished', repo_name_full)
239 239 raise
240 240
241 241 return True
242 242
243 243
244 244 @async_task(ignore_result=True, base=RequestContextTask)
245 245 def create_repo_fork(form_data, cur_user):
246 246 """
247 247 Creates a fork of repository using internal VCS methods
248 248 """
249 249 from rhodecode.model.repo import RepoModel
250 250 from rhodecode.model.user import UserModel
251 251
252 252 log = get_logger(create_repo_fork)
253 253
254 254 cur_user = UserModel()._get_user(cur_user)
255 255 owner = cur_user
256 256
257 257 repo_name = form_data['repo_name'] # fork in this case
258 258 repo_name_full = form_data['repo_name_full']
259 259 repo_type = form_data['repo_type']
260 260 description = form_data['description']
261 261 private = form_data['private']
262 262 clone_uri = form_data.get('clone_uri')
263 263 repo_group = safe_int(form_data['repo_group'])
264 264 landing_ref = form_data['landing_rev']
265 265 copy_fork_permissions = form_data.get('copy_permissions')
266 266 fork_id = safe_int(form_data.get('fork_parent_id'))
267 267
268 268 try:
269 269 fork_of = RepoModel()._get_repo(fork_id)
270 270 RepoModel()._create_repo(
271 271 repo_name=repo_name_full,
272 272 repo_type=repo_type,
273 273 description=description,
274 274 owner=owner,
275 275 private=private,
276 276 clone_uri=clone_uri,
277 277 repo_group=repo_group,
278 278 landing_rev=landing_ref,
279 279 fork_of=fork_of,
280 280 copy_fork_permissions=copy_fork_permissions
281 281 )
282 282
283 283 Session().commit()
284 284
285 285 base_path = Repository.base_path()
286 286 source_repo_path = os.path.join(base_path, fork_of.repo_name)
287 287
288 288 # now create this repo on Filesystem
289 289 RepoModel()._create_filesystem_repo(
290 290 repo_name=repo_name,
291 291 repo_type=repo_type,
292 292 repo_group=RepoModel()._get_repo_group(repo_group),
293 293 clone_uri=source_repo_path,
294 294 )
295 295 repo = Repository.get_by_repo_name(repo_name_full)
296 296 hooks_base.create_repository(created_by=owner.username, **repo.get_dict())
297 297
298 298 # update repo commit caches initially
299 299 config = repo._config
300 300 config.set('extensions', 'largefiles', '')
301 301 repo.update_commit_cache(config=config)
302 302
303 303 # set new created state
304 304 repo.set_state(Repository.STATE_CREATED)
305 305
306 306 repo_id = repo.repo_id
307 307 repo_data = repo.get_api_data()
308 308 audit_logger.store(
309 309 'repo.fork', action_data={'data': repo_data},
310 310 user=cur_user,
311 311 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
312 312
313 313 Session().commit()
314 314 except Exception as e:
315 315 log.warning('Exception occurred when forking repository, '
316 316 'doing cleanup...', exc_info=True)
317 317 if isinstance(e, IntegrityError):
318 318 Session().rollback()
319 319
320 320 # rollback things manually !
321 321 repo = Repository.get_by_repo_name(repo_name_full)
322 322 if repo:
323 323 Repository.delete(repo.repo_id)
324 324 Session().commit()
325 325 RepoModel()._delete_filesystem_repo(repo)
326 326 log.info('Cleanup of repo %s finished', repo_name_full)
327 327 raise
328 328
329 329 return True
330 330
331 331
332 332 @async_task(ignore_result=True, base=RequestContextTask)
333 333 def repo_maintenance(repoid):
334 334 from rhodecode.lib import repo_maintenance as repo_maintenance_lib
335 335 log = get_logger(repo_maintenance)
336 336 repo = Repository.get_by_id_or_repo_name(repoid)
337 337 if repo:
338 338 maintenance = repo_maintenance_lib.RepoMaintenance()
339 339 tasks = maintenance.get_tasks_for_repo(repo)
340 340 log.debug('Executing %s tasks on repo `%s`', tasks, repoid)
341 341 executed_types = maintenance.execute(repo)
342 342 log.debug('Got execution results %s', executed_types)
343 343 else:
344 344 log.debug('Repo `%s` not found or without a clone_url', repoid)
345 345
346 346
347 347 @async_task(ignore_result=True, base=RequestContextTask)
348 348 def check_for_update(send_email_notification=True, email_recipients=None):
349 349 from rhodecode.model.update import UpdateModel
350 350 from rhodecode.model.notification import EmailNotificationModel
351 351
352 352 log = get_logger(check_for_update)
353 353 update_url = UpdateModel().get_update_url()
354 354 cur_ver = rhodecode.__version__
355 355
356 356 try:
357 357 data = UpdateModel().get_update_data(update_url)
358 358
359 359 current_ver = UpdateModel().get_stored_version(fallback=cur_ver)
360 360 latest_ver = data['versions'][0]['version']
361 361 UpdateModel().store_version(latest_ver)
362 362
363 363 if send_email_notification:
364 364 log.debug('Send email notification is enabled. '
365 365 'Current RhodeCode version: %s, latest known: %s', current_ver, latest_ver)
366 366 if UpdateModel().is_outdated(current_ver, latest_ver):
367 367
368 368 email_kwargs = {
369 369 'current_ver': current_ver,
370 370 'latest_ver': latest_ver,
371 371 }
372 372
373 373 (subject, email_body, email_body_plaintext) = EmailNotificationModel().render_email(
374 374 EmailNotificationModel.TYPE_UPDATE_AVAILABLE, **email_kwargs)
375 375
376 376 email_recipients = aslist(email_recipients, sep=',') or \
377 377 [user.email for user in User.get_all_super_admins()]
378 378 run_task(send_email, email_recipients, subject,
379 379 email_body_plaintext, email_body)
380 380
381 381 except Exception:
382 382 log.exception('Failed to check for update')
383 383 raise
384 384
385 385
386 386 def sync_last_update_for_objects(*args, **kwargs):
387 387 skip_repos = kwargs.get('skip_repos')
388 388 if not skip_repos:
389 389 repos = Repository.query() \
390 390 .order_by(Repository.group_id.asc())
391 391
392 392 for repo in repos:
393 393 repo.update_commit_cache()
394 394
395 395 skip_groups = kwargs.get('skip_groups')
396 396 if not skip_groups:
397 397 repo_groups = RepoGroup.query() \
398 398 .filter(RepoGroup.group_parent_id == null())
399 399
400 400 for root_gr in repo_groups:
401 401 for repo_gr in reversed(root_gr.recursive_groups()):
402 402 repo_gr.update_commit_cache()
403 403
404 404
405 405 @async_task(ignore_result=True, base=RequestContextTask)
406 def test_celery_exception(msg):
407 raise Exception(f'Test exception: {msg}')
408
409
410 @async_task(ignore_result=True, base=RequestContextTask)
406 411 def sync_last_update(*args, **kwargs):
407 412 sync_last_update_for_objects(*args, **kwargs)
408 413
409 414
410 415 @async_task(ignore_result=False)
411 416 def beat_check(*args, **kwargs):
412 417 log = get_logger(beat_check)
413 418 log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs)
414 419 return time.time()
415 420
416 421
417 422 @async_task
418 423 @adopt_for_celery
419 424 def repo_size(extras):
420 425 from rhodecode.lib.hooks_base import repo_size
421 426 return repo_size(extras)
422 427
423 428
424 429 @async_task
425 430 @adopt_for_celery
426 431 def pre_pull(extras):
427 432 from rhodecode.lib.hooks_base import pre_pull
428 433 return pre_pull(extras)
429 434
430 435
431 436 @async_task
432 437 @adopt_for_celery
433 438 def post_pull(extras):
434 439 from rhodecode.lib.hooks_base import post_pull
435 440 return post_pull(extras)
436 441
437 442
438 443 @async_task
439 444 @adopt_for_celery
440 445 def pre_push(extras):
441 446 from rhodecode.lib.hooks_base import pre_push
442 447 return pre_push(extras)
443 448
444 449
445 450 @async_task
446 451 @adopt_for_celery
447 452 def post_push(extras):
448 453 from rhodecode.lib.hooks_base import post_push
449 454 return post_push(extras)
General Comments 0
You need to be logged in to leave comments. Login now