__init__.py
260 lines
| 8.2 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r1271 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import time | ||||
import logging | ||||
import datetime | ||||
import hashlib | ||||
import tempfile | ||||
from os.path import join as jn | ||||
from tempfile import _RandomNameSequence | ||||
from paste.deploy import loadapp | ||||
from paste.script.appinstall import SetupCommand | ||||
import pylons | ||||
import pylons.test | ||||
from pylons import config, url | ||||
from pylons.i18n.translation import _get_translator | ||||
from pylons.util import ContextObj | ||||
from routes.util import URLGenerator | ||||
from nose.plugins.skip import SkipTest | ||||
import pytest | ||||
from rhodecode import is_windows | ||||
r40 | from rhodecode.config.routing import ADMIN_PREFIX | |||
r1 | from rhodecode.model.meta import Session | |||
from rhodecode.model.db import User | ||||
from rhodecode.lib import auth | ||||
from rhodecode.lib.helpers import flash, link_to | ||||
from rhodecode.lib.utils2 import safe_unicode, safe_str | ||||
r41 | from rhodecode.tests.utils import get_session_from_response | |||
r1 | ||||
# TODO: johbo: Solve time zone related issues and remove this tweak | ||||
os.environ['TZ'] = 'UTC' | ||||
if not is_windows: | ||||
time.tzset() | ||||
log = logging.getLogger(__name__) | ||||
__all__ = [ | ||||
'get_new_dir', 'TestController', 'SkipTest', | ||||
'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches', | ||||
'assert_session_flash', 'login_user', | ||||
'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO', | ||||
'NEW_HG_REPO', 'NEW_GIT_REPO', | ||||
'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS', | ||||
'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS', | ||||
'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN', | ||||
'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', | ||||
'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', | ||||
'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS', | ||||
] | ||||
# Invoke websetup with the current config file | ||||
# SetupCommand('setup-app').run([config_file]) | ||||
# SOME GLOBALS FOR TESTS | ||||
TEST_DIR = tempfile.gettempdir() | ||||
TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next()) | ||||
TEST_USER_ADMIN_LOGIN = 'test_admin' | ||||
TEST_USER_ADMIN_PASS = 'test12' | ||||
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com' | ||||
TEST_USER_REGULAR_LOGIN = 'test_regular' | ||||
TEST_USER_REGULAR_PASS = 'test12' | ||||
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com' | ||||
TEST_USER_REGULAR2_LOGIN = 'test_regular2' | ||||
TEST_USER_REGULAR2_PASS = 'test12' | ||||
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com' | ||||
HG_REPO = 'vcs_test_hg' | ||||
GIT_REPO = 'vcs_test_git' | ||||
SVN_REPO = 'vcs_test_svn' | ||||
NEW_HG_REPO = 'vcs_test_hg_new' | ||||
NEW_GIT_REPO = 'vcs_test_git_new' | ||||
HG_FORK = 'vcs_test_hg_fork' | ||||
GIT_FORK = 'vcs_test_git_fork' | ||||
## VCS | ||||
SCM_TESTS = ['hg', 'git'] | ||||
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple()))) | ||||
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO) | ||||
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix) | ||||
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix) | ||||
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO) | ||||
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix) | ||||
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix) | ||||
TEST_REPO_PREFIX = 'vcs-test' | ||||
# skip ldap tests if LDAP lib is not installed | ||||
ldap_lib_installed = False | ||||
try: | ||||
import ldap | ||||
ldap_lib_installed = True | ||||
except ImportError: | ||||
# means that python-ldap is not installed | ||||
pass | ||||
def clear_all_caches(): | ||||
from beaker.cache import cache_managers | ||||
for _cache in cache_managers.values(): | ||||
_cache.clear() | ||||
def get_new_dir(title): | ||||
""" | ||||
Returns always new directory path. | ||||
""" | ||||
from rhodecode.tests.vcs.utils import get_normalized_path | ||||
name_parts = [TEST_REPO_PREFIX] | ||||
if title: | ||||
name_parts.append(title) | ||||
hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest() | ||||
name_parts.append(hex_str) | ||||
name = '-'.join(name_parts) | ||||
path = os.path.join(TEST_DIR, name) | ||||
return get_normalized_path(path) | ||||
@pytest.mark.usefixtures('app', 'index_location') | ||||
class TestController(object): | ||||
maxDiff = None | ||||
def log_user(self, username=TEST_USER_ADMIN_LOGIN, | ||||
password=TEST_USER_ADMIN_PASS): | ||||
self._logged_username = username | ||||
self._session = login_user_session(self.app, username, password) | ||||
self.csrf_token = auth.get_csrf_token(self._session) | ||||
return self._session['rhodecode_user'] | ||||
def logout_user(self): | ||||
logout_user_session(self.app, auth.get_csrf_token(self._session)) | ||||
self.csrf_token = None | ||||
self._logged_username = None | ||||
self._session = None | ||||
def _get_logged_user(self): | ||||
return User.get_by_username(self._logged_username) | ||||
# TODO: remove, use plain assert in tests | ||||
def assertEqual(self, a, b, msg=None): | ||||
if msg: | ||||
assert a == b, msg | ||||
else: | ||||
assert a == b | ||||
def login_user_session( | ||||
app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS): | ||||
r40 | from rhodecode.tests.functional.test_login import login_url | |||
response = app.post( | ||||
login_url, | ||||
{'username': username, 'password': password}) | ||||
r1 | if 'invalid user name' in response.body: | |||
pytest.fail('could not login using %s %s' % (username, password)) | ||||
assert response.status == '302 Found' | ||||
response = response.follow() | ||||
r41 | assert response.status == '200 OK' | |||
r1 | ||||
r41 | session = get_session_from_response(response) | |||
assert 'rhodecode_user' in session | ||||
rc_user = session['rhodecode_user'] | ||||
assert rc_user.get('username') == username | ||||
assert rc_user.get('is_authenticated') | ||||
return session | ||||
r1 | ||||
def logout_user_session(app, csrf_token): | ||||
r40 | from rhodecode.tests.functional.test_login import logut_url | |||
app.post(logut_url, {'csrf_token': csrf_token}, status=302) | ||||
r1 | ||||
def login_user(app, username=TEST_USER_ADMIN_LOGIN, | ||||
password=TEST_USER_ADMIN_PASS): | ||||
return login_user_session(app, username, password)['rhodecode_user'] | ||||
def assert_session_flash(response=None, msg=None, category=None): | ||||
""" | ||||
Assert on a flash message in the current session. | ||||
:param msg: Required. The expected message. Will be evaluated if a | ||||
:class:`LazyString` is passed in. | ||||
:param response: Optional. For functional testing, pass in the response | ||||
object. Otherwise don't pass in any value. | ||||
:param category: Optional. If passed, the message category will be | ||||
checked as well. | ||||
""" | ||||
if msg is None: | ||||
raise ValueError("Parameter msg is required.") | ||||
messages = flash.pop_messages() | ||||
r1097 | msg = _eval_if_lazy(msg) | |||
assert messages, 'unable to find message `%s` in empty flash list' % msg | ||||
r1 | message = messages[0] | |||
message_text = _eval_if_lazy(message.message) | ||||
if msg not in message_text: | ||||
msg = u'msg `%s` not found in session flash: got `%s` instead' % ( | ||||
msg, message_text) | ||||
pytest.fail(safe_str(msg)) | ||||
if category: | ||||
assert category == message.category | ||||
def _eval_if_lazy(value): | ||||
return value.eval() if hasattr(value, 'eval') else value | ||||
def assert_not_in_session_flash(response, msg, category=None): | ||||
assert 'flash' in response.session, 'Response session has no flash key' | ||||
message_category, message_text = response.session['flash'][0] | ||||
if msg in message_text: | ||||
msg = u'msg `%s` found in session flash: got `%s` instead' % ( | ||||
msg, message_text) | ||||
pytest.fail(safe_str(msg)) | ||||
if category: | ||||
assert category == message_category | ||||
def assert_session_flash_is_empty(response): | ||||
if 'flash' in response.session: | ||||
msg = 'flash messages are present in session:%s' % \ | ||||
response.session['flash'][0] | ||||
pytest.fail(safe_str(msg)) | ||||