# Copyright (C) 2011-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Set of diffing helpers, previously part of vcs """ import dataclasses import os import re import bz2 import gzip import time import difflib import logging import pickle from itertools import tee from rhodecode.lib.vcs.exceptions import VCSError from rhodecode.lib.vcs.nodes import FileNode, SubModuleNode from rhodecode.lib.vcs.backends import base from rhodecode.lib.str_utils import safe_str log = logging.getLogger(__name__) # define max context, a file with more than this numbers of lines is unusable # in browser anyway MAX_CONTEXT = 20 * 1024 DEFAULT_CONTEXT = 3 def get_diff_context(request): return MAX_CONTEXT if request.GET.get('fullcontext', '') == '1' else DEFAULT_CONTEXT def get_diff_whitespace_flag(request): return request.GET.get('ignorews', '') == '1' @dataclasses.dataclass class OPS: ADD: str = 'A' MOD: str = 'M' DEL: str = 'D' @dataclasses.dataclass class DiffLineNumber: old: int | None new: int | None def __iter__(self): yield self.old yield self.new def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3): """ Returns git style diff between given ``filenode_old`` and ``filenode_new``. :param ignore_whitespace: ignore whitespaces in diff """ # make sure we pass in default context context = context or 3 # protect against IntOverflow when passing HUGE context if context > MAX_CONTEXT: context = MAX_CONTEXT submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)] if submodules: return '' for filenode in (filenode_old, filenode_new): if not isinstance(filenode, FileNode): raise VCSError(f"Given object should be FileNode object, not {filenode.__class__}") repo = filenode_new.commit.repository old_commit = filenode_old.commit or repo.EMPTY_COMMIT new_commit = filenode_new.commit vcs_gitdiff = repo.get_diff( old_commit, new_commit, filenode_new.path, ignore_whitespace, context, path1=filenode_old.path) return vcs_gitdiff NEW_FILENODE = 1 DEL_FILENODE = 2 MOD_FILENODE = 3 RENAMED_FILENODE = 4 COPIED_FILENODE = 5 CHMOD_FILENODE = 6 BIN_FILENODE = 7 class LimitedDiffContainer(object): def __init__(self, diff_limit: int, cur_diff_size, diff): self.diff = diff self.diff_limit = diff_limit self.cur_diff_size = cur_diff_size def __getitem__(self, key): return self.diff.__getitem__(key) def __iter__(self): yield from self.diff class Action(object): """ Contains constants for the action value of the lines in a parsed diff. """ ADD = 'add' DELETE = 'del' UNMODIFIED = 'unmod' CONTEXT = 'context' OLD_NO_NL = 'old-no-nl' NEW_NO_NL = 'new-no-nl' class DiffProcessor(object): """ Give it a unified or git diff, and it returns a list of the files that were mentioned in the diff together with a dict of meta information that can be used to render it in an HTML template. .. note:: Unicode handling The original diffs are a byte sequence and can contain filenames in mixed encodings. This class generally returns `unicode` objects since the result is intended for presentation to the user. """ _chunk_re = re.compile(br'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)') _newline_marker = re.compile(br'^\\ No newline at end of file') # used for inline highlighter word split _token_re = re.compile(br'()(>|<|&|\W+?)') # collapse ranges of commits over given number _collapse_commits_over = 5 def __init__(self, diff: base.Diff, diff_format='gitdiff', diff_limit: int = 0, file_limit: int = 0, show_full_diff=True): """ :param diff: A `Diff` object representing a diff from a vcs backend :param diff_format: format of diff passed, `udiff` or `gitdiff` :param diff_limit: define the size of diff that is considered "big" based on that parameter cut off will be triggered, set to None to show full diff """ self._diff = diff self._format = diff_format self.adds = 0 self.removes = 0 # calculate diff size self.diff_limit = diff_limit self.file_limit = file_limit self.show_full_diff = show_full_diff self.cur_diff_size = 0 self.parsed = False self.parsed_diff = [] log.debug('Initialized DiffProcessor with %s mode', diff_format) self.differ = self._highlight_line_udiff self._parser = self._new_parse_gitdiff if diff_format == 'gitdiff': self.differ = self._highlight_line_difflib self._parser = self._parse_gitdiff raise DeprecationWarning('gitdiff usage is deprecated') def _copy_iterator(self): """ make a fresh copy of generator, we should not iterate thru an original as it's needed for repeating operations on this instance of DiffProcessor """ self.__udiff, iterator_copy = tee(self.__udiff) return iterator_copy def _escaper(self, diff_string): """ Escaper for diff escapes special chars and checks the diff limit :param string: """ self.cur_diff_size += len(diff_string) if not self.show_full_diff and (self.cur_diff_size > self.diff_limit): raise DiffLimitExceeded('Diff Limit Exceeded') return diff_string \ .replace(b'&', b'&')\ .replace(b'<', b'<')\ .replace(b'>', b'>') def _line_counter(self, diff_line): """ Checks each line and bumps total adds/removes for this diff :param diff_line: """ if diff_line.startswith(b'+') and not diff_line.startswith(b'+++'): self.adds += 1 elif diff_line.startswith(b'-') and not diff_line.startswith(b'---'): self.removes += 1 return diff_line def _highlight_line_difflib(self, line, next_): """ Highlight inline changes in both lines. """ if line['action'] == Action.DELETE: old, new = line, next_ else: old, new = next_, line oldwords = self._token_re.split(old['line']) newwords = self._token_re.split(new['line']) sequence = difflib.SequenceMatcher(None, oldwords, newwords) oldfragments, newfragments = [], [] for tag, i1, i2, j1, j2 in sequence.get_opcodes(): oldfrag = ''.join(oldwords[i1:i2]) newfrag = ''.join(newwords[j1:j2]) if tag != 'equal': if oldfrag: oldfrag = f'{oldfrag}' if newfrag: newfrag = f'{newfrag}' oldfragments.append(oldfrag) newfragments.append(newfrag) old['line'] = "".join(oldfragments) new['line'] = "".join(newfragments) def _highlight_line_udiff(self, line, next_): """ Highlight inline changes in both lines. """ start = 0 limit = min(len(line['line']), len(next_['line'])) while start < limit and line['line'][start] == next_['line'][start]: start += 1 end = -1 limit -= start while -end <= limit and line['line'][end] == next_['line'][end]: end -= 1 end += 1 if start or end: def do(l): last = end + len(l['line']) if l['action'] == Action.ADD: tag = 'ins' else: tag = 'del' l['line'] = f"{l['line'][:start]}<{tag}>{l['line'][start:last]}{l['line'][last:]}" do(line) do(next_) def _clean_line(self, line, command: str): if command in ['+', '-', ' ']: # only modify the line if it's actually a diff thing line = line[1:] return line def _parse_gitdiff(self, inline_diff=True): _files = [] def diff_container(arg): return arg for chunk in self._diff.chunks(): head = chunk.header diff = map(self._escaper, self.diff_splitter(chunk.diff)) raw_diff = chunk.raw limited_diff = False exceeds_limit = False op = None stats = { 'added': 0, 'deleted': 0, 'binary': False, 'ops': {}, } if head['deleted_file_mode']: op = OPS.DEL stats['binary'] = True stats['ops'][DEL_FILENODE] = 'deleted file' elif head['new_file_mode']: op = OPS.ADD stats['binary'] = True stats['ops'][NEW_FILENODE] = f"new file {safe_str(head['new_file_mode'])}" else: # modify operation, can be: copy, rename or chmod # CHMOD if head['new_mode'] and head['old_mode']: op = OPS.MOD stats['binary'] = True stats['ops'][CHMOD_FILENODE] = f"modified file chmod {safe_str(head['old_mode'])} => {safe_str(head['new_mode'])}" # RENAME if head['rename_from'] != head['rename_to']: op = OPS.MOD stats['binary'] = True stats['ops'][RENAMED_FILENODE] = f"file renamed from {safe_str(head['rename_from'])} to {safe_str(head['rename_to'])}" # COPY if head.get('copy_from') and head.get('copy_to'): op = OPS.MOD stats['binary'] = True stats['ops'][COPIED_FILENODE] = f"file copied from {safe_str(head['copy_from'])} to {safe_str(head['copy_to'])}" # If our new parsed headers didn't match anything fallback to # old style detection if op is None: if not head['a_file'] and head['b_file']: op = OPS.ADD stats['binary'] = True stats['ops'][NEW_FILENODE] = 'new file' elif head['a_file'] and not head['b_file']: op = OPS.DEL stats['binary'] = True stats['ops'][DEL_FILENODE] = 'deleted file' # it's not ADD not DELETE if op is None: op = OPS.MOD stats['binary'] = True stats['ops'][MOD_FILENODE] = 'modified file' # a real non-binary diff if head['a_file'] or head['b_file']: try: raw_diff, chunks, _stats = self._parse_lines(diff) stats['binary'] = False stats['added'] = _stats[0] stats['deleted'] = _stats[1] # explicit mark that it's a modified file if op == OPS.MOD: stats['ops'][MOD_FILENODE] = 'modified file' exceeds_limit = len(raw_diff) > self.file_limit # changed from _escaper function so we validate size of # each file instead of the whole diff # diff will hide big files but still show small ones # from my tests, big files are fairly safe to be parsed # but the browser is the bottleneck if not self.show_full_diff and exceeds_limit: raise DiffLimitExceeded('File Limit Exceeded') except DiffLimitExceeded: def diff_container(_diff): return LimitedDiffContainer(self.diff_limit, self.cur_diff_size, _diff) exceeds_limit = len(raw_diff) > self.file_limit limited_diff = True chunks = [] else: # GIT format binary patch, or possibly empty diff if head['bin_patch']: # we have operation already extracted, but we mark simply # it's a diff we won't show for binary files stats['ops'][BIN_FILENODE] = 'binary diff hidden' chunks = [] if chunks and not self.show_full_diff and op == OPS.DEL: # if not full diff mode show deleted file contents # TODO: anderson: if the view is not too big, there is no way # to see the content of the file chunks = [] frag = [{ 'old_lineno': '', 'new_lineno': '', 'action': Action.CONTEXT, 'line': msg, } for _op, msg in list(stats['ops'].items()) if _op not in [MOD_FILENODE]] chunks.insert(0, frag) _files.append({ 'filename': safe_str(head['b_path']), 'old_revision': head['a_blob_id'], 'new_revision': head['b_blob_id'], 'chunks': chunks, 'raw_diff': safe_str(raw_diff), 'operation': op, 'stats': stats, 'exceeds_limit': exceeds_limit, 'is_limited_diff': limited_diff, }) def operation_sorter(info): return {OPS.ADD: 0, OPS.MOD: 1, OPS.DEL: 2}.get(info['operation']) if not inline_diff: return diff_container(sorted(_files, key=operation_sorter)) # highlight inline changes for diff_data in _files: for chunk in diff_data['chunks']: lineiter = iter(chunk) try: while 1: line = next(lineiter) if line['action'] not in ( Action.UNMODIFIED, Action.CONTEXT): nextline = next(lineiter) if nextline['action'] in ['unmod', 'context'] or \ nextline['action'] == line['action']: continue self.differ(line, nextline) except StopIteration: pass return diff_container(sorted(_files, key=operation_sorter)) def _check_large_diff(self): if self.diff_limit: log.debug('Checking if diff exceeds current diff_limit of %s', self.diff_limit) if not self.show_full_diff and (self.cur_diff_size > self.diff_limit): raise DiffLimitExceeded(f'Diff Limit `{self.diff_limit}` Exceeded') # FIXME: NEWDIFFS: dan: this replaces _parse_gitdiff def _new_parse_gitdiff(self, inline_diff=True): _files = [] # this can be overridden later to a LimitedDiffContainer type def diff_container(arg): return arg for chunk in self._diff.chunks(): head = chunk.header_as_str log.debug('parsing diff chunk %r', chunk) raw_diff = chunk.raw limited_diff = False exceeds_limit = False op = None stats = { 'added': 0, 'deleted': 0, 'binary': False, 'old_mode': '', 'new_mode': '', 'ops': {}, } if head['old_mode']: stats['old_mode'] = head['old_mode'] if head['new_mode']: stats['new_mode'] = head['new_mode'] if head['b_mode']: stats['new_mode'] = head['b_mode'] # delete file if head['deleted_file_mode']: op = OPS.DEL stats['binary'] = True stats['ops'][DEL_FILENODE] = 'deleted file' # new file elif head['new_file_mode']: op = OPS.ADD stats['binary'] = True stats['old_mode'] = '' stats['new_mode'] = head['new_file_mode'] stats['ops'][NEW_FILENODE] = f"new file {head['new_file_mode']}" # modify operation, can be: copy, rename or chmod else: # CHMOD if head['new_mode'] and head['old_mode']: op = OPS.MOD stats['binary'] = True stats['ops'][CHMOD_FILENODE] = f"modified file chmod {head['old_mode']} => {head['new_mode']}" # RENAME if head['rename_from'] != head['rename_to']: op = OPS.MOD stats['binary'] = True stats['renamed'] = (head['rename_from'], head['rename_to']) stats['ops'][RENAMED_FILENODE] = f"file renamed from {head['rename_from']} to {head['rename_to']}" # COPY if head.get('copy_from') and head.get('copy_to'): op = OPS.MOD stats['binary'] = True stats['copied'] = (head['copy_from'], head['copy_to']) stats['ops'][COPIED_FILENODE] = f"file copied from {head['copy_from']} to {head['copy_to']}" # If our new parsed headers didn't match anything fallback to # old style detection if op is None: if not head['a_file'] and head['b_file']: op = OPS.ADD stats['binary'] = True stats['new_file'] = True stats['ops'][NEW_FILENODE] = 'new file' elif head['a_file'] and not head['b_file']: op = OPS.DEL stats['binary'] = True stats['ops'][DEL_FILENODE] = 'deleted file' # it's not ADD not DELETE if op is None: op = OPS.MOD stats['binary'] = True stats['ops'][MOD_FILENODE] = 'modified file' # a real non-binary diff if head['a_file'] or head['b_file']: # simulate splitlines, so we keep the line end part diff = self.diff_splitter(chunk.diff) # append each file to the diff size raw_chunk_size = len(raw_diff) exceeds_limit = raw_chunk_size > self.file_limit self.cur_diff_size += raw_chunk_size try: # Check each file instead of the whole diff. # Diff will hide big files but still show small ones. # From the tests big files are fairly safe to be parsed # but the browser is the bottleneck. if not self.show_full_diff and exceeds_limit: log.debug('File `%s` exceeds current file_limit of %s', head['b_path'], self.file_limit) raise DiffLimitExceeded(f'File Limit {self.file_limit} Exceeded') self._check_large_diff() raw_diff, chunks, _stats = self._new_parse_lines(diff) stats['binary'] = False stats['added'] = _stats[0] stats['deleted'] = _stats[1] # explicit mark that it's a modified file if op == OPS.MOD: stats['ops'][MOD_FILENODE] = 'modified file' except DiffLimitExceeded: def limited_diff_container(_diff): return LimitedDiffContainer(self.diff_limit, self.cur_diff_size, _diff) # re-definition of our container wrapper diff_container = limited_diff_container limited_diff = True chunks = [] else: # GIT format binary patch, or possibly empty diff if head['bin_patch']: # we have operation already extracted, but we mark simply # it's a diff we won't show for binary files stats['ops'][BIN_FILENODE] = 'binary diff hidden' chunks = [] # Hide content of deleted node by setting empty chunks if chunks and not self.show_full_diff and op == OPS.DEL: # if not full diff mode show deleted file contents # TODO: anderson: if the view is not too big, there is no way # to see the content of the file chunks = [] frag = [ {'old_lineno': '', 'new_lineno': '', 'action': Action.CONTEXT, 'line': msg, } for _op, msg in list(stats['ops'].items()) if _op not in [MOD_FILENODE]] chunks.insert(0, frag) original_filename = safe_str(head['a_path']) _files.append({ 'original_filename': original_filename, 'filename': safe_str(head['b_path']), 'old_revision': head['a_blob_id'], 'new_revision': head['b_blob_id'], 'chunks': chunks, 'raw_diff': safe_str(raw_diff), 'operation': op, 'stats': stats, 'exceeds_limit': exceeds_limit, 'is_limited_diff': limited_diff, }) def sorter(info): return {OPS.ADD: 0, OPS.MOD: 1, OPS.DEL: 2}.get(info['operation']) return diff_container(sorted(_files, key=sorter)) # FIXME: NEWDIFFS: dan: this gets replaced by _new_parse_lines def _parse_lines(self, diff_iter): """ Parse the diff an return data for the template. """ stats = [0, 0] chunks = [] raw_diff = [] try: line = next(diff_iter) while line: raw_diff.append(line) lines = [] chunks.append(lines) match = self._chunk_re.match(line) if not match: break gr = match.groups() (old_line, old_end, new_line, new_end) = [int(x or 1) for x in gr[:-1]] old_line -= 1 new_line -= 1 context = len(gr) == 5 old_end += old_line new_end += new_line if context: # skip context only if it's first line if int(gr[0]) > 1: lines.append({ 'old_lineno': '...', 'new_lineno': '...', 'action': Action.CONTEXT, 'line': line, }) line = next(diff_iter) while old_line < old_end or new_line < new_end: command = b' ' if line: command = line[0] affects_old = affects_new = False # ignore those if we don't expect them if command in b'#@': continue elif command == b'+': affects_new = True action = Action.ADD stats[0] += 1 elif command == b'-': affects_old = True action = Action.DELETE stats[1] += 1 else: affects_old = affects_new = True action = Action.UNMODIFIED if not self._newline_marker.match(line): old_line += affects_old new_line += affects_new lines.append({ 'old_lineno': affects_old and old_line or b'', 'new_lineno': affects_new and new_line or b'', 'action': action, 'line': self._clean_line(line, command) }) raw_diff.append(line) line = next(diff_iter) if self._newline_marker.match(line): # we need to append to lines, since this is not # counted in the line specs of diff lines.append({ 'old_lineno': '...', 'new_lineno': '...', 'action': Action.CONTEXT, 'line': self._clean_line(line, command) }) except StopIteration: pass return ''.join(raw_diff), chunks, stats # FIXME: NEWDIFFS: dan: this replaces _parse_lines def _new_parse_lines(self, diff_iter): """ Parse the diff an return data for the template. """ stats = [0, 0] chunks = [] raw_diff = [] try: line = next(diff_iter) assert isinstance(line, bytes) while line: raw_diff.append(line) # match header e.g @@ -0,0 +1 @@\n' match = self._chunk_re.match(line) if not match: break gr = match.groups() (old_line, old_end, new_line, new_end) = [int(x or 1) for x in gr[:-1]] lines = [] hunk = { 'section_header': gr[-1], 'source_start': old_line, 'source_length': old_end, 'target_start': new_line, 'target_length': new_end, 'lines': lines, } chunks.append(hunk) old_line -= 1 new_line -= 1 len(gr) == 5 old_end += old_line new_end += new_line line = next(diff_iter) while old_line < old_end or new_line < new_end: command = ' ' if line: # This is bytes, so we need to convert it to a str command: str = chr(line[0]) affects_old = affects_new = False # ignore those if we don't expect them if command in '#@': continue elif command == '+': affects_new = True action = Action.ADD stats[0] += 1 elif command == '-': affects_old = True action = Action.DELETE stats[1] += 1 else: affects_old = affects_new = True action = Action.UNMODIFIED if not self._newline_marker.match(line): old_line += affects_old new_line += affects_new lines.append({ 'old_lineno': affects_old and old_line or None, 'new_lineno': affects_new and new_line or None, 'action': action, 'line': self._clean_line(line, command) }) raw_diff.append(line) line = next(diff_iter) if self._newline_marker.match(line): # we need to append to lines, since this is not # counted in the line specs of diff if affects_old: action = Action.OLD_NO_NL elif affects_new: action = Action.NEW_NO_NL else: raise Exception('invalid context for no newline') lines.append({ 'old_lineno': None, 'new_lineno': None, 'action': action, 'line': self._clean_line(line, command) }) except StopIteration: pass return b''.join(raw_diff), chunks, stats def _safe_id(self, idstring): r"""Make a string safe for including in an id attribute. The HTML spec says that id attributes 'must begin with a letter ([A-Za-z]) and may be followed by any number of letters, digits ([0-9]), hyphens ("-"), underscores ("_"), colons (":"), and periods (".")'. These regexps are slightly over-zealous, in that they remove colons and periods unnecessarily. Whitespace is transformed into underscores, and then anything which is not a hyphen or a character that matches \w (alphanumerics and underscore) is removed. """ # Transform all whitespace to underscore idstring = re.sub(r'\s', "_", f'{idstring}') # Remove everything that is not a hyphen or a member of \w idstring = re.sub(r'(?!-)\W', "", idstring).lower() return idstring @classmethod def diff_splitter(cls, diff_string: bytes): """ Diff split that emulates .splitlines() but works only on \n """ if not diff_string: return elif diff_string == b'\n': yield b'\n' else: has_newline = diff_string.endswith(b'\n') elements = diff_string.split(b'\n') if has_newline: # skip last element as it's empty string from newlines elements = elements[:-1] len_elements = len(elements) for cnt, line in enumerate(elements, start=1): last_line = cnt == len_elements if last_line and not has_newline: yield line else: yield line + b'\n' def prepare(self, inline_diff=True): """ Prepare the passed udiff for HTML rendering. :return: A list of dicts with diff information. """ parsed = self._parser(inline_diff=inline_diff) self.parsed = True self.parsed_diff = parsed return parsed def as_raw(self, diff_lines=None): """ Returns raw diff as a byte string """ return self._diff.raw.tobytes() def stat(self): """ Returns tuple of added, and removed lines for this instance """ return self.adds, self.removes def get_context_of_line( self, path, diff_line: DiffLineNumber = None, context_before: int = 3, context_after: int = 3): """ Returns the context lines for the specified diff line. """ assert self.parsed, "DiffProcessor is not initialized." if None not in diff_line: raise ValueError(f"Cannot specify both line numbers in diff_line: {diff_line}") file_diff = self._get_file_diff(path) chunk, idx = self._find_chunk_line_index(file_diff, diff_line) first_line_to_include = max(idx - context_before, 0) first_line_after_context = idx + context_after + 1 context_lines = chunk['lines'][first_line_to_include:first_line_after_context] line_contents = [ _context_line(line) for line in context_lines if _is_diff_content(line) ] # TODO: johbo: Interim fixup, the diff chunks drop the final newline. # Once they are fixed, we can drop this line here. if line_contents: line_contents[-1] = ( line_contents[-1][0], line_contents[-1][1].rstrip(b'\n') + b'\n') return line_contents def find_context(self, path, context, offset=0): """ Finds the given `context` inside of the diff. Use the parameter `offset` to specify which offset the target line has inside of the given `context`. This way the correct diff line will be returned. :param offset: Shall be used to specify the offset of the main line within the given `context`. """ if offset < 0 or offset >= len(context): raise ValueError( "Only positive values up to the length of the context " "minus one are allowed.") matches = [] file_diff = self._get_file_diff(path) for chunk in file_diff['chunks']: if not isinstance(chunk, dict): continue context_iter = iter(context) for line_idx, line in enumerate(chunk['lines']): try: if _context_line(line) == next(context_iter): continue except StopIteration: matches.append((line_idx, chunk)) context_iter = iter(context) # Increment position and triger StopIteration # if we had a match at the end line_idx += 1 try: next(context_iter) except StopIteration: matches.append((line_idx, chunk)) effective_offset = len(context) - offset found_at_diff_lines = [ _line_to_diff_line_number(chunk['lines'][idx - effective_offset]) for idx, chunk in matches] return found_at_diff_lines def _get_file_diff(self, path): for file_diff in self.parsed_diff: if file_diff['filename'] == path: break else: raise FileNotInDiffException(f"File {path} not in diff") return file_diff def _find_chunk_line_index(self, file_diff, diff_line): for chunk in file_diff['chunks']: if not isinstance(chunk, dict): continue for line_idx, line in enumerate(chunk['lines']): if diff_line.old and line['old_lineno'] == diff_line.old: return chunk, line_idx if diff_line.new and line['new_lineno'] == diff_line.new: return chunk, line_idx raise LineNotInDiffException(f"The line {diff_line} is not part of the diff.") def _is_diff_content(line): return line['action'] in ( Action.UNMODIFIED, Action.ADD, Action.DELETE) def _context_line(line): return line['action'], line['line'] def _line_to_diff_line_number(line): new_line_no = line['new_lineno'] or None old_line_no = line['old_lineno'] or None return DiffLineNumber(old=old_line_no, new=new_line_no) class FileNotInDiffException(Exception): """ Raised when the context for a missing file is requested. If you request the context for a line in a file which is not part of the given diff, then this exception is raised. """ class LineNotInDiffException(Exception): """ Raised when the context for a missing line is requested. If you request the context for a line in a file and this line is not part of the given diff, then this exception is raised. """ class DiffLimitExceeded(Exception): pass # NOTE(marcink): if diffs.mako change, probably this # needs a bump to next version CURRENT_DIFF_VERSION = 'v5' def _cleanup_cache_file(cached_diff_file): # cleanup file to not store it "damaged" try: os.remove(cached_diff_file) except Exception: log.exception('Failed to cleanup path %s', cached_diff_file) def _get_compression_mode(cached_diff_file): mode = 'bz2' if 'mode:plain' in cached_diff_file: mode = 'plain' elif 'mode:gzip' in cached_diff_file: mode = 'gzip' return mode def cache_diff(cached_diff_file, diff, commits): compression_mode = _get_compression_mode(cached_diff_file) struct = { 'version': CURRENT_DIFF_VERSION, 'diff': diff, 'commits': commits } start = time.time() try: if compression_mode == 'plain': with open(cached_diff_file, 'wb') as f: pickle.dump(struct, f) elif compression_mode == 'gzip': with gzip.GzipFile(cached_diff_file, 'wb') as f: pickle.dump(struct, f) else: with bz2.BZ2File(cached_diff_file, 'wb') as f: pickle.dump(struct, f) except Exception: log.warning('Failed to save cache', exc_info=True) _cleanup_cache_file(cached_diff_file) log.debug('Saved diff cache under %s in %.4fs', cached_diff_file, time.time() - start) def load_cached_diff(cached_diff_file): compression_mode = _get_compression_mode(cached_diff_file) default_struct = { 'version': CURRENT_DIFF_VERSION, 'diff': None, 'commits': None } has_cache = os.path.isfile(cached_diff_file) if not has_cache: log.debug('Reading diff cache file failed %s', cached_diff_file) return default_struct data = None start = time.time() try: if compression_mode == 'plain': with open(cached_diff_file, 'rb') as f: data = pickle.load(f) elif compression_mode == 'gzip': with gzip.GzipFile(cached_diff_file, 'rb') as f: data = pickle.load(f) else: with bz2.BZ2File(cached_diff_file, 'rb') as f: data = pickle.load(f) except Exception: log.warning('Failed to read diff cache file', exc_info=True) if not data: data = default_struct if not isinstance(data, dict): # old version of data ? data = default_struct # check version if data.get('version') != CURRENT_DIFF_VERSION: # purge cache _cleanup_cache_file(cached_diff_file) return default_struct log.debug('Loaded diff cache from %s in %.4fs', cached_diff_file, time.time() - start) return data def generate_diff_cache_key(*args): """ Helper to generate a cache key using arguments """ def arg_mapper(input_param): input_param = safe_str(input_param) # we cannot allow '/' in arguments since it would allow # subdirectory usage input_param.replace('/', '_') return input_param or None # prevent empty string arguments return '_'.join([ '{}' for _i in range(len(args))]).format(*list(map(arg_mapper, args))) def diff_cache_exist(cache_storage, *args): """ Based on all generated arguments check and return a cache path """ args = list(args) + ['mode:gzip'] cache_key = generate_diff_cache_key(*args) cache_file_path = os.path.join(cache_storage, cache_key) # prevent path traversal attacks using some param that have e.g '../../' if not os.path.abspath(cache_file_path).startswith(cache_storage): raise ValueError(f'Final path must be within {cache_storage}') return cache_file_path