# Copyright (C) 2018-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import logging from pyramid.httpexceptions import HTTPFound from rhodecode.apps._base import BaseAppView from rhodecode.apps._base.navigation import navigation_list from rhodecode.lib import helpers as h from rhodecode.lib.auth import ( LoginRequired, HasPermissionAllDecorator, CSRFRequired) from rhodecode.lib.utils2 import time_to_utcdatetime, safe_int from rhodecode.lib import exc_tracking log = logging.getLogger(__name__) class ExceptionsTrackerView(BaseAppView): def load_default_context(self): c = self._get_local_tmpl_context() c.navlist = navigation_list(self.request) return c def count_all_exceptions(self): exc_store_path = exc_tracking.get_exc_store() count = 0 for fname in os.listdir(exc_store_path): parts = fname.split('_', 2) if not len(parts) == 3: continue count +=1 return count def get_all_exceptions(self, read_metadata=False, limit=None, type_filter=None): exc_store_path = exc_tracking.get_exc_store() exception_list = [] def key_sorter(val): try: return val.split('_')[-1] except Exception: return 0 for fname in reversed(sorted(os.listdir(exc_store_path), key=key_sorter)): parts = fname.split('_', 2) if not len(parts) == 3: continue exc_id, app_type, exc_timestamp = parts exc = {'exc_id': exc_id, 'app_type': app_type, 'exc_type': 'unknown', 'exc_utc_date': '', 'exc_timestamp': exc_timestamp} if read_metadata: full_path = os.path.join(exc_store_path, fname) if not os.path.isfile(full_path): continue try: # we can read our metadata with open(full_path, 'rb') as f: exc_metadata = exc_tracking.exc_unserialize(f.read()) exc.update(exc_metadata) except Exception: log.exception(f'Failed to read exc data from:{full_path}') pass # convert our timestamp to a date obj, for nicer representation exc['exc_utc_date'] = time_to_utcdatetime(exc['exc_timestamp']) type_present = exc.get('exc_type') if type_filter: if type_present and type_present == type_filter: exception_list.append(exc) else: exception_list.append(exc) if limit and len(exception_list) >= limit: break return exception_list @LoginRequired() @HasPermissionAllDecorator('hg.admin') def browse_exceptions(self): _ = self.request.translate c = self.load_default_context() c.active = 'exceptions_browse' c.limit = safe_int(self.request.GET.get('limit')) or 50 c.type_filter = self.request.GET.get('type_filter') c.next_limit = c.limit + 50 c.exception_list = self.get_all_exceptions( read_metadata=True, limit=c.limit, type_filter=c.type_filter) c.exception_list_count = self.count_all_exceptions() c.exception_store_dir = exc_tracking.get_exc_store() return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') def exception_show(self): _ = self.request.translate c = self.load_default_context() c.active = 'exceptions' c.exception_id = self.request.matchdict['exception_id'] c.traceback = exc_tracking.read_exception(c.exception_id, prefix=None) return self._get_template_context(c) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def exception_delete_all(self): _ = self.request.translate c = self.load_default_context() type_filter = self.request.POST.get('type_filter') c.active = 'exceptions' all_exc = self.get_all_exceptions(read_metadata=bool(type_filter), type_filter=type_filter) exc_count = 0 for exc in all_exc: if type_filter: if exc.get('exc_type') == type_filter: exc_tracking.delete_exception(exc['exc_id'], prefix=None) exc_count += 1 else: exc_tracking.delete_exception(exc['exc_id'], prefix=None) exc_count += 1 h.flash(_('Removed {} Exceptions').format(exc_count), category='success') raise HTTPFound(h.route_path('admin_settings_exception_tracker')) @LoginRequired() @HasPermissionAllDecorator('hg.admin') @CSRFRequired() def exception_delete(self): _ = self.request.translate c = self.load_default_context() c.active = 'exceptions' c.exception_id = self.request.matchdict['exception_id'] exc_tracking.delete_exception(c.exception_id, prefix=None) h.flash(_('Removed Exception {}').format(c.exception_id), category='success') raise HTTPFound(h.route_path('admin_settings_exception_tracker'))