# Copyright (C) 2010-2020 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import csv import datetime import pytest from rhodecode.tests import * from rhodecode.tests.fixture import FIXTURES from rhodecode.model.db import UserLog from rhodecode.model.meta import Session from rhodecode.lib.utils2 import safe_unicode def route_path(name, params=None, **kwargs): import urllib.request, urllib.parse, urllib.error from rhodecode.apps._base import ADMIN_PREFIX base_url = { 'admin_home': ADMIN_PREFIX, 'admin_audit_logs': ADMIN_PREFIX + '/audit_logs', }[name].format(**kwargs) if params: base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params)) return base_url @pytest.mark.usefixtures('app') class TestAdminController(object): @pytest.fixture(scope='class', autouse=True) def prepare(self, request, baseapp): UserLog.query().delete() Session().commit() def strptime(val): fmt = '%Y-%m-%d %H:%M:%S' if '.' not in val: return datetime.datetime.strptime(val, fmt) nofrag, frag = val.split(".") date = datetime.datetime.strptime(nofrag, fmt) frag = frag[:6] # truncate to microseconds frag += (6 - len(frag)) * '0' # add 0s return date.replace(microsecond=int(frag)) with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f: for row in csv.DictReader(f): ul = UserLog() for k, v in row.items(): v = safe_unicode(v) if k == 'action_date': v = strptime(v) if k in ['user_id', 'repository_id']: # nullable due to FK problems v = None setattr(ul, k, v) Session().add(ul) Session().commit() @request.addfinalizer def cleanup(): UserLog.query().delete() Session().commit() def test_index(self, autologin_user): response = self.app.get(route_path('admin_audit_logs')) response.mustcontain('Admin audit logs') def test_filter_all_entries(self, autologin_user): response = self.app.get(route_path('admin_audit_logs')) all_count = UserLog.query().count() response.mustcontain('%s entries' % all_count) def test_filter_journal_filter_exact_match_on_repository(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:rhodecode'))) response.mustcontain('3 entries') def test_filter_journal_filter_exact_match_on_repository_CamelCase(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:RhodeCode'))) response.mustcontain('3 entries') def test_filter_journal_filter_wildcard_on_repository(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:*test*'))) response.mustcontain('862 entries') def test_filter_journal_filter_prefix_on_repository(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:test*'))) response.mustcontain('257 entries') def test_filter_journal_filter_prefix_on_repository_CamelCase(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:Test*'))) response.mustcontain('257 entries') def test_filter_journal_filter_prefix_on_repository_and_user(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:test* AND username:demo'))) response.mustcontain('130 entries') def test_filter_journal_filter_prefix_on_repository_or_target_repo(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='repository:test* OR repository:rhodecode'))) response.mustcontain('260 entries') # 257 + 3 def test_filter_journal_filter_exact_match_on_username(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='username:demo'))) response.mustcontain('1087 entries') def test_filter_journal_filter_exact_match_on_username_camelCase(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='username:DemO'))) response.mustcontain('1087 entries') def test_filter_journal_filter_wildcard_on_username(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='username:*test*'))) entries_count = UserLog.query().filter(UserLog.username.ilike('%test%')).count() response.mustcontain('{} entries'.format(entries_count)) def test_filter_journal_filter_prefix_on_username(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='username:demo*'))) response.mustcontain('1101 entries') def test_filter_journal_filter_prefix_on_user_or_other_user(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='username:demo OR username:volcan'))) response.mustcontain('1095 entries') # 1087 + 8 def test_filter_journal_filter_wildcard_on_action(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='action:*pull_request*'))) response.mustcontain('187 entries') def test_filter_journal_filter_on_date(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='date:20121010'))) response.mustcontain('47 entries') def test_filter_journal_filter_on_date_2(self, autologin_user): response = self.app.get(route_path('admin_audit_logs', params=dict(filter='date:20121020'))) response.mustcontain('17 entries')