##// END OF EJS Templates
tests: fixed pyramid_beaker import
marcink -
r3765:8fd3d071 new-ui
parent child Browse files
Show More
@@ -1,465 +1,464 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2019 RhodeCode GmbH
3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import threading
21 import threading
22 import time
22 import time
23 import logging
23 import logging
24 import os.path
24 import os.path
25 import subprocess32
25 import subprocess32
26 import tempfile
26 import tempfile
27 import urllib2
27 import urllib2
28 from lxml.html import fromstring, tostring
28 from lxml.html import fromstring, tostring
29 from lxml.cssselect import CSSSelector
29 from lxml.cssselect import CSSSelector
30 from urlparse import urlparse, parse_qsl
30 from urlparse import urlparse, parse_qsl
31 from urllib import unquote_plus
31 from urllib import unquote_plus
32 import webob
32 import webob
33
33
34 from webtest.app import TestResponse, TestApp, string_types
34 from webtest.app import TestResponse, TestApp, string_types
35 from webtest.compat import print_stderr
35 from webtest.compat import print_stderr
36
36
37 import pytest
37 import pytest
38 import rc_testdata
38 import rc_testdata
39
39
40 from rhodecode.model.db import User, Repository
40 from rhodecode.model.db import User, Repository
41 from rhodecode.model.meta import Session
41 from rhodecode.model.meta import Session
42 from rhodecode.model.scm import ScmModel
42 from rhodecode.model.scm import ScmModel
43 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
43 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
44 from rhodecode.lib.vcs.backends.base import EmptyCommit
44 from rhodecode.lib.vcs.backends.base import EmptyCommit
45 from rhodecode.tests import login_user_session
45 from rhodecode.tests import login_user_session
46
46
47 log = logging.getLogger(__name__)
47 log = logging.getLogger(__name__)
48
48
49
49
50 class CustomTestResponse(TestResponse):
50 class CustomTestResponse(TestResponse):
51 def _save_output(self, out):
51 def _save_output(self, out):
52 f = tempfile.NamedTemporaryFile(
52 f = tempfile.NamedTemporaryFile(
53 delete=False, prefix='rc-test-', suffix='.html')
53 delete=False, prefix='rc-test-', suffix='.html')
54 f.write(out)
54 f.write(out)
55 return f.name
55 return f.name
56
56
57 def mustcontain(self, *strings, **kw):
57 def mustcontain(self, *strings, **kw):
58 """
58 """
59 Assert that the response contains all of the strings passed
59 Assert that the response contains all of the strings passed
60 in as arguments.
60 in as arguments.
61
61
62 Equivalent to::
62 Equivalent to::
63
63
64 assert string in res
64 assert string in res
65 """
65 """
66 if 'no' in kw:
66 if 'no' in kw:
67 no = kw['no']
67 no = kw['no']
68 del kw['no']
68 del kw['no']
69 if isinstance(no, string_types):
69 if isinstance(no, string_types):
70 no = [no]
70 no = [no]
71 else:
71 else:
72 no = []
72 no = []
73 if kw:
73 if kw:
74 raise TypeError(
74 raise TypeError(
75 "The only keyword argument allowed is 'no' got %s" % kw)
75 "The only keyword argument allowed is 'no' got %s" % kw)
76
76
77 f = self._save_output(str(self))
77 f = self._save_output(str(self))
78
78
79 for s in strings:
79 for s in strings:
80 if not s in self:
80 if not s in self:
81 print_stderr("Actual response (no %r):" % s)
81 print_stderr("Actual response (no %r):" % s)
82 print_stderr(str(self))
82 print_stderr(str(self))
83 raise IndexError(
83 raise IndexError(
84 "Body does not contain string %r, output saved as %s" % (
84 "Body does not contain string %r, output saved as %s" % (
85 s, f))
85 s, f))
86
86
87 for no_s in no:
87 for no_s in no:
88 if no_s in self:
88 if no_s in self:
89 print_stderr("Actual response (has %r)" % no_s)
89 print_stderr("Actual response (has %r)" % no_s)
90 print_stderr(str(self))
90 print_stderr(str(self))
91 raise IndexError(
91 raise IndexError(
92 "Body contains bad string %r, output saved as %s" % (
92 "Body contains bad string %r, output saved as %s" % (
93 no_s, f))
93 no_s, f))
94
94
95 def assert_response(self):
95 def assert_response(self):
96 return AssertResponse(self)
96 return AssertResponse(self)
97
97
98 def get_session_from_response(self):
98 def get_session_from_response(self):
99 """
99 """
100 This returns the session from a response object.
100 This returns the session from a response object.
101 """
101 """
102
102 from rhodecode.lib.rc_beaker import session_factory_from_settings
103 from pyramid_beaker import session_factory_from_settings
104 session = session_factory_from_settings(self.test_app._pyramid_settings)
103 session = session_factory_from_settings(self.test_app._pyramid_settings)
105 return session(self.request)
104 return session(self.request)
106
105
107
106
108 class TestRequest(webob.BaseRequest):
107 class TestRequest(webob.BaseRequest):
109
108
110 # for py.test
109 # for py.test
111 disabled = True
110 disabled = True
112 ResponseClass = CustomTestResponse
111 ResponseClass = CustomTestResponse
113
112
114 def add_response_callback(self, callback):
113 def add_response_callback(self, callback):
115 pass
114 pass
116
115
117
116
118 class CustomTestApp(TestApp):
117 class CustomTestApp(TestApp):
119 """
118 """
120 Custom app to make mustcontain more Useful, and extract special methods
119 Custom app to make mustcontain more Useful, and extract special methods
121 """
120 """
122 RequestClass = TestRequest
121 RequestClass = TestRequest
123 rc_login_data = {}
122 rc_login_data = {}
124 rc_current_session = None
123 rc_current_session = None
125
124
126 def login(self, username=None, password=None):
125 def login(self, username=None, password=None):
127 from rhodecode.lib import auth
126 from rhodecode.lib import auth
128
127
129 if username and password:
128 if username and password:
130 session = login_user_session(self, username, password)
129 session = login_user_session(self, username, password)
131 else:
130 else:
132 session = login_user_session(self)
131 session = login_user_session(self)
133
132
134 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
133 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
135 self.rc_current_session = session
134 self.rc_current_session = session
136 return session['rhodecode_user']
135 return session['rhodecode_user']
137
136
138 @property
137 @property
139 def csrf_token(self):
138 def csrf_token(self):
140 return self.rc_login_data['csrf_token']
139 return self.rc_login_data['csrf_token']
141
140
142 @property
141 @property
143 def _pyramid_registry(self):
142 def _pyramid_registry(self):
144 return self.app.config.registry
143 return self.app.config.registry
145
144
146 @property
145 @property
147 def _pyramid_settings(self):
146 def _pyramid_settings(self):
148 return self._pyramid_registry.settings
147 return self._pyramid_registry.settings
149
148
150
149
151 def set_anonymous_access(enabled):
150 def set_anonymous_access(enabled):
152 """(Dis)allows anonymous access depending on parameter `enabled`"""
151 """(Dis)allows anonymous access depending on parameter `enabled`"""
153 user = User.get_default_user()
152 user = User.get_default_user()
154 user.active = enabled
153 user.active = enabled
155 Session().add(user)
154 Session().add(user)
156 Session().commit()
155 Session().commit()
157 time.sleep(1.5) # must sleep for cache (1s to expire)
156 time.sleep(1.5) # must sleep for cache (1s to expire)
158 log.info('anonymous access is now: %s', enabled)
157 log.info('anonymous access is now: %s', enabled)
159 assert enabled == User.get_default_user().active, (
158 assert enabled == User.get_default_user().active, (
160 'Cannot set anonymous access')
159 'Cannot set anonymous access')
161
160
162
161
163 def check_xfail_backends(node, backend_alias):
162 def check_xfail_backends(node, backend_alias):
164 # Using "xfail_backends" here intentionally, since this marks work
163 # Using "xfail_backends" here intentionally, since this marks work
165 # which is "to be done" soon.
164 # which is "to be done" soon.
166 skip_marker = node.get_closest_marker('xfail_backends')
165 skip_marker = node.get_closest_marker('xfail_backends')
167 if skip_marker and backend_alias in skip_marker.args:
166 if skip_marker and backend_alias in skip_marker.args:
168 msg = "Support for backend %s to be developed." % (backend_alias, )
167 msg = "Support for backend %s to be developed." % (backend_alias, )
169 msg = skip_marker.kwargs.get('reason', msg)
168 msg = skip_marker.kwargs.get('reason', msg)
170 pytest.xfail(msg)
169 pytest.xfail(msg)
171
170
172
171
173 def check_skip_backends(node, backend_alias):
172 def check_skip_backends(node, backend_alias):
174 # Using "skip_backends" here intentionally, since this marks work which is
173 # Using "skip_backends" here intentionally, since this marks work which is
175 # not supported.
174 # not supported.
176 skip_marker = node.get_closest_marker('skip_backends')
175 skip_marker = node.get_closest_marker('skip_backends')
177 if skip_marker and backend_alias in skip_marker.args:
176 if skip_marker and backend_alias in skip_marker.args:
178 msg = "Feature not supported for backend %s." % (backend_alias, )
177 msg = "Feature not supported for backend %s." % (backend_alias, )
179 msg = skip_marker.kwargs.get('reason', msg)
178 msg = skip_marker.kwargs.get('reason', msg)
180 pytest.skip(msg)
179 pytest.skip(msg)
181
180
182
181
183 def extract_git_repo_from_dump(dump_name, repo_name):
182 def extract_git_repo_from_dump(dump_name, repo_name):
184 """Create git repo `repo_name` from dump `dump_name`."""
183 """Create git repo `repo_name` from dump `dump_name`."""
185 repos_path = ScmModel().repos_path
184 repos_path = ScmModel().repos_path
186 target_path = os.path.join(repos_path, repo_name)
185 target_path = os.path.join(repos_path, repo_name)
187 rc_testdata.extract_git_dump(dump_name, target_path)
186 rc_testdata.extract_git_dump(dump_name, target_path)
188 return target_path
187 return target_path
189
188
190
189
191 def extract_hg_repo_from_dump(dump_name, repo_name):
190 def extract_hg_repo_from_dump(dump_name, repo_name):
192 """Create hg repo `repo_name` from dump `dump_name`."""
191 """Create hg repo `repo_name` from dump `dump_name`."""
193 repos_path = ScmModel().repos_path
192 repos_path = ScmModel().repos_path
194 target_path = os.path.join(repos_path, repo_name)
193 target_path = os.path.join(repos_path, repo_name)
195 rc_testdata.extract_hg_dump(dump_name, target_path)
194 rc_testdata.extract_hg_dump(dump_name, target_path)
196 return target_path
195 return target_path
197
196
198
197
199 def extract_svn_repo_from_dump(dump_name, repo_name):
198 def extract_svn_repo_from_dump(dump_name, repo_name):
200 """Create a svn repo `repo_name` from dump `dump_name`."""
199 """Create a svn repo `repo_name` from dump `dump_name`."""
201 repos_path = ScmModel().repos_path
200 repos_path = ScmModel().repos_path
202 target_path = os.path.join(repos_path, repo_name)
201 target_path = os.path.join(repos_path, repo_name)
203 SubversionRepository(target_path, create=True)
202 SubversionRepository(target_path, create=True)
204 _load_svn_dump_into_repo(dump_name, target_path)
203 _load_svn_dump_into_repo(dump_name, target_path)
205 return target_path
204 return target_path
206
205
207
206
208 def assert_message_in_log(log_records, message, levelno, module):
207 def assert_message_in_log(log_records, message, levelno, module):
209 messages = [
208 messages = [
210 r.message for r in log_records
209 r.message for r in log_records
211 if r.module == module and r.levelno == levelno
210 if r.module == module and r.levelno == levelno
212 ]
211 ]
213 assert message in messages
212 assert message in messages
214
213
215
214
216 def _load_svn_dump_into_repo(dump_name, repo_path):
215 def _load_svn_dump_into_repo(dump_name, repo_path):
217 """
216 """
218 Utility to populate a svn repository with a named dump
217 Utility to populate a svn repository with a named dump
219
218
220 Currently the dumps are in rc_testdata. They might later on be
219 Currently the dumps are in rc_testdata. They might later on be
221 integrated with the main repository once they stabilize more.
220 integrated with the main repository once they stabilize more.
222 """
221 """
223 dump = rc_testdata.load_svn_dump(dump_name)
222 dump = rc_testdata.load_svn_dump(dump_name)
224 load_dump = subprocess32.Popen(
223 load_dump = subprocess32.Popen(
225 ['svnadmin', 'load', repo_path],
224 ['svnadmin', 'load', repo_path],
226 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
225 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
227 stderr=subprocess32.PIPE)
226 stderr=subprocess32.PIPE)
228 out, err = load_dump.communicate(dump)
227 out, err = load_dump.communicate(dump)
229 if load_dump.returncode != 0:
228 if load_dump.returncode != 0:
230 log.error("Output of load_dump command: %s", out)
229 log.error("Output of load_dump command: %s", out)
231 log.error("Error output of load_dump command: %s", err)
230 log.error("Error output of load_dump command: %s", err)
232 raise Exception(
231 raise Exception(
233 'Failed to load dump "%s" into repository at path "%s".'
232 'Failed to load dump "%s" into repository at path "%s".'
234 % (dump_name, repo_path))
233 % (dump_name, repo_path))
235
234
236
235
237 class AssertResponse(object):
236 class AssertResponse(object):
238 """
237 """
239 Utility that helps to assert things about a given HTML response.
238 Utility that helps to assert things about a given HTML response.
240 """
239 """
241
240
242 def __init__(self, response):
241 def __init__(self, response):
243 self.response = response
242 self.response = response
244
243
245 def get_imports(self):
244 def get_imports(self):
246 return fromstring, tostring, CSSSelector
245 return fromstring, tostring, CSSSelector
247
246
248 def one_element_exists(self, css_selector):
247 def one_element_exists(self, css_selector):
249 self.get_element(css_selector)
248 self.get_element(css_selector)
250
249
251 def no_element_exists(self, css_selector):
250 def no_element_exists(self, css_selector):
252 assert not self._get_elements(css_selector)
251 assert not self._get_elements(css_selector)
253
252
254 def element_equals_to(self, css_selector, expected_content):
253 def element_equals_to(self, css_selector, expected_content):
255 element = self.get_element(css_selector)
254 element = self.get_element(css_selector)
256 element_text = self._element_to_string(element)
255 element_text = self._element_to_string(element)
257 assert expected_content in element_text
256 assert expected_content in element_text
258
257
259 def element_contains(self, css_selector, expected_content):
258 def element_contains(self, css_selector, expected_content):
260 element = self.get_element(css_selector)
259 element = self.get_element(css_selector)
261 assert expected_content in element.text_content()
260 assert expected_content in element.text_content()
262
261
263 def element_value_contains(self, css_selector, expected_content):
262 def element_value_contains(self, css_selector, expected_content):
264 element = self.get_element(css_selector)
263 element = self.get_element(css_selector)
265 assert expected_content in element.value
264 assert expected_content in element.value
266
265
267 def contains_one_link(self, link_text, href):
266 def contains_one_link(self, link_text, href):
268 fromstring, tostring, CSSSelector = self.get_imports()
267 fromstring, tostring, CSSSelector = self.get_imports()
269 doc = fromstring(self.response.body)
268 doc = fromstring(self.response.body)
270 sel = CSSSelector('a[href]')
269 sel = CSSSelector('a[href]')
271 elements = [
270 elements = [
272 e for e in sel(doc) if e.text_content().strip() == link_text]
271 e for e in sel(doc) if e.text_content().strip() == link_text]
273 assert len(elements) == 1, "Did not find link or found multiple links"
272 assert len(elements) == 1, "Did not find link or found multiple links"
274 self._ensure_url_equal(elements[0].attrib.get('href'), href)
273 self._ensure_url_equal(elements[0].attrib.get('href'), href)
275
274
276 def contains_one_anchor(self, anchor_id):
275 def contains_one_anchor(self, anchor_id):
277 fromstring, tostring, CSSSelector = self.get_imports()
276 fromstring, tostring, CSSSelector = self.get_imports()
278 doc = fromstring(self.response.body)
277 doc = fromstring(self.response.body)
279 sel = CSSSelector('#' + anchor_id)
278 sel = CSSSelector('#' + anchor_id)
280 elements = sel(doc)
279 elements = sel(doc)
281 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
280 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
282
281
283 def _ensure_url_equal(self, found, expected):
282 def _ensure_url_equal(self, found, expected):
284 assert _Url(found) == _Url(expected)
283 assert _Url(found) == _Url(expected)
285
284
286 def get_element(self, css_selector):
285 def get_element(self, css_selector):
287 elements = self._get_elements(css_selector)
286 elements = self._get_elements(css_selector)
288 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
287 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
289 return elements[0]
288 return elements[0]
290
289
291 def get_elements(self, css_selector):
290 def get_elements(self, css_selector):
292 return self._get_elements(css_selector)
291 return self._get_elements(css_selector)
293
292
294 def _get_elements(self, css_selector):
293 def _get_elements(self, css_selector):
295 fromstring, tostring, CSSSelector = self.get_imports()
294 fromstring, tostring, CSSSelector = self.get_imports()
296 doc = fromstring(self.response.body)
295 doc = fromstring(self.response.body)
297 sel = CSSSelector(css_selector)
296 sel = CSSSelector(css_selector)
298 elements = sel(doc)
297 elements = sel(doc)
299 return elements
298 return elements
300
299
301 def _element_to_string(self, element):
300 def _element_to_string(self, element):
302 fromstring, tostring, CSSSelector = self.get_imports()
301 fromstring, tostring, CSSSelector = self.get_imports()
303 return tostring(element)
302 return tostring(element)
304
303
305
304
306 class _Url(object):
305 class _Url(object):
307 """
306 """
308 A url object that can be compared with other url orbjects
307 A url object that can be compared with other url orbjects
309 without regard to the vagaries of encoding, escaping, and ordering
308 without regard to the vagaries of encoding, escaping, and ordering
310 of parameters in query strings.
309 of parameters in query strings.
311
310
312 Inspired by
311 Inspired by
313 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
312 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
314 """
313 """
315
314
316 def __init__(self, url):
315 def __init__(self, url):
317 parts = urlparse(url)
316 parts = urlparse(url)
318 _query = frozenset(parse_qsl(parts.query))
317 _query = frozenset(parse_qsl(parts.query))
319 _path = unquote_plus(parts.path)
318 _path = unquote_plus(parts.path)
320 parts = parts._replace(query=_query, path=_path)
319 parts = parts._replace(query=_query, path=_path)
321 self.parts = parts
320 self.parts = parts
322
321
323 def __eq__(self, other):
322 def __eq__(self, other):
324 return self.parts == other.parts
323 return self.parts == other.parts
325
324
326 def __hash__(self):
325 def __hash__(self):
327 return hash(self.parts)
326 return hash(self.parts)
328
327
329
328
330 def run_test_concurrently(times, raise_catched_exc=True):
329 def run_test_concurrently(times, raise_catched_exc=True):
331 """
330 """
332 Add this decorator to small pieces of code that you want to test
331 Add this decorator to small pieces of code that you want to test
333 concurrently
332 concurrently
334
333
335 ex:
334 ex:
336
335
337 @test_concurrently(25)
336 @test_concurrently(25)
338 def my_test_function():
337 def my_test_function():
339 ...
338 ...
340 """
339 """
341 def test_concurrently_decorator(test_func):
340 def test_concurrently_decorator(test_func):
342 def wrapper(*args, **kwargs):
341 def wrapper(*args, **kwargs):
343 exceptions = []
342 exceptions = []
344
343
345 def call_test_func():
344 def call_test_func():
346 try:
345 try:
347 test_func(*args, **kwargs)
346 test_func(*args, **kwargs)
348 except Exception as e:
347 except Exception as e:
349 exceptions.append(e)
348 exceptions.append(e)
350 if raise_catched_exc:
349 if raise_catched_exc:
351 raise
350 raise
352 threads = []
351 threads = []
353 for i in range(times):
352 for i in range(times):
354 threads.append(threading.Thread(target=call_test_func))
353 threads.append(threading.Thread(target=call_test_func))
355 for t in threads:
354 for t in threads:
356 t.start()
355 t.start()
357 for t in threads:
356 for t in threads:
358 t.join()
357 t.join()
359 if exceptions:
358 if exceptions:
360 raise Exception(
359 raise Exception(
361 'test_concurrently intercepted %s exceptions: %s' % (
360 'test_concurrently intercepted %s exceptions: %s' % (
362 len(exceptions), exceptions))
361 len(exceptions), exceptions))
363 return wrapper
362 return wrapper
364 return test_concurrently_decorator
363 return test_concurrently_decorator
365
364
366
365
367 def wait_for_url(url, timeout=10):
366 def wait_for_url(url, timeout=10):
368 """
367 """
369 Wait until URL becomes reachable.
368 Wait until URL becomes reachable.
370
369
371 It polls the URL until the timeout is reached or it became reachable.
370 It polls the URL until the timeout is reached or it became reachable.
372 If will call to `py.test.fail` in case the URL is not reachable.
371 If will call to `py.test.fail` in case the URL is not reachable.
373 """
372 """
374 timeout = time.time() + timeout
373 timeout = time.time() + timeout
375 last = 0
374 last = 0
376 wait = 0.1
375 wait = 0.1
377
376
378 while timeout > last:
377 while timeout > last:
379 last = time.time()
378 last = time.time()
380 if is_url_reachable(url):
379 if is_url_reachable(url):
381 break
380 break
382 elif (last + wait) > time.time():
381 elif (last + wait) > time.time():
383 # Go to sleep because not enough time has passed since last check.
382 # Go to sleep because not enough time has passed since last check.
384 time.sleep(wait)
383 time.sleep(wait)
385 else:
384 else:
386 pytest.fail("Timeout while waiting for URL {}".format(url))
385 pytest.fail("Timeout while waiting for URL {}".format(url))
387
386
388
387
389 def is_url_reachable(url):
388 def is_url_reachable(url):
390 try:
389 try:
391 urllib2.urlopen(url)
390 urllib2.urlopen(url)
392 except urllib2.URLError:
391 except urllib2.URLError:
393 return False
392 return False
394 return True
393 return True
395
394
396
395
397 def repo_on_filesystem(repo_name):
396 def repo_on_filesystem(repo_name):
398 from rhodecode.lib import vcs
397 from rhodecode.lib import vcs
399 from rhodecode.tests import TESTS_TMP_PATH
398 from rhodecode.tests import TESTS_TMP_PATH
400 repo = vcs.get_vcs_instance(
399 repo = vcs.get_vcs_instance(
401 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
400 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
402 return repo is not None
401 return repo is not None
403
402
404
403
405 def commit_change(
404 def commit_change(
406 repo, filename, content, message, vcs_type, parent=None, newfile=False):
405 repo, filename, content, message, vcs_type, parent=None, newfile=False):
407 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
406 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
408
407
409 repo = Repository.get_by_repo_name(repo)
408 repo = Repository.get_by_repo_name(repo)
410 _commit = parent
409 _commit = parent
411 if not parent:
410 if not parent:
412 _commit = EmptyCommit(alias=vcs_type)
411 _commit = EmptyCommit(alias=vcs_type)
413
412
414 if newfile:
413 if newfile:
415 nodes = {
414 nodes = {
416 filename: {
415 filename: {
417 'content': content
416 'content': content
418 }
417 }
419 }
418 }
420 commit = ScmModel().create_nodes(
419 commit = ScmModel().create_nodes(
421 user=TEST_USER_ADMIN_LOGIN, repo=repo,
420 user=TEST_USER_ADMIN_LOGIN, repo=repo,
422 message=message,
421 message=message,
423 nodes=nodes,
422 nodes=nodes,
424 parent_commit=_commit,
423 parent_commit=_commit,
425 author=TEST_USER_ADMIN_LOGIN,
424 author=TEST_USER_ADMIN_LOGIN,
426 )
425 )
427 else:
426 else:
428 commit = ScmModel().commit_change(
427 commit = ScmModel().commit_change(
429 repo=repo.scm_instance(), repo_name=repo.repo_name,
428 repo=repo.scm_instance(), repo_name=repo.repo_name,
430 commit=parent, user=TEST_USER_ADMIN_LOGIN,
429 commit=parent, user=TEST_USER_ADMIN_LOGIN,
431 author=TEST_USER_ADMIN_LOGIN,
430 author=TEST_USER_ADMIN_LOGIN,
432 message=message,
431 message=message,
433 content=content,
432 content=content,
434 f_path=filename
433 f_path=filename
435 )
434 )
436 return commit
435 return commit
437
436
438
437
439 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
438 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
440 if not default:
439 if not default:
441 raise ValueError('Permission for default user must be given')
440 raise ValueError('Permission for default user must be given')
442 form_data = [(
441 form_data = [(
443 'csrf_token', csrf_token
442 'csrf_token', csrf_token
444 )]
443 )]
445 # add default
444 # add default
446 form_data.extend([
445 form_data.extend([
447 ('u_perm_1', default)
446 ('u_perm_1', default)
448 ])
447 ])
449
448
450 if grant:
449 if grant:
451 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
450 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
452 form_data.extend([
451 form_data.extend([
453 ('perm_new_member_perm_new{}'.format(cnt), perm),
452 ('perm_new_member_perm_new{}'.format(cnt), perm),
454 ('perm_new_member_id_new{}'.format(cnt), obj_id),
453 ('perm_new_member_id_new{}'.format(cnt), obj_id),
455 ('perm_new_member_name_new{}'.format(cnt), obj_name),
454 ('perm_new_member_name_new{}'.format(cnt), obj_name),
456 ('perm_new_member_type_new{}'.format(cnt), obj_type),
455 ('perm_new_member_type_new{}'.format(cnt), obj_type),
457
456
458 ])
457 ])
459 if revoke:
458 if revoke:
460 for obj_id, obj_type in revoke:
459 for obj_id, obj_type in revoke:
461 form_data.extend([
460 form_data.extend([
462 ('perm_del_member_id_{}'.format(obj_id), obj_id),
461 ('perm_del_member_id_{}'.format(obj_id), obj_id),
463 ('perm_del_member_type_{}'.format(obj_id), obj_type),
462 ('perm_del_member_type_{}'.format(obj_id), obj_type),
464 ])
463 ])
465 return form_data
464 return form_data
General Comments 0
You need to be logged in to leave comments. Login now