##// END OF EJS Templates
created basic TestClass for tests that does...
marcink -
r3829:5067d6e8 beta
parent child Browse files
Show More
@@ -1,193 +1,202 b''
1 """Pylons application test package
1 """Pylons application test package
2
2
3 This package assumes the Pylons environment is already loaded, such as
3 This package assumes the Pylons environment is already loaded, such as
4 when this script is imported from the `nosetests --with-pylons=test.ini`
4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 command.
5 command.
6
6
7 This module initializes the application via ``websetup`` (`paster
7 This module initializes the application via ``websetup`` (`paster
8 setup-app`) and provides the base testing objects.
8 setup-app`) and provides the base testing objects.
9
9
10 nosetests -x - fail on first error
10 nosetests -x - fail on first error
11 nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
11 nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
12 nosetests --pdb --pdb-failures
12 nosetests --pdb --pdb-failures
13 nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
13 nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
14
14
15 optional FLAGS:
15 optional FLAGS:
16 RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
16 RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
17 RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
17 RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
18
18
19 """
19 """
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import datetime
23 import datetime
24 import hashlib
24 import hashlib
25 import tempfile
25 import tempfile
26 from os.path import join as jn
26 from os.path import join as jn
27
27
28 from unittest import TestCase
28 from unittest import TestCase
29 from tempfile import _RandomNameSequence
29 from tempfile import _RandomNameSequence
30
30
31 from paste.deploy import loadapp
31 from paste.deploy import loadapp
32 from paste.script.appinstall import SetupCommand
32 from paste.script.appinstall import SetupCommand
33
33
34 import pylons
34 import pylons
35 import pylons.test
35 import pylons.test
36 from pylons import config, url
36 from pylons import config, url
37 from pylons.i18n.translation import _get_translator
37 from pylons.i18n.translation import _get_translator
38 from pylons.util import ContextObj
38
39
39 from routes.util import URLGenerator
40 from routes.util import URLGenerator
40 from webtest import TestApp
41 from webtest import TestApp
41 from nose.plugins.skip import SkipTest
42 from nose.plugins.skip import SkipTest
42
43
43 from rhodecode import is_windows
44 from rhodecode import is_windows
44 from rhodecode.model.meta import Session
45 from rhodecode.model.meta import Session
45 from rhodecode.model.db import User
46 from rhodecode.model.db import User
46 from rhodecode.tests.nose_parametrized import parameterized
47 from rhodecode.tests.nose_parametrized import parameterized
47 from rhodecode.lib.utils2 import safe_unicode, safe_str
48 from rhodecode.lib.utils2 import safe_unicode, safe_str
48
49
49
50
50 os.environ['TZ'] = 'UTC'
51 os.environ['TZ'] = 'UTC'
51 if not is_windows:
52 if not is_windows:
52 time.tzset()
53 time.tzset()
53
54
54 log = logging.getLogger(__name__)
55 log = logging.getLogger(__name__)
55
56
56 __all__ = [
57 __all__ = [
57 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
58 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
58 'SkipTest', 'ldap_lib_installed',
59 'SkipTest', 'ldap_lib_installed', 'BaseTestCase', 'init_stack',
59 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
60 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
60 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
61 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
61 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
62 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
62 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
63 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
63 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
64 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
64 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
65 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
65 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
66 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
66 'GIT_REMOTE_REPO', 'SCM_TESTS',
67 'GIT_REMOTE_REPO', 'SCM_TESTS',
67 ]
68 ]
68
69
69 # Invoke websetup with the current config file
70 # Invoke websetup with the current config file
70 # SetupCommand('setup-app').run([config_file])
71 # SetupCommand('setup-app').run([config_file])
71
72
72 environ = {}
73 environ = {}
73
74
74 #SOME GLOBALS FOR TESTS
75 #SOME GLOBALS FOR TESTS
75
76
76 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
77 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
77 TEST_USER_ADMIN_LOGIN = 'test_admin'
78 TEST_USER_ADMIN_LOGIN = 'test_admin'
78 TEST_USER_ADMIN_PASS = 'test12'
79 TEST_USER_ADMIN_PASS = 'test12'
79 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
80 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
80
81
81 TEST_USER_REGULAR_LOGIN = 'test_regular'
82 TEST_USER_REGULAR_LOGIN = 'test_regular'
82 TEST_USER_REGULAR_PASS = 'test12'
83 TEST_USER_REGULAR_PASS = 'test12'
83 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
84 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
84
85
85 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
86 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
86 TEST_USER_REGULAR2_PASS = 'test12'
87 TEST_USER_REGULAR2_PASS = 'test12'
87 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
88 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
88
89
89 HG_REPO = 'vcs_test_hg'
90 HG_REPO = 'vcs_test_hg'
90 GIT_REPO = 'vcs_test_git'
91 GIT_REPO = 'vcs_test_git'
91
92
92 NEW_HG_REPO = 'vcs_test_hg_new'
93 NEW_HG_REPO = 'vcs_test_hg_new'
93 NEW_GIT_REPO = 'vcs_test_git_new'
94 NEW_GIT_REPO = 'vcs_test_git_new'
94
95
95 HG_FORK = 'vcs_test_hg_fork'
96 HG_FORK = 'vcs_test_hg_fork'
96 GIT_FORK = 'vcs_test_git_fork'
97 GIT_FORK = 'vcs_test_git_fork'
97
98
98 ## VCS
99 ## VCS
99 SCM_TESTS = ['hg', 'git']
100 SCM_TESTS = ['hg', 'git']
100 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
101 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
101
102
102 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
103 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
103
104
104 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
105 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
105 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
106 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
106 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
107 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
107
108
108
109
109 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
110 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
110
111
111 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
112 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
112 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
113 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
113 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
114 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
114
115
115 TEST_DIR = tempfile.gettempdir()
116 TEST_DIR = tempfile.gettempdir()
116 TEST_REPO_PREFIX = 'vcs-test'
117 TEST_REPO_PREFIX = 'vcs-test'
117
118
118 # cached repos if any !
119 # cached repos if any !
119 # comment out to get some other repos from bb or github
120 # comment out to get some other repos from bb or github
120 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
121 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
121 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
122 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
122
123
123 #skip ldap tests if LDAP lib is not installed
124 #skip ldap tests if LDAP lib is not installed
124 ldap_lib_installed = False
125 ldap_lib_installed = False
125 try:
126 try:
126 import ldap
127 import ldap
127 ldap_lib_installed = True
128 ldap_lib_installed = True
128 except ImportError:
129 except ImportError:
129 # means that python-ldap is not installed
130 # means that python-ldap is not installed
130 pass
131 pass
131
132
132
133
133 def get_new_dir(title):
134 def get_new_dir(title):
134 """
135 """
135 Returns always new directory path.
136 Returns always new directory path.
136 """
137 """
137 from rhodecode.tests.vcs.utils import get_normalized_path
138 from rhodecode.tests.vcs.utils import get_normalized_path
138 name = TEST_REPO_PREFIX
139 name = TEST_REPO_PREFIX
139 if title:
140 if title:
140 name = '-'.join((name, title))
141 name = '-'.join((name, title))
141 hex = hashlib.sha1(str(time.time())).hexdigest()
142 hex = hashlib.sha1(str(time.time())).hexdigest()
142 name = '-'.join((name, hex))
143 name = '-'.join((name, hex))
143 path = os.path.join(TEST_DIR, name)
144 path = os.path.join(TEST_DIR, name)
144 return get_normalized_path(path)
145 return get_normalized_path(path)
145
146
146
147
147 class TestController(TestCase):
148 def init_stack(config=None):
149 if not config:
150 config = pylons.test.pylonsapp.config
151 url._push_object(URLGenerator(config['routes.map'], environ))
152 pylons.app_globals._push_object(config['pylons.app_globals'])
153 pylons.config._push_object(config)
154 pylons.tmpl_context._push_object(ContextObj())
155 # Initialize a translator for tests that utilize i18n
156 translator = _get_translator(pylons.config.get('lang'))
157 pylons.translator._push_object(translator)
158
159
160 class BaseTestCase(TestCase):
161 def __init__(self, *args, **kwargs):
162 self.wsgiapp = pylons.test.pylonsapp
163 init_stack(self.wsgiapp.config)
164 TestCase.__init__(self, *args, **kwargs)
165
166
167 class TestController(BaseTestCase):
148
168
149 def __init__(self, *args, **kwargs):
169 def __init__(self, *args, **kwargs):
150 wsgiapp = pylons.test.pylonsapp
170 BaseTestCase.__init__(self, *args, **kwargs)
151 config = wsgiapp.config
171 self.app = TestApp(self.wsgiapp)
152
153 self.app = TestApp(wsgiapp)
154 url._push_object(URLGenerator(config['routes.map'], environ))
155 pylons.app_globals._push_object(config['pylons.app_globals'])
156 pylons.config._push_object(config)
157
158 # Initialize a translator for tests that utilize i18n
159 translator = _get_translator(pylons.config.get('lang'))
160 pylons.translator._push_object(translator)
161
162 self.index_location = config['app_conf']['index_dir']
172 self.index_location = config['app_conf']['index_dir']
163 TestCase.__init__(self, *args, **kwargs)
164
173
165 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
174 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
166 password=TEST_USER_ADMIN_PASS):
175 password=TEST_USER_ADMIN_PASS):
167 self._logged_username = username
176 self._logged_username = username
168 response = self.app.post(url(controller='login', action='index'),
177 response = self.app.post(url(controller='login', action='index'),
169 {'username': username,
178 {'username': username,
170 'password': password})
179 'password': password})
171
180
172 if 'invalid user name' in response.body:
181 if 'invalid user name' in response.body:
173 self.fail('could not login using %s %s' % (username, password))
182 self.fail('could not login using %s %s' % (username, password))
174
183
175 self.assertEqual(response.status, '302 Found')
184 self.assertEqual(response.status, '302 Found')
176 ses = response.session['rhodecode_user']
185 ses = response.session['rhodecode_user']
177 self.assertEqual(ses.get('username'), username)
186 self.assertEqual(ses.get('username'), username)
178 response = response.follow()
187 response = response.follow()
179 self.assertEqual(ses.get('is_authenticated'), True)
188 self.assertEqual(ses.get('is_authenticated'), True)
180
189
181 return response.session['rhodecode_user']
190 return response.session['rhodecode_user']
182
191
183 def _get_logged_user(self):
192 def _get_logged_user(self):
184 return User.get_by_username(self._logged_username)
193 return User.get_by_username(self._logged_username)
185
194
186 def checkSessionFlash(self, response, msg):
195 def checkSessionFlash(self, response, msg):
187 self.assertTrue('flash' in response.session,
196 self.assertTrue('flash' in response.session,
188 msg='Response session:%r have no flash'
197 msg='Response session:%r have no flash'
189 % response.session)
198 % response.session)
190 if not msg in response.session['flash'][0][1]:
199 if not msg in response.session['flash'][0][1]:
191 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
200 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
192 msg, response.session['flash'][0][1])
201 msg, response.session['flash'][0][1])
193 self.fail(safe_str(msg))
202 self.fail(safe_str(msg))
@@ -1,104 +1,101 b''
1 import os
2 import unittest
3 import functools
4 from rhodecode.tests import *
1 from rhodecode.tests import *
5 from rhodecode.tests.fixture import Fixture
2 from rhodecode.tests.fixture import Fixture
6
3
7 from rhodecode.model.repos_group import ReposGroupModel
4 from rhodecode.model.repos_group import ReposGroupModel
8 from rhodecode.model.repo import RepoModel
5 from rhodecode.model.repo import RepoModel
9 from rhodecode.model.db import RepoGroup, Repository, User
6 from rhodecode.model.db import RepoGroup, Repository, User
10 from rhodecode.model.user import UserModel
7 from rhodecode.model.user import UserModel
11
8
12 from rhodecode.lib.auth import AuthUser
9 from rhodecode.lib.auth import AuthUser
13 from rhodecode.model.meta import Session
10 from rhodecode.model.meta import Session
14
11
15
12
16 fixture = Fixture()
13 fixture = Fixture()
17
14
18
15
19 def _destroy_project_tree(test_u1_id):
16 def _destroy_project_tree(test_u1_id):
20 Session.remove()
17 Session.remove()
21 repos_group = RepoGroup.get_by_group_name(group_name='g0')
18 repos_group = RepoGroup.get_by_group_name(group_name='g0')
22 for el in reversed(repos_group.recursive_groups_and_repos()):
19 for el in reversed(repos_group.recursive_groups_and_repos()):
23 if isinstance(el, Repository):
20 if isinstance(el, Repository):
24 RepoModel().delete(el)
21 RepoModel().delete(el)
25 elif isinstance(el, RepoGroup):
22 elif isinstance(el, RepoGroup):
26 ReposGroupModel().delete(el, force_delete=True)
23 ReposGroupModel().delete(el, force_delete=True)
27
24
28 u = User.get(test_u1_id)
25 u = User.get(test_u1_id)
29 Session().delete(u)
26 Session().delete(u)
30 Session().commit()
27 Session().commit()
31
28
32
29
33 def _create_project_tree():
30 def _create_project_tree():
34 """
31 """
35 Creates a tree of groups and repositories to test permissions
32 Creates a tree of groups and repositories to test permissions
36
33
37 structure
34 structure
38 [g0] - group `g0` with 3 subgroups
35 [g0] - group `g0` with 3 subgroups
39 |
36 |
40 |__[g0_1] group g0_1 with 2 groups 0 repos
37 |__[g0_1] group g0_1 with 2 groups 0 repos
41 | |
38 | |
42 | |__[g0_1_1] group g0_1_1 with 1 group 2 repos
39 | |__[g0_1_1] group g0_1_1 with 1 group 2 repos
43 | | |__<g0/g0_1/g0_1_1/g0_1_1_r1>
40 | | |__<g0/g0_1/g0_1_1/g0_1_1_r1>
44 | | |__<g0/g0_1/g0_1_1/g0_1_1_r2>
41 | | |__<g0/g0_1/g0_1_1/g0_1_1_r2>
45 | |__<g0/g0_1/g0_1_r1>
42 | |__<g0/g0_1/g0_1_r1>
46 |
43 |
47 |__[g0_2] 2 repos
44 |__[g0_2] 2 repos
48 | |
45 | |
49 | |__<g0/g0_2/g0_2_r1>
46 | |__<g0/g0_2/g0_2_r1>
50 | |__<g0/g0_2/g0_2_r2>
47 | |__<g0/g0_2/g0_2_r2>
51 |
48 |
52 |__[g0_3] 1 repo
49 |__[g0_3] 1 repo
53 |
50 |
54 |_<g0/g0_3/g0_3_r1>
51 |_<g0/g0_3/g0_3_r1>
55 |_<g0/g0_3/g0_3_r2_private>
52 |_<g0/g0_3/g0_3_r2_private>
56
53
57 """
54 """
58 test_u1 = UserModel().create_or_update(
55 test_u1 = UserModel().create_or_update(
59 username=u'test_u1', password=u'qweqwe',
56 username=u'test_u1', password=u'qweqwe',
60 email=u'test_u1@rhodecode.org', firstname=u'test_u1', lastname=u'test_u1'
57 email=u'test_u1@rhodecode.org', firstname=u'test_u1', lastname=u'test_u1'
61 )
58 )
62 g0 = fixture.create_group('g0')
59 g0 = fixture.create_group('g0')
63 g0_1 = fixture.create_group('g0_1', group_parent_id=g0)
60 g0_1 = fixture.create_group('g0_1', group_parent_id=g0)
64 g0_1_1 = fixture.create_group('g0_1_1', group_parent_id=g0_1)
61 g0_1_1 = fixture.create_group('g0_1_1', group_parent_id=g0_1)
65 g0_1_1_r1 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1)
62 g0_1_1_r1 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1)
66 g0_1_1_r2 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1)
63 g0_1_1_r2 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1)
67 g0_1_r1 = fixture.create_repo('g0/g0_1/g0_1_r1', repos_group=g0_1)
64 g0_1_r1 = fixture.create_repo('g0/g0_1/g0_1_r1', repos_group=g0_1)
68 g0_2 = fixture.create_group('g0_2', group_parent_id=g0)
65 g0_2 = fixture.create_group('g0_2', group_parent_id=g0)
69 g0_2_r1 = fixture.create_repo('g0/g0_2/g0_2_r1', repos_group=g0_2)
66 g0_2_r1 = fixture.create_repo('g0/g0_2/g0_2_r1', repos_group=g0_2)
70 g0_2_r2 = fixture.create_repo('g0/g0_2/g0_2_r2', repos_group=g0_2)
67 g0_2_r2 = fixture.create_repo('g0/g0_2/g0_2_r2', repos_group=g0_2)
71 g0_3 = fixture.create_group('g0_3', group_parent_id=g0)
68 g0_3 = fixture.create_group('g0_3', group_parent_id=g0)
72 g0_3_r1 = fixture.create_repo('g0/g0_3/g0_3_r1', repos_group=g0_3)
69 g0_3_r1 = fixture.create_repo('g0/g0_3/g0_3_r1', repos_group=g0_3)
73 g0_3_r2_private = fixture.create_repo('g0/g0_3/g0_3_r1_private',
70 g0_3_r2_private = fixture.create_repo('g0/g0_3/g0_3_r1_private',
74 repos_group=g0_3, repo_private=True)
71 repos_group=g0_3, repo_private=True)
75 return test_u1
72 return test_u1
76
73
77
74
78 def expected_count(group_name, objects=False):
75 def expected_count(group_name, objects=False):
79 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
76 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
80 objs = repos_group.recursive_groups_and_repos()
77 objs = repos_group.recursive_groups_and_repos()
81 if objects:
78 if objects:
82 return objs
79 return objs
83 return len(objs)
80 return len(objs)
84
81
85
82
86 def _check_expected_count(items, repo_items, expected):
83 def _check_expected_count(items, repo_items, expected):
87 should_be = len(items + repo_items)
84 should_be = len(items + repo_items)
88 there_are = len(expected)
85 there_are = len(expected)
89 assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
86 assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
90
87
91
88
92 def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
89 def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
93 assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
90 assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
94 % (obj_name, repo_perm, expected_perm))
91 % (obj_name, repo_perm, expected_perm))
95
92
96
93
97 def _get_perms(filter_='', recursive=True, key=None, test_u1_id=None):
94 def _get_perms(filter_='', recursive=True, key=None, test_u1_id=None):
98 test_u1 = AuthUser(user_id=test_u1_id)
95 test_u1 = AuthUser(user_id=test_u1_id)
99 for k, v in test_u1.permissions[key].items():
96 for k, v in test_u1.permissions[key].items():
100 if recursive and k.startswith(filter_):
97 if recursive and k.startswith(filter_):
101 yield k, v
98 yield k, v
102 elif not recursive:
99 elif not recursive:
103 if k == filter_:
100 if k == filter_:
104 yield k, v
101 yield k, v
@@ -1,251 +1,250 b''
1 from __future__ import with_statement
1 from __future__ import with_statement
2 import os
2 import os
3 import unittest
4 from rhodecode.tests import *
3 from rhodecode.tests import *
5 from rhodecode.lib.diffs import DiffProcessor, NEW_FILENODE, DEL_FILENODE, \
4 from rhodecode.lib.diffs import DiffProcessor, NEW_FILENODE, DEL_FILENODE, \
6 MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
5 MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
7
6
8 dn = os.path.dirname
7 dn = os.path.dirname
9 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
8 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
10
9
11 DIFF_FIXTURES = {
10 DIFF_FIXTURES = {
12 'hg_diff_add_single_binary_file.diff': [
11 'hg_diff_add_single_binary_file.diff': [
13 ('US Warszawa.jpg', 'A',
12 ('US Warszawa.jpg', 'A',
14 {'added': 0,
13 {'added': 0,
15 'deleted': 0,
14 'deleted': 0,
16 'binary': True,
15 'binary': True,
17 'ops': {NEW_FILENODE: 'new file 100755',
16 'ops': {NEW_FILENODE: 'new file 100755',
18 BIN_FILENODE: 'binary diff not shown'}}),
17 BIN_FILENODE: 'binary diff not shown'}}),
19 ],
18 ],
20 'hg_diff_mod_single_binary_file.diff': [
19 'hg_diff_mod_single_binary_file.diff': [
21 ('US Warszawa.jpg', 'M',
20 ('US Warszawa.jpg', 'M',
22 {'added': 0,
21 {'added': 0,
23 'deleted': 0,
22 'deleted': 0,
24 'binary': True,
23 'binary': True,
25 'ops': {MOD_FILENODE: 'modified file',
24 'ops': {MOD_FILENODE: 'modified file',
26 BIN_FILENODE: 'binary diff not shown'}}),
25 BIN_FILENODE: 'binary diff not shown'}}),
27 ],
26 ],
28
27
29 'hg_diff_mod_single_file_and_rename_and_chmod.diff': [
28 'hg_diff_mod_single_file_and_rename_and_chmod.diff': [
30 ('README', 'M',
29 ('README', 'M',
31 {'added': 3,
30 {'added': 3,
32 'deleted': 0,
31 'deleted': 0,
33 'binary': False,
32 'binary': False,
34 'ops': {MOD_FILENODE: 'modified file',
33 'ops': {MOD_FILENODE: 'modified file',
35 RENAMED_FILENODE: 'file renamed from README.rst to README',
34 RENAMED_FILENODE: 'file renamed from README.rst to README',
36 CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
35 CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
37 ],
36 ],
38 'hg_diff_rename_and_chmod_file.diff': [
37 'hg_diff_rename_and_chmod_file.diff': [
39 ('README', 'M',
38 ('README', 'M',
40 {'added': 3,
39 {'added': 3,
41 'deleted': 0,
40 'deleted': 0,
42 'binary': False,
41 'binary': False,
43 'ops': {MOD_FILENODE: 'modified file',
42 'ops': {MOD_FILENODE: 'modified file',
44 BIN_FILENODE: 'binary diff not shown'}}),
43 BIN_FILENODE: 'binary diff not shown'}}),
45 ],
44 ],
46 'hg_diff_del_single_binary_file.diff': [
45 'hg_diff_del_single_binary_file.diff': [
47 ('US Warszawa.jpg', 'D',
46 ('US Warszawa.jpg', 'D',
48 {'added': 0,
47 {'added': 0,
49 'deleted': 0,
48 'deleted': 0,
50 'binary': True,
49 'binary': True,
51 'ops': {DEL_FILENODE: 'deleted file',
50 'ops': {DEL_FILENODE: 'deleted file',
52 BIN_FILENODE: 'binary diff not shown'}}),
51 BIN_FILENODE: 'binary diff not shown'}}),
53 ],
52 ],
54 'hg_diff_chmod_and_mod_single_binary_file.diff': [
53 'hg_diff_chmod_and_mod_single_binary_file.diff': [
55 ('gravatar.png', 'M',
54 ('gravatar.png', 'M',
56 {'added': 0,
55 {'added': 0,
57 'deleted': 0,
56 'deleted': 0,
58 'binary': True,
57 'binary': True,
59 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
58 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
60 BIN_FILENODE: 'binary diff not shown'}}),
59 BIN_FILENODE: 'binary diff not shown'}}),
61 ],
60 ],
62 'hg_diff_chmod.diff': [
61 'hg_diff_chmod.diff': [
63 ('file', 'M',
62 ('file', 'M',
64 {'added': 0,
63 {'added': 0,
65 'deleted': 0,
64 'deleted': 0,
66 'binary': True,
65 'binary': True,
67 'ops': {CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
66 'ops': {CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
68 ],
67 ],
69 'hg_diff_rename_file.diff': [
68 'hg_diff_rename_file.diff': [
70 ('file_renamed', 'M',
69 ('file_renamed', 'M',
71 {'added': 0,
70 {'added': 0,
72 'deleted': 0,
71 'deleted': 0,
73 'binary': True,
72 'binary': True,
74 'ops': {RENAMED_FILENODE: 'file renamed from file to file_renamed'}}),
73 'ops': {RENAMED_FILENODE: 'file renamed from file to file_renamed'}}),
75 ],
74 ],
76 'hg_diff_rename_and_chmod_file.diff': [
75 'hg_diff_rename_and_chmod_file.diff': [
77 ('README', 'M',
76 ('README', 'M',
78 {'added': 0,
77 {'added': 0,
79 'deleted': 0,
78 'deleted': 0,
80 'binary': True,
79 'binary': True,
81 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
80 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
82 RENAMED_FILENODE: 'file renamed from README.rst to README'}}),
81 RENAMED_FILENODE: 'file renamed from README.rst to README'}}),
83 ],
82 ],
84 'hg_diff_binary_and_normal.diff': [
83 'hg_diff_binary_and_normal.diff': [
85 ('img/baseline-10px.png', 'A',
84 ('img/baseline-10px.png', 'A',
86 {'added': 0,
85 {'added': 0,
87 'deleted': 0,
86 'deleted': 0,
88 'binary': True,
87 'binary': True,
89 'ops': {NEW_FILENODE: 'new file 100644',
88 'ops': {NEW_FILENODE: 'new file 100644',
90 BIN_FILENODE: 'binary diff not shown'}}),
89 BIN_FILENODE: 'binary diff not shown'}}),
91 ('js/jquery/hashgrid.js', 'A',
90 ('js/jquery/hashgrid.js', 'A',
92 {'added': 340,
91 {'added': 340,
93 'deleted': 0,
92 'deleted': 0,
94 'binary': False,
93 'binary': False,
95 'ops': {NEW_FILENODE: 'new file 100755'}}),
94 'ops': {NEW_FILENODE: 'new file 100755'}}),
96 ('index.html', 'M',
95 ('index.html', 'M',
97 {'added': 3,
96 {'added': 3,
98 'deleted': 2,
97 'deleted': 2,
99 'binary': False,
98 'binary': False,
100 'ops': {MOD_FILENODE: 'modified file'}}),
99 'ops': {MOD_FILENODE: 'modified file'}}),
101 ('less/docs.less', 'M',
100 ('less/docs.less', 'M',
102 {'added': 34,
101 {'added': 34,
103 'deleted': 0,
102 'deleted': 0,
104 'binary': False,
103 'binary': False,
105 'ops': {MOD_FILENODE: 'modified file'}}),
104 'ops': {MOD_FILENODE: 'modified file'}}),
106 ('less/scaffolding.less', 'M',
105 ('less/scaffolding.less', 'M',
107 {'added': 1,
106 {'added': 1,
108 'deleted': 3,
107 'deleted': 3,
109 'binary': False,
108 'binary': False,
110 'ops': {MOD_FILENODE: 'modified file'}}),
109 'ops': {MOD_FILENODE: 'modified file'}}),
111 ('readme.markdown', 'M',
110 ('readme.markdown', 'M',
112 {'added': 1,
111 {'added': 1,
113 'deleted': 10,
112 'deleted': 10,
114 'binary': False,
113 'binary': False,
115 'ops': {MOD_FILENODE: 'modified file'}}),
114 'ops': {MOD_FILENODE: 'modified file'}}),
116 ('img/baseline-20px.png', 'D',
115 ('img/baseline-20px.png', 'D',
117 {'added': 0,
116 {'added': 0,
118 'deleted': 0,
117 'deleted': 0,
119 'binary': True,
118 'binary': True,
120 'ops': {DEL_FILENODE: 'deleted file',
119 'ops': {DEL_FILENODE: 'deleted file',
121 BIN_FILENODE: 'binary diff not shown'}}),
120 BIN_FILENODE: 'binary diff not shown'}}),
122 ('js/global.js', 'D',
121 ('js/global.js', 'D',
123 {'added': 0,
122 {'added': 0,
124 'deleted': 75,
123 'deleted': 75,
125 'binary': False,
124 'binary': False,
126 'ops': {DEL_FILENODE: 'deleted file'}})
125 'ops': {DEL_FILENODE: 'deleted file'}})
127 ],
126 ],
128 'git_diff_chmod.diff': [
127 'git_diff_chmod.diff': [
129 ('work-horus.xls', 'M',
128 ('work-horus.xls', 'M',
130 {'added': 0,
129 {'added': 0,
131 'deleted': 0,
130 'deleted': 0,
132 'binary': True,
131 'binary': True,
133 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755'}})
132 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755'}})
134 ],
133 ],
135 'git_diff_rename_file.diff': [
134 'git_diff_rename_file.diff': [
136 ('file.xls', 'M',
135 ('file.xls', 'M',
137 {'added': 0,
136 {'added': 0,
138 'deleted': 0,
137 'deleted': 0,
139 'binary': True,
138 'binary': True,
140 'ops': {RENAMED_FILENODE: 'file renamed from work-horus.xls to file.xls'}})
139 'ops': {RENAMED_FILENODE: 'file renamed from work-horus.xls to file.xls'}})
141 ],
140 ],
142 'git_diff_mod_single_binary_file.diff': [
141 'git_diff_mod_single_binary_file.diff': [
143 ('US Warszawa.jpg', 'M',
142 ('US Warszawa.jpg', 'M',
144 {'added': 0,
143 {'added': 0,
145 'deleted': 0,
144 'deleted': 0,
146 'binary': True,
145 'binary': True,
147 'ops': {MOD_FILENODE: 'modified file',
146 'ops': {MOD_FILENODE: 'modified file',
148 BIN_FILENODE: 'binary diff not shown'}})
147 BIN_FILENODE: 'binary diff not shown'}})
149 ],
148 ],
150 'git_diff_binary_and_normal.diff': [
149 'git_diff_binary_and_normal.diff': [
151 ('img/baseline-10px.png', 'A',
150 ('img/baseline-10px.png', 'A',
152 {'added': 0,
151 {'added': 0,
153 'deleted': 0,
152 'deleted': 0,
154 'binary': True,
153 'binary': True,
155 'ops': {NEW_FILENODE: 'new file 100644',
154 'ops': {NEW_FILENODE: 'new file 100644',
156 BIN_FILENODE: 'binary diff not shown'}}),
155 BIN_FILENODE: 'binary diff not shown'}}),
157 ('js/jquery/hashgrid.js', 'A',
156 ('js/jquery/hashgrid.js', 'A',
158 {'added': 340,
157 {'added': 340,
159 'deleted': 0,
158 'deleted': 0,
160 'binary': False,
159 'binary': False,
161 'ops': {NEW_FILENODE: 'new file 100755'}}),
160 'ops': {NEW_FILENODE: 'new file 100755'}}),
162 ('index.html', 'M',
161 ('index.html', 'M',
163 {'added': 3,
162 {'added': 3,
164 'deleted': 2,
163 'deleted': 2,
165 'binary': False,
164 'binary': False,
166 'ops': {MOD_FILENODE: 'modified file'}}),
165 'ops': {MOD_FILENODE: 'modified file'}}),
167 ('less/docs.less', 'M',
166 ('less/docs.less', 'M',
168 {'added': 34,
167 {'added': 34,
169 'deleted': 0,
168 'deleted': 0,
170 'binary': False,
169 'binary': False,
171 'ops': {MOD_FILENODE: 'modified file'}}),
170 'ops': {MOD_FILENODE: 'modified file'}}),
172 ('less/scaffolding.less', 'M',
171 ('less/scaffolding.less', 'M',
173 {'added': 1,
172 {'added': 1,
174 'deleted': 3,
173 'deleted': 3,
175 'binary': False,
174 'binary': False,
176 'ops': {MOD_FILENODE: 'modified file'}}),
175 'ops': {MOD_FILENODE: 'modified file'}}),
177 ('readme.markdown', 'M',
176 ('readme.markdown', 'M',
178 {'added': 1,
177 {'added': 1,
179 'deleted': 10,
178 'deleted': 10,
180 'binary': False,
179 'binary': False,
181 'ops': {MOD_FILENODE: 'modified file'}}),
180 'ops': {MOD_FILENODE: 'modified file'}}),
182 ('img/baseline-20px.png', 'D',
181 ('img/baseline-20px.png', 'D',
183 {'added': 0,
182 {'added': 0,
184 'deleted': 0,
183 'deleted': 0,
185 'binary': True,
184 'binary': True,
186 'ops': {DEL_FILENODE: 'deleted file',
185 'ops': {DEL_FILENODE: 'deleted file',
187 BIN_FILENODE: 'binary diff not shown'}}),
186 BIN_FILENODE: 'binary diff not shown'}}),
188 ('js/global.js', 'D',
187 ('js/global.js', 'D',
189 {'added': 0,
188 {'added': 0,
190 'deleted': 75,
189 'deleted': 75,
191 'binary': False,
190 'binary': False,
192 'ops': {DEL_FILENODE: 'deleted file'}}),
191 'ops': {DEL_FILENODE: 'deleted file'}}),
193 ],
192 ],
194 'diff_with_diff_data.diff': [
193 'diff_with_diff_data.diff': [
195 ('vcs/backends/base.py', 'M',
194 ('vcs/backends/base.py', 'M',
196 {'added': 18,
195 {'added': 18,
197 'deleted': 2,
196 'deleted': 2,
198 'binary': False,
197 'binary': False,
199 'ops': {MOD_FILENODE: 'modified file'}}),
198 'ops': {MOD_FILENODE: 'modified file'}}),
200 ('vcs/backends/git/repository.py', 'M',
199 ('vcs/backends/git/repository.py', 'M',
201 {'added': 46,
200 {'added': 46,
202 'deleted': 15,
201 'deleted': 15,
203 'binary': False,
202 'binary': False,
204 'ops': {MOD_FILENODE: 'modified file'}}),
203 'ops': {MOD_FILENODE: 'modified file'}}),
205 ('vcs/backends/hg.py', 'M',
204 ('vcs/backends/hg.py', 'M',
206 {'added': 22,
205 {'added': 22,
207 'deleted': 3,
206 'deleted': 3,
208 'binary': False,
207 'binary': False,
209 'ops': {MOD_FILENODE: 'modified file'}}),
208 'ops': {MOD_FILENODE: 'modified file'}}),
210 ('vcs/tests/test_git.py', 'M',
209 ('vcs/tests/test_git.py', 'M',
211 {'added': 5,
210 {'added': 5,
212 'deleted': 5,
211 'deleted': 5,
213 'binary': False,
212 'binary': False,
214 'ops': {MOD_FILENODE: 'modified file'}}),
213 'ops': {MOD_FILENODE: 'modified file'}}),
215 ('vcs/tests/test_repository.py', 'M',
214 ('vcs/tests/test_repository.py', 'M',
216 {'added': 174,
215 {'added': 174,
217 'deleted': 2,
216 'deleted': 2,
218 'binary': False,
217 'binary': False,
219 'ops': {MOD_FILENODE: 'modified file'}}),
218 'ops': {MOD_FILENODE: 'modified file'}}),
220 ],
219 ],
221 # 'large_diff.diff': [
220 # 'large_diff.diff': [
222 # ('.hgignore', 'A', {'deleted': 0, 'binary': False, 'added': 3, 'ops': {1: 'new file 100644'}}),
221 # ('.hgignore', 'A', {'deleted': 0, 'binary': False, 'added': 3, 'ops': {1: 'new file 100644'}}),
223 # ('MANIFEST.in', 'A', {'deleted': 0, 'binary': False, 'added': 3, 'ops': {1: 'new file 100644'}}),
222 # ('MANIFEST.in', 'A', {'deleted': 0, 'binary': False, 'added': 3, 'ops': {1: 'new file 100644'}}),
224 # ('README.txt', 'A', {'deleted': 0, 'binary': False, 'added': 19, 'ops': {1: 'new file 100644'}}),
223 # ('README.txt', 'A', {'deleted': 0, 'binary': False, 'added': 19, 'ops': {1: 'new file 100644'}}),
225 # ('development.ini', 'A', {'deleted': 0, 'binary': False, 'added': 116, 'ops': {1: 'new file 100644'}}),
224 # ('development.ini', 'A', {'deleted': 0, 'binary': False, 'added': 116, 'ops': {1: 'new file 100644'}}),
226 # ('docs/index.txt', 'A', {'deleted': 0, 'binary': False, 'added': 19, 'ops': {1: 'new file 100644'}}),
225 # ('docs/index.txt', 'A', {'deleted': 0, 'binary': False, 'added': 19, 'ops': {1: 'new file 100644'}}),
227 # ('ez_setup.py', 'A', {'deleted': 0, 'binary': False, 'added': 276, 'ops': {1: 'new file 100644'}}),
226 # ('ez_setup.py', 'A', {'deleted': 0, 'binary': False, 'added': 276, 'ops': {1: 'new file 100644'}}),
228 # ('hgapp.py', 'A', {'deleted': 0, 'binary': False, 'added': 26, 'ops': {1: 'new file 100644'}}),
227 # ('hgapp.py', 'A', {'deleted': 0, 'binary': False, 'added': 26, 'ops': {1: 'new file 100644'}}),
229 # ('hgwebdir.config', 'A', {'deleted': 0, 'binary': False, 'added': 21, 'ops': {1: 'new file 100644'}}),
228 # ('hgwebdir.config', 'A', {'deleted': 0, 'binary': False, 'added': 21, 'ops': {1: 'new file 100644'}}),
230 # ('pylons_app.egg-info/PKG-INFO', 'A', {'deleted': 0, 'binary': False, 'added': 10, 'ops': {1: 'new file 100644'}}),
229 # ('pylons_app.egg-info/PKG-INFO', 'A', {'deleted': 0, 'binary': False, 'added': 10, 'ops': {1: 'new file 100644'}}),
231 # ('pylons_app.egg-info/SOURCES.txt', 'A', {'deleted': 0, 'binary': False, 'added': 33, 'ops': {1: 'new file 100644'}}),
230 # ('pylons_app.egg-info/SOURCES.txt', 'A', {'deleted': 0, 'binary': False, 'added': 33, 'ops': {1: 'new file 100644'}}),
232 # ('pylons_app.egg-info/dependency_links.txt', 'A', {'deleted': 0, 'binary': False, 'added': 1, 'ops': {1: 'new file 100644'}}),
231 # ('pylons_app.egg-info/dependency_links.txt', 'A', {'deleted': 0, 'binary': False, 'added': 1, 'ops': {1: 'new file 100644'}}),
233 # #TODO:
232 # #TODO:
234 # ],
233 # ],
235
234
236 }
235 }
237
236
238
237
239 class DiffLibTest(unittest.TestCase):
238 class DiffLibTest(BaseTestCase):
240
239
241 @parameterized.expand([(x,) for x in DIFF_FIXTURES])
240 @parameterized.expand([(x,) for x in DIFF_FIXTURES])
242 def test_diff(self, diff_fixture):
241 def test_diff(self, diff_fixture):
243
242
244 with open(os.path.join(FIXTURES, diff_fixture)) as f:
243 with open(os.path.join(FIXTURES, diff_fixture)) as f:
245 diff = f.read()
244 diff = f.read()
246
245
247 diff_proc = DiffProcessor(diff)
246 diff_proc = DiffProcessor(diff)
248 diff_proc_d = diff_proc.prepare()
247 diff_proc_d = diff_proc.prepare()
249 data = [(x['filename'], x['operation'], x['stats']) for x in diff_proc_d]
248 data = [(x['filename'], x['operation'], x['stats']) for x in diff_proc_d]
250 expected_data = DIFF_FIXTURES[diff_fixture]
249 expected_data = DIFF_FIXTURES[diff_fixture]
251 self.assertListEqual(expected_data, data)
250 self.assertListEqual(expected_data, data)
@@ -1,187 +1,185 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
1 from rhodecode.tests import *
4
2
5 from rhodecode.model.db import User, Notification, UserNotification
3 from rhodecode.model.db import User, Notification, UserNotification
6 from rhodecode.model.user import UserModel
4 from rhodecode.model.user import UserModel
7
5
8 from rhodecode.model.meta import Session
6 from rhodecode.model.meta import Session
9 from rhodecode.model.notification import NotificationModel
7 from rhodecode.model.notification import NotificationModel
10
8
11
9
12 class TestNotifications(unittest.TestCase):
10 class TestNotifications(BaseTestCase):
13
11
14 def __init__(self, methodName='runTest'):
12 def __init__(self, methodName='runTest'):
15 Session.remove()
13 Session.remove()
16 self.u1 = UserModel().create_or_update(username=u'u1',
14 self.u1 = UserModel().create_or_update(username=u'u1',
17 password=u'qweqwe',
15 password=u'qweqwe',
18 email=u'u1@rhodecode.org',
16 email=u'u1@rhodecode.org',
19 firstname=u'u1', lastname=u'u1')
17 firstname=u'u1', lastname=u'u1')
20 Session().commit()
18 Session().commit()
21 self.u1 = self.u1.user_id
19 self.u1 = self.u1.user_id
22
20
23 self.u2 = UserModel().create_or_update(username=u'u2',
21 self.u2 = UserModel().create_or_update(username=u'u2',
24 password=u'qweqwe',
22 password=u'qweqwe',
25 email=u'u2@rhodecode.org',
23 email=u'u2@rhodecode.org',
26 firstname=u'u2', lastname=u'u3')
24 firstname=u'u2', lastname=u'u3')
27 Session().commit()
25 Session().commit()
28 self.u2 = self.u2.user_id
26 self.u2 = self.u2.user_id
29
27
30 self.u3 = UserModel().create_or_update(username=u'u3',
28 self.u3 = UserModel().create_or_update(username=u'u3',
31 password=u'qweqwe',
29 password=u'qweqwe',
32 email=u'u3@rhodecode.org',
30 email=u'u3@rhodecode.org',
33 firstname=u'u3', lastname=u'u3')
31 firstname=u'u3', lastname=u'u3')
34 Session().commit()
32 Session().commit()
35 self.u3 = self.u3.user_id
33 self.u3 = self.u3.user_id
36
34
37 super(TestNotifications, self).__init__(methodName=methodName)
35 super(TestNotifications, self).__init__(methodName=methodName)
38
36
39 def _clean_notifications(self):
37 def _clean_notifications(self):
40 for n in Notification.query().all():
38 for n in Notification.query().all():
41 Session().delete(n)
39 Session().delete(n)
42
40
43 Session().commit()
41 Session().commit()
44 self.assertEqual(Notification.query().all(), [])
42 self.assertEqual(Notification.query().all(), [])
45
43
46 def tearDown(self):
44 def tearDown(self):
47 self._clean_notifications()
45 self._clean_notifications()
48
46
49 def test_create_notification(self):
47 def test_create_notification(self):
50 self.assertEqual([], Notification.query().all())
48 self.assertEqual([], Notification.query().all())
51 self.assertEqual([], UserNotification.query().all())
49 self.assertEqual([], UserNotification.query().all())
52
50
53 usrs = [self.u1, self.u2]
51 usrs = [self.u1, self.u2]
54 notification = NotificationModel().create(created_by=self.u1,
52 notification = NotificationModel().create(created_by=self.u1,
55 subject=u'subj', body=u'hi there',
53 subject=u'subj', body=u'hi there',
56 recipients=usrs)
54 recipients=usrs)
57 Session().commit()
55 Session().commit()
58 u1 = User.get(self.u1)
56 u1 = User.get(self.u1)
59 u2 = User.get(self.u2)
57 u2 = User.get(self.u2)
60 u3 = User.get(self.u3)
58 u3 = User.get(self.u3)
61 notifications = Notification.query().all()
59 notifications = Notification.query().all()
62 self.assertEqual(len(notifications), 1)
60 self.assertEqual(len(notifications), 1)
63
61
64 self.assertEqual(notifications[0].recipients, [u1, u2])
62 self.assertEqual(notifications[0].recipients, [u1, u2])
65 self.assertEqual(notification.notification_id,
63 self.assertEqual(notification.notification_id,
66 notifications[0].notification_id)
64 notifications[0].notification_id)
67
65
68 unotification = UserNotification.query()\
66 unotification = UserNotification.query()\
69 .filter(UserNotification.notification == notification).all()
67 .filter(UserNotification.notification == notification).all()
70
68
71 self.assertEqual(len(unotification), len(usrs))
69 self.assertEqual(len(unotification), len(usrs))
72 self.assertEqual(set([x.user.user_id for x in unotification]),
70 self.assertEqual(set([x.user.user_id for x in unotification]),
73 set(usrs))
71 set(usrs))
74
72
75 def test_user_notifications(self):
73 def test_user_notifications(self):
76 self.assertEqual([], Notification.query().all())
74 self.assertEqual([], Notification.query().all())
77 self.assertEqual([], UserNotification.query().all())
75 self.assertEqual([], UserNotification.query().all())
78
76
79 notification1 = NotificationModel().create(created_by=self.u1,
77 notification1 = NotificationModel().create(created_by=self.u1,
80 subject=u'subj', body=u'hi there1',
78 subject=u'subj', body=u'hi there1',
81 recipients=[self.u3])
79 recipients=[self.u3])
82 Session().commit()
80 Session().commit()
83 notification2 = NotificationModel().create(created_by=self.u1,
81 notification2 = NotificationModel().create(created_by=self.u1,
84 subject=u'subj', body=u'hi there2',
82 subject=u'subj', body=u'hi there2',
85 recipients=[self.u3])
83 recipients=[self.u3])
86 Session().commit()
84 Session().commit()
87 u3 = Session().query(User).get(self.u3)
85 u3 = Session().query(User).get(self.u3)
88
86
89 self.assertEqual(sorted([x.notification for x in u3.notifications]),
87 self.assertEqual(sorted([x.notification for x in u3.notifications]),
90 sorted([notification2, notification1]))
88 sorted([notification2, notification1]))
91
89
92 def test_delete_notifications(self):
90 def test_delete_notifications(self):
93 self.assertEqual([], Notification.query().all())
91 self.assertEqual([], Notification.query().all())
94 self.assertEqual([], UserNotification.query().all())
92 self.assertEqual([], UserNotification.query().all())
95
93
96 notification = NotificationModel().create(created_by=self.u1,
94 notification = NotificationModel().create(created_by=self.u1,
97 subject=u'title', body=u'hi there3',
95 subject=u'title', body=u'hi there3',
98 recipients=[self.u3, self.u1, self.u2])
96 recipients=[self.u3, self.u1, self.u2])
99 Session().commit()
97 Session().commit()
100 notifications = Notification.query().all()
98 notifications = Notification.query().all()
101 self.assertTrue(notification in notifications)
99 self.assertTrue(notification in notifications)
102
100
103 Notification.delete(notification.notification_id)
101 Notification.delete(notification.notification_id)
104 Session().commit()
102 Session().commit()
105
103
106 notifications = Notification.query().all()
104 notifications = Notification.query().all()
107 self.assertFalse(notification in notifications)
105 self.assertFalse(notification in notifications)
108
106
109 un = UserNotification.query().filter(UserNotification.notification
107 un = UserNotification.query().filter(UserNotification.notification
110 == notification).all()
108 == notification).all()
111 self.assertEqual(un, [])
109 self.assertEqual(un, [])
112
110
113 def test_delete_association(self):
111 def test_delete_association(self):
114
112
115 self.assertEqual([], Notification.query().all())
113 self.assertEqual([], Notification.query().all())
116 self.assertEqual([], UserNotification.query().all())
114 self.assertEqual([], UserNotification.query().all())
117
115
118 notification = NotificationModel().create(created_by=self.u1,
116 notification = NotificationModel().create(created_by=self.u1,
119 subject=u'title', body=u'hi there3',
117 subject=u'title', body=u'hi there3',
120 recipients=[self.u3, self.u1, self.u2])
118 recipients=[self.u3, self.u1, self.u2])
121 Session().commit()
119 Session().commit()
122
120
123 unotification = UserNotification.query()\
121 unotification = UserNotification.query()\
124 .filter(UserNotification.notification ==
122 .filter(UserNotification.notification ==
125 notification)\
123 notification)\
126 .filter(UserNotification.user_id == self.u3)\
124 .filter(UserNotification.user_id == self.u3)\
127 .scalar()
125 .scalar()
128
126
129 self.assertEqual(unotification.user_id, self.u3)
127 self.assertEqual(unotification.user_id, self.u3)
130
128
131 NotificationModel().delete(self.u3,
129 NotificationModel().delete(self.u3,
132 notification.notification_id)
130 notification.notification_id)
133 Session().commit()
131 Session().commit()
134
132
135 u3notification = UserNotification.query()\
133 u3notification = UserNotification.query()\
136 .filter(UserNotification.notification ==
134 .filter(UserNotification.notification ==
137 notification)\
135 notification)\
138 .filter(UserNotification.user_id == self.u3)\
136 .filter(UserNotification.user_id == self.u3)\
139 .scalar()
137 .scalar()
140
138
141 self.assertEqual(u3notification, None)
139 self.assertEqual(u3notification, None)
142
140
143 # notification object is still there
141 # notification object is still there
144 self.assertEqual(Notification.query().all(), [notification])
142 self.assertEqual(Notification.query().all(), [notification])
145
143
146 #u1 and u2 still have assignments
144 #u1 and u2 still have assignments
147 u1notification = UserNotification.query()\
145 u1notification = UserNotification.query()\
148 .filter(UserNotification.notification ==
146 .filter(UserNotification.notification ==
149 notification)\
147 notification)\
150 .filter(UserNotification.user_id == self.u1)\
148 .filter(UserNotification.user_id == self.u1)\
151 .scalar()
149 .scalar()
152 self.assertNotEqual(u1notification, None)
150 self.assertNotEqual(u1notification, None)
153 u2notification = UserNotification.query()\
151 u2notification = UserNotification.query()\
154 .filter(UserNotification.notification ==
152 .filter(UserNotification.notification ==
155 notification)\
153 notification)\
156 .filter(UserNotification.user_id == self.u2)\
154 .filter(UserNotification.user_id == self.u2)\
157 .scalar()
155 .scalar()
158 self.assertNotEqual(u2notification, None)
156 self.assertNotEqual(u2notification, None)
159
157
160 def test_notification_counter(self):
158 def test_notification_counter(self):
161 self._clean_notifications()
159 self._clean_notifications()
162 self.assertEqual([], Notification.query().all())
160 self.assertEqual([], Notification.query().all())
163 self.assertEqual([], UserNotification.query().all())
161 self.assertEqual([], UserNotification.query().all())
164
162
165 NotificationModel().create(created_by=self.u1,
163 NotificationModel().create(created_by=self.u1,
166 subject=u'title', body=u'hi there_delete',
164 subject=u'title', body=u'hi there_delete',
167 recipients=[self.u3, self.u1])
165 recipients=[self.u3, self.u1])
168 Session().commit()
166 Session().commit()
169
167
170 self.assertEqual(NotificationModel()
168 self.assertEqual(NotificationModel()
171 .get_unread_cnt_for_user(self.u1), 1)
169 .get_unread_cnt_for_user(self.u1), 1)
172 self.assertEqual(NotificationModel()
170 self.assertEqual(NotificationModel()
173 .get_unread_cnt_for_user(self.u2), 0)
171 .get_unread_cnt_for_user(self.u2), 0)
174 self.assertEqual(NotificationModel()
172 self.assertEqual(NotificationModel()
175 .get_unread_cnt_for_user(self.u3), 1)
173 .get_unread_cnt_for_user(self.u3), 1)
176
174
177 notification = NotificationModel().create(created_by=self.u1,
175 notification = NotificationModel().create(created_by=self.u1,
178 subject=u'title', body=u'hi there3',
176 subject=u'title', body=u'hi there3',
179 recipients=[self.u3, self.u1, self.u2])
177 recipients=[self.u3, self.u1, self.u2])
180 Session().commit()
178 Session().commit()
181
179
182 self.assertEqual(NotificationModel()
180 self.assertEqual(NotificationModel()
183 .get_unread_cnt_for_user(self.u1), 2)
181 .get_unread_cnt_for_user(self.u1), 2)
184 self.assertEqual(NotificationModel()
182 self.assertEqual(NotificationModel()
185 .get_unread_cnt_for_user(self.u2), 1)
183 .get_unread_cnt_for_user(self.u2), 1)
186 self.assertEqual(NotificationModel()
184 self.assertEqual(NotificationModel()
187 .get_unread_cnt_for_user(self.u3), 2)
185 .get_unread_cnt_for_user(self.u3), 2)
@@ -1,529 +1,527 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
1 from rhodecode.tests import *
4 from rhodecode.tests.fixture import Fixture
2 from rhodecode.tests.fixture import Fixture
5 from rhodecode.model.repos_group import ReposGroupModel
3 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
4 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm,\
5 from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm,\
8 Permission, UserToPerm
6 Permission, UserToPerm
9 from rhodecode.model.user import UserModel
7 from rhodecode.model.user import UserModel
10
8
11 from rhodecode.model.meta import Session
9 from rhodecode.model.meta import Session
12 from rhodecode.model.users_group import UserGroupModel
10 from rhodecode.model.users_group import UserGroupModel
13 from rhodecode.lib.auth import AuthUser
11 from rhodecode.lib.auth import AuthUser
14 from rhodecode.model.permission import PermissionModel
12 from rhodecode.model.permission import PermissionModel
15
13
16
14
17 fixture = Fixture()
15 fixture = Fixture()
18
16
19
17
20 class TestPermissions(unittest.TestCase):
18 class TestPermissions(BaseTestCase):
21 def __init__(self, methodName='runTest'):
19 def __init__(self, methodName='runTest'):
22 super(TestPermissions, self).__init__(methodName=methodName)
20 super(TestPermissions, self).__init__(methodName=methodName)
23
21
24 def setUp(self):
22 def setUp(self):
25 self.u1 = UserModel().create_or_update(
23 self.u1 = UserModel().create_or_update(
26 username=u'u1', password=u'qweqwe',
24 username=u'u1', password=u'qweqwe',
27 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
25 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
28 )
26 )
29 self.u2 = UserModel().create_or_update(
27 self.u2 = UserModel().create_or_update(
30 username=u'u2', password=u'qweqwe',
28 username=u'u2', password=u'qweqwe',
31 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
29 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
32 )
30 )
33 self.u3 = UserModel().create_or_update(
31 self.u3 = UserModel().create_or_update(
34 username=u'u3', password=u'qweqwe',
32 username=u'u3', password=u'qweqwe',
35 email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
33 email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
36 )
34 )
37 self.anon = User.get_default_user()
35 self.anon = User.get_default_user()
38 self.a1 = UserModel().create_or_update(
36 self.a1 = UserModel().create_or_update(
39 username=u'a1', password=u'qweqwe',
37 username=u'a1', password=u'qweqwe',
40 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
38 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
41 )
39 )
42 Session().commit()
40 Session().commit()
43
41
44 def tearDown(self):
42 def tearDown(self):
45 if hasattr(self, 'test_repo'):
43 if hasattr(self, 'test_repo'):
46 RepoModel().delete(repo=self.test_repo)
44 RepoModel().delete(repo=self.test_repo)
47
45
48 UserModel().delete(self.u1)
46 UserModel().delete(self.u1)
49 UserModel().delete(self.u2)
47 UserModel().delete(self.u2)
50 UserModel().delete(self.u3)
48 UserModel().delete(self.u3)
51 UserModel().delete(self.a1)
49 UserModel().delete(self.a1)
52 if hasattr(self, 'g1'):
50 if hasattr(self, 'g1'):
53 ReposGroupModel().delete(self.g1.group_id)
51 ReposGroupModel().delete(self.g1.group_id)
54 if hasattr(self, 'g2'):
52 if hasattr(self, 'g2'):
55 ReposGroupModel().delete(self.g2.group_id)
53 ReposGroupModel().delete(self.g2.group_id)
56
54
57 if hasattr(self, 'ug1'):
55 if hasattr(self, 'ug1'):
58 UserGroupModel().delete(self.ug1, force=True)
56 UserGroupModel().delete(self.ug1, force=True)
59
57
60 Session().commit()
58 Session().commit()
61
59
62 def test_default_perms_set(self):
60 def test_default_perms_set(self):
63 u1_auth = AuthUser(user_id=self.u1.user_id)
61 u1_auth = AuthUser(user_id=self.u1.user_id)
64 perms = {
62 perms = {
65 'repositories_groups': {},
63 'repositories_groups': {},
66 'global': set([u'hg.create.repository', u'repository.read',
64 'global': set([u'hg.create.repository', u'repository.read',
67 u'hg.register.manual_activate']),
65 u'hg.register.manual_activate']),
68 'repositories': {u'vcs_test_hg': u'repository.read'}
66 'repositories': {u'vcs_test_hg': u'repository.read'}
69 }
67 }
70 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
68 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
71 perms['repositories'][HG_REPO])
69 perms['repositories'][HG_REPO])
72 new_perm = 'repository.write'
70 new_perm = 'repository.write'
73 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
71 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
74 perm=new_perm)
72 perm=new_perm)
75 Session().commit()
73 Session().commit()
76
74
77 u1_auth = AuthUser(user_id=self.u1.user_id)
75 u1_auth = AuthUser(user_id=self.u1.user_id)
78 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
76 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
79 new_perm)
77 new_perm)
80
78
81 def test_default_admin_perms_set(self):
79 def test_default_admin_perms_set(self):
82 a1_auth = AuthUser(user_id=self.a1.user_id)
80 a1_auth = AuthUser(user_id=self.a1.user_id)
83 perms = {
81 perms = {
84 'repositories_groups': {},
82 'repositories_groups': {},
85 'global': set([u'hg.admin']),
83 'global': set([u'hg.admin']),
86 'repositories': {u'vcs_test_hg': u'repository.admin'}
84 'repositories': {u'vcs_test_hg': u'repository.admin'}
87 }
85 }
88 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
86 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
89 perms['repositories'][HG_REPO])
87 perms['repositories'][HG_REPO])
90 new_perm = 'repository.write'
88 new_perm = 'repository.write'
91 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
89 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
92 perm=new_perm)
90 perm=new_perm)
93 Session().commit()
91 Session().commit()
94 # cannot really downgrade admins permissions !? they still get's set as
92 # cannot really downgrade admins permissions !? they still get's set as
95 # admin !
93 # admin !
96 u1_auth = AuthUser(user_id=self.a1.user_id)
94 u1_auth = AuthUser(user_id=self.a1.user_id)
97 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
95 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
98 perms['repositories'][HG_REPO])
96 perms['repositories'][HG_REPO])
99
97
100 def test_default_group_perms(self):
98 def test_default_group_perms(self):
101 self.g1 = fixture.create_group('test1', skip_if_exists=True)
99 self.g1 = fixture.create_group('test1', skip_if_exists=True)
102 self.g2 = fixture.create_group('test2', skip_if_exists=True)
100 self.g2 = fixture.create_group('test2', skip_if_exists=True)
103 u1_auth = AuthUser(user_id=self.u1.user_id)
101 u1_auth = AuthUser(user_id=self.u1.user_id)
104 perms = {
102 perms = {
105 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
103 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
106 'global': set(Permission.DEFAULT_USER_PERMISSIONS),
104 'global': set(Permission.DEFAULT_USER_PERMISSIONS),
107 'repositories': {u'vcs_test_hg': u'repository.read'}
105 'repositories': {u'vcs_test_hg': u'repository.read'}
108 }
106 }
109 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
107 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
110 perms['repositories'][HG_REPO])
108 perms['repositories'][HG_REPO])
111 self.assertEqual(u1_auth.permissions['repositories_groups'],
109 self.assertEqual(u1_auth.permissions['repositories_groups'],
112 perms['repositories_groups'])
110 perms['repositories_groups'])
113 self.assertEqual(u1_auth.permissions['global'],
111 self.assertEqual(u1_auth.permissions['global'],
114 perms['global'])
112 perms['global'])
115
113
116 def test_default_admin_group_perms(self):
114 def test_default_admin_group_perms(self):
117 self.g1 = fixture.create_group('test1', skip_if_exists=True)
115 self.g1 = fixture.create_group('test1', skip_if_exists=True)
118 self.g2 = fixture.create_group('test2', skip_if_exists=True)
116 self.g2 = fixture.create_group('test2', skip_if_exists=True)
119 a1_auth = AuthUser(user_id=self.a1.user_id)
117 a1_auth = AuthUser(user_id=self.a1.user_id)
120 perms = {
118 perms = {
121 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
119 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
122 'global': set(['hg.admin']),
120 'global': set(['hg.admin']),
123 'repositories': {u'vcs_test_hg': 'repository.admin'}
121 'repositories': {u'vcs_test_hg': 'repository.admin'}
124 }
122 }
125
123
126 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
124 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
127 perms['repositories'][HG_REPO])
125 perms['repositories'][HG_REPO])
128 self.assertEqual(a1_auth.permissions['repositories_groups'],
126 self.assertEqual(a1_auth.permissions['repositories_groups'],
129 perms['repositories_groups'])
127 perms['repositories_groups'])
130
128
131 def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
129 def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
132 # make group
130 # make group
133 self.ug1 = fixture.create_user_group('G1')
131 self.ug1 = fixture.create_user_group('G1')
134 UserGroupModel().add_user_to_group(self.ug1, self.u1)
132 UserGroupModel().add_user_to_group(self.ug1, self.u1)
135
133
136 # set permission to lower
134 # set permission to lower
137 new_perm = 'repository.none'
135 new_perm = 'repository.none'
138 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
136 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
139 Session().commit()
137 Session().commit()
140 u1_auth = AuthUser(user_id=self.u1.user_id)
138 u1_auth = AuthUser(user_id=self.u1.user_id)
141 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
139 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
142 new_perm)
140 new_perm)
143
141
144 # grant perm for group this should not override permission from user
142 # grant perm for group this should not override permission from user
145 # since it has explicitly set
143 # since it has explicitly set
146 new_perm_gr = 'repository.write'
144 new_perm_gr = 'repository.write'
147 RepoModel().grant_users_group_permission(repo=HG_REPO,
145 RepoModel().grant_users_group_permission(repo=HG_REPO,
148 group_name=self.ug1,
146 group_name=self.ug1,
149 perm=new_perm_gr)
147 perm=new_perm_gr)
150 # check perms
148 # check perms
151 u1_auth = AuthUser(user_id=self.u1.user_id)
149 u1_auth = AuthUser(user_id=self.u1.user_id)
152 perms = {
150 perms = {
153 'repositories_groups': {},
151 'repositories_groups': {},
154 'global': set([u'hg.create.repository', u'repository.read',
152 'global': set([u'hg.create.repository', u'repository.read',
155 u'hg.register.manual_activate']),
153 u'hg.register.manual_activate']),
156 'repositories': {u'vcs_test_hg': u'repository.read'}
154 'repositories': {u'vcs_test_hg': u'repository.read'}
157 }
155 }
158 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
159 new_perm)
157 new_perm)
160 self.assertEqual(u1_auth.permissions['repositories_groups'],
158 self.assertEqual(u1_auth.permissions['repositories_groups'],
161 perms['repositories_groups'])
159 perms['repositories_groups'])
162
160
163 def test_propagated_permission_from_users_group(self):
161 def test_propagated_permission_from_users_group(self):
164 # make group
162 # make group
165 self.ug1 = fixture.create_user_group('G1')
163 self.ug1 = fixture.create_user_group('G1')
166 UserGroupModel().add_user_to_group(self.ug1, self.u3)
164 UserGroupModel().add_user_to_group(self.ug1, self.u3)
167
165
168 # grant perm for group this should override default permission from user
166 # grant perm for group this should override default permission from user
169 new_perm_gr = 'repository.write'
167 new_perm_gr = 'repository.write'
170 RepoModel().grant_users_group_permission(repo=HG_REPO,
168 RepoModel().grant_users_group_permission(repo=HG_REPO,
171 group_name=self.ug1,
169 group_name=self.ug1,
172 perm=new_perm_gr)
170 perm=new_perm_gr)
173 # check perms
171 # check perms
174 u3_auth = AuthUser(user_id=self.u3.user_id)
172 u3_auth = AuthUser(user_id=self.u3.user_id)
175 perms = {
173 perms = {
176 'repositories_groups': {},
174 'repositories_groups': {},
177 'global': set([u'hg.create.repository', u'repository.read',
175 'global': set([u'hg.create.repository', u'repository.read',
178 u'hg.register.manual_activate']),
176 u'hg.register.manual_activate']),
179 'repositories': {u'vcs_test_hg': u'repository.read'}
177 'repositories': {u'vcs_test_hg': u'repository.read'}
180 }
178 }
181 self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
179 self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
182 new_perm_gr)
180 new_perm_gr)
183 self.assertEqual(u3_auth.permissions['repositories_groups'],
181 self.assertEqual(u3_auth.permissions['repositories_groups'],
184 perms['repositories_groups'])
182 perms['repositories_groups'])
185
183
186 def test_propagated_permission_from_users_group_lower_weight(self):
184 def test_propagated_permission_from_users_group_lower_weight(self):
187 # make group
185 # make group
188 self.ug1 = fixture.create_user_group('G1')
186 self.ug1 = fixture.create_user_group('G1')
189 # add user to group
187 # add user to group
190 UserGroupModel().add_user_to_group(self.ug1, self.u1)
188 UserGroupModel().add_user_to_group(self.ug1, self.u1)
191
189
192 # set permission to lower
190 # set permission to lower
193 new_perm_h = 'repository.write'
191 new_perm_h = 'repository.write'
194 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
192 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
195 perm=new_perm_h)
193 perm=new_perm_h)
196 Session().commit()
194 Session().commit()
197 u1_auth = AuthUser(user_id=self.u1.user_id)
195 u1_auth = AuthUser(user_id=self.u1.user_id)
198 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
196 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
199 new_perm_h)
197 new_perm_h)
200
198
201 # grant perm for group this should NOT override permission from user
199 # grant perm for group this should NOT override permission from user
202 # since it's lower than granted
200 # since it's lower than granted
203 new_perm_l = 'repository.read'
201 new_perm_l = 'repository.read'
204 RepoModel().grant_users_group_permission(repo=HG_REPO,
202 RepoModel().grant_users_group_permission(repo=HG_REPO,
205 group_name=self.ug1,
203 group_name=self.ug1,
206 perm=new_perm_l)
204 perm=new_perm_l)
207 # check perms
205 # check perms
208 u1_auth = AuthUser(user_id=self.u1.user_id)
206 u1_auth = AuthUser(user_id=self.u1.user_id)
209 perms = {
207 perms = {
210 'repositories_groups': {},
208 'repositories_groups': {},
211 'global': set([u'hg.create.repository', u'repository.read',
209 'global': set([u'hg.create.repository', u'repository.read',
212 u'hg.register.manual_activate']),
210 u'hg.register.manual_activate']),
213 'repositories': {u'vcs_test_hg': u'repository.write'}
211 'repositories': {u'vcs_test_hg': u'repository.write'}
214 }
212 }
215 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
213 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
216 new_perm_h)
214 new_perm_h)
217 self.assertEqual(u1_auth.permissions['repositories_groups'],
215 self.assertEqual(u1_auth.permissions['repositories_groups'],
218 perms['repositories_groups'])
216 perms['repositories_groups'])
219
217
220 def test_repo_in_group_permissions(self):
218 def test_repo_in_group_permissions(self):
221 self.g1 = fixture.create_group('group1', skip_if_exists=True)
219 self.g1 = fixture.create_group('group1', skip_if_exists=True)
222 self.g2 = fixture.create_group('group2', skip_if_exists=True)
220 self.g2 = fixture.create_group('group2', skip_if_exists=True)
223 # both perms should be read !
221 # both perms should be read !
224 u1_auth = AuthUser(user_id=self.u1.user_id)
222 u1_auth = AuthUser(user_id=self.u1.user_id)
225 self.assertEqual(u1_auth.permissions['repositories_groups'],
223 self.assertEqual(u1_auth.permissions['repositories_groups'],
226 {u'group1': u'group.read', u'group2': u'group.read'})
224 {u'group1': u'group.read', u'group2': u'group.read'})
227
225
228 a1_auth = AuthUser(user_id=self.anon.user_id)
226 a1_auth = AuthUser(user_id=self.anon.user_id)
229 self.assertEqual(a1_auth.permissions['repositories_groups'],
227 self.assertEqual(a1_auth.permissions['repositories_groups'],
230 {u'group1': u'group.read', u'group2': u'group.read'})
228 {u'group1': u'group.read', u'group2': u'group.read'})
231
229
232 #Change perms to none for both groups
230 #Change perms to none for both groups
233 ReposGroupModel().grant_user_permission(repos_group=self.g1,
231 ReposGroupModel().grant_user_permission(repos_group=self.g1,
234 user=self.anon,
232 user=self.anon,
235 perm='group.none')
233 perm='group.none')
236 ReposGroupModel().grant_user_permission(repos_group=self.g2,
234 ReposGroupModel().grant_user_permission(repos_group=self.g2,
237 user=self.anon,
235 user=self.anon,
238 perm='group.none')
236 perm='group.none')
239
237
240 u1_auth = AuthUser(user_id=self.u1.user_id)
238 u1_auth = AuthUser(user_id=self.u1.user_id)
241 self.assertEqual(u1_auth.permissions['repositories_groups'],
239 self.assertEqual(u1_auth.permissions['repositories_groups'],
242 {u'group1': u'group.none', u'group2': u'group.none'})
240 {u'group1': u'group.none', u'group2': u'group.none'})
243
241
244 a1_auth = AuthUser(user_id=self.anon.user_id)
242 a1_auth = AuthUser(user_id=self.anon.user_id)
245 self.assertEqual(a1_auth.permissions['repositories_groups'],
243 self.assertEqual(a1_auth.permissions['repositories_groups'],
246 {u'group1': u'group.none', u'group2': u'group.none'})
244 {u'group1': u'group.none', u'group2': u'group.none'})
247
245
248 # add repo to group
246 # add repo to group
249 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
247 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
250 self.test_repo = fixture.create_repo(name=name,
248 self.test_repo = fixture.create_repo(name=name,
251 repo_type='hg',
249 repo_type='hg',
252 repos_group=self.g1,
250 repos_group=self.g1,
253 cur_user=self.u1,)
251 cur_user=self.u1,)
254
252
255 u1_auth = AuthUser(user_id=self.u1.user_id)
253 u1_auth = AuthUser(user_id=self.u1.user_id)
256 self.assertEqual(u1_auth.permissions['repositories_groups'],
254 self.assertEqual(u1_auth.permissions['repositories_groups'],
257 {u'group1': u'group.none', u'group2': u'group.none'})
255 {u'group1': u'group.none', u'group2': u'group.none'})
258
256
259 a1_auth = AuthUser(user_id=self.anon.user_id)
257 a1_auth = AuthUser(user_id=self.anon.user_id)
260 self.assertEqual(a1_auth.permissions['repositories_groups'],
258 self.assertEqual(a1_auth.permissions['repositories_groups'],
261 {u'group1': u'group.none', u'group2': u'group.none'})
259 {u'group1': u'group.none', u'group2': u'group.none'})
262
260
263 #grant permission for u2 !
261 #grant permission for u2 !
264 ReposGroupModel().grant_user_permission(repos_group=self.g1,
262 ReposGroupModel().grant_user_permission(repos_group=self.g1,
265 user=self.u2,
263 user=self.u2,
266 perm='group.read')
264 perm='group.read')
267 ReposGroupModel().grant_user_permission(repos_group=self.g2,
265 ReposGroupModel().grant_user_permission(repos_group=self.g2,
268 user=self.u2,
266 user=self.u2,
269 perm='group.read')
267 perm='group.read')
270 Session().commit()
268 Session().commit()
271 self.assertNotEqual(self.u1, self.u2)
269 self.assertNotEqual(self.u1, self.u2)
272 #u1 and anon should have not change perms while u2 should !
270 #u1 and anon should have not change perms while u2 should !
273 u1_auth = AuthUser(user_id=self.u1.user_id)
271 u1_auth = AuthUser(user_id=self.u1.user_id)
274 self.assertEqual(u1_auth.permissions['repositories_groups'],
272 self.assertEqual(u1_auth.permissions['repositories_groups'],
275 {u'group1': u'group.none', u'group2': u'group.none'})
273 {u'group1': u'group.none', u'group2': u'group.none'})
276
274
277 u2_auth = AuthUser(user_id=self.u2.user_id)
275 u2_auth = AuthUser(user_id=self.u2.user_id)
278 self.assertEqual(u2_auth.permissions['repositories_groups'],
276 self.assertEqual(u2_auth.permissions['repositories_groups'],
279 {u'group1': u'group.read', u'group2': u'group.read'})
277 {u'group1': u'group.read', u'group2': u'group.read'})
280
278
281 a1_auth = AuthUser(user_id=self.anon.user_id)
279 a1_auth = AuthUser(user_id=self.anon.user_id)
282 self.assertEqual(a1_auth.permissions['repositories_groups'],
280 self.assertEqual(a1_auth.permissions['repositories_groups'],
283 {u'group1': u'group.none', u'group2': u'group.none'})
281 {u'group1': u'group.none', u'group2': u'group.none'})
284
282
285 def test_repo_group_user_as_user_group_member(self):
283 def test_repo_group_user_as_user_group_member(self):
286 # create Group1
284 # create Group1
287 self.g1 = fixture.create_group('group1', skip_if_exists=True)
285 self.g1 = fixture.create_group('group1', skip_if_exists=True)
288 a1_auth = AuthUser(user_id=self.anon.user_id)
286 a1_auth = AuthUser(user_id=self.anon.user_id)
289
287
290 self.assertEqual(a1_auth.permissions['repositories_groups'],
288 self.assertEqual(a1_auth.permissions['repositories_groups'],
291 {u'group1': u'group.read'})
289 {u'group1': u'group.read'})
292
290
293 # set default permission to none
291 # set default permission to none
294 ReposGroupModel().grant_user_permission(repos_group=self.g1,
292 ReposGroupModel().grant_user_permission(repos_group=self.g1,
295 user=self.anon,
293 user=self.anon,
296 perm='group.none')
294 perm='group.none')
297 # make group
295 # make group
298 self.ug1 = fixture.create_user_group('G1')
296 self.ug1 = fixture.create_user_group('G1')
299 # add user to group
297 # add user to group
300 UserGroupModel().add_user_to_group(self.ug1, self.u1)
298 UserGroupModel().add_user_to_group(self.ug1, self.u1)
301 Session().commit()
299 Session().commit()
302
300
303 # check if user is in the group
301 # check if user is in the group
304 membrs = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
302 membrs = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
305 self.assertEqual(membrs, [self.u1.user_id])
303 self.assertEqual(membrs, [self.u1.user_id])
306 # add some user to that group
304 # add some user to that group
307
305
308 # check his permissions
306 # check his permissions
309 a1_auth = AuthUser(user_id=self.anon.user_id)
307 a1_auth = AuthUser(user_id=self.anon.user_id)
310 self.assertEqual(a1_auth.permissions['repositories_groups'],
308 self.assertEqual(a1_auth.permissions['repositories_groups'],
311 {u'group1': u'group.none'})
309 {u'group1': u'group.none'})
312
310
313 u1_auth = AuthUser(user_id=self.u1.user_id)
311 u1_auth = AuthUser(user_id=self.u1.user_id)
314 self.assertEqual(u1_auth.permissions['repositories_groups'],
312 self.assertEqual(u1_auth.permissions['repositories_groups'],
315 {u'group1': u'group.none'})
313 {u'group1': u'group.none'})
316
314
317 # grant ug1 read permissions for
315 # grant ug1 read permissions for
318 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
316 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
319 group_name=self.ug1,
317 group_name=self.ug1,
320 perm='group.read')
318 perm='group.read')
321 Session().commit()
319 Session().commit()
322 # check if the
320 # check if the
323 obj = Session().query(UserGroupRepoGroupToPerm)\
321 obj = Session().query(UserGroupRepoGroupToPerm)\
324 .filter(UserGroupRepoGroupToPerm.group == self.g1)\
322 .filter(UserGroupRepoGroupToPerm.group == self.g1)\
325 .filter(UserGroupRepoGroupToPerm.users_group == self.ug1)\
323 .filter(UserGroupRepoGroupToPerm.users_group == self.ug1)\
326 .scalar()
324 .scalar()
327 self.assertEqual(obj.permission.permission_name, 'group.read')
325 self.assertEqual(obj.permission.permission_name, 'group.read')
328
326
329 a1_auth = AuthUser(user_id=self.anon.user_id)
327 a1_auth = AuthUser(user_id=self.anon.user_id)
330
328
331 self.assertEqual(a1_auth.permissions['repositories_groups'],
329 self.assertEqual(a1_auth.permissions['repositories_groups'],
332 {u'group1': u'group.none'})
330 {u'group1': u'group.none'})
333
331
334 u1_auth = AuthUser(user_id=self.u1.user_id)
332 u1_auth = AuthUser(user_id=self.u1.user_id)
335 self.assertEqual(u1_auth.permissions['repositories_groups'],
333 self.assertEqual(u1_auth.permissions['repositories_groups'],
336 {u'group1': u'group.read'})
334 {u'group1': u'group.read'})
337
335
338 def test_inherited_permissions_from_default_on_user_enabled(self):
336 def test_inherited_permissions_from_default_on_user_enabled(self):
339 user_model = UserModel()
337 user_model = UserModel()
340 # enable fork and create on default user
338 # enable fork and create on default user
341 usr = 'default'
339 usr = 'default'
342 user_model.revoke_perm(usr, 'hg.create.none')
340 user_model.revoke_perm(usr, 'hg.create.none')
343 user_model.grant_perm(usr, 'hg.create.repository')
341 user_model.grant_perm(usr, 'hg.create.repository')
344 user_model.revoke_perm(usr, 'hg.fork.none')
342 user_model.revoke_perm(usr, 'hg.fork.none')
345 user_model.grant_perm(usr, 'hg.fork.repository')
343 user_model.grant_perm(usr, 'hg.fork.repository')
346 # make sure inherit flag is turned on
344 # make sure inherit flag is turned on
347 self.u1.inherit_default_permissions = True
345 self.u1.inherit_default_permissions = True
348 Session().commit()
346 Session().commit()
349 u1_auth = AuthUser(user_id=self.u1.user_id)
347 u1_auth = AuthUser(user_id=self.u1.user_id)
350 # this user will have inherited permissions from default user
348 # this user will have inherited permissions from default user
351 self.assertEqual(u1_auth.permissions['global'],
349 self.assertEqual(u1_auth.permissions['global'],
352 set(['hg.create.repository', 'hg.fork.repository',
350 set(['hg.create.repository', 'hg.fork.repository',
353 'hg.register.manual_activate',
351 'hg.register.manual_activate',
354 'hg.extern_activate.auto',
352 'hg.extern_activate.auto',
355 'repository.read', 'group.read',
353 'repository.read', 'group.read',
356 'usergroup.read']))
354 'usergroup.read']))
357
355
358 def test_inherited_permissions_from_default_on_user_disabled(self):
356 def test_inherited_permissions_from_default_on_user_disabled(self):
359 user_model = UserModel()
357 user_model = UserModel()
360 # disable fork and create on default user
358 # disable fork and create on default user
361 usr = 'default'
359 usr = 'default'
362 user_model.revoke_perm(usr, 'hg.create.repository')
360 user_model.revoke_perm(usr, 'hg.create.repository')
363 user_model.grant_perm(usr, 'hg.create.none')
361 user_model.grant_perm(usr, 'hg.create.none')
364 user_model.revoke_perm(usr, 'hg.fork.repository')
362 user_model.revoke_perm(usr, 'hg.fork.repository')
365 user_model.grant_perm(usr, 'hg.fork.none')
363 user_model.grant_perm(usr, 'hg.fork.none')
366 # make sure inherit flag is turned on
364 # make sure inherit flag is turned on
367 self.u1.inherit_default_permissions = True
365 self.u1.inherit_default_permissions = True
368 Session().commit()
366 Session().commit()
369 u1_auth = AuthUser(user_id=self.u1.user_id)
367 u1_auth = AuthUser(user_id=self.u1.user_id)
370 # this user will have inherited permissions from default user
368 # this user will have inherited permissions from default user
371 self.assertEqual(u1_auth.permissions['global'],
369 self.assertEqual(u1_auth.permissions['global'],
372 set(['hg.create.none', 'hg.fork.none',
370 set(['hg.create.none', 'hg.fork.none',
373 'hg.register.manual_activate',
371 'hg.register.manual_activate',
374 'hg.extern_activate.auto',
372 'hg.extern_activate.auto',
375 'repository.read', 'group.read',
373 'repository.read', 'group.read',
376 'usergroup.read']))
374 'usergroup.read']))
377
375
378 def test_non_inherited_permissions_from_default_on_user_enabled(self):
376 def test_non_inherited_permissions_from_default_on_user_enabled(self):
379 user_model = UserModel()
377 user_model = UserModel()
380 # enable fork and create on default user
378 # enable fork and create on default user
381 usr = 'default'
379 usr = 'default'
382 user_model.revoke_perm(usr, 'hg.create.none')
380 user_model.revoke_perm(usr, 'hg.create.none')
383 user_model.grant_perm(usr, 'hg.create.repository')
381 user_model.grant_perm(usr, 'hg.create.repository')
384 user_model.revoke_perm(usr, 'hg.fork.none')
382 user_model.revoke_perm(usr, 'hg.fork.none')
385 user_model.grant_perm(usr, 'hg.fork.repository')
383 user_model.grant_perm(usr, 'hg.fork.repository')
386
384
387 #disable global perms on specific user
385 #disable global perms on specific user
388 user_model.revoke_perm(self.u1, 'hg.create.repository')
386 user_model.revoke_perm(self.u1, 'hg.create.repository')
389 user_model.grant_perm(self.u1, 'hg.create.none')
387 user_model.grant_perm(self.u1, 'hg.create.none')
390 user_model.revoke_perm(self.u1, 'hg.fork.repository')
388 user_model.revoke_perm(self.u1, 'hg.fork.repository')
391 user_model.grant_perm(self.u1, 'hg.fork.none')
389 user_model.grant_perm(self.u1, 'hg.fork.none')
392
390
393 # make sure inherit flag is turned off
391 # make sure inherit flag is turned off
394 self.u1.inherit_default_permissions = False
392 self.u1.inherit_default_permissions = False
395 Session().commit()
393 Session().commit()
396 u1_auth = AuthUser(user_id=self.u1.user_id)
394 u1_auth = AuthUser(user_id=self.u1.user_id)
397 # this user will have non inherited permissions from he's
395 # this user will have non inherited permissions from he's
398 # explicitly set permissions
396 # explicitly set permissions
399 self.assertEqual(u1_auth.permissions['global'],
397 self.assertEqual(u1_auth.permissions['global'],
400 set(['hg.create.none', 'hg.fork.none',
398 set(['hg.create.none', 'hg.fork.none',
401 'hg.register.manual_activate',
399 'hg.register.manual_activate',
402 'hg.extern_activate.auto',
400 'hg.extern_activate.auto',
403 'repository.read', 'group.read',
401 'repository.read', 'group.read',
404 'usergroup.read']))
402 'usergroup.read']))
405
403
406 def test_non_inherited_permissions_from_default_on_user_disabled(self):
404 def test_non_inherited_permissions_from_default_on_user_disabled(self):
407 user_model = UserModel()
405 user_model = UserModel()
408 # disable fork and create on default user
406 # disable fork and create on default user
409 usr = 'default'
407 usr = 'default'
410 user_model.revoke_perm(usr, 'hg.create.repository')
408 user_model.revoke_perm(usr, 'hg.create.repository')
411 user_model.grant_perm(usr, 'hg.create.none')
409 user_model.grant_perm(usr, 'hg.create.none')
412 user_model.revoke_perm(usr, 'hg.fork.repository')
410 user_model.revoke_perm(usr, 'hg.fork.repository')
413 user_model.grant_perm(usr, 'hg.fork.none')
411 user_model.grant_perm(usr, 'hg.fork.none')
414
412
415 #enable global perms on specific user
413 #enable global perms on specific user
416 user_model.revoke_perm(self.u1, 'hg.create.none')
414 user_model.revoke_perm(self.u1, 'hg.create.none')
417 user_model.grant_perm(self.u1, 'hg.create.repository')
415 user_model.grant_perm(self.u1, 'hg.create.repository')
418 user_model.revoke_perm(self.u1, 'hg.fork.none')
416 user_model.revoke_perm(self.u1, 'hg.fork.none')
419 user_model.grant_perm(self.u1, 'hg.fork.repository')
417 user_model.grant_perm(self.u1, 'hg.fork.repository')
420
418
421 # make sure inherit flag is turned off
419 # make sure inherit flag is turned off
422 self.u1.inherit_default_permissions = False
420 self.u1.inherit_default_permissions = False
423 Session().commit()
421 Session().commit()
424 u1_auth = AuthUser(user_id=self.u1.user_id)
422 u1_auth = AuthUser(user_id=self.u1.user_id)
425 # this user will have non inherited permissions from he's
423 # this user will have non inherited permissions from he's
426 # explicitly set permissions
424 # explicitly set permissions
427 self.assertEqual(u1_auth.permissions['global'],
425 self.assertEqual(u1_auth.permissions['global'],
428 set(['hg.create.repository', 'hg.fork.repository',
426 set(['hg.create.repository', 'hg.fork.repository',
429 'hg.register.manual_activate',
427 'hg.register.manual_activate',
430 'hg.extern_activate.auto',
428 'hg.extern_activate.auto',
431 'repository.read', 'group.read',
429 'repository.read', 'group.read',
432 'usergroup.read']))
430 'usergroup.read']))
433
431
434 def test_owner_permissions_doesnot_get_overwritten_by_group(self):
432 def test_owner_permissions_doesnot_get_overwritten_by_group(self):
435 #create repo as USER,
433 #create repo as USER,
436 self.test_repo = fixture.create_repo(name='myownrepo',
434 self.test_repo = fixture.create_repo(name='myownrepo',
437 repo_type='hg',
435 repo_type='hg',
438 cur_user=self.u1)
436 cur_user=self.u1)
439
437
440 #he has permissions of admin as owner
438 #he has permissions of admin as owner
441 u1_auth = AuthUser(user_id=self.u1.user_id)
439 u1_auth = AuthUser(user_id=self.u1.user_id)
442 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
440 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
443 'repository.admin')
441 'repository.admin')
444 #set his permission as user group, he should still be admin
442 #set his permission as user group, he should still be admin
445 self.ug1 = fixture.create_user_group('G1')
443 self.ug1 = fixture.create_user_group('G1')
446 UserGroupModel().add_user_to_group(self.ug1, self.u1)
444 UserGroupModel().add_user_to_group(self.ug1, self.u1)
447 RepoModel().grant_users_group_permission(self.test_repo,
445 RepoModel().grant_users_group_permission(self.test_repo,
448 group_name=self.ug1,
446 group_name=self.ug1,
449 perm='repository.none')
447 perm='repository.none')
450
448
451 Session().commit()
449 Session().commit()
452 u1_auth = AuthUser(user_id=self.u1.user_id)
450 u1_auth = AuthUser(user_id=self.u1.user_id)
453 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
451 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
454 'repository.admin')
452 'repository.admin')
455
453
456 def test_owner_permissions_doesnot_get_overwritten_by_others(self):
454 def test_owner_permissions_doesnot_get_overwritten_by_others(self):
457 #create repo as USER,
455 #create repo as USER,
458 self.test_repo = fixture.create_repo(name='myownrepo',
456 self.test_repo = fixture.create_repo(name='myownrepo',
459 repo_type='hg',
457 repo_type='hg',
460 cur_user=self.u1)
458 cur_user=self.u1)
461
459
462 #he has permissions of admin as owner
460 #he has permissions of admin as owner
463 u1_auth = AuthUser(user_id=self.u1.user_id)
461 u1_auth = AuthUser(user_id=self.u1.user_id)
464 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
462 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
465 'repository.admin')
463 'repository.admin')
466 #set his permission as user, he should still be admin
464 #set his permission as user, he should still be admin
467 RepoModel().grant_user_permission(self.test_repo, user=self.u1,
465 RepoModel().grant_user_permission(self.test_repo, user=self.u1,
468 perm='repository.none')
466 perm='repository.none')
469 Session().commit()
467 Session().commit()
470 u1_auth = AuthUser(user_id=self.u1.user_id)
468 u1_auth = AuthUser(user_id=self.u1.user_id)
471 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
469 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
472 'repository.admin')
470 'repository.admin')
473
471
474 def _test_def_perm_equal(self, user, change_factor=0):
472 def _test_def_perm_equal(self, user, change_factor=0):
475 perms = UserToPerm.query()\
473 perms = UserToPerm.query()\
476 .filter(UserToPerm.user == user)\
474 .filter(UserToPerm.user == user)\
477 .all()
475 .all()
478 self.assertEqual(len(perms),
476 self.assertEqual(len(perms),
479 len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor,
477 len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor,
480 msg=perms)
478 msg=perms)
481
479
482 def test_set_default_permissions(self):
480 def test_set_default_permissions(self):
483 PermissionModel().create_default_permissions(user=self.u1)
481 PermissionModel().create_default_permissions(user=self.u1)
484 self._test_def_perm_equal(user=self.u1)
482 self._test_def_perm_equal(user=self.u1)
485
483
486 def test_set_default_permissions_after_one_is_missing(self):
484 def test_set_default_permissions_after_one_is_missing(self):
487 PermissionModel().create_default_permissions(user=self.u1)
485 PermissionModel().create_default_permissions(user=self.u1)
488 self._test_def_perm_equal(user=self.u1)
486 self._test_def_perm_equal(user=self.u1)
489 #now we delete one, it should be re-created after another call
487 #now we delete one, it should be re-created after another call
490 perms = UserToPerm.query()\
488 perms = UserToPerm.query()\
491 .filter(UserToPerm.user == self.u1)\
489 .filter(UserToPerm.user == self.u1)\
492 .all()
490 .all()
493 Session().delete(perms[0])
491 Session().delete(perms[0])
494 Session().commit()
492 Session().commit()
495
493
496 self._test_def_perm_equal(user=self.u1, change_factor=-1)
494 self._test_def_perm_equal(user=self.u1, change_factor=-1)
497
495
498 #create missing one !
496 #create missing one !
499 PermissionModel().create_default_permissions(user=self.u1)
497 PermissionModel().create_default_permissions(user=self.u1)
500 self._test_def_perm_equal(user=self.u1)
498 self._test_def_perm_equal(user=self.u1)
501
499
502 @parameterized.expand([
500 @parameterized.expand([
503 ('repository.read', 'repository.none'),
501 ('repository.read', 'repository.none'),
504 ('group.read', 'group.none'),
502 ('group.read', 'group.none'),
505 ('usergroup.read', 'usergroup.none'),
503 ('usergroup.read', 'usergroup.none'),
506 ('hg.create.repository', 'hg.create.none'),
504 ('hg.create.repository', 'hg.create.none'),
507 ('hg.fork.repository', 'hg.fork.none'),
505 ('hg.fork.repository', 'hg.fork.none'),
508 ('hg.register.manual_activate', 'hg.register.auto_activate',)
506 ('hg.register.manual_activate', 'hg.register.auto_activate',)
509 ])
507 ])
510 def test_set_default_permissions_after_modification(self, perm, modify_to):
508 def test_set_default_permissions_after_modification(self, perm, modify_to):
511 PermissionModel().create_default_permissions(user=self.u1)
509 PermissionModel().create_default_permissions(user=self.u1)
512 self._test_def_perm_equal(user=self.u1)
510 self._test_def_perm_equal(user=self.u1)
513
511
514 old = Permission.get_by_key(perm)
512 old = Permission.get_by_key(perm)
515 new = Permission.get_by_key(modify_to)
513 new = Permission.get_by_key(modify_to)
516 self.assertNotEqual(old, None)
514 self.assertNotEqual(old, None)
517 self.assertNotEqual(new, None)
515 self.assertNotEqual(new, None)
518
516
519 #now modify permissions
517 #now modify permissions
520 p = UserToPerm.query()\
518 p = UserToPerm.query()\
521 .filter(UserToPerm.user == self.u1)\
519 .filter(UserToPerm.user == self.u1)\
522 .filter(UserToPerm.permission == old)\
520 .filter(UserToPerm.permission == old)\
523 .one()
521 .one()
524 p.permission = new
522 p.permission = new
525 Session().add(p)
523 Session().add(p)
526 Session().commit()
524 Session().commit()
527
525
528 PermissionModel().create_default_permissions(user=self.u1)
526 PermissionModel().create_default_permissions(user=self.u1)
529 self._test_def_perm_equal(user=self.u1)
527 self._test_def_perm_equal(user=self.u1)
@@ -1,79 +1,77 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
1 from rhodecode.tests import *
4
2
5 from rhodecode.model.meta import Session
3 from rhodecode.model.meta import Session
6 from rhodecode.tests.fixture import Fixture
4 from rhodecode.tests.fixture import Fixture
7 from rhodecode.model.repo import RepoModel
5 from rhodecode.model.repo import RepoModel
8 from rhodecode.model.db import Repository
6 from rhodecode.model.db import Repository
9 from rhodecode.lib.exceptions import AttachedForksError
7 from rhodecode.lib.exceptions import AttachedForksError
10
8
11 fixture = Fixture()
9 fixture = Fixture()
12
10
13
11
14 class TestRepos(unittest.TestCase):
12 class TestRepos(BaseTestCase):
15
13
16 def setUp(self):
14 def setUp(self):
17 pass
15 pass
18
16
19 def tearDown(self):
17 def tearDown(self):
20 Session.remove()
18 Session.remove()
21
19
22 def test_remove_repo(self):
20 def test_remove_repo(self):
23 repo = fixture.create_repo(name='test-repo-1')
21 repo = fixture.create_repo(name='test-repo-1')
24 Session().commit()
22 Session().commit()
25
23
26 RepoModel().delete(repo=repo)
24 RepoModel().delete(repo=repo)
27 Session().commit()
25 Session().commit()
28
26
29 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
27 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
30
28
31 def test_remove_repo_repo_raises_exc_when_attached_forks(self):
29 def test_remove_repo_repo_raises_exc_when_attached_forks(self):
32 repo = fixture.create_repo(name='test-repo-1')
30 repo = fixture.create_repo(name='test-repo-1')
33 Session().commit()
31 Session().commit()
34
32
35 fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
33 fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
36 Session().commit()
34 Session().commit()
37
35
38 self.assertRaises(AttachedForksError, lambda: RepoModel().delete(repo=repo))
36 self.assertRaises(AttachedForksError, lambda: RepoModel().delete(repo=repo))
39
37
40 def test_remove_repo_delete_forks(self):
38 def test_remove_repo_delete_forks(self):
41 repo = fixture.create_repo(name='test-repo-1')
39 repo = fixture.create_repo(name='test-repo-1')
42 Session().commit()
40 Session().commit()
43
41
44 fork = fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
42 fork = fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
45 Session().commit()
43 Session().commit()
46
44
47 #fork of fork
45 #fork of fork
48 fixture.create_fork(fork.repo_name, 'test-repo-fork-fork-1')
46 fixture.create_fork(fork.repo_name, 'test-repo-fork-fork-1')
49 Session().commit()
47 Session().commit()
50
48
51 RepoModel().delete(repo=repo, forks='delete')
49 RepoModel().delete(repo=repo, forks='delete')
52 Session().commit()
50 Session().commit()
53
51
54 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
52 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
55 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-1'))
53 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-1'))
56 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-fork-1'))
54 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-fork-1'))
57
55
58 def test_remove_repo_detach_forks(self):
56 def test_remove_repo_detach_forks(self):
59 repo = fixture.create_repo(name='test-repo-1')
57 repo = fixture.create_repo(name='test-repo-1')
60 Session().commit()
58 Session().commit()
61
59
62 fork = fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
60 fork = fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
63 Session().commit()
61 Session().commit()
64
62
65 #fork of fork
63 #fork of fork
66 fixture.create_fork(fork.repo_name, 'test-repo-fork-fork-1')
64 fixture.create_fork(fork.repo_name, 'test-repo-fork-fork-1')
67 Session().commit()
65 Session().commit()
68
66
69 RepoModel().delete(repo=repo, forks='detach')
67 RepoModel().delete(repo=repo, forks='detach')
70 Session().commit()
68 Session().commit()
71
69
72 try:
70 try:
73 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
71 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
74 self.assertNotEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-1'))
72 self.assertNotEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-1'))
75 self.assertNotEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-fork-1'))
73 self.assertNotEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-fork-1'))
76 finally:
74 finally:
77 RepoModel().delete(repo='test-repo-fork-fork-1')
75 RepoModel().delete(repo='test-repo-fork-fork-1')
78 RepoModel().delete(repo='test-repo-fork-1')
76 RepoModel().delete(repo='test-repo-fork-1')
79 Session().commit()
77 Session().commit()
@@ -1,200 +1,199 b''
1 import os
1 import os
2 import unittest
3 from sqlalchemy.exc import IntegrityError
2 from sqlalchemy.exc import IntegrityError
4
3
5 from rhodecode.tests import *
4 from rhodecode.tests import *
6 from rhodecode.tests.fixture import Fixture
5 from rhodecode.tests.fixture import Fixture
7
6
8 from rhodecode.model.repos_group import ReposGroupModel
7 from rhodecode.model.repos_group import ReposGroupModel
9 from rhodecode.model.repo import RepoModel
8 from rhodecode.model.repo import RepoModel
10 from rhodecode.model.db import RepoGroup
9 from rhodecode.model.db import RepoGroup
11 from rhodecode.model.meta import Session
10 from rhodecode.model.meta import Session
12
11
13
12
14 fixture = Fixture()
13 fixture = Fixture()
15
14
16
15
17 def _update_group(id_, group_name, desc='desc', parent_id=None):
16 def _update_group(id_, group_name, desc='desc', parent_id=None):
18 form_data = fixture._get_group_create_params(group_name=group_name,
17 form_data = fixture._get_group_create_params(group_name=group_name,
19 group_desc=desc,
18 group_desc=desc,
20 group_parent_id=parent_id)
19 group_parent_id=parent_id)
21 gr = ReposGroupModel().update(id_, form_data)
20 gr = ReposGroupModel().update(id_, form_data)
22 return gr
21 return gr
23
22
24
23
25 def _update_repo(name, **kwargs):
24 def _update_repo(name, **kwargs):
26 form_data = fixture._get_repo_create_params(**kwargs)
25 form_data = fixture._get_repo_create_params(**kwargs)
27 if not 'repo_name' in kwargs:
26 if not 'repo_name' in kwargs:
28 form_data['repo_name'] = name
27 form_data['repo_name'] = name
29 if not 'perms_new' in kwargs:
28 if not 'perms_new' in kwargs:
30 form_data['perms_new'] = []
29 form_data['perms_new'] = []
31 if not 'perms_updates' in kwargs:
30 if not 'perms_updates' in kwargs:
32 form_data['perms_updates'] = []
31 form_data['perms_updates'] = []
33 r = RepoModel().update(name, **form_data)
32 r = RepoModel().update(name, **form_data)
34 return r
33 return r
35
34
36
35
37 class TestReposGroups(unittest.TestCase):
36 class TestReposGroups(BaseTestCase):
38
37
39 def setUp(self):
38 def setUp(self):
40 self.g1 = fixture.create_group('test1', skip_if_exists=True)
39 self.g1 = fixture.create_group('test1', skip_if_exists=True)
41 self.g2 = fixture.create_group('test2', skip_if_exists=True)
40 self.g2 = fixture.create_group('test2', skip_if_exists=True)
42 self.g3 = fixture.create_group('test3', skip_if_exists=True)
41 self.g3 = fixture.create_group('test3', skip_if_exists=True)
43
42
44 def tearDown(self):
43 def tearDown(self):
45 Session.remove()
44 Session.remove()
46
45
47 def __check_path(self, *path):
46 def __check_path(self, *path):
48 """
47 """
49 Checks the path for existance !
48 Checks the path for existance !
50 """
49 """
51 path = [TESTS_TMP_PATH] + list(path)
50 path = [TESTS_TMP_PATH] + list(path)
52 path = os.path.join(*path)
51 path = os.path.join(*path)
53 return os.path.isdir(path)
52 return os.path.isdir(path)
54
53
55 def _check_folders(self):
54 def _check_folders(self):
56 print os.listdir(TESTS_TMP_PATH)
55 print os.listdir(TESTS_TMP_PATH)
57
56
58 def __delete_group(self, id_):
57 def __delete_group(self, id_):
59 ReposGroupModel().delete(id_)
58 ReposGroupModel().delete(id_)
60
59
61 def test_create_group(self):
60 def test_create_group(self):
62 g = fixture.create_group('newGroup')
61 g = fixture.create_group('newGroup')
63 Session().commit()
62 Session().commit()
64 self.assertEqual(g.full_path, 'newGroup')
63 self.assertEqual(g.full_path, 'newGroup')
65
64
66 self.assertTrue(self.__check_path('newGroup'))
65 self.assertTrue(self.__check_path('newGroup'))
67
66
68 def test_create_same_name_group(self):
67 def test_create_same_name_group(self):
69 self.assertRaises(IntegrityError, lambda: fixture.create_group('newGroup'))
68 self.assertRaises(IntegrityError, lambda: fixture.create_group('newGroup'))
70 Session().rollback()
69 Session().rollback()
71
70
72 def test_same_subgroup(self):
71 def test_same_subgroup(self):
73 sg1 = fixture.create_group('sub1', group_parent_id=self.g1.group_id)
72 sg1 = fixture.create_group('sub1', group_parent_id=self.g1.group_id)
74 self.assertEqual(sg1.parent_group, self.g1)
73 self.assertEqual(sg1.parent_group, self.g1)
75 self.assertEqual(sg1.full_path, 'test1/sub1')
74 self.assertEqual(sg1.full_path, 'test1/sub1')
76 self.assertTrue(self.__check_path('test1', 'sub1'))
75 self.assertTrue(self.__check_path('test1', 'sub1'))
77
76
78 ssg1 = fixture.create_group('subsub1', group_parent_id=sg1.group_id)
77 ssg1 = fixture.create_group('subsub1', group_parent_id=sg1.group_id)
79 self.assertEqual(ssg1.parent_group, sg1)
78 self.assertEqual(ssg1.parent_group, sg1)
80 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
79 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
81 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
80 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
82
81
83 def test_remove_group(self):
82 def test_remove_group(self):
84 sg1 = fixture.create_group('deleteme')
83 sg1 = fixture.create_group('deleteme')
85 self.__delete_group(sg1.group_id)
84 self.__delete_group(sg1.group_id)
86
85
87 self.assertEqual(RepoGroup.get(sg1.group_id), None)
86 self.assertEqual(RepoGroup.get(sg1.group_id), None)
88 self.assertFalse(self.__check_path('deteteme'))
87 self.assertFalse(self.__check_path('deteteme'))
89
88
90 sg1 = fixture.create_group('deleteme', group_parent_id=self.g1.group_id)
89 sg1 = fixture.create_group('deleteme', group_parent_id=self.g1.group_id)
91 self.__delete_group(sg1.group_id)
90 self.__delete_group(sg1.group_id)
92
91
93 self.assertEqual(RepoGroup.get(sg1.group_id), None)
92 self.assertEqual(RepoGroup.get(sg1.group_id), None)
94 self.assertFalse(self.__check_path('test1', 'deteteme'))
93 self.assertFalse(self.__check_path('test1', 'deteteme'))
95
94
96 def test_rename_single_group(self):
95 def test_rename_single_group(self):
97 sg1 = fixture.create_group('initial')
96 sg1 = fixture.create_group('initial')
98
97
99 new_sg1 = _update_group(sg1.group_id, 'after')
98 new_sg1 = _update_group(sg1.group_id, 'after')
100 self.assertTrue(self.__check_path('after'))
99 self.assertTrue(self.__check_path('after'))
101 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
100 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
102
101
103 def test_update_group_parent(self):
102 def test_update_group_parent(self):
104
103
105 sg1 = fixture.create_group('initial', group_parent_id=self.g1.group_id)
104 sg1 = fixture.create_group('initial', group_parent_id=self.g1.group_id)
106
105
107 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
106 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
108 self.assertTrue(self.__check_path('test1', 'after'))
107 self.assertTrue(self.__check_path('test1', 'after'))
109 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
108 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
110
109
111 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
110 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
112 self.assertTrue(self.__check_path('test3', 'after'))
111 self.assertTrue(self.__check_path('test3', 'after'))
113 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
112 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
114
113
115 new_sg1 = _update_group(sg1.group_id, 'hello')
114 new_sg1 = _update_group(sg1.group_id, 'hello')
116 self.assertTrue(self.__check_path('hello'))
115 self.assertTrue(self.__check_path('hello'))
117
116
118 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
117 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
119
118
120 def test_subgrouping_with_repo(self):
119 def test_subgrouping_with_repo(self):
121
120
122 g1 = fixture.create_group('g1')
121 g1 = fixture.create_group('g1')
123 g2 = fixture.create_group('g2')
122 g2 = fixture.create_group('g2')
124 # create new repo
123 # create new repo
125 r = fixture.create_repo('john')
124 r = fixture.create_repo('john')
126
125
127 self.assertEqual(r.repo_name, 'john')
126 self.assertEqual(r.repo_name, 'john')
128 # put repo into group
127 # put repo into group
129 r = _update_repo('john', repo_group=g1.group_id)
128 r = _update_repo('john', repo_group=g1.group_id)
130 Session().commit()
129 Session().commit()
131 self.assertEqual(r.repo_name, 'g1/john')
130 self.assertEqual(r.repo_name, 'g1/john')
132
131
133 _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
132 _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
134 self.assertTrue(self.__check_path('g2', 'g1'))
133 self.assertTrue(self.__check_path('g2', 'g1'))
135
134
136 # test repo
135 # test repo
137 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
136 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
138 r.just_name]))
137 r.just_name]))
139
138
140 def test_move_to_root(self):
139 def test_move_to_root(self):
141 g1 = fixture.create_group('t11')
140 g1 = fixture.create_group('t11')
142 g2 = fixture.create_group('t22', group_parent_id=g1.group_id)
141 g2 = fixture.create_group('t22', group_parent_id=g1.group_id)
143
142
144 self.assertEqual(g2.full_path, 't11/t22')
143 self.assertEqual(g2.full_path, 't11/t22')
145 self.assertTrue(self.__check_path('t11', 't22'))
144 self.assertTrue(self.__check_path('t11', 't22'))
146
145
147 g2 = _update_group(g2.group_id, 'g22', parent_id=None)
146 g2 = _update_group(g2.group_id, 'g22', parent_id=None)
148 Session().commit()
147 Session().commit()
149
148
150 self.assertEqual(g2.group_name, 'g22')
149 self.assertEqual(g2.group_name, 'g22')
151 # we moved out group from t1 to '' so it's full path should be 'g2'
150 # we moved out group from t1 to '' so it's full path should be 'g2'
152 self.assertEqual(g2.full_path, 'g22')
151 self.assertEqual(g2.full_path, 'g22')
153 self.assertFalse(self.__check_path('t11', 't22'))
152 self.assertFalse(self.__check_path('t11', 't22'))
154 self.assertTrue(self.__check_path('g22'))
153 self.assertTrue(self.__check_path('g22'))
155
154
156 def test_rename_top_level_group_in_nested_setup(self):
155 def test_rename_top_level_group_in_nested_setup(self):
157 g1 = fixture.create_group('L1')
156 g1 = fixture.create_group('L1')
158 g2 = fixture.create_group('L2', group_parent_id=g1.group_id)
157 g2 = fixture.create_group('L2', group_parent_id=g1.group_id)
159 g3 = fixture.create_group('L3', group_parent_id=g2.group_id)
158 g3 = fixture.create_group('L3', group_parent_id=g2.group_id)
160
159
161 r = fixture.create_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
160 r = fixture.create_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
162
161
163 ##rename L1 all groups should be now changed
162 ##rename L1 all groups should be now changed
164 _update_group(g1.group_id, 'L1_NEW')
163 _update_group(g1.group_id, 'L1_NEW')
165 Session().commit()
164 Session().commit()
166 self.assertEqual(g1.full_path, 'L1_NEW')
165 self.assertEqual(g1.full_path, 'L1_NEW')
167 self.assertEqual(g2.full_path, 'L1_NEW/L2')
166 self.assertEqual(g2.full_path, 'L1_NEW/L2')
168 self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
167 self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
169 self.assertEqual(r.repo_name, 'L1_NEW/L2/L3/L3_REPO')
168 self.assertEqual(r.repo_name, 'L1_NEW/L2/L3/L3_REPO')
170
169
171 def test_change_parent_of_top_level_group_in_nested_setup(self):
170 def test_change_parent_of_top_level_group_in_nested_setup(self):
172 g1 = fixture.create_group('R1')
171 g1 = fixture.create_group('R1')
173 g2 = fixture.create_group('R2', group_parent_id=g1.group_id)
172 g2 = fixture.create_group('R2', group_parent_id=g1.group_id)
174 g3 = fixture.create_group('R3', group_parent_id=g2.group_id)
173 g3 = fixture.create_group('R3', group_parent_id=g2.group_id)
175 g4 = fixture.create_group('R1_NEW')
174 g4 = fixture.create_group('R1_NEW')
176
175
177 r = fixture.create_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
176 r = fixture.create_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
178 ##rename L1 all groups should be now changed
177 ##rename L1 all groups should be now changed
179 _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
178 _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
180 Session().commit()
179 Session().commit()
181 self.assertEqual(g1.full_path, 'R1_NEW/R1')
180 self.assertEqual(g1.full_path, 'R1_NEW/R1')
182 self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
181 self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
183 self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
182 self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
184 self.assertEqual(r.repo_name, 'R1_NEW/R1/R2/R3/R3_REPO')
183 self.assertEqual(r.repo_name, 'R1_NEW/R1/R2/R3/R3_REPO')
185
184
186 def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
185 def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
187 g1 = fixture.create_group('X1')
186 g1 = fixture.create_group('X1')
188 g2 = fixture.create_group('X2', group_parent_id=g1.group_id)
187 g2 = fixture.create_group('X2', group_parent_id=g1.group_id)
189 g3 = fixture.create_group('X3', group_parent_id=g2.group_id)
188 g3 = fixture.create_group('X3', group_parent_id=g2.group_id)
190 g4 = fixture.create_group('X1_NEW')
189 g4 = fixture.create_group('X1_NEW')
191
190
192 r = fixture.create_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
191 r = fixture.create_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
193
192
194 ##rename L1 all groups should be now changed
193 ##rename L1 all groups should be now changed
195 _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
194 _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
196 Session().commit()
195 Session().commit()
197 self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
196 self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
198 self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
197 self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
199 self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
198 self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
200 self.assertEqual(r.repo_name, 'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
199 self.assertEqual(r.repo_name, 'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
@@ -1,165 +1,162 b''
1 import os
2 import unittest
3 import functools
1 import functools
4 from rhodecode.tests import *
2 from rhodecode.tests import *
5
3
6 from rhodecode.model.repos_group import ReposGroupModel
4 from rhodecode.model.repos_group import ReposGroupModel
7 from rhodecode.model.db import RepoGroup, Repository, User
5 from rhodecode.model.db import RepoGroup, Repository, User
8
6
9 from rhodecode.model.meta import Session
7 from rhodecode.model.meta import Session
10 from nose.tools import with_setup
8 from nose.tools import with_setup
11 from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
9 from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
12 _get_perms, _check_expected_count, expected_count, _destroy_project_tree
10 _get_perms, _check_expected_count, expected_count, _destroy_project_tree
13 from rhodecode.model.repo import RepoModel
14
11
15
12
16 test_u1_id = None
13 test_u1_id = None
17 _get_repo_perms = None
14 _get_repo_perms = None
18 _get_group_perms = None
15 _get_group_perms = None
19
16
20
17
21 def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
18 def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
22 """
19 """
23 Resets all permissions to perm attribute
20 Resets all permissions to perm attribute
24 """
21 """
25 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
22 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
26 if not repos_group:
23 if not repos_group:
27 raise Exception('Cannot get group %s' % group_name)
24 raise Exception('Cannot get group %s' % group_name)
28 perms_updates = [[test_u1_id, perm, 'user']]
25 perms_updates = [[test_u1_id, perm, 'user']]
29 ReposGroupModel()._update_permissions(repos_group,
26 ReposGroupModel()._update_permissions(repos_group,
30 perms_updates=perms_updates,
27 perms_updates=perms_updates,
31 recursive=recursive,
28 recursive=recursive,
32 check_perms=False)
29 check_perms=False)
33 Session().commit()
30 Session().commit()
34
31
35
32
36 def setup_module():
33 def setup_module():
37 global test_u1_id, _get_repo_perms, _get_group_perms
34 global test_u1_id, _get_repo_perms, _get_group_perms
38 test_u1 = _create_project_tree()
35 test_u1 = _create_project_tree()
39 Session().commit()
36 Session().commit()
40 test_u1_id = test_u1.user_id
37 test_u1_id = test_u1.user_id
41 _get_repo_perms = functools.partial(_get_perms, key='repositories',
38 _get_repo_perms = functools.partial(_get_perms, key='repositories',
42 test_u1_id=test_u1_id)
39 test_u1_id=test_u1_id)
43 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
40 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
44 test_u1_id=test_u1_id)
41 test_u1_id=test_u1_id)
45
42
46
43
47 def teardown_module():
44 def teardown_module():
48 _destroy_project_tree(test_u1_id)
45 _destroy_project_tree(test_u1_id)
49
46
50
47
51 @with_setup(permissions_setup_func)
48 @with_setup(permissions_setup_func)
52 def test_user_permissions_on_group_without_recursive_mode():
49 def test_user_permissions_on_group_without_recursive_mode():
53 # set permission to g0 non-recursive mode
50 # set permission to g0 non-recursive mode
54 recursive = False
51 recursive = False
55 group = 'g0'
52 group = 'g0'
56 permissions_setup_func(group, 'group.write', recursive=recursive)
53 permissions_setup_func(group, 'group.write', recursive=recursive)
57
54
58 items = [x for x in _get_repo_perms(group, recursive)]
55 items = [x for x in _get_repo_perms(group, recursive)]
59 expected = 0
56 expected = 0
60 assert len(items) == expected, ' %s != %s' % (len(items), expected)
57 assert len(items) == expected, ' %s != %s' % (len(items), expected)
61 for name, perm in items:
58 for name, perm in items:
62 yield check_tree_perms, name, perm, group, 'repository.read'
59 yield check_tree_perms, name, perm, group, 'repository.read'
63
60
64 items = [x for x in _get_group_perms(group, recursive)]
61 items = [x for x in _get_group_perms(group, recursive)]
65 expected = 1
62 expected = 1
66 assert len(items) == expected, ' %s != %s' % (len(items), expected)
63 assert len(items) == expected, ' %s != %s' % (len(items), expected)
67 for name, perm in items:
64 for name, perm in items:
68 yield check_tree_perms, name, perm, group, 'group.write'
65 yield check_tree_perms, name, perm, group, 'group.write'
69
66
70
67
71 @with_setup(permissions_setup_func)
68 @with_setup(permissions_setup_func)
72 def test_user_permissions_on_group_without_recursive_mode_subgroup():
69 def test_user_permissions_on_group_without_recursive_mode_subgroup():
73 # set permission to g0 non-recursive mode
70 # set permission to g0 non-recursive mode
74 recursive = False
71 recursive = False
75 group = 'g0/g0_1'
72 group = 'g0/g0_1'
76 permissions_setup_func(group, 'group.write', recursive=recursive)
73 permissions_setup_func(group, 'group.write', recursive=recursive)
77
74
78 items = [x for x in _get_repo_perms(group, recursive)]
75 items = [x for x in _get_repo_perms(group, recursive)]
79 expected = 0
76 expected = 0
80 assert len(items) == expected, ' %s != %s' % (len(items), expected)
77 assert len(items) == expected, ' %s != %s' % (len(items), expected)
81 for name, perm in items:
78 for name, perm in items:
82 yield check_tree_perms, name, perm, group, 'repository.read'
79 yield check_tree_perms, name, perm, group, 'repository.read'
83
80
84 items = [x for x in _get_group_perms(group, recursive)]
81 items = [x for x in _get_group_perms(group, recursive)]
85 expected = 1
82 expected = 1
86 assert len(items) == expected, ' %s != %s' % (len(items), expected)
83 assert len(items) == expected, ' %s != %s' % (len(items), expected)
87 for name, perm in items:
84 for name, perm in items:
88 yield check_tree_perms, name, perm, group, 'group.write'
85 yield check_tree_perms, name, perm, group, 'group.write'
89
86
90
87
91 @with_setup(permissions_setup_func)
88 @with_setup(permissions_setup_func)
92 def test_user_permissions_on_group_with_recursive_mode():
89 def test_user_permissions_on_group_with_recursive_mode():
93
90
94 # set permission to g0 recursive mode, all children including
91 # set permission to g0 recursive mode, all children including
95 # other repos and groups should have this permission now set !
92 # other repos and groups should have this permission now set !
96 recursive = True
93 recursive = True
97 group = 'g0'
94 group = 'g0'
98 permissions_setup_func(group, 'group.write', recursive=recursive)
95 permissions_setup_func(group, 'group.write', recursive=recursive)
99
96
100 repo_items = [x for x in _get_repo_perms(group, recursive)]
97 repo_items = [x for x in _get_repo_perms(group, recursive)]
101 items = [x for x in _get_group_perms(group, recursive)]
98 items = [x for x in _get_group_perms(group, recursive)]
102 _check_expected_count(items, repo_items, expected_count(group, True))
99 _check_expected_count(items, repo_items, expected_count(group, True))
103
100
104 for name, perm in repo_items:
101 for name, perm in repo_items:
105 if name == 'g0/g0_3/g0_3_r1_private':
102 if name == 'g0/g0_3/g0_3_r1_private':
106 yield check_tree_perms, name, perm, group, 'repository.none'
103 yield check_tree_perms, name, perm, group, 'repository.none'
107 else:
104 else:
108 yield check_tree_perms, name, perm, group, 'repository.write'
105 yield check_tree_perms, name, perm, group, 'repository.write'
109
106
110 for name, perm in items:
107 for name, perm in items:
111 yield check_tree_perms, name, perm, group, 'group.write'
108 yield check_tree_perms, name, perm, group, 'group.write'
112
109
113
110
114 @with_setup(permissions_setup_func)
111 @with_setup(permissions_setup_func)
115 def test_user_permissions_on_group_with_recursive_mode_inner_group():
112 def test_user_permissions_on_group_with_recursive_mode_inner_group():
116 ## set permission to g0_3 group to none
113 ## set permission to g0_3 group to none
117 recursive = True
114 recursive = True
118 group = 'g0/g0_3'
115 group = 'g0/g0_3'
119 permissions_setup_func(group, 'group.none', recursive=recursive)
116 permissions_setup_func(group, 'group.none', recursive=recursive)
120
117
121 repo_items = [x for x in _get_repo_perms(group, recursive)]
118 repo_items = [x for x in _get_repo_perms(group, recursive)]
122 items = [x for x in _get_group_perms(group, recursive)]
119 items = [x for x in _get_group_perms(group, recursive)]
123 _check_expected_count(items, repo_items, expected_count(group, True))
120 _check_expected_count(items, repo_items, expected_count(group, True))
124
121
125 for name, perm in repo_items:
122 for name, perm in repo_items:
126 yield check_tree_perms, name, perm, group, 'repository.none'
123 yield check_tree_perms, name, perm, group, 'repository.none'
127
124
128 for name, perm in items:
125 for name, perm in items:
129 yield check_tree_perms, name, perm, group, 'group.none'
126 yield check_tree_perms, name, perm, group, 'group.none'
130
127
131
128
132 @with_setup(permissions_setup_func)
129 @with_setup(permissions_setup_func)
133 def test_user_permissions_on_group_with_recursive_mode_deepest():
130 def test_user_permissions_on_group_with_recursive_mode_deepest():
134 ## set permission to g0_3 group to none
131 ## set permission to g0_3 group to none
135 recursive = True
132 recursive = True
136 group = 'g0/g0_1/g0_1_1'
133 group = 'g0/g0_1/g0_1_1'
137 permissions_setup_func(group, 'group.write', recursive=recursive)
134 permissions_setup_func(group, 'group.write', recursive=recursive)
138
135
139 repo_items = [x for x in _get_repo_perms(group, recursive)]
136 repo_items = [x for x in _get_repo_perms(group, recursive)]
140 items = [x for x in _get_group_perms(group, recursive)]
137 items = [x for x in _get_group_perms(group, recursive)]
141 _check_expected_count(items, repo_items, expected_count(group, True))
138 _check_expected_count(items, repo_items, expected_count(group, True))
142
139
143 for name, perm in repo_items:
140 for name, perm in repo_items:
144 yield check_tree_perms, name, perm, group, 'repository.write'
141 yield check_tree_perms, name, perm, group, 'repository.write'
145
142
146 for name, perm in items:
143 for name, perm in items:
147 yield check_tree_perms, name, perm, group, 'group.write'
144 yield check_tree_perms, name, perm, group, 'group.write'
148
145
149
146
150 @with_setup(permissions_setup_func)
147 @with_setup(permissions_setup_func)
151 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
148 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
152 ## set permission to g0_3 group to none
149 ## set permission to g0_3 group to none
153 recursive = True
150 recursive = True
154 group = 'g0/g0_2'
151 group = 'g0/g0_2'
155 permissions_setup_func(group, 'group.admin', recursive=recursive)
152 permissions_setup_func(group, 'group.admin', recursive=recursive)
156
153
157 repo_items = [x for x in _get_repo_perms(group, recursive)]
154 repo_items = [x for x in _get_repo_perms(group, recursive)]
158 items = [x for x in _get_group_perms(group, recursive)]
155 items = [x for x in _get_group_perms(group, recursive)]
159 _check_expected_count(items, repo_items, expected_count(group, True))
156 _check_expected_count(items, repo_items, expected_count(group, True))
160
157
161 for name, perm in repo_items:
158 for name, perm in repo_items:
162 yield check_tree_perms, name, perm, group, 'repository.admin'
159 yield check_tree_perms, name, perm, group, 'repository.admin'
163
160
164 for name, perm in items:
161 for name, perm in items:
165 yield check_tree_perms, name, perm, group, 'group.admin'
162 yield check_tree_perms, name, perm, group, 'group.admin'
@@ -1,131 +1,130 b''
1 import unittest
2 from rhodecode.tests import *
1 from rhodecode.tests import *
3
2
4 from rhodecode.model.db import User, UserGroup, UserGroupMember, UserEmailMap,\
3 from rhodecode.model.db import User, UserGroup, UserGroupMember, UserEmailMap,\
5 Permission
4 Permission
6 from rhodecode.model.user import UserModel
5 from rhodecode.model.user import UserModel
7
6
8 from rhodecode.model.meta import Session
7 from rhodecode.model.meta import Session
9 from rhodecode.model.users_group import UserGroupModel
8 from rhodecode.model.users_group import UserGroupModel
10 from rhodecode.tests.fixture import Fixture
9 from rhodecode.tests.fixture import Fixture
11
10
12 fixture = Fixture()
11 fixture = Fixture()
13
12
14
13
15 class TestUser(unittest.TestCase):
14 class TestUser(BaseTestCase):
16 def __init__(self, methodName='runTest'):
15 def __init__(self, methodName='runTest'):
17 Session.remove()
16 Session.remove()
18 super(TestUser, self).__init__(methodName=methodName)
17 super(TestUser, self).__init__(methodName=methodName)
19
18
20 def tearDown(self):
19 def tearDown(self):
21 Session.remove()
20 Session.remove()
22
21
23 def test_create_and_remove(self):
22 def test_create_and_remove(self):
24 usr = UserModel().create_or_update(username=u'test_user',
23 usr = UserModel().create_or_update(username=u'test_user',
25 password=u'qweqwe',
24 password=u'qweqwe',
26 email=u'u232@rhodecode.org',
25 email=u'u232@rhodecode.org',
27 firstname=u'u1', lastname=u'u1')
26 firstname=u'u1', lastname=u'u1')
28 Session().commit()
27 Session().commit()
29 self.assertEqual(User.get_by_username(u'test_user'), usr)
28 self.assertEqual(User.get_by_username(u'test_user'), usr)
30
29
31 # make user group
30 # make user group
32 users_group = fixture.create_user_group('some_example_group')
31 users_group = fixture.create_user_group('some_example_group')
33 Session().commit()
32 Session().commit()
34
33
35 UserGroupModel().add_user_to_group(users_group, usr)
34 UserGroupModel().add_user_to_group(users_group, usr)
36 Session().commit()
35 Session().commit()
37
36
38 self.assertEqual(UserGroup.get(users_group.users_group_id), users_group)
37 self.assertEqual(UserGroup.get(users_group.users_group_id), users_group)
39 self.assertEqual(UserGroupMember.query().count(), 1)
38 self.assertEqual(UserGroupMember.query().count(), 1)
40 UserModel().delete(usr.user_id)
39 UserModel().delete(usr.user_id)
41 Session().commit()
40 Session().commit()
42
41
43 self.assertEqual(UserGroupMember.query().all(), [])
42 self.assertEqual(UserGroupMember.query().all(), [])
44
43
45 def test_additonal_email_as_main(self):
44 def test_additonal_email_as_main(self):
46 usr = UserModel().create_or_update(username=u'test_user',
45 usr = UserModel().create_or_update(username=u'test_user',
47 password=u'qweqwe',
46 password=u'qweqwe',
48 email=u'main_email@rhodecode.org',
47 email=u'main_email@rhodecode.org',
49 firstname=u'u1', lastname=u'u1')
48 firstname=u'u1', lastname=u'u1')
50 Session().commit()
49 Session().commit()
51
50
52 def do():
51 def do():
53 m = UserEmailMap()
52 m = UserEmailMap()
54 m.email = u'main_email@rhodecode.org'
53 m.email = u'main_email@rhodecode.org'
55 m.user = usr
54 m.user = usr
56 Session().add(m)
55 Session().add(m)
57 Session().commit()
56 Session().commit()
58 self.assertRaises(AttributeError, do)
57 self.assertRaises(AttributeError, do)
59
58
60 UserModel().delete(usr.user_id)
59 UserModel().delete(usr.user_id)
61 Session().commit()
60 Session().commit()
62
61
63 def test_extra_email_map(self):
62 def test_extra_email_map(self):
64 usr = UserModel().create_or_update(username=u'test_user',
63 usr = UserModel().create_or_update(username=u'test_user',
65 password=u'qweqwe',
64 password=u'qweqwe',
66 email=u'main_email@rhodecode.org',
65 email=u'main_email@rhodecode.org',
67 firstname=u'u1', lastname=u'u1')
66 firstname=u'u1', lastname=u'u1')
68 Session().commit()
67 Session().commit()
69
68
70 m = UserEmailMap()
69 m = UserEmailMap()
71 m.email = u'main_email2@rhodecode.org'
70 m.email = u'main_email2@rhodecode.org'
72 m.user = usr
71 m.user = usr
73 Session().add(m)
72 Session().add(m)
74 Session().commit()
73 Session().commit()
75
74
76 u = User.get_by_email(email='main_email@rhodecode.org')
75 u = User.get_by_email(email='main_email@rhodecode.org')
77 self.assertEqual(usr.user_id, u.user_id)
76 self.assertEqual(usr.user_id, u.user_id)
78 self.assertEqual(usr.username, u.username)
77 self.assertEqual(usr.username, u.username)
79
78
80 u = User.get_by_email(email='main_email2@rhodecode.org')
79 u = User.get_by_email(email='main_email2@rhodecode.org')
81 self.assertEqual(usr.user_id, u.user_id)
80 self.assertEqual(usr.user_id, u.user_id)
82 self.assertEqual(usr.username, u.username)
81 self.assertEqual(usr.username, u.username)
83 u = User.get_by_email(email='main_email3@rhodecode.org')
82 u = User.get_by_email(email='main_email3@rhodecode.org')
84 self.assertEqual(None, u)
83 self.assertEqual(None, u)
85
84
86 UserModel().delete(usr.user_id)
85 UserModel().delete(usr.user_id)
87 Session().commit()
86 Session().commit()
88
87
89
88
90 class TestUsers(unittest.TestCase):
89 class TestUsers(BaseTestCase):
91
90
92 def __init__(self, methodName='runTest'):
91 def __init__(self, methodName='runTest'):
93 super(TestUsers, self).__init__(methodName=methodName)
92 super(TestUsers, self).__init__(methodName=methodName)
94
93
95 def setUp(self):
94 def setUp(self):
96 self.u1 = UserModel().create_or_update(username=u'u1',
95 self.u1 = UserModel().create_or_update(username=u'u1',
97 password=u'qweqwe',
96 password=u'qweqwe',
98 email=u'u1@rhodecode.org',
97 email=u'u1@rhodecode.org',
99 firstname=u'u1', lastname=u'u1')
98 firstname=u'u1', lastname=u'u1')
100
99
101 def tearDown(self):
100 def tearDown(self):
102 perm = Permission.query().all()
101 perm = Permission.query().all()
103 for p in perm:
102 for p in perm:
104 UserModel().revoke_perm(self.u1, p)
103 UserModel().revoke_perm(self.u1, p)
105
104
106 UserModel().delete(self.u1)
105 UserModel().delete(self.u1)
107 Session().commit()
106 Session().commit()
108 Session.remove()
107 Session.remove()
109
108
110 def test_add_perm(self):
109 def test_add_perm(self):
111 perm = Permission.query().all()[0]
110 perm = Permission.query().all()[0]
112 UserModel().grant_perm(self.u1, perm)
111 UserModel().grant_perm(self.u1, perm)
113 Session().commit()
112 Session().commit()
114 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
113 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
115
114
116 def test_has_perm(self):
115 def test_has_perm(self):
117 perm = Permission.query().all()
116 perm = Permission.query().all()
118 for p in perm:
117 for p in perm:
119 has_p = UserModel().has_perm(self.u1, p)
118 has_p = UserModel().has_perm(self.u1, p)
120 self.assertEqual(False, has_p)
119 self.assertEqual(False, has_p)
121
120
122 def test_revoke_perm(self):
121 def test_revoke_perm(self):
123 perm = Permission.query().all()[0]
122 perm = Permission.query().all()[0]
124 UserModel().grant_perm(self.u1, perm)
123 UserModel().grant_perm(self.u1, perm)
125 Session().commit()
124 Session().commit()
126 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
125 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
127
126
128 #revoke
127 #revoke
129 UserModel().revoke_perm(self.u1, perm)
128 UserModel().revoke_perm(self.u1, perm)
130 Session().commit()
129 Session().commit()
131 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
130 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
@@ -1,172 +1,170 b''
1 import os
2 import unittest
3 import functools
1 import functools
4 from rhodecode.tests import *
2 from rhodecode.tests import *
5
3
6 from rhodecode.model.repos_group import ReposGroupModel
4 from rhodecode.model.repos_group import ReposGroupModel
7 from rhodecode.model.db import RepoGroup
5 from rhodecode.model.db import RepoGroup
8
6
9 from rhodecode.model.meta import Session
7 from rhodecode.model.meta import Session
10 from nose.tools import with_setup
8 from nose.tools import with_setup
11 from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
9 from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
12 _get_perms, _check_expected_count, expected_count, _destroy_project_tree
10 _get_perms, _check_expected_count, expected_count, _destroy_project_tree
13 from rhodecode.model.users_group import UserGroupModel
11 from rhodecode.model.users_group import UserGroupModel
14 from rhodecode.tests.fixture import Fixture
12 from rhodecode.tests.fixture import Fixture
15
13
16 fixture = Fixture()
14 fixture = Fixture()
17
15
18 test_u2_id = None
16 test_u2_id = None
19 test_u2_gr_id = None
17 test_u2_gr_id = None
20 _get_repo_perms = None
18 _get_repo_perms = None
21 _get_group_perms = None
19 _get_group_perms = None
22
20
23
21
24 def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
22 def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
25 """
23 """
26 Resets all permissions to perm attribute
24 Resets all permissions to perm attribute
27 """
25 """
28 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
26 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
29 if not repos_group:
27 if not repos_group:
30 raise Exception('Cannot get group %s' % group_name)
28 raise Exception('Cannot get group %s' % group_name)
31 perms_updates = [[test_u2_gr_id, perm, 'users_group']]
29 perms_updates = [[test_u2_gr_id, perm, 'users_group']]
32 ReposGroupModel()._update_permissions(repos_group,
30 ReposGroupModel()._update_permissions(repos_group,
33 perms_updates=perms_updates,
31 perms_updates=perms_updates,
34 recursive=recursive,
32 recursive=recursive,
35 check_perms=False)
33 check_perms=False)
36 Session().commit()
34 Session().commit()
37
35
38
36
39 def setup_module():
37 def setup_module():
40 global test_u2_id, test_u2_gr_id, _get_repo_perms, _get_group_perms
38 global test_u2_id, test_u2_gr_id, _get_repo_perms, _get_group_perms
41 test_u2 = _create_project_tree()
39 test_u2 = _create_project_tree()
42 Session().commit()
40 Session().commit()
43 test_u2_id = test_u2.user_id
41 test_u2_id = test_u2.user_id
44
42
45 gr1 = fixture.create_user_group('perms_group_1')
43 gr1 = fixture.create_user_group('perms_group_1')
46 Session().commit()
44 Session().commit()
47 test_u2_gr_id = gr1.users_group_id
45 test_u2_gr_id = gr1.users_group_id
48 UserGroupModel().add_user_to_group(gr1, user=test_u2_id)
46 UserGroupModel().add_user_to_group(gr1, user=test_u2_id)
49 Session().commit()
47 Session().commit()
50
48
51 _get_repo_perms = functools.partial(_get_perms, key='repositories',
49 _get_repo_perms = functools.partial(_get_perms, key='repositories',
52 test_u1_id=test_u2_id)
50 test_u1_id=test_u2_id)
53 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
51 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
54 test_u1_id=test_u2_id)
52 test_u1_id=test_u2_id)
55
53
56
54
57 def teardown_module():
55 def teardown_module():
58 _destroy_project_tree(test_u2_id)
56 _destroy_project_tree(test_u2_id)
59
57
60
58
61 @with_setup(permissions_setup_func)
59 @with_setup(permissions_setup_func)
62 def test_user_permissions_on_group_without_recursive_mode():
60 def test_user_permissions_on_group_without_recursive_mode():
63 # set permission to g0 non-recursive mode
61 # set permission to g0 non-recursive mode
64 recursive = False
62 recursive = False
65 group = 'g0'
63 group = 'g0'
66 permissions_setup_func(group, 'group.write', recursive=recursive)
64 permissions_setup_func(group, 'group.write', recursive=recursive)
67
65
68 items = [x for x in _get_repo_perms(group, recursive)]
66 items = [x for x in _get_repo_perms(group, recursive)]
69 expected = 0
67 expected = 0
70 assert len(items) == expected, ' %s != %s' % (len(items), expected)
68 assert len(items) == expected, ' %s != %s' % (len(items), expected)
71 for name, perm in items:
69 for name, perm in items:
72 yield check_tree_perms, name, perm, group, 'repository.read'
70 yield check_tree_perms, name, perm, group, 'repository.read'
73
71
74 items = [x for x in _get_group_perms(group, recursive)]
72 items = [x for x in _get_group_perms(group, recursive)]
75 expected = 1
73 expected = 1
76 assert len(items) == expected, ' %s != %s' % (len(items), expected)
74 assert len(items) == expected, ' %s != %s' % (len(items), expected)
77 for name, perm in items:
75 for name, perm in items:
78 yield check_tree_perms, name, perm, group, 'group.write'
76 yield check_tree_perms, name, perm, group, 'group.write'
79
77
80
78
81 @with_setup(permissions_setup_func)
79 @with_setup(permissions_setup_func)
82 def test_user_permissions_on_group_without_recursive_mode_subgroup():
80 def test_user_permissions_on_group_without_recursive_mode_subgroup():
83 # set permission to g0 non-recursive mode
81 # set permission to g0 non-recursive mode
84 recursive = False
82 recursive = False
85 group = 'g0/g0_1'
83 group = 'g0/g0_1'
86 permissions_setup_func(group, 'group.write', recursive=recursive)
84 permissions_setup_func(group, 'group.write', recursive=recursive)
87
85
88 items = [x for x in _get_repo_perms(group, recursive)]
86 items = [x for x in _get_repo_perms(group, recursive)]
89 expected = 0
87 expected = 0
90 assert len(items) == expected, ' %s != %s' % (len(items), expected)
88 assert len(items) == expected, ' %s != %s' % (len(items), expected)
91 for name, perm in items:
89 for name, perm in items:
92 yield check_tree_perms, name, perm, group, 'repository.read'
90 yield check_tree_perms, name, perm, group, 'repository.read'
93
91
94 items = [x for x in _get_group_perms(group, recursive)]
92 items = [x for x in _get_group_perms(group, recursive)]
95 expected = 1
93 expected = 1
96 assert len(items) == expected, ' %s != %s' % (len(items), expected)
94 assert len(items) == expected, ' %s != %s' % (len(items), expected)
97 for name, perm in items:
95 for name, perm in items:
98 yield check_tree_perms, name, perm, group, 'group.write'
96 yield check_tree_perms, name, perm, group, 'group.write'
99
97
100
98
101 @with_setup(permissions_setup_func)
99 @with_setup(permissions_setup_func)
102 def test_user_permissions_on_group_with_recursive_mode():
100 def test_user_permissions_on_group_with_recursive_mode():
103
101
104 # set permission to g0 recursive mode, all children including
102 # set permission to g0 recursive mode, all children including
105 # other repos and groups should have this permission now set !
103 # other repos and groups should have this permission now set !
106 recursive = True
104 recursive = True
107 group = 'g0'
105 group = 'g0'
108 permissions_setup_func(group, 'group.write', recursive=recursive)
106 permissions_setup_func(group, 'group.write', recursive=recursive)
109
107
110 repo_items = [x for x in _get_repo_perms(group, recursive)]
108 repo_items = [x for x in _get_repo_perms(group, recursive)]
111 items = [x for x in _get_group_perms(group, recursive)]
109 items = [x for x in _get_group_perms(group, recursive)]
112 _check_expected_count(items, repo_items, expected_count(group, True))
110 _check_expected_count(items, repo_items, expected_count(group, True))
113
111
114 for name, perm in repo_items:
112 for name, perm in repo_items:
115 yield check_tree_perms, name, perm, group, 'repository.write'
113 yield check_tree_perms, name, perm, group, 'repository.write'
116
114
117 for name, perm in items:
115 for name, perm in items:
118 yield check_tree_perms, name, perm, group, 'group.write'
116 yield check_tree_perms, name, perm, group, 'group.write'
119
117
120
118
121 @with_setup(permissions_setup_func)
119 @with_setup(permissions_setup_func)
122 def test_user_permissions_on_group_with_recursive_mode_inner_group():
120 def test_user_permissions_on_group_with_recursive_mode_inner_group():
123 ## set permission to g0_3 group to none
121 ## set permission to g0_3 group to none
124 recursive = True
122 recursive = True
125 group = 'g0/g0_3'
123 group = 'g0/g0_3'
126 permissions_setup_func(group, 'group.none', recursive=recursive)
124 permissions_setup_func(group, 'group.none', recursive=recursive)
127
125
128 repo_items = [x for x in _get_repo_perms(group, recursive)]
126 repo_items = [x for x in _get_repo_perms(group, recursive)]
129 items = [x for x in _get_group_perms(group, recursive)]
127 items = [x for x in _get_group_perms(group, recursive)]
130 _check_expected_count(items, repo_items, expected_count(group, True))
128 _check_expected_count(items, repo_items, expected_count(group, True))
131
129
132 for name, perm in repo_items:
130 for name, perm in repo_items:
133 yield check_tree_perms, name, perm, group, 'repository.none'
131 yield check_tree_perms, name, perm, group, 'repository.none'
134
132
135 for name, perm in items:
133 for name, perm in items:
136 yield check_tree_perms, name, perm, group, 'group.none'
134 yield check_tree_perms, name, perm, group, 'group.none'
137
135
138
136
139 @with_setup(permissions_setup_func)
137 @with_setup(permissions_setup_func)
140 def test_user_permissions_on_group_with_recursive_mode_deepest():
138 def test_user_permissions_on_group_with_recursive_mode_deepest():
141 ## set permission to g0_3 group to none
139 ## set permission to g0_3 group to none
142 recursive = True
140 recursive = True
143 group = 'g0/g0_1/g0_1_1'
141 group = 'g0/g0_1/g0_1_1'
144 permissions_setup_func(group, 'group.write', recursive=recursive)
142 permissions_setup_func(group, 'group.write', recursive=recursive)
145
143
146 repo_items = [x for x in _get_repo_perms(group, recursive)]
144 repo_items = [x for x in _get_repo_perms(group, recursive)]
147 items = [x for x in _get_group_perms(group, recursive)]
145 items = [x for x in _get_group_perms(group, recursive)]
148 _check_expected_count(items, repo_items, expected_count(group, True))
146 _check_expected_count(items, repo_items, expected_count(group, True))
149
147
150 for name, perm in repo_items:
148 for name, perm in repo_items:
151 yield check_tree_perms, name, perm, group, 'repository.write'
149 yield check_tree_perms, name, perm, group, 'repository.write'
152
150
153 for name, perm in items:
151 for name, perm in items:
154 yield check_tree_perms, name, perm, group, 'group.write'
152 yield check_tree_perms, name, perm, group, 'group.write'
155
153
156
154
157 @with_setup(permissions_setup_func)
155 @with_setup(permissions_setup_func)
158 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
156 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
159 ## set permission to g0_3 group to none
157 ## set permission to g0_3 group to none
160 recursive = True
158 recursive = True
161 group = 'g0/g0_2'
159 group = 'g0/g0_2'
162 permissions_setup_func(group, 'group.admin', recursive=recursive)
160 permissions_setup_func(group, 'group.admin', recursive=recursive)
163
161
164 repo_items = [x for x in _get_repo_perms(group, recursive)]
162 repo_items = [x for x in _get_repo_perms(group, recursive)]
165 items = [x for x in _get_group_perms(group, recursive)]
163 items = [x for x in _get_group_perms(group, recursive)]
166 _check_expected_count(items, repo_items, expected_count(group, True))
164 _check_expected_count(items, repo_items, expected_count(group, True))
167
165
168 for name, perm in repo_items:
166 for name, perm in repo_items:
169 yield check_tree_perms, name, perm, group, 'repository.admin'
167 yield check_tree_perms, name, perm, group, 'repository.admin'
170
168
171 for name, perm in items:
169 for name, perm in items:
172 yield check_tree_perms, name, perm, group, 'group.admin'
170 yield check_tree_perms, name, perm, group, 'group.admin'
@@ -1,295 +1,294 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_libs
3 rhodecode.tests.test_libs
4 ~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6
6
7 Package for testing various lib/helper functions in rhodecode
7 Package for testing various lib/helper functions in rhodecode
8
8
9 :created_on: Jun 9, 2011
9 :created_on: Jun 9, 2011
10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 from __future__ import with_statement
25 from __future__ import with_statement
26 import unittest
27 import datetime
26 import datetime
28 import hashlib
27 import hashlib
29 import mock
28 import mock
30 from rhodecode.tests import *
29 from rhodecode.tests import *
31
30
32 proto = 'http'
31 proto = 'http'
33 TEST_URLS = [
32 TEST_URLS = [
34 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
33 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
35 '%s://127.0.0.1' % proto),
34 '%s://127.0.0.1' % proto),
36 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
35 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
37 '%s://127.0.0.1' % proto),
36 '%s://127.0.0.1' % proto),
38 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
37 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
39 '%s://127.0.0.1' % proto),
38 '%s://127.0.0.1' % proto),
40 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
39 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
41 '%s://127.0.0.1:8080' % proto),
40 '%s://127.0.0.1:8080' % proto),
42 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
41 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
43 '%s://domain.org' % proto),
42 '%s://domain.org' % proto),
44 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
43 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
45 '8080'],
44 '8080'],
46 '%s://domain.org:8080' % proto),
45 '%s://domain.org:8080' % proto),
47 ]
46 ]
48
47
49 proto = 'https'
48 proto = 'https'
50 TEST_URLS += [
49 TEST_URLS += [
51 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
50 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
52 '%s://127.0.0.1' % proto),
51 '%s://127.0.0.1' % proto),
53 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
52 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
54 '%s://127.0.0.1' % proto),
53 '%s://127.0.0.1' % proto),
55 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
54 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
56 '%s://127.0.0.1' % proto),
55 '%s://127.0.0.1' % proto),
57 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
56 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
58 '%s://127.0.0.1:8080' % proto),
57 '%s://127.0.0.1:8080' % proto),
59 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
58 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
60 '%s://domain.org' % proto),
59 '%s://domain.org' % proto),
61 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
60 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
62 '8080'],
61 '8080'],
63 '%s://domain.org:8080' % proto),
62 '%s://domain.org:8080' % proto),
64 ]
63 ]
65
64
66
65
67 class TestLibs(unittest.TestCase):
66 class TestLibs(BaseTestCase):
68
67
69 @parameterized.expand(TEST_URLS)
68 @parameterized.expand(TEST_URLS)
70 def test_uri_filter(self, test_url, expected, expected_creds):
69 def test_uri_filter(self, test_url, expected, expected_creds):
71 from rhodecode.lib.utils2 import uri_filter
70 from rhodecode.lib.utils2 import uri_filter
72 self.assertEqual(uri_filter(test_url), expected)
71 self.assertEqual(uri_filter(test_url), expected)
73
72
74 @parameterized.expand(TEST_URLS)
73 @parameterized.expand(TEST_URLS)
75 def test_credentials_filter(self, test_url, expected, expected_creds):
74 def test_credentials_filter(self, test_url, expected, expected_creds):
76 from rhodecode.lib.utils2 import credentials_filter
75 from rhodecode.lib.utils2 import credentials_filter
77 self.assertEqual(credentials_filter(test_url), expected_creds)
76 self.assertEqual(credentials_filter(test_url), expected_creds)
78
77
79 @parameterized.expand([('t', True),
78 @parameterized.expand([('t', True),
80 ('true', True),
79 ('true', True),
81 ('y', True),
80 ('y', True),
82 ('yes', True),
81 ('yes', True),
83 ('on', True),
82 ('on', True),
84 ('1', True),
83 ('1', True),
85 ('Y', True),
84 ('Y', True),
86 ('yeS', True),
85 ('yeS', True),
87 ('Y', True),
86 ('Y', True),
88 ('TRUE', True),
87 ('TRUE', True),
89 ('T', True),
88 ('T', True),
90 ('False', False),
89 ('False', False),
91 ('F', False),
90 ('F', False),
92 ('FALSE', False),
91 ('FALSE', False),
93 ('0', False),
92 ('0', False),
94 ('-1', False),
93 ('-1', False),
95 ('', False)
94 ('', False)
96 ])
95 ])
97 def test_str2bool(self, str_bool, expected):
96 def test_str2bool(self, str_bool, expected):
98 from rhodecode.lib.utils2 import str2bool
97 from rhodecode.lib.utils2 import str2bool
99 self.assertEqual(str2bool(str_bool), expected)
98 self.assertEqual(str2bool(str_bool), expected)
100
99
101 def test_mention_extractor(self):
100 def test_mention_extractor(self):
102 from rhodecode.lib.utils2 import extract_mentioned_users
101 from rhodecode.lib.utils2 import extract_mentioned_users
103 sample = (
102 sample = (
104 "@first hi there @marcink here's my email marcin@email.com "
103 "@first hi there @marcink here's my email marcin@email.com "
105 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
104 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
106 "@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl "
105 "@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl "
107 "@marian.user just do it @marco-polo and next extract @marco_polo "
106 "@marian.user just do it @marco-polo and next extract @marco_polo "
108 "user.dot hej ! not-needed maril@domain.org"
107 "user.dot hej ! not-needed maril@domain.org"
109 )
108 )
110
109
111 s = sorted([
110 s = sorted([
112 'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
111 'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
113 'marian.user', 'marco-polo', 'marco_polo'
112 'marian.user', 'marco-polo', 'marco_polo'
114 ], key=lambda k: k.lower())
113 ], key=lambda k: k.lower())
115 self.assertEqual(s, extract_mentioned_users(sample))
114 self.assertEqual(s, extract_mentioned_users(sample))
116
115
117 @parameterized.expand([
116 @parameterized.expand([
118 (dict(), u'just now'),
117 (dict(), u'just now'),
119 (dict(seconds= -1), u'1 second ago'),
118 (dict(seconds= -1), u'1 second ago'),
120 (dict(seconds= -60 * 2), u'2 minutes ago'),
119 (dict(seconds= -60 * 2), u'2 minutes ago'),
121 (dict(hours= -1), u'1 hour ago'),
120 (dict(hours= -1), u'1 hour ago'),
122 (dict(hours= -24), u'1 day ago'),
121 (dict(hours= -24), u'1 day ago'),
123 (dict(hours= -24 * 5), u'5 days ago'),
122 (dict(hours= -24 * 5), u'5 days ago'),
124 (dict(months= -1), u'1 month ago'),
123 (dict(months= -1), u'1 month ago'),
125 (dict(months= -1, days= -2), u'1 month and 2 days ago'),
124 (dict(months= -1, days= -2), u'1 month and 2 days ago'),
126 (dict(years= -1, months= -1), u'1 year and 1 month ago'),
125 (dict(years= -1, months= -1), u'1 year and 1 month ago'),
127 ])
126 ])
128 def test_age(self, age_args, expected):
127 def test_age(self, age_args, expected):
129 from rhodecode.lib.utils2 import age
128 from rhodecode.lib.utils2 import age
130 from dateutil import relativedelta
129 from dateutil import relativedelta
131 n = datetime.datetime(year=2012, month=5, day=17)
130 n = datetime.datetime(year=2012, month=5, day=17)
132 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
131 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
133 self.assertEqual(age(n + delt(**age_args), now=n), expected)
132 self.assertEqual(age(n + delt(**age_args), now=n), expected)
134
133
135 @parameterized.expand([
134 @parameterized.expand([
136
135
137 (dict(), u'just now'),
136 (dict(), u'just now'),
138 (dict(seconds=1), u'in 1 second'),
137 (dict(seconds=1), u'in 1 second'),
139 (dict(seconds=60 * 2), u'in 2 minutes'),
138 (dict(seconds=60 * 2), u'in 2 minutes'),
140 (dict(hours=1), u'in 1 hour'),
139 (dict(hours=1), u'in 1 hour'),
141 (dict(hours=24), u'in 1 day'),
140 (dict(hours=24), u'in 1 day'),
142 (dict(hours=24 * 5), u'in 5 days'),
141 (dict(hours=24 * 5), u'in 5 days'),
143 (dict(months=1), u'in 1 month'),
142 (dict(months=1), u'in 1 month'),
144 (dict(months=1, days=1), u'in 1 month and 1 day'),
143 (dict(months=1, days=1), u'in 1 month and 1 day'),
145 (dict(years=1, months=1), u'in 1 year and 1 month')
144 (dict(years=1, months=1), u'in 1 year and 1 month')
146 ])
145 ])
147 def test_age_in_future(self, age_args, expected):
146 def test_age_in_future(self, age_args, expected):
148 from rhodecode.lib.utils2 import age
147 from rhodecode.lib.utils2 import age
149 from dateutil import relativedelta
148 from dateutil import relativedelta
150 n = datetime.datetime(year=2012, month=5, day=17)
149 n = datetime.datetime(year=2012, month=5, day=17)
151 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
150 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
152 self.assertEqual(age(n + delt(**age_args), now=n), expected)
151 self.assertEqual(age(n + delt(**age_args), now=n), expected)
153
152
154 def test_tag_exctrator(self):
153 def test_tag_exctrator(self):
155 sample = (
154 sample = (
156 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
155 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
157 "[requires] [stale] [see<>=>] [see => http://url.com]"
156 "[requires] [stale] [see<>=>] [see => http://url.com]"
158 "[requires => url] [lang => python] [just a tag]"
157 "[requires => url] [lang => python] [just a tag]"
159 "[,d] [ => ULR ] [obsolete] [desc]]"
158 "[,d] [ => ULR ] [obsolete] [desc]]"
160 )
159 )
161 from rhodecode.lib.helpers import desc_stylize
160 from rhodecode.lib.helpers import desc_stylize
162 res = desc_stylize(sample)
161 res = desc_stylize(sample)
163 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
162 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
164 self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
163 self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
165 self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
164 self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
166 self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
165 self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
167 self.assertTrue('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res)
166 self.assertTrue('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res)
168 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
167 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
169
168
170 def test_alternative_gravatar(self):
169 def test_alternative_gravatar(self):
171 from rhodecode.lib.helpers import gravatar_url
170 from rhodecode.lib.helpers import gravatar_url
172 _md5 = lambda s: hashlib.md5(s).hexdigest()
171 _md5 = lambda s: hashlib.md5(s).hexdigest()
173
172
174 def fake_conf(**kwargs):
173 def fake_conf(**kwargs):
175 from pylons import config
174 from pylons import config
176 config['app_conf'] = {}
175 config['app_conf'] = {}
177 config['app_conf']['use_gravatar'] = True
176 config['app_conf']['use_gravatar'] = True
178 config['app_conf'].update(kwargs)
177 config['app_conf'].update(kwargs)
179 return config
178 return config
180
179
181 class fake_url():
180 class fake_url():
182 @classmethod
181 @classmethod
183 def current(cls, *args, **kwargs):
182 def current(cls, *args, **kwargs):
184 return 'https://server.com'
183 return 'https://server.com'
185
184
186 with mock.patch('pylons.url', fake_url):
185 with mock.patch('pylons.url', fake_url):
187 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
186 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
188 with mock.patch('pylons.config', fake):
187 with mock.patch('pylons.config', fake):
189 from pylons import url
188 from pylons import url
190 assert url.current() == 'https://server.com'
189 assert url.current() == 'https://server.com'
191 grav = gravatar_url(email_address='test@foo.com', size=24)
190 grav = gravatar_url(email_address='test@foo.com', size=24)
192 assert grav == 'http://test.com/test@foo.com'
191 assert grav == 'http://test.com/test@foo.com'
193
192
194 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
193 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
195 with mock.patch('pylons.config', fake):
194 with mock.patch('pylons.config', fake):
196 grav = gravatar_url(email_address='test@foo.com', size=24)
195 grav = gravatar_url(email_address='test@foo.com', size=24)
197 assert grav == 'http://test.com/test@foo.com'
196 assert grav == 'http://test.com/test@foo.com'
198
197
199 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
198 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
200 with mock.patch('pylons.config', fake):
199 with mock.patch('pylons.config', fake):
201 em = 'test@foo.com'
200 em = 'test@foo.com'
202 grav = gravatar_url(email_address=em, size=24)
201 grav = gravatar_url(email_address=em, size=24)
203 assert grav == 'http://test.com/%s' % (_md5(em))
202 assert grav == 'http://test.com/%s' % (_md5(em))
204
203
205 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
204 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
206 with mock.patch('pylons.config', fake):
205 with mock.patch('pylons.config', fake):
207 em = 'test@foo.com'
206 em = 'test@foo.com'
208 grav = gravatar_url(email_address=em, size=24)
207 grav = gravatar_url(email_address=em, size=24)
209 assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
208 assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
210
209
211 fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
210 fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
212 with mock.patch('pylons.config', fake):
211 with mock.patch('pylons.config', fake):
213 em = 'test@foo.com'
212 em = 'test@foo.com'
214 grav = gravatar_url(email_address=em, size=24)
213 grav = gravatar_url(email_address=em, size=24)
215 assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
214 assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
216
215
217 def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
216 def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
218 """
217 """
219 Changes `some text url[foo]` => `some text <a href="/">foo</a>
218 Changes `some text url[foo]` => `some text <a href="/">foo</a>
220
219
221 :param text:
220 :param text:
222 """
221 """
223 import re
222 import re
224 # quickly change expected url[] into a link
223 # quickly change expected url[] into a link
225 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
224 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
226
225
227 def url_func(match_obj):
226 def url_func(match_obj):
228 _url = match_obj.groups()[0]
227 _url = match_obj.groups()[0]
229 return tmpl % (url_ or '/some-url', _url)
228 return tmpl % (url_ or '/some-url', _url)
230 return URL_PAT.sub(url_func, text)
229 return URL_PAT.sub(url_func, text)
231
230
232 @parameterized.expand([
231 @parameterized.expand([
233 ("",
232 ("",
234 ""),
233 ""),
235 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
234 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
236 "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
235 "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
237 ("from rev 000000000000",
236 ("from rev 000000000000",
238 "from rev url[000000000000]"),
237 "from rev url[000000000000]"),
239 ("from rev 000000000000123123 also rev 000000000000",
238 ("from rev 000000000000123123 also rev 000000000000",
240 "from rev url[000000000000123123] also rev url[000000000000]"),
239 "from rev url[000000000000123123] also rev url[000000000000]"),
241 ("this should-000 00",
240 ("this should-000 00",
242 "this should-000 00"),
241 "this should-000 00"),
243 ("longtextffffffffff rev 123123123123",
242 ("longtextffffffffff rev 123123123123",
244 "longtextffffffffff rev url[123123123123]"),
243 "longtextffffffffff rev url[123123123123]"),
245 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
244 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
246 "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
245 "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
247 ("ffffffffffff some text traalaa",
246 ("ffffffffffff some text traalaa",
248 "url[ffffffffffff] some text traalaa"),
247 "url[ffffffffffff] some text traalaa"),
249 ("""Multi line
248 ("""Multi line
250 123123123123
249 123123123123
251 some text 123123123123
250 some text 123123123123
252 sometimes !
251 sometimes !
253 """,
252 """,
254 """Multi line
253 """Multi line
255 url[123123123123]
254 url[123123123123]
256 some text url[123123123123]
255 some text url[123123123123]
257 sometimes !
256 sometimes !
258 """)
257 """)
259 ])
258 ])
260 def test_urlify_changesets(self, sample, expected):
259 def test_urlify_changesets(self, sample, expected):
261 def fake_url(self, *args, **kwargs):
260 def fake_url(self, *args, **kwargs):
262 return '/some-url'
261 return '/some-url'
263
262
264 expected = self._quick_url(expected)
263 expected = self._quick_url(expected)
265
264
266 with mock.patch('pylons.url', fake_url):
265 with mock.patch('pylons.url', fake_url):
267 from rhodecode.lib.helpers import urlify_changesets
266 from rhodecode.lib.helpers import urlify_changesets
268 self.assertEqual(urlify_changesets(sample, 'repo_name'), expected)
267 self.assertEqual(urlify_changesets(sample, 'repo_name'), expected)
269
268
270 @parameterized.expand([
269 @parameterized.expand([
271 ("",
270 ("",
272 "",
271 "",
273 ""),
272 ""),
274 ("https://svn.apache.org/repos",
273 ("https://svn.apache.org/repos",
275 "url[https://svn.apache.org/repos]",
274 "url[https://svn.apache.org/repos]",
276 "https://svn.apache.org/repos"),
275 "https://svn.apache.org/repos"),
277 ("http://svn.apache.org/repos",
276 ("http://svn.apache.org/repos",
278 "url[http://svn.apache.org/repos]",
277 "url[http://svn.apache.org/repos]",
279 "http://svn.apache.org/repos"),
278 "http://svn.apache.org/repos"),
280 ("from rev a also rev http://google.com",
279 ("from rev a also rev http://google.com",
281 "from rev a also rev url[http://google.com]",
280 "from rev a also rev url[http://google.com]",
282 "http://google.com"),
281 "http://google.com"),
283 ("""Multi line
282 ("""Multi line
284 https://foo.bar.com
283 https://foo.bar.com
285 some text lalala""",
284 some text lalala""",
286 """Multi line
285 """Multi line
287 url[https://foo.bar.com]
286 url[https://foo.bar.com]
288 some text lalala""",
287 some text lalala""",
289 "https://foo.bar.com")
288 "https://foo.bar.com")
290 ])
289 ])
291 def test_urlify_test(self, sample, expected, url_):
290 def test_urlify_test(self, sample, expected, url_):
292 from rhodecode.lib.helpers import urlify_text
291 from rhodecode.lib.helpers import urlify_text
293 expected = self._quick_url(expected,
292 expected = self._quick_url(expected,
294 tmpl="""<a href="%s">%s</a>""", url_=url_)
293 tmpl="""<a href="%s">%s</a>""", url_=url_)
295 self.assertEqual(urlify_text(sample), expected)
294 self.assertEqual(urlify_text(sample), expected)
@@ -1,254 +1,253 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 import unittest
3 import formencode
2 import formencode
4
3
5 from rhodecode.tests import *
4 from rhodecode.tests import *
6
5
7 from rhodecode.model import validators as v
6 from rhodecode.model import validators as v
8 from rhodecode.model.users_group import UserGroupModel
7 from rhodecode.model.users_group import UserGroupModel
9
8
10 from rhodecode.model.meta import Session
9 from rhodecode.model.meta import Session
11 from rhodecode.model.repos_group import ReposGroupModel
10 from rhodecode.model.repos_group import ReposGroupModel
12 from rhodecode.model.db import ChangesetStatus, Repository
11 from rhodecode.model.db import ChangesetStatus, Repository
13 from rhodecode.model.changeset_status import ChangesetStatusModel
12 from rhodecode.model.changeset_status import ChangesetStatusModel
14 from rhodecode.tests.fixture import Fixture
13 from rhodecode.tests.fixture import Fixture
15
14
16 fixture = Fixture()
15 fixture = Fixture()
17
16
18
17
19 class TestReposGroups(unittest.TestCase):
18 class TestReposGroups(BaseTestCase):
20
19
21 def setUp(self):
20 def setUp(self):
22 pass
21 pass
23
22
24 def tearDown(self):
23 def tearDown(self):
25 Session.remove()
24 Session.remove()
26
25
27 def test_Message_extractor(self):
26 def test_Message_extractor(self):
28 validator = v.ValidUsername()
27 validator = v.ValidUsername()
29 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
28 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
30
29
31 class StateObj(object):
30 class StateObj(object):
32 pass
31 pass
33
32
34 self.assertRaises(formencode.Invalid,
33 self.assertRaises(formencode.Invalid,
35 validator.to_python, 'default', StateObj)
34 validator.to_python, 'default', StateObj)
36
35
37 def test_ValidUsername(self):
36 def test_ValidUsername(self):
38 validator = v.ValidUsername()
37 validator = v.ValidUsername()
39
38
40 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
39 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
41 self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
40 self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
42 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
41 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
43 self.assertRaises(formencode.Invalid, validator.to_python,
42 self.assertRaises(formencode.Invalid, validator.to_python,
44 TEST_USER_ADMIN_LOGIN)
43 TEST_USER_ADMIN_LOGIN)
45 self.assertEqual('test', validator.to_python('test'))
44 self.assertEqual('test', validator.to_python('test'))
46
45
47 validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
46 validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
48
47
49 def test_ValidRepoUser(self):
48 def test_ValidRepoUser(self):
50 validator = v.ValidRepoUser()
49 validator = v.ValidRepoUser()
51 self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
50 self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
52 self.assertEqual(TEST_USER_ADMIN_LOGIN,
51 self.assertEqual(TEST_USER_ADMIN_LOGIN,
53 validator.to_python(TEST_USER_ADMIN_LOGIN))
52 validator.to_python(TEST_USER_ADMIN_LOGIN))
54
53
55 def test_ValidUserGroup(self):
54 def test_ValidUserGroup(self):
56 validator = v.ValidUserGroup()
55 validator = v.ValidUserGroup()
57 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
56 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
58 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
57 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
59
58
60 gr = fixture.create_user_group('test')
59 gr = fixture.create_user_group('test')
61 gr2 = fixture.create_user_group('tes2')
60 gr2 = fixture.create_user_group('tes2')
62 Session().commit()
61 Session().commit()
63 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
62 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
64 assert gr.users_group_id != None
63 assert gr.users_group_id != None
65 validator = v.ValidUserGroup(edit=True,
64 validator = v.ValidUserGroup(edit=True,
66 old_data={'users_group_id':
65 old_data={'users_group_id':
67 gr2.users_group_id})
66 gr2.users_group_id})
68
67
69 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
68 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
70 self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
69 self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
71 self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
70 self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
72 UserGroupModel().delete(gr)
71 UserGroupModel().delete(gr)
73 UserGroupModel().delete(gr2)
72 UserGroupModel().delete(gr2)
74 Session().commit()
73 Session().commit()
75
74
76 def test_ValidReposGroup(self):
75 def test_ValidReposGroup(self):
77 validator = v.ValidReposGroup()
76 validator = v.ValidReposGroup()
78 model = ReposGroupModel()
77 model = ReposGroupModel()
79 self.assertRaises(formencode.Invalid, validator.to_python,
78 self.assertRaises(formencode.Invalid, validator.to_python,
80 {'group_name': HG_REPO, })
79 {'group_name': HG_REPO, })
81 gr = model.create(group_name='test_gr', group_description='desc',
80 gr = model.create(group_name='test_gr', group_description='desc',
82 parent=None,
81 parent=None,
83 just_db=True,
82 just_db=True,
84 owner=TEST_USER_ADMIN_LOGIN)
83 owner=TEST_USER_ADMIN_LOGIN)
85 self.assertRaises(formencode.Invalid,
84 self.assertRaises(formencode.Invalid,
86 validator.to_python, {'group_name': gr.group_name, })
85 validator.to_python, {'group_name': gr.group_name, })
87
86
88 validator = v.ValidReposGroup(edit=True,
87 validator = v.ValidReposGroup(edit=True,
89 old_data={'group_id': gr.group_id})
88 old_data={'group_id': gr.group_id})
90 self.assertRaises(formencode.Invalid,
89 self.assertRaises(formencode.Invalid,
91 validator.to_python, {
90 validator.to_python, {
92 'group_name': gr.group_name + 'n',
91 'group_name': gr.group_name + 'n',
93 'group_parent_id': gr.group_id
92 'group_parent_id': gr.group_id
94 })
93 })
95 model.delete(gr)
94 model.delete(gr)
96
95
97 def test_ValidPassword(self):
96 def test_ValidPassword(self):
98 validator = v.ValidPassword()
97 validator = v.ValidPassword()
99 self.assertEqual('lol', validator.to_python('lol'))
98 self.assertEqual('lol', validator.to_python('lol'))
100 self.assertEqual(None, validator.to_python(None))
99 self.assertEqual(None, validator.to_python(None))
101 self.assertRaises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
100 self.assertRaises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
102
101
103 def test_ValidPasswordsMatch(self):
102 def test_ValidPasswordsMatch(self):
104 validator = v.ValidPasswordsMatch()
103 validator = v.ValidPasswordsMatch()
105 self.assertRaises(formencode.Invalid,
104 self.assertRaises(formencode.Invalid,
106 validator.to_python, {'password': 'pass',
105 validator.to_python, {'password': 'pass',
107 'password_confirmation': 'pass2'})
106 'password_confirmation': 'pass2'})
108
107
109 self.assertRaises(formencode.Invalid,
108 self.assertRaises(formencode.Invalid,
110 validator.to_python, {'new_password': 'pass',
109 validator.to_python, {'new_password': 'pass',
111 'password_confirmation': 'pass2'})
110 'password_confirmation': 'pass2'})
112
111
113 self.assertEqual({'new_password': 'pass',
112 self.assertEqual({'new_password': 'pass',
114 'password_confirmation': 'pass'},
113 'password_confirmation': 'pass'},
115 validator.to_python({'new_password': 'pass',
114 validator.to_python({'new_password': 'pass',
116 'password_confirmation': 'pass'}))
115 'password_confirmation': 'pass'}))
117
116
118 self.assertEqual({'password': 'pass',
117 self.assertEqual({'password': 'pass',
119 'password_confirmation': 'pass'},
118 'password_confirmation': 'pass'},
120 validator.to_python({'password': 'pass',
119 validator.to_python({'password': 'pass',
121 'password_confirmation': 'pass'}))
120 'password_confirmation': 'pass'}))
122
121
123 def test_ValidAuth(self):
122 def test_ValidAuth(self):
124 validator = v.ValidAuth()
123 validator = v.ValidAuth()
125 valid_creds = {
124 valid_creds = {
126 'username': TEST_USER_REGULAR2_LOGIN,
125 'username': TEST_USER_REGULAR2_LOGIN,
127 'password': TEST_USER_REGULAR2_PASS,
126 'password': TEST_USER_REGULAR2_PASS,
128 }
127 }
129 invalid_creds = {
128 invalid_creds = {
130 'username': 'err',
129 'username': 'err',
131 'password': 'err',
130 'password': 'err',
132 }
131 }
133 self.assertEqual(valid_creds, validator.to_python(valid_creds))
132 self.assertEqual(valid_creds, validator.to_python(valid_creds))
134 self.assertRaises(formencode.Invalid,
133 self.assertRaises(formencode.Invalid,
135 validator.to_python, invalid_creds)
134 validator.to_python, invalid_creds)
136
135
137 def test_ValidAuthToken(self):
136 def test_ValidAuthToken(self):
138 validator = v.ValidAuthToken()
137 validator = v.ValidAuthToken()
139 # this is untestable without a threadlocal
138 # this is untestable without a threadlocal
140 # self.assertRaises(formencode.Invalid,
139 # self.assertRaises(formencode.Invalid,
141 # validator.to_python, 'BadToken')
140 # validator.to_python, 'BadToken')
142 validator
141 validator
143
142
144 def test_ValidRepoName(self):
143 def test_ValidRepoName(self):
145 validator = v.ValidRepoName()
144 validator = v.ValidRepoName()
146
145
147 self.assertRaises(formencode.Invalid,
146 self.assertRaises(formencode.Invalid,
148 validator.to_python, {'repo_name': ''})
147 validator.to_python, {'repo_name': ''})
149
148
150 self.assertRaises(formencode.Invalid,
149 self.assertRaises(formencode.Invalid,
151 validator.to_python, {'repo_name': HG_REPO})
150 validator.to_python, {'repo_name': HG_REPO})
152
151
153 gr = ReposGroupModel().create(group_name='group_test',
152 gr = ReposGroupModel().create(group_name='group_test',
154 group_description='desc',
153 group_description='desc',
155 parent=None,
154 parent=None,
156 owner=TEST_USER_ADMIN_LOGIN)
155 owner=TEST_USER_ADMIN_LOGIN)
157 self.assertRaises(formencode.Invalid,
156 self.assertRaises(formencode.Invalid,
158 validator.to_python, {'repo_name': gr.group_name})
157 validator.to_python, {'repo_name': gr.group_name})
159
158
160 #TODO: write an error case for that ie. create a repo withinh a group
159 #TODO: write an error case for that ie. create a repo withinh a group
161 # self.assertRaises(formencode.Invalid,
160 # self.assertRaises(formencode.Invalid,
162 # validator.to_python, {'repo_name': 'some',
161 # validator.to_python, {'repo_name': 'some',
163 # 'repo_group': gr.group_id})
162 # 'repo_group': gr.group_id})
164
163
165 def test_ValidForkName(self):
164 def test_ValidForkName(self):
166 # this uses ValidRepoName validator
165 # this uses ValidRepoName validator
167 assert True
166 assert True
168
167
169 @parameterized.expand([
168 @parameterized.expand([
170 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
169 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
171 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
170 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
172 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
171 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
173 ('/]re po', 're-po')])
172 ('/]re po', 're-po')])
174 def test_SlugifyName(self, name, expected):
173 def test_SlugifyName(self, name, expected):
175 validator = v.SlugifyName()
174 validator = v.SlugifyName()
176 self.assertEqual(expected, validator.to_python(name))
175 self.assertEqual(expected, validator.to_python(name))
177
176
178 def test_ValidCloneUri(self):
177 def test_ValidCloneUri(self):
179 #TODO: write this one
178 #TODO: write this one
180 pass
179 pass
181
180
182 def test_ValidForkType(self):
181 def test_ValidForkType(self):
183 validator = v.ValidForkType(old_data={'repo_type': 'hg'})
182 validator = v.ValidForkType(old_data={'repo_type': 'hg'})
184 self.assertEqual('hg', validator.to_python('hg'))
183 self.assertEqual('hg', validator.to_python('hg'))
185 self.assertRaises(formencode.Invalid, validator.to_python, 'git')
184 self.assertRaises(formencode.Invalid, validator.to_python, 'git')
186
185
187 def test_ValidPerms(self):
186 def test_ValidPerms(self):
188 #TODO: write this one
187 #TODO: write this one
189 pass
188 pass
190
189
191 def test_ValidSettings(self):
190 def test_ValidSettings(self):
192 validator = v.ValidSettings()
191 validator = v.ValidSettings()
193 self.assertEqual({'pass': 'pass'},
192 self.assertEqual({'pass': 'pass'},
194 validator.to_python(value={'user': 'test',
193 validator.to_python(value={'user': 'test',
195 'pass': 'pass'}))
194 'pass': 'pass'}))
196
195
197 self.assertEqual({'user2': 'test', 'pass': 'pass'},
196 self.assertEqual({'user2': 'test', 'pass': 'pass'},
198 validator.to_python(value={'user2': 'test',
197 validator.to_python(value={'user2': 'test',
199 'pass': 'pass'}))
198 'pass': 'pass'}))
200
199
201 def test_ValidPath(self):
200 def test_ValidPath(self):
202 validator = v.ValidPath()
201 validator = v.ValidPath()
203 self.assertEqual(TESTS_TMP_PATH,
202 self.assertEqual(TESTS_TMP_PATH,
204 validator.to_python(TESTS_TMP_PATH))
203 validator.to_python(TESTS_TMP_PATH))
205 self.assertRaises(formencode.Invalid, validator.to_python,
204 self.assertRaises(formencode.Invalid, validator.to_python,
206 '/no_such_dir')
205 '/no_such_dir')
207
206
208 def test_UniqSystemEmail(self):
207 def test_UniqSystemEmail(self):
209 validator = v.UniqSystemEmail(old_data={})
208 validator = v.UniqSystemEmail(old_data={})
210
209
211 self.assertEqual('mail@python.org',
210 self.assertEqual('mail@python.org',
212 validator.to_python('MaiL@Python.org'))
211 validator.to_python('MaiL@Python.org'))
213
212
214 email = TEST_USER_REGULAR2_EMAIL
213 email = TEST_USER_REGULAR2_EMAIL
215 self.assertRaises(formencode.Invalid, validator.to_python, email)
214 self.assertRaises(formencode.Invalid, validator.to_python, email)
216
215
217 def test_ValidSystemEmail(self):
216 def test_ValidSystemEmail(self):
218 validator = v.ValidSystemEmail()
217 validator = v.ValidSystemEmail()
219 email = TEST_USER_REGULAR2_EMAIL
218 email = TEST_USER_REGULAR2_EMAIL
220
219
221 self.assertEqual(email, validator.to_python(email))
220 self.assertEqual(email, validator.to_python(email))
222 self.assertRaises(formencode.Invalid, validator.to_python, 'err')
221 self.assertRaises(formencode.Invalid, validator.to_python, 'err')
223
222
224 def test_LdapLibValidator(self):
223 def test_LdapLibValidator(self):
225 if ldap_lib_installed:
224 if ldap_lib_installed:
226 validator = v.LdapLibValidator()
225 validator = v.LdapLibValidator()
227 self.assertEqual("DN", validator.to_python('DN'))
226 self.assertEqual("DN", validator.to_python('DN'))
228 else:
227 else:
229 validator = v.LdapLibValidator()
228 validator = v.LdapLibValidator()
230 self.assertRaises(v.LdapImportError, validator.to_python, 'err')
229 self.assertRaises(v.LdapImportError, validator.to_python, 'err')
231
230
232 def test_AttrLoginValidator(self):
231 def test_AttrLoginValidator(self):
233 validator = v.AttrLoginValidator()
232 validator = v.AttrLoginValidator()
234 self.assertEqual('DN_attr', validator.to_python('DN_attr'))
233 self.assertEqual('DN_attr', validator.to_python('DN_attr'))
235
234
236 def test_NotReviewedRevisions(self):
235 def test_NotReviewedRevisions(self):
237 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
236 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
238 validator = v.NotReviewedRevisions(repo_id)
237 validator = v.NotReviewedRevisions(repo_id)
239 rev = '0' * 40
238 rev = '0' * 40
240 # add status for a rev, that should throw an error because it is already
239 # add status for a rev, that should throw an error because it is already
241 # reviewed
240 # reviewed
242 new_status = ChangesetStatus()
241 new_status = ChangesetStatus()
243 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
242 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
244 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
243 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
245 new_status.status = ChangesetStatus.STATUS_APPROVED
244 new_status.status = ChangesetStatus.STATUS_APPROVED
246 new_status.comment = None
245 new_status.comment = None
247 new_status.revision = rev
246 new_status.revision = rev
248 Session().add(new_status)
247 Session().add(new_status)
249 Session().commit()
248 Session().commit()
250 try:
249 try:
251 self.assertRaises(formencode.Invalid, validator.to_python, [rev])
250 self.assertRaises(formencode.Invalid, validator.to_python, [rev])
252 finally:
251 finally:
253 Session().delete(new_status)
252 Session().delete(new_status)
254 Session().commit()
253 Session().commit()
@@ -1,507 +1,507 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_scm_operations
3 rhodecode.tests.test_scm_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations.
6 Test suite for making push/pull operations.
7 Run using after doing paster serve test.ini::
7 Run using after doing paster serve test.ini::
8 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
8 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
9
9
10 You must have git > 1.8.1 for tests to work fine
10 You must have git > 1.8.1 for tests to work fine
11
11
12 :created_on: Dec 30, 2010
12 :created_on: Dec 30, 2010
13 :author: marcink
13 :author: marcink
14 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
14 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
15 :license: GPLv3, see COPYING for more details.
15 :license: GPLv3, see COPYING for more details.
16 """
16 """
17 # This program is free software: you can redistribute it and/or modify
17 # This program is free software: you can redistribute it and/or modify
18 # it under the terms of the GNU General Public License as published by
18 # it under the terms of the GNU General Public License as published by
19 # the Free Software Foundation, either version 3 of the License, or
19 # the Free Software Foundation, either version 3 of the License, or
20 # (at your option) any later version.
20 # (at your option) any later version.
21 #
21 #
22 # This program is distributed in the hope that it will be useful,
22 # This program is distributed in the hope that it will be useful,
23 # but WITHOUT ANY WARRANTY; without even the implied warranty of
23 # but WITHOUT ANY WARRANTY; without even the implied warranty of
24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 # GNU General Public License for more details.
25 # GNU General Public License for more details.
26 #
26 #
27 # You should have received a copy of the GNU General Public License
27 # You should have received a copy of the GNU General Public License
28 # along with this program. If not, see <http://www.gnu.org/licenses/>.
28 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29
29
30 import os
30 import os
31 import tempfile
31 import tempfile
32 import unittest
32 import unittest
33 import time
33 import time
34 from os.path import join as jn
34 from os.path import join as jn
35 from os.path import dirname as dn
35 from os.path import dirname as dn
36
36
37 from tempfile import _RandomNameSequence
37 from tempfile import _RandomNameSequence
38 from subprocess import Popen, PIPE
38 from subprocess import Popen, PIPE
39
39
40 from rhodecode.tests import *
40 from rhodecode.tests import *
41 from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\
41 from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\
42 CacheInvalidation
42 CacheInvalidation
43 from rhodecode.model.meta import Session
43 from rhodecode.model.meta import Session
44 from rhodecode.model.repo import RepoModel
44 from rhodecode.model.repo import RepoModel
45 from rhodecode.model.user import UserModel
45 from rhodecode.model.user import UserModel
46
46
47 DEBUG = True
47 DEBUG = True
48 HOST = '127.0.0.1:5000' # test host
48 HOST = '127.0.0.1:5000' # test host
49
49
50
50
51 class Command(object):
51 class Command(object):
52
52
53 def __init__(self, cwd):
53 def __init__(self, cwd):
54 self.cwd = cwd
54 self.cwd = cwd
55
55
56 def execute(self, cmd, *args):
56 def execute(self, cmd, *args):
57 """
57 """
58 Runs command on the system with given ``args``.
58 Runs command on the system with given ``args``.
59 """
59 """
60
60
61 command = cmd + ' ' + ' '.join(args)
61 command = cmd + ' ' + ' '.join(args)
62 if DEBUG:
62 if DEBUG:
63 print '*** CMD %s ***' % command
63 print '*** CMD %s ***' % command
64 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
64 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
65 stdout, stderr = p.communicate()
65 stdout, stderr = p.communicate()
66 if DEBUG:
66 if DEBUG:
67 print stdout, stderr
67 print stdout, stderr
68 return stdout, stderr
68 return stdout, stderr
69
69
70
70
71 def _get_tmp_dir():
71 def _get_tmp_dir():
72 return tempfile.mkdtemp(prefix='rc_integration_test')
72 return tempfile.mkdtemp(prefix='rc_integration_test')
73
73
74
74
75 def _construct_url(repo, dest=None, **kwargs):
75 def _construct_url(repo, dest=None, **kwargs):
76 if dest is None:
76 if dest is None:
77 #make temp clone
77 #make temp clone
78 dest = _get_tmp_dir()
78 dest = _get_tmp_dir()
79 params = {
79 params = {
80 'user': TEST_USER_ADMIN_LOGIN,
80 'user': TEST_USER_ADMIN_LOGIN,
81 'passwd': TEST_USER_ADMIN_PASS,
81 'passwd': TEST_USER_ADMIN_PASS,
82 'host': HOST,
82 'host': HOST,
83 'cloned_repo': repo,
83 'cloned_repo': repo,
84 'dest': dest
84 'dest': dest
85 }
85 }
86 params.update(**kwargs)
86 params.update(**kwargs)
87 if params['user'] and params['passwd']:
87 if params['user'] and params['passwd']:
88 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
88 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
89 else:
89 else:
90 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
90 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
91 return _url
91 return _url
92
92
93
93
94 def _add_files_and_push(vcs, DEST, **kwargs):
94 def _add_files_and_push(vcs, DEST, **kwargs):
95 """
95 """
96 Generate some files, add it to DEST repo and push back
96 Generate some files, add it to DEST repo and push back
97 vcs is git or hg and defines what VCS we want to make those files for
97 vcs is git or hg and defines what VCS we want to make those files for
98
98
99 :param vcs:
99 :param vcs:
100 :param DEST:
100 :param DEST:
101 """
101 """
102 # commit some stuff into this repo
102 # commit some stuff into this repo
103 cwd = path = jn(DEST)
103 cwd = path = jn(DEST)
104 #added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
104 #added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
105 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
105 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
106 Command(cwd).execute('touch %s' % added_file)
106 Command(cwd).execute('touch %s' % added_file)
107 Command(cwd).execute('%s add %s' % (vcs, added_file))
107 Command(cwd).execute('%s add %s' % (vcs, added_file))
108
108
109 for i in xrange(kwargs.get('files_no', 3)):
109 for i in xrange(kwargs.get('files_no', 3)):
110 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
110 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
111 Command(cwd).execute(cmd)
111 Command(cwd).execute(cmd)
112 author_str = 'Marcin KuΕΊminski <me@email.com>'
112 author_str = 'Marcin KuΕΊminski <me@email.com>'
113 if vcs == 'hg':
113 if vcs == 'hg':
114 cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
114 cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
115 i, author_str, added_file
115 i, author_str, added_file
116 )
116 )
117 elif vcs == 'git':
117 elif vcs == 'git':
118 cmd = """git commit -m 'commited new %s' --author '%s' %s """ % (
118 cmd = """git commit -m 'commited new %s' --author '%s' %s """ % (
119 i, author_str, added_file
119 i, author_str, added_file
120 )
120 )
121 Command(cwd).execute(cmd)
121 Command(cwd).execute(cmd)
122 # PUSH it back
122 # PUSH it back
123 if vcs == 'hg':
123 if vcs == 'hg':
124 _REPO = HG_REPO
124 _REPO = HG_REPO
125 elif vcs == 'git':
125 elif vcs == 'git':
126 _REPO = GIT_REPO
126 _REPO = GIT_REPO
127
127
128 kwargs['dest'] = ''
128 kwargs['dest'] = ''
129 clone_url = _construct_url(_REPO, **kwargs)
129 clone_url = _construct_url(_REPO, **kwargs)
130 if 'clone_url' in kwargs:
130 if 'clone_url' in kwargs:
131 clone_url = kwargs['clone_url']
131 clone_url = kwargs['clone_url']
132 if vcs == 'hg':
132 if vcs == 'hg':
133 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
133 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
134 elif vcs == 'git':
134 elif vcs == 'git':
135 stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
135 stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
136
136
137 return stdout, stderr
137 return stdout, stderr
138
138
139
139
140 def set_anonymous_access(enable=True):
140 def set_anonymous_access(enable=True):
141 user = User.get_by_username(User.DEFAULT_USER)
141 user = User.get_by_username(User.DEFAULT_USER)
142 user.active = enable
142 user.active = enable
143 Session().add(user)
143 Session().add(user)
144 Session().commit()
144 Session().commit()
145 print '\tanonymous access is now:', enable
145 print '\tanonymous access is now:', enable
146 if enable != User.get_by_username(User.DEFAULT_USER).active:
146 if enable != User.get_by_username(User.DEFAULT_USER).active:
147 raise Exception('Cannot set anonymous access')
147 raise Exception('Cannot set anonymous access')
148
148
149
149
150 #==============================================================================
150 #==============================================================================
151 # TESTS
151 # TESTS
152 #==============================================================================
152 #==============================================================================
153
153
154 class TestVCSOperations(unittest.TestCase):
154 class TestVCSOperations(BaseTestCase):
155
155
156 @classmethod
156 @classmethod
157 def setup_class(cls):
157 def setup_class(cls):
158 #DISABLE ANONYMOUS ACCESS
158 #DISABLE ANONYMOUS ACCESS
159 set_anonymous_access(False)
159 set_anonymous_access(False)
160
160
161 def setUp(self):
161 def setUp(self):
162 r = Repository.get_by_repo_name(GIT_REPO)
162 r = Repository.get_by_repo_name(GIT_REPO)
163 Repository.unlock(r)
163 Repository.unlock(r)
164 r.enable_locking = False
164 r.enable_locking = False
165 Session().add(r)
165 Session().add(r)
166 Session().commit()
166 Session().commit()
167
167
168 r = Repository.get_by_repo_name(HG_REPO)
168 r = Repository.get_by_repo_name(HG_REPO)
169 Repository.unlock(r)
169 Repository.unlock(r)
170 r.enable_locking = False
170 r.enable_locking = False
171 Session().add(r)
171 Session().add(r)
172 Session().commit()
172 Session().commit()
173
173
174 def test_clone_hg_repo_by_admin(self):
174 def test_clone_hg_repo_by_admin(self):
175 clone_url = _construct_url(HG_REPO)
175 clone_url = _construct_url(HG_REPO)
176 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
176 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
177
177
178 assert 'requesting all changes' in stdout
178 assert 'requesting all changes' in stdout
179 assert 'adding changesets' in stdout
179 assert 'adding changesets' in stdout
180 assert 'adding manifests' in stdout
180 assert 'adding manifests' in stdout
181 assert 'adding file changes' in stdout
181 assert 'adding file changes' in stdout
182
182
183 assert stderr == ''
183 assert stderr == ''
184
184
185 def test_clone_git_repo_by_admin(self):
185 def test_clone_git_repo_by_admin(self):
186 clone_url = _construct_url(GIT_REPO)
186 clone_url = _construct_url(GIT_REPO)
187 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
187 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
188
188
189 assert 'Cloning into' in stdout
189 assert 'Cloning into' in stdout
190 assert stderr == ''
190 assert stderr == ''
191
191
192 def test_clone_wrong_credentials_hg(self):
192 def test_clone_wrong_credentials_hg(self):
193 clone_url = _construct_url(HG_REPO, passwd='bad!')
193 clone_url = _construct_url(HG_REPO, passwd='bad!')
194 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
194 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
195 assert 'abort: authorization failed' in stderr
195 assert 'abort: authorization failed' in stderr
196
196
197 def test_clone_wrong_credentials_git(self):
197 def test_clone_wrong_credentials_git(self):
198 clone_url = _construct_url(GIT_REPO, passwd='bad!')
198 clone_url = _construct_url(GIT_REPO, passwd='bad!')
199 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
199 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
200 assert 'fatal: Authentication failed' in stderr
200 assert 'fatal: Authentication failed' in stderr
201
201
202 def test_clone_git_dir_as_hg(self):
202 def test_clone_git_dir_as_hg(self):
203 clone_url = _construct_url(GIT_REPO)
203 clone_url = _construct_url(GIT_REPO)
204 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
204 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
205 assert 'HTTP Error 404: Not Found' in stderr
205 assert 'HTTP Error 404: Not Found' in stderr
206
206
207 def test_clone_hg_repo_as_git(self):
207 def test_clone_hg_repo_as_git(self):
208 clone_url = _construct_url(HG_REPO)
208 clone_url = _construct_url(HG_REPO)
209 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
209 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
210 assert 'not found:' in stderr
210 assert 'not found:' in stderr
211
211
212 def test_clone_non_existing_path_hg(self):
212 def test_clone_non_existing_path_hg(self):
213 clone_url = _construct_url('trololo')
213 clone_url = _construct_url('trololo')
214 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
214 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
215 assert 'HTTP Error 404: Not Found' in stderr
215 assert 'HTTP Error 404: Not Found' in stderr
216
216
217 def test_clone_non_existing_path_git(self):
217 def test_clone_non_existing_path_git(self):
218 clone_url = _construct_url('trololo')
218 clone_url = _construct_url('trololo')
219 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
219 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
220 assert 'not found:' in stderr
220 assert 'not found:' in stderr
221
221
222 def test_push_new_file_hg(self):
222 def test_push_new_file_hg(self):
223 DEST = _get_tmp_dir()
223 DEST = _get_tmp_dir()
224 clone_url = _construct_url(HG_REPO, dest=DEST)
224 clone_url = _construct_url(HG_REPO, dest=DEST)
225 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
225 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
226
226
227 stdout, stderr = _add_files_and_push('hg', DEST)
227 stdout, stderr = _add_files_and_push('hg', DEST)
228
228
229 assert 'pushing to' in stdout
229 assert 'pushing to' in stdout
230 assert 'Repository size' in stdout
230 assert 'Repository size' in stdout
231 assert 'Last revision is now' in stdout
231 assert 'Last revision is now' in stdout
232
232
233 def test_push_new_file_git(self):
233 def test_push_new_file_git(self):
234 DEST = _get_tmp_dir()
234 DEST = _get_tmp_dir()
235 clone_url = _construct_url(GIT_REPO, dest=DEST)
235 clone_url = _construct_url(GIT_REPO, dest=DEST)
236 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
236 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
237
237
238 # commit some stuff into this repo
238 # commit some stuff into this repo
239 stdout, stderr = _add_files_and_push('git', DEST)
239 stdout, stderr = _add_files_and_push('git', DEST)
240
240
241 #WTF git stderr ?!
241 #WTF git stderr ?!
242 assert 'master -> master' in stderr
242 assert 'master -> master' in stderr
243
243
244 def test_push_invalidates_cache_hg(self):
244 def test_push_invalidates_cache_hg(self):
245 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
245 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
246 ==HG_REPO).one()
246 ==HG_REPO).one()
247 key.cache_active = True
247 key.cache_active = True
248 Session().add(key)
248 Session().add(key)
249 Session().commit()
249 Session().commit()
250
250
251 DEST = _get_tmp_dir()
251 DEST = _get_tmp_dir()
252 clone_url = _construct_url(HG_REPO, dest=DEST)
252 clone_url = _construct_url(HG_REPO, dest=DEST)
253 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
253 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
254
254
255 stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
255 stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
256 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
256 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
257 ==HG_REPO).one()
257 ==HG_REPO).one()
258 self.assertEqual(key.cache_active, False)
258 self.assertEqual(key.cache_active, False)
259
259
260 def test_push_invalidates_cache_git(self):
260 def test_push_invalidates_cache_git(self):
261 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
261 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
262 ==GIT_REPO).one()
262 ==GIT_REPO).one()
263 key.cache_active = True
263 key.cache_active = True
264 Session().add(key)
264 Session().add(key)
265 Session().commit()
265 Session().commit()
266
266
267 DEST = _get_tmp_dir()
267 DEST = _get_tmp_dir()
268 clone_url = _construct_url(GIT_REPO, dest=DEST)
268 clone_url = _construct_url(GIT_REPO, dest=DEST)
269 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
269 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
270
270
271 # commit some stuff into this repo
271 # commit some stuff into this repo
272 stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
272 stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
273
273
274 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
274 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
275 ==GIT_REPO).one()
275 ==GIT_REPO).one()
276 self.assertEqual(key.cache_active, False)
276 self.assertEqual(key.cache_active, False)
277
277
278 def test_push_wrong_credentials_hg(self):
278 def test_push_wrong_credentials_hg(self):
279 DEST = _get_tmp_dir()
279 DEST = _get_tmp_dir()
280 clone_url = _construct_url(HG_REPO, dest=DEST)
280 clone_url = _construct_url(HG_REPO, dest=DEST)
281 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
281 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
282
282
283 stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
283 stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
284 passwd='name')
284 passwd='name')
285
285
286 assert 'abort: authorization failed' in stderr
286 assert 'abort: authorization failed' in stderr
287
287
288 def test_push_wrong_credentials_git(self):
288 def test_push_wrong_credentials_git(self):
289 DEST = _get_tmp_dir()
289 DEST = _get_tmp_dir()
290 clone_url = _construct_url(GIT_REPO, dest=DEST)
290 clone_url = _construct_url(GIT_REPO, dest=DEST)
291 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
291 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
292
292
293 stdout, stderr = _add_files_and_push('git', DEST, user='bad',
293 stdout, stderr = _add_files_and_push('git', DEST, user='bad',
294 passwd='name')
294 passwd='name')
295
295
296 assert 'fatal: Authentication failed' in stderr
296 assert 'fatal: Authentication failed' in stderr
297
297
298 def test_push_back_to_wrong_url_hg(self):
298 def test_push_back_to_wrong_url_hg(self):
299 DEST = _get_tmp_dir()
299 DEST = _get_tmp_dir()
300 clone_url = _construct_url(HG_REPO, dest=DEST)
300 clone_url = _construct_url(HG_REPO, dest=DEST)
301 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
301 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
302
302
303 stdout, stderr = _add_files_and_push('hg', DEST,
303 stdout, stderr = _add_files_and_push('hg', DEST,
304 clone_url='http://127.0.0.1:5000/tmp',)
304 clone_url='http://127.0.0.1:5000/tmp',)
305
305
306 assert 'HTTP Error 404: Not Found' in stderr
306 assert 'HTTP Error 404: Not Found' in stderr
307
307
308 def test_push_back_to_wrong_url_git(self):
308 def test_push_back_to_wrong_url_git(self):
309 DEST = _get_tmp_dir()
309 DEST = _get_tmp_dir()
310 clone_url = _construct_url(GIT_REPO, dest=DEST)
310 clone_url = _construct_url(GIT_REPO, dest=DEST)
311 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
311 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
312
312
313 stdout, stderr = _add_files_and_push('git', DEST,
313 stdout, stderr = _add_files_and_push('git', DEST,
314 clone_url='http://127.0.0.1:5000/tmp',)
314 clone_url='http://127.0.0.1:5000/tmp',)
315
315
316 assert 'not found:' in stderr
316 assert 'not found:' in stderr
317
317
318 def test_clone_and_create_lock_hg(self):
318 def test_clone_and_create_lock_hg(self):
319 # enable locking
319 # enable locking
320 r = Repository.get_by_repo_name(HG_REPO)
320 r = Repository.get_by_repo_name(HG_REPO)
321 r.enable_locking = True
321 r.enable_locking = True
322 Session().add(r)
322 Session().add(r)
323 Session().commit()
323 Session().commit()
324 # clone
324 # clone
325 clone_url = _construct_url(HG_REPO)
325 clone_url = _construct_url(HG_REPO)
326 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
326 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
327
327
328 #check if lock was made
328 #check if lock was made
329 r = Repository.get_by_repo_name(HG_REPO)
329 r = Repository.get_by_repo_name(HG_REPO)
330 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
330 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
331
331
332 def test_clone_and_create_lock_git(self):
332 def test_clone_and_create_lock_git(self):
333 # enable locking
333 # enable locking
334 r = Repository.get_by_repo_name(GIT_REPO)
334 r = Repository.get_by_repo_name(GIT_REPO)
335 r.enable_locking = True
335 r.enable_locking = True
336 Session().add(r)
336 Session().add(r)
337 Session().commit()
337 Session().commit()
338 # clone
338 # clone
339 clone_url = _construct_url(GIT_REPO)
339 clone_url = _construct_url(GIT_REPO)
340 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
340 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
341
341
342 #check if lock was made
342 #check if lock was made
343 r = Repository.get_by_repo_name(GIT_REPO)
343 r = Repository.get_by_repo_name(GIT_REPO)
344 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
344 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
345
345
346 def test_clone_after_repo_was_locked_hg(self):
346 def test_clone_after_repo_was_locked_hg(self):
347 #lock repo
347 #lock repo
348 r = Repository.get_by_repo_name(HG_REPO)
348 r = Repository.get_by_repo_name(HG_REPO)
349 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
349 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
350 #pull fails since repo is locked
350 #pull fails since repo is locked
351 clone_url = _construct_url(HG_REPO)
351 clone_url = _construct_url(HG_REPO)
352 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
352 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
353 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
353 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
354 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
354 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
355 assert msg in stderr
355 assert msg in stderr
356
356
357 def test_clone_after_repo_was_locked_git(self):
357 def test_clone_after_repo_was_locked_git(self):
358 #lock repo
358 #lock repo
359 r = Repository.get_by_repo_name(GIT_REPO)
359 r = Repository.get_by_repo_name(GIT_REPO)
360 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
360 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
361 #pull fails since repo is locked
361 #pull fails since repo is locked
362 clone_url = _construct_url(GIT_REPO)
362 clone_url = _construct_url(GIT_REPO)
363 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
363 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
364 msg = ("""The requested URL returned error: 423""")
364 msg = ("""The requested URL returned error: 423""")
365 assert msg in stderr
365 assert msg in stderr
366
366
367 def test_push_on_locked_repo_by_other_user_hg(self):
367 def test_push_on_locked_repo_by_other_user_hg(self):
368 #clone some temp
368 #clone some temp
369 DEST = _get_tmp_dir()
369 DEST = _get_tmp_dir()
370 clone_url = _construct_url(HG_REPO, dest=DEST)
370 clone_url = _construct_url(HG_REPO, dest=DEST)
371 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
371 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
372
372
373 #lock repo
373 #lock repo
374 r = Repository.get_by_repo_name(HG_REPO)
374 r = Repository.get_by_repo_name(HG_REPO)
375 # let this user actually push !
375 # let this user actually push !
376 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
376 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
377 perm='repository.write')
377 perm='repository.write')
378 Session().commit()
378 Session().commit()
379 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
379 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
380
380
381 #push fails repo is locked by other user !
381 #push fails repo is locked by other user !
382 stdout, stderr = _add_files_and_push('hg', DEST,
382 stdout, stderr = _add_files_and_push('hg', DEST,
383 user=TEST_USER_REGULAR_LOGIN,
383 user=TEST_USER_REGULAR_LOGIN,
384 passwd=TEST_USER_REGULAR_PASS)
384 passwd=TEST_USER_REGULAR_PASS)
385 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
385 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
386 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
386 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
387 assert msg in stderr
387 assert msg in stderr
388
388
389 #TODO: fix me ! somehow during tests hooks don't get called on GIT
389 #TODO: fix me ! somehow during tests hooks don't get called on GIT
390 # def test_push_on_locked_repo_by_other_user_git(self):
390 # def test_push_on_locked_repo_by_other_user_git(self):
391 # #clone some temp
391 # #clone some temp
392 # DEST = _get_tmp_dir()
392 # DEST = _get_tmp_dir()
393 # clone_url = _construct_url(GIT_REPO, dest=DEST)
393 # clone_url = _construct_url(GIT_REPO, dest=DEST)
394 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
394 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
395 #
395 #
396 # #lock repo
396 # #lock repo
397 # r = Repository.get_by_repo_name(GIT_REPO)
397 # r = Repository.get_by_repo_name(GIT_REPO)
398 # # let this user actually push !
398 # # let this user actually push !
399 # RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
399 # RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
400 # perm='repository.write')
400 # perm='repository.write')
401 # Session().commit()
401 # Session().commit()
402 # Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
402 # Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
403 #
403 #
404 # #push fails repo is locked by other user !
404 # #push fails repo is locked by other user !
405 # stdout, stderr = _add_files_and_push('git', DEST,
405 # stdout, stderr = _add_files_and_push('git', DEST,
406 # user=TEST_USER_REGULAR_LOGIN,
406 # user=TEST_USER_REGULAR_LOGIN,
407 # passwd=TEST_USER_REGULAR_PASS)
407 # passwd=TEST_USER_REGULAR_PASS)
408 # msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
408 # msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
409 # % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
409 # % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
410 # #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
410 # #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
411 # # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
411 # # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
412 # msg = "405 Method Not Allowed"
412 # msg = "405 Method Not Allowed"
413 # assert msg in stderr
413 # assert msg in stderr
414
414
415 def test_push_unlocks_repository_hg(self):
415 def test_push_unlocks_repository_hg(self):
416 # enable locking
416 # enable locking
417 r = Repository.get_by_repo_name(HG_REPO)
417 r = Repository.get_by_repo_name(HG_REPO)
418 r.enable_locking = True
418 r.enable_locking = True
419 Session().add(r)
419 Session().add(r)
420 Session().commit()
420 Session().commit()
421 #clone some temp
421 #clone some temp
422 DEST = _get_tmp_dir()
422 DEST = _get_tmp_dir()
423 clone_url = _construct_url(HG_REPO, dest=DEST)
423 clone_url = _construct_url(HG_REPO, dest=DEST)
424 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
424 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
425
425
426 #check for lock repo after clone
426 #check for lock repo after clone
427 r = Repository.get_by_repo_name(HG_REPO)
427 r = Repository.get_by_repo_name(HG_REPO)
428 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
428 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
429
429
430 #push is ok and repo is now unlocked
430 #push is ok and repo is now unlocked
431 stdout, stderr = _add_files_and_push('hg', DEST)
431 stdout, stderr = _add_files_and_push('hg', DEST)
432 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
432 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
433 #we need to cleanup the Session Here !
433 #we need to cleanup the Session Here !
434 Session.remove()
434 Session.remove()
435 r = Repository.get_by_repo_name(HG_REPO)
435 r = Repository.get_by_repo_name(HG_REPO)
436 assert r.locked == [None, None]
436 assert r.locked == [None, None]
437
437
438 #TODO: fix me ! somehow during tests hooks don't get called on GIT
438 #TODO: fix me ! somehow during tests hooks don't get called on GIT
439 # def test_push_unlocks_repository_git(self):
439 # def test_push_unlocks_repository_git(self):
440 # # enable locking
440 # # enable locking
441 # r = Repository.get_by_repo_name(GIT_REPO)
441 # r = Repository.get_by_repo_name(GIT_REPO)
442 # r.enable_locking = True
442 # r.enable_locking = True
443 # Session().add(r)
443 # Session().add(r)
444 # Session().commit()
444 # Session().commit()
445 # #clone some temp
445 # #clone some temp
446 # DEST = _get_tmp_dir()
446 # DEST = _get_tmp_dir()
447 # clone_url = _construct_url(GIT_REPO, dest=DEST)
447 # clone_url = _construct_url(GIT_REPO, dest=DEST)
448 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
448 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
449 #
449 #
450 # #check for lock repo after clone
450 # #check for lock repo after clone
451 # r = Repository.get_by_repo_name(GIT_REPO)
451 # r = Repository.get_by_repo_name(GIT_REPO)
452 # assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
452 # assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
453 #
453 #
454 # #push is ok and repo is now unlocked
454 # #push is ok and repo is now unlocked
455 # stdout, stderr = _add_files_and_push('git', DEST)
455 # stdout, stderr = _add_files_and_push('git', DEST)
456 # #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
456 # #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
457 # #we need to cleanup the Session Here !
457 # #we need to cleanup the Session Here !
458 # Session.remove()
458 # Session.remove()
459 # r = Repository.get_by_repo_name(GIT_REPO)
459 # r = Repository.get_by_repo_name(GIT_REPO)
460 # assert r.locked == [None, None]
460 # assert r.locked == [None, None]
461
461
462 def test_ip_restriction_hg(self):
462 def test_ip_restriction_hg(self):
463 user_model = UserModel()
463 user_model = UserModel()
464 try:
464 try:
465 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
465 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
466 Session().commit()
466 Session().commit()
467 clone_url = _construct_url(HG_REPO)
467 clone_url = _construct_url(HG_REPO)
468 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
468 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
469 assert 'abort: HTTP Error 403: Forbidden' in stderr
469 assert 'abort: HTTP Error 403: Forbidden' in stderr
470 finally:
470 finally:
471 #release IP restrictions
471 #release IP restrictions
472 for ip in UserIpMap.getAll():
472 for ip in UserIpMap.getAll():
473 UserIpMap.delete(ip.ip_id)
473 UserIpMap.delete(ip.ip_id)
474 Session().commit()
474 Session().commit()
475
475
476 time.sleep(2)
476 time.sleep(2)
477 clone_url = _construct_url(HG_REPO)
477 clone_url = _construct_url(HG_REPO)
478 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
478 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
479
479
480 assert 'requesting all changes' in stdout
480 assert 'requesting all changes' in stdout
481 assert 'adding changesets' in stdout
481 assert 'adding changesets' in stdout
482 assert 'adding manifests' in stdout
482 assert 'adding manifests' in stdout
483 assert 'adding file changes' in stdout
483 assert 'adding file changes' in stdout
484
484
485 assert stderr == ''
485 assert stderr == ''
486
486
487 def test_ip_restriction_git(self):
487 def test_ip_restriction_git(self):
488 user_model = UserModel()
488 user_model = UserModel()
489 try:
489 try:
490 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
490 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
491 Session().commit()
491 Session().commit()
492 clone_url = _construct_url(GIT_REPO)
492 clone_url = _construct_url(GIT_REPO)
493 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
493 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
494 msg = ("""The requested URL returned error: 403""")
494 msg = ("""The requested URL returned error: 403""")
495 assert msg in stderr
495 assert msg in stderr
496 finally:
496 finally:
497 #release IP restrictions
497 #release IP restrictions
498 for ip in UserIpMap.getAll():
498 for ip in UserIpMap.getAll():
499 UserIpMap.delete(ip.ip_id)
499 UserIpMap.delete(ip.ip_id)
500 Session().commit()
500 Session().commit()
501
501
502 time.sleep(2)
502 time.sleep(2)
503 clone_url = _construct_url(GIT_REPO)
503 clone_url = _construct_url(GIT_REPO)
504 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
504 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
505
505
506 assert 'Cloning into' in stdout
506 assert 'Cloning into' in stdout
507 assert stderr == ''
507 assert stderr == ''
General Comments 0
You need to be logged in to leave comments. Login now