# RhodeCode VCSServer provides access to different vcs backends via network. # Copyright (C) 2014-2023 RhodeCode GmbH # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import io import os import time import sys import datetime import msgpack import logging import traceback import tempfile import glob log = logging.getLogger(__name__) # NOTE: Any changes should be synced with exc_tracking at rhodecode.lib.exc_tracking global_prefix = 'vcsserver' exc_store_dir_name = 'rc_exception_store_v1' def exc_serialize(exc_id, tb, exc_type, extra_data=None): data = { "version": "v1", "exc_id": exc_id, "exc_utc_date": datetime.datetime.utcnow().isoformat(), "exc_timestamp": repr(time.time()), "exc_message": tb, "exc_type": exc_type, } if extra_data: data.update(extra_data) return msgpack.packb(data), data def exc_unserialize(tb): return msgpack.unpackb(tb) _exc_store = None def get_exc_store(): """ Get and create exception store if it's not existing """ global _exc_store if _exc_store is not None: # quick global cache return _exc_store import vcsserver as app exc_store_dir = ( app.CONFIG.get("exception_tracker.store_path", "") or tempfile.gettempdir() ) _exc_store_path = os.path.join(exc_store_dir, exc_store_dir_name) _exc_store_path = os.path.abspath(_exc_store_path) if not os.path.isdir(_exc_store_path): os.makedirs(_exc_store_path) log.debug("Initializing exceptions store at %s", _exc_store_path) _exc_store = _exc_store_path return _exc_store_path def get_detailed_tb(exc_info): try: from pip._vendor.rich import ( traceback as rich_tb, scope as rich_scope, console as rich_console, ) except ImportError: try: from rich import ( traceback as rich_tb, scope as rich_scope, console as rich_console, ) except ImportError: return None console = rich_console.Console(width=160, file=io.StringIO()) exc = rich_tb.Traceback.extract(*exc_info, show_locals=True) tb_rich = rich_tb.Traceback( trace=exc, width=160, extra_lines=3, theme=None, word_wrap=False, show_locals=False, max_frames=100, ) # last_stack = exc.stacks[-1] # last_frame = last_stack.frames[-1] # if last_frame and last_frame.locals: # console.print( # rich_scope.render_scope( # last_frame.locals, # title=f'{last_frame.filename}:{last_frame.lineno}')) console.print(tb_rich) formatted_locals = console.file.getvalue() return formatted_locals def get_request_metadata(request=None) -> dict: request_metadata = {} if not request: from pyramid.threadlocal import get_current_request request = get_current_request() # NOTE(marcink): store request information into exc_data if request: request_metadata["client_address"] = getattr(request, "client_addr", "") request_metadata["user_agent"] = getattr(request, "user_agent", "") request_metadata["method"] = getattr(request, "method", "") request_metadata["url"] = getattr(request, "url", "") return request_metadata def format_exc(exc_info, use_detailed_tb=True): exc_type, exc_value, exc_traceback = exc_info tb = "++ TRACEBACK ++\n\n" tb += "".join(traceback.format_exception(exc_type, exc_value, exc_traceback, None)) detailed_tb = getattr(exc_value, "_org_exc_tb", None) if detailed_tb: remote_tb = detailed_tb if isinstance(detailed_tb, str): remote_tb = [detailed_tb] tb += ( "\n+++ BEG SOURCE EXCEPTION +++\n\n" "{}\n" "+++ END SOURCE EXCEPTION +++\n" "".format("\n".join(remote_tb)) ) # Avoid that remote_tb also appears in the frame del remote_tb if use_detailed_tb: locals_tb = get_detailed_tb(exc_info) if locals_tb: tb += f"\n+++ DETAILS +++\n\n{locals_tb}\n" "" return tb def _store_exception(exc_id, exc_info, prefix, request_path=''): """ Low level function to store exception in the exception tracker """ extra_data = {} extra_data.update(get_request_metadata()) exc_type, exc_value, exc_traceback = exc_info tb = format_exc(exc_info) exc_type_name = exc_type.__name__ exc_data, org_data = exc_serialize(exc_id, tb, exc_type_name, extra_data=extra_data) exc_pref_id = f"{exc_id}_{prefix}_{org_data['exc_timestamp']}" exc_store_path = get_exc_store() if not os.path.isdir(exc_store_path): os.makedirs(exc_store_path) stored_exc_path = os.path.join(exc_store_path, exc_pref_id) with open(stored_exc_path, "wb") as f: f.write(exc_data) log.debug("Stored generated exception %s as: %s", exc_id, stored_exc_path) if request_path: log.error( 'error occurred handling this request.\n' 'Path: `%s`, %s', request_path, tb) def store_exception(exc_id, exc_info, prefix=global_prefix, request_path=''): """ Example usage:: exc_info = sys.exc_info() store_exception(id(exc_info), exc_info) """ try: exc_type = exc_info[0] exc_type_name = exc_type.__name__ _store_exception( exc_id=exc_id, exc_info=exc_info, prefix=prefix, request_path=request_path, ) return exc_id, exc_type_name except Exception: log.exception("Failed to store exception `%s` information", exc_id) # there's no way this can fail, it will crash server badly if it does. pass def _find_exc_file(exc_id, prefix=global_prefix): exc_store_path = get_exc_store() if prefix: exc_id = f"{exc_id}_{prefix}" else: # search without a prefix exc_id = f"{exc_id}" found_exc_id = None matches = glob.glob(os.path.join(exc_store_path, exc_id) + "*") if matches: found_exc_id = matches[0] return found_exc_id def _read_exception(exc_id, prefix): exc_id_file_path = _find_exc_file(exc_id=exc_id, prefix=prefix) if exc_id_file_path: with open(exc_id_file_path, "rb") as f: return exc_unserialize(f.read()) else: log.debug("Exception File `%s` not found", exc_id_file_path) return None def read_exception(exc_id, prefix=global_prefix): try: return _read_exception(exc_id=exc_id, prefix=prefix) except Exception: log.exception("Failed to read exception `%s` information", exc_id) # there's no way this can fail, it will crash server badly if it does. return None def delete_exception(exc_id, prefix=global_prefix): try: exc_id_file_path = _find_exc_file(exc_id, prefix=prefix) if exc_id_file_path: os.remove(exc_id_file_path) except Exception: log.exception("Failed to remove exception `%s` information", exc_id) # there's no way this can fail, it will crash server badly if it does. pass def generate_id(): return id(object())