##// END OF EJS Templates
tests: fixed tests for archivals
super-admin -
r5150:1fa672f5 default
parent child Browse files
Show More
@@ -1,1125 +1,1125 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import os
21 21
22 22 import mock
23 23 import pytest
24 from collections import OrderedDict
24 25
25 26 from rhodecode.apps.repository.tests.test_repo_compare import ComparePage
26 27 from rhodecode.apps.repository.views.repo_files import RepoFilesView, get_archive_name, get_path_sha
27 28 from rhodecode.lib import helpers as h
28 from collections import OrderedDict
29 29 from rhodecode.lib.ext_json import json
30 30 from rhodecode.lib.str_utils import safe_str
31 31 from rhodecode.lib.vcs import nodes
32 from rhodecode.lib.vcs.conf import settings
33 from rhodecode.model.db import Session, Repository
32 34
33 from rhodecode.lib.vcs.conf import settings
34 35 from rhodecode.tests import assert_session_flash
35 36 from rhodecode.tests.fixture import Fixture
36 from rhodecode.model.db import Session
37 37
38 38 fixture = Fixture()
39 39
40 40
41 41 def get_node_history(backend_type):
42 42 return {
43 43 'hg': json.loads(fixture.load_resource('hg_node_history_response.json')),
44 44 'git': json.loads(fixture.load_resource('git_node_history_response.json')),
45 45 'svn': json.loads(fixture.load_resource('svn_node_history_response.json')),
46 46 }[backend_type]
47 47
48 48
49 49 def route_path(name, params=None, **kwargs):
50 50 import urllib.request
51 51 import urllib.parse
52 52 import urllib.error
53 53
54 54 base_url = {
55 55 'repo_summary': '/{repo_name}',
56 56 'repo_archivefile': '/{repo_name}/archive/{fname}',
57 57 'repo_files_diff': '/{repo_name}/diff/{f_path}',
58 58 'repo_files_diff_2way_redirect': '/{repo_name}/diff-2way/{f_path}',
59 59 'repo_files': '/{repo_name}/files/{commit_id}/{f_path}',
60 60 'repo_files:default_path': '/{repo_name}/files/{commit_id}/',
61 61 'repo_files:default_commit': '/{repo_name}/files',
62 62 'repo_files:rendered': '/{repo_name}/render/{commit_id}/{f_path}',
63 63 'repo_files:annotated': '/{repo_name}/annotate/{commit_id}/{f_path}',
64 64 'repo_files:annotated_previous': '/{repo_name}/annotate-previous/{commit_id}/{f_path}',
65 65 'repo_files_nodelist': '/{repo_name}/nodelist/{commit_id}/{f_path}',
66 66 'repo_file_raw': '/{repo_name}/raw/{commit_id}/{f_path}',
67 67 'repo_file_download': '/{repo_name}/download/{commit_id}/{f_path}',
68 68 'repo_file_history': '/{repo_name}/history/{commit_id}/{f_path}',
69 69 'repo_file_authors': '/{repo_name}/authors/{commit_id}/{f_path}',
70 70 'repo_files_remove_file': '/{repo_name}/remove_file/{commit_id}/{f_path}',
71 71 'repo_files_delete_file': '/{repo_name}/delete_file/{commit_id}/{f_path}',
72 72 'repo_files_edit_file': '/{repo_name}/edit_file/{commit_id}/{f_path}',
73 73 'repo_files_update_file': '/{repo_name}/update_file/{commit_id}/{f_path}',
74 74 'repo_files_add_file': '/{repo_name}/add_file/{commit_id}/{f_path}',
75 75 'repo_files_upload_file': '/{repo_name}/upload_file/{commit_id}/{f_path}',
76 76 'repo_files_create_file': '/{repo_name}/create_file/{commit_id}/{f_path}',
77 77 'repo_nodetree_full': '/{repo_name}/nodetree_full/{commit_id}/{f_path}',
78 78 'repo_nodetree_full:default_path': '/{repo_name}/nodetree_full/{commit_id}/',
79 79 }[name].format(**kwargs)
80 80
81 81 if params:
82 82 base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
83 83 return base_url
84 84
85 85
86 86 def assert_files_in_response(response, files, params):
87 87 template = (
88 88 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"')
89 89 _assert_items_in_response(response, files, template, params)
90 90
91 91
92 92 def assert_dirs_in_response(response, dirs, params):
93 93 template = (
94 94 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"')
95 95 _assert_items_in_response(response, dirs, template, params)
96 96
97 97
98 98 def _assert_items_in_response(response, items, template, params):
99 99 for item in items:
100 100 item_params = {'name': item}
101 101 item_params.update(params)
102 102 response.mustcontain(template % item_params)
103 103
104 104
105 105 def assert_timeago_in_response(response, items, params):
106 106 for item in items:
107 107 response.mustcontain(h.age_component(params['date']))
108 108
109 109
110 110 @pytest.mark.usefixtures("app")
111 111 class TestFilesViews(object):
112 112
113 113 def test_show_files(self, backend):
114 114 response = self.app.get(
115 115 route_path('repo_files',
116 116 repo_name=backend.repo_name,
117 117 commit_id='tip', f_path='/'))
118 118 commit = backend.repo.get_commit()
119 119
120 120 params = {
121 121 'repo_name': backend.repo_name,
122 122 'commit_id': commit.raw_id,
123 123 'date': commit.date
124 124 }
125 125 assert_dirs_in_response(response, ['docs', 'vcs'], params)
126 126 files = [
127 127 '.gitignore',
128 128 '.hgignore',
129 129 '.hgtags',
130 130 # TODO: missing in Git
131 131 # '.travis.yml',
132 132 'MANIFEST.in',
133 133 'README.rst',
134 134 # TODO: File is missing in svn repository
135 135 # 'run_test_and_report.sh',
136 136 'setup.cfg',
137 137 'setup.py',
138 138 'test_and_report.sh',
139 139 'tox.ini',
140 140 ]
141 141 assert_files_in_response(response, files, params)
142 142 assert_timeago_in_response(response, files, params)
143 143
144 144 def test_show_files_links_submodules_with_absolute_url(self, backend_hg):
145 145 repo = backend_hg['subrepos']
146 146 response = self.app.get(
147 147 route_path('repo_files',
148 148 repo_name=repo.repo_name,
149 149 commit_id='tip', f_path='/'))
150 150 assert_response = response.assert_response()
151 151 assert_response.contains_one_link(
152 152 'absolute-path @ 000000000000', 'http://example.com/absolute-path')
153 153
154 154 def test_show_files_links_submodules_with_absolute_url_subpaths(
155 155 self, backend_hg):
156 156 repo = backend_hg['subrepos']
157 157 response = self.app.get(
158 158 route_path('repo_files',
159 159 repo_name=repo.repo_name,
160 160 commit_id='tip', f_path='/'))
161 161 assert_response = response.assert_response()
162 162 assert_response.contains_one_link(
163 163 'subpaths-path @ 000000000000',
164 164 'http://sub-base.example.com/subpaths-path')
165 165
166 166 @pytest.mark.xfail_backends("svn", reason="Depends on branch support")
167 167 def test_files_menu(self, backend):
168 168 new_branch = "temp_branch_name"
169 169 commits = [
170 170 {'message': 'a'},
171 171 {'message': 'b', 'branch': new_branch}
172 172 ]
173 173 backend.create_repo(commits)
174 backend.repo.landing_rev = "branch:%s" % new_branch
174 backend.repo.landing_rev = f"branch:{new_branch}"
175 175 Session().commit()
176 176
177 177 # get response based on tip and not new commit
178 178 response = self.app.get(
179 179 route_path('repo_files',
180 180 repo_name=backend.repo_name,
181 181 commit_id='tip', f_path='/'))
182 182
183 183 # make sure Files menu url is not tip but new commit
184 184 landing_rev = backend.repo.landing_ref_name
185 185 files_url = route_path('repo_files:default_path',
186 186 repo_name=backend.repo_name,
187 187 commit_id=landing_rev, params={'at': landing_rev})
188 188
189 189 assert landing_rev != 'tip'
190 response.mustcontain(
191 '<li class="active"><a class="menulink" href="%s">' % files_url)
190 response.mustcontain(f'<li class="active"><a class="menulink" href="{files_url}">')
192 191
193 192 def test_show_files_commit(self, backend):
194 193 commit = backend.repo.get_commit(commit_idx=32)
195 194
196 195 response = self.app.get(
197 196 route_path('repo_files',
198 197 repo_name=backend.repo_name,
199 198 commit_id=commit.raw_id, f_path='/'))
200 199
201 200 dirs = ['docs', 'tests']
202 201 files = ['README.rst']
203 202 params = {
204 203 'repo_name': backend.repo_name,
205 204 'commit_id': commit.raw_id,
206 205 }
207 206 assert_dirs_in_response(response, dirs, params)
208 207 assert_files_in_response(response, files, params)
209 208
210 209 def test_show_files_different_branch(self, backend):
211 210 branches = dict(
212 211 hg=(150, ['git']),
213 212 # TODO: Git test repository does not contain other branches
214 213 git=(633, ['master']),
215 214 # TODO: Branch support in Subversion
216 215 svn=(150, [])
217 216 )
218 217 idx, branches = branches[backend.alias]
219 218 commit = backend.repo.get_commit(commit_idx=idx)
220 219 response = self.app.get(
221 220 route_path('repo_files',
222 221 repo_name=backend.repo_name,
223 222 commit_id=commit.raw_id, f_path='/'))
224 223
225 224 assert_response = response.assert_response()
226 225 for branch in branches:
227 226 assert_response.element_contains('.tags .branchtag', branch)
228 227
229 228 def test_show_files_paging(self, backend):
230 229 repo = backend.repo
231 230 indexes = [73, 92, 109, 1, 0]
232 231 idx_map = [(rev, repo.get_commit(commit_idx=rev).raw_id)
233 232 for rev in indexes]
234 233
235 234 for idx in idx_map:
236 235 response = self.app.get(
237 236 route_path('repo_files',
238 237 repo_name=backend.repo_name,
239 238 commit_id=idx[1], f_path='/'))
240 239
241 240 response.mustcontain("""r%s:%s""" % (idx[0], idx[1][:8]))
242 241
243 242 def test_file_source(self, backend):
244 243 commit = backend.repo.get_commit(commit_idx=167)
245 244 response = self.app.get(
246 245 route_path('repo_files',
247 246 repo_name=backend.repo_name,
248 247 commit_id=commit.raw_id, f_path='vcs/nodes.py'))
249 248
250 249 msgbox = """<div class="commit">%s</div>"""
251 250 response.mustcontain(msgbox % (commit.message, ))
252 251
253 252 assert_response = response.assert_response()
254 253 if commit.branch:
255 254 assert_response.element_contains(
256 255 '.tags.tags-main .branchtag', commit.branch)
257 256 if commit.tags:
258 257 for tag in commit.tags:
259 258 assert_response.element_contains('.tags.tags-main .tagtag', tag)
260 259
261 260 def test_file_source_annotated(self, backend):
262 261 response = self.app.get(
263 262 route_path('repo_files:annotated',
264 263 repo_name=backend.repo_name,
265 264 commit_id='tip', f_path='vcs/nodes.py'))
266 265 expected_commits = {
267 266 'hg': 'r356',
268 267 'git': 'r345',
269 268 'svn': 'r208',
270 269 }
271 270 response.mustcontain(expected_commits[backend.alias])
272 271
273 272 def test_file_source_authors(self, backend):
274 273 response = self.app.get(
275 274 route_path('repo_file_authors',
276 275 repo_name=backend.repo_name,
277 276 commit_id='tip', f_path='vcs/nodes.py'))
278 277 expected_authors = {
279 278 'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
280 279 'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
281 280 'svn': ('marcin', 'lukasz'),
282 281 }
283 282
284 283 for author in expected_authors[backend.alias]:
285 284 response.mustcontain(author)
286 285
287 286 def test_file_source_authors_with_annotation(self, backend):
288 287 response = self.app.get(
289 288 route_path('repo_file_authors',
290 289 repo_name=backend.repo_name,
291 290 commit_id='tip', f_path='vcs/nodes.py',
292 291 params=dict(annotate=1)))
293 292 expected_authors = {
294 293 'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
295 294 'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
296 295 'svn': ('marcin', 'lukasz'),
297 296 }
298 297
299 298 for author in expected_authors[backend.alias]:
300 299 response.mustcontain(author)
301 300
302 301 def test_file_source_history(self, backend, xhr_header):
303 302 response = self.app.get(
304 303 route_path('repo_file_history',
305 304 repo_name=backend.repo_name,
306 305 commit_id='tip', f_path='vcs/nodes.py'),
307 306 extra_environ=xhr_header)
308 307 assert get_node_history(backend.alias) == json.loads(response.body)
309 308
310 309 def test_file_source_history_svn(self, backend_svn, xhr_header):
311 310 simple_repo = backend_svn['svn-simple-layout']
312 311 response = self.app.get(
313 312 route_path('repo_file_history',
314 313 repo_name=simple_repo.repo_name,
315 314 commit_id='tip', f_path='trunk/example.py'),
316 315 extra_environ=xhr_header)
317 316
318 317 expected_data = json.loads(
319 318 fixture.load_resource('svn_node_history_branches.json'))
320 319
321 320 assert expected_data == response.json
322 321
323 322 def test_file_source_history_with_annotation(self, backend, xhr_header):
324 323 response = self.app.get(
325 324 route_path('repo_file_history',
326 325 repo_name=backend.repo_name,
327 326 commit_id='tip', f_path='vcs/nodes.py',
328 327 params=dict(annotate=1)),
329 328
330 329 extra_environ=xhr_header)
331 330 assert get_node_history(backend.alias) == json.loads(response.body)
332 331
333 332 def test_tree_search_top_level(self, backend, xhr_header):
334 333 commit = backend.repo.get_commit(commit_idx=173)
335 334 response = self.app.get(
336 335 route_path('repo_files_nodelist',
337 336 repo_name=backend.repo_name,
338 337 commit_id=commit.raw_id, f_path='/'),
339 338 extra_environ=xhr_header)
340 339 assert 'nodes' in response.json
341 340 assert {'name': 'docs', 'type': 'dir'} in response.json['nodes']
342 341
343 342 def test_tree_search_missing_xhr(self, backend):
344 343 self.app.get(
345 344 route_path('repo_files_nodelist',
346 345 repo_name=backend.repo_name,
347 346 commit_id='tip', f_path='/'),
348 347 status=404)
349 348
350 349 def test_tree_search_at_path(self, backend, xhr_header):
351 350 commit = backend.repo.get_commit(commit_idx=173)
352 351 response = self.app.get(
353 352 route_path('repo_files_nodelist',
354 353 repo_name=backend.repo_name,
355 354 commit_id=commit.raw_id, f_path='/docs'),
356 355 extra_environ=xhr_header)
357 356 assert 'nodes' in response.json
358 357 nodes = response.json['nodes']
359 358 assert {'name': 'docs/api', 'type': 'dir'} in nodes
360 359 assert {'name': 'docs/index.rst', 'type': 'file'} in nodes
361 360
362 361 def test_tree_search_at_path_2nd_level(self, backend, xhr_header):
363 362 commit = backend.repo.get_commit(commit_idx=173)
364 363 response = self.app.get(
365 364 route_path('repo_files_nodelist',
366 365 repo_name=backend.repo_name,
367 366 commit_id=commit.raw_id, f_path='/docs/api'),
368 367 extra_environ=xhr_header)
369 368 assert 'nodes' in response.json
370 369 nodes = response.json['nodes']
371 370 assert {'name': 'docs/api/index.rst', 'type': 'file'} in nodes
372 371
373 372 def test_tree_search_at_path_missing_xhr(self, backend):
374 373 self.app.get(
375 374 route_path('repo_files_nodelist',
376 375 repo_name=backend.repo_name,
377 376 commit_id='tip', f_path='/docs'),
378 377 status=404)
379 378
380 379 def test_nodetree(self, backend, xhr_header):
381 380 commit = backend.repo.get_commit(commit_idx=173)
382 381 response = self.app.get(
383 382 route_path('repo_nodetree_full',
384 383 repo_name=backend.repo_name,
385 384 commit_id=commit.raw_id, f_path='/'),
386 385 extra_environ=xhr_header)
387 386
388 387 assert_response = response.assert_response()
389 388
390 389 for attr in ['data-commit-id', 'data-date', 'data-author']:
391 390 elements = assert_response.get_elements('[{}]'.format(attr))
392 391 assert len(elements) > 1
393 392
394 393 for element in elements:
395 394 assert element.get(attr)
396 395
397 396 def test_nodetree_if_file(self, backend, xhr_header):
398 397 commit = backend.repo.get_commit(commit_idx=173)
399 398 response = self.app.get(
400 399 route_path('repo_nodetree_full',
401 400 repo_name=backend.repo_name,
402 401 commit_id=commit.raw_id, f_path='README.rst'),
403 402 extra_environ=xhr_header)
404 403 assert response.text == ''
405 404
406 405 def test_nodetree_wrong_path(self, backend, xhr_header):
407 406 commit = backend.repo.get_commit(commit_idx=173)
408 407 response = self.app.get(
409 408 route_path('repo_nodetree_full',
410 409 repo_name=backend.repo_name,
411 410 commit_id=commit.raw_id, f_path='/dont-exist'),
412 411 extra_environ=xhr_header)
413 412
414 413 err = 'error: There is no file nor ' \
415 414 'directory at the given path'
416 415 assert err in response.text
417 416
418 417 def test_nodetree_missing_xhr(self, backend):
419 418 self.app.get(
420 419 route_path('repo_nodetree_full',
421 420 repo_name=backend.repo_name,
422 421 commit_id='tip', f_path='/'),
423 422 status=404)
424 423
425 424
426 425 @pytest.mark.usefixtures("app", "autologin_user")
427 426 class TestRawFileHandling(object):
428 427
429 428 def test_download_file(self, backend):
430 429 commit = backend.repo.get_commit(commit_idx=173)
431 430 response = self.app.get(
432 431 route_path('repo_file_download',
433 432 repo_name=backend.repo_name,
434 433 commit_id=commit.raw_id, f_path='vcs/nodes.py'),)
435 434
436 435 assert response.content_disposition == 'attachment; filename="nodes.py"; filename*=UTF-8\'\'nodes.py'
437 436 assert response.content_type == "text/x-python"
438 437
439 438 def test_download_file_wrong_cs(self, backend):
440 439 raw_id = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
441 440
442 441 response = self.app.get(
443 442 route_path('repo_file_download',
444 443 repo_name=backend.repo_name,
445 444 commit_id=raw_id, f_path='vcs/nodes.svg'),
446 445 status=404)
447 446
448 447 msg = """No such commit exists for this repository"""
449 448 response.mustcontain(msg)
450 449
451 450 def test_download_file_wrong_f_path(self, backend):
452 451 commit = backend.repo.get_commit(commit_idx=173)
453 452 f_path = 'vcs/ERRORnodes.py'
454 453
455 454 response = self.app.get(
456 455 route_path('repo_file_download',
457 456 repo_name=backend.repo_name,
458 457 commit_id=commit.raw_id, f_path=f_path),
459 458 status=404)
460 459
461 460 msg = (
462 461 "There is no file nor directory at the given path: "
463 462 "`%s` at commit %s" % (f_path, commit.short_id))
464 463 response.mustcontain(msg)
465 464
466 465 def test_file_raw(self, backend):
467 466 commit = backend.repo.get_commit(commit_idx=173)
468 467 response = self.app.get(
469 468 route_path('repo_file_raw',
470 469 repo_name=backend.repo_name,
471 470 commit_id=commit.raw_id, f_path='vcs/nodes.py'),)
472 471
473 472 assert response.content_type == "text/plain"
474 473
475 474 def test_file_raw_binary(self, backend):
476 475 commit = backend.repo.get_commit()
477 476 response = self.app.get(
478 477 route_path('repo_file_raw',
479 478 repo_name=backend.repo_name,
480 479 commit_id=commit.raw_id,
481 480 f_path='docs/theme/ADC/static/breadcrumb_background.png'),)
482 481
483 482 assert response.content_disposition == 'inline'
484 483
485 484 def test_raw_file_wrong_cs(self, backend):
486 485 raw_id = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
487 486
488 487 response = self.app.get(
489 488 route_path('repo_file_raw',
490 489 repo_name=backend.repo_name,
491 490 commit_id=raw_id, f_path='vcs/nodes.svg'),
492 491 status=404)
493 492
494 493 msg = """No such commit exists for this repository"""
495 494 response.mustcontain(msg)
496 495
497 496 def test_raw_wrong_f_path(self, backend):
498 497 commit = backend.repo.get_commit(commit_idx=173)
499 498 f_path = 'vcs/ERRORnodes.py'
500 499 response = self.app.get(
501 500 route_path('repo_file_raw',
502 501 repo_name=backend.repo_name,
503 502 commit_id=commit.raw_id, f_path=f_path),
504 503 status=404)
505 504
506 505 msg = (
507 506 "There is no file nor directory at the given path: "
508 507 "`%s` at commit %s" % (f_path, commit.short_id))
509 508 response.mustcontain(msg)
510 509
511 510 def test_raw_svg_should_not_be_rendered(self, backend):
512 511 backend.create_repo()
513 512 backend.ensure_file(b"xss.svg")
514 513 response = self.app.get(
515 514 route_path('repo_file_raw',
516 515 repo_name=backend.repo_name,
517 516 commit_id='tip', f_path='xss.svg'),)
518 517 # If the content type is image/svg+xml then it allows to render HTML
519 518 # and malicious SVG.
520 519 assert response.content_type == "text/plain"
521 520
522 521
523 522 @pytest.mark.usefixtures("app")
524 523 class TestRepositoryArchival(object):
525 524
526 525 def test_archival(self, backend):
527 526 backend.enable_downloads()
528 527 commit = backend.repo.get_commit(commit_idx=173)
528
529 529 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
530 530 path_sha = get_path_sha('/')
531 filename = get_archive_name(backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
531 filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
532 532
533 533 fname = commit.raw_id + extension
534 534 response = self.app.get(
535 535 route_path('repo_archivefile',
536 536 repo_name=backend.repo_name,
537 537 fname=fname))
538 538
539 539 assert response.status == '200 OK'
540 540 headers = [
541 ('Content-Disposition', 'attachment; filename=%s' % filename),
542 ('Content-Type', '%s' % content_type),
541 ('Content-Disposition', f'attachment; filename={filename}'),
542 ('Content-Type', content_type),
543 543 ]
544 544
545 545 for header in headers:
546 assert header in response.headers.items()
546 assert header in list(response.headers.items())
547 547
548 548 def test_archival_no_hash(self, backend):
549 549 backend.enable_downloads()
550 550 commit = backend.repo.get_commit(commit_idx=173)
551 551 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
552 552 path_sha = get_path_sha('/')
553 filename = get_archive_name(backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha, with_hash=False)
553 filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha, with_hash=False)
554 554
555 555 fname = commit.raw_id + extension
556 556 response = self.app.get(
557 557 route_path('repo_archivefile',
558 558 repo_name=backend.repo_name,
559 559 fname=fname, params={'with_hash': 0}))
560 560
561 561 assert response.status == '200 OK'
562 562 headers = [
563 ('Content-Disposition', 'attachment; filename=%s' % filename),
564 ('Content-Type', '%s' % content_type),
563 ('Content-Disposition', f'attachment; filename={filename}'),
564 ('Content-Type', content_type),
565 565 ]
566 566
567 567 for header in headers:
568 568 assert header in list(response.headers.items())
569 569
570 570 def test_archival_at_path(self, backend):
571 571 backend.enable_downloads()
572 572 commit = backend.repo.get_commit(commit_idx=190)
573 573 at_path = 'vcs'
574 574
575 575 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
576 576 path_sha = get_path_sha(at_path)
577 filename = get_archive_name(backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
577 filename = get_archive_name(backend.repo_id, backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
578 578
579 579 fname = commit.raw_id + extension
580 580 response = self.app.get(
581 581 route_path('repo_archivefile',
582 582 repo_name=backend.repo_name,
583 583 fname=fname, params={'at_path': at_path}))
584 584
585 585 assert response.status == '200 OK'
586 586 headers = [
587 ('Content-Disposition', 'attachment; filename=%s' % filename),
588 ('Content-Type', '%s' % content_type),
587 ('Content-Disposition', f'attachment; filename={filename}'),
588 ('Content-Type', content_type),
589 589 ]
590 590
591 591 for header in headers:
592 592 assert header in list(response.headers.items())
593 593
594 594 @pytest.mark.parametrize('arch_ext',[
595 595 'tar', 'rar', 'x', '..ax', '.zipz', 'tar.gz.tar'])
596 596 def test_archival_wrong_ext(self, backend, arch_ext):
597 597 backend.enable_downloads()
598 598 commit = backend.repo.get_commit(commit_idx=173)
599 599
600 600 fname = commit.raw_id + '.' + arch_ext
601 601
602 602 response = self.app.get(
603 603 route_path('repo_archivefile',
604 604 repo_name=backend.repo_name,
605 605 fname=fname))
606 606 response.mustcontain(
607 607 'Unknown archive type for: `{}`'.format(fname))
608 608
609 609 @pytest.mark.parametrize('commit_id', [
610 610 '00x000000', 'tar', 'wrong', '@$@$42413232', '232dffcd'])
611 611 def test_archival_wrong_commit_id(self, backend, commit_id):
612 612 backend.enable_downloads()
613 fname = '%s.zip' % commit_id
613 fname = f'{commit_id}.zip'
614 614
615 615 response = self.app.get(
616 616 route_path('repo_archivefile',
617 617 repo_name=backend.repo_name,
618 618 fname=fname))
619 619 response.mustcontain('Unknown commit_id')
620 620
621 621
622 622 @pytest.mark.usefixtures("app")
623 623 class TestFilesDiff(object):
624 624
625 625 @pytest.mark.parametrize("diff", ['diff', 'download', 'raw'])
626 626 def test_file_full_diff(self, backend, diff):
627 627 commit1 = backend.repo.get_commit(commit_idx=-1)
628 628 commit2 = backend.repo.get_commit(commit_idx=-2)
629 629
630 630 response = self.app.get(
631 631 route_path('repo_files_diff',
632 632 repo_name=backend.repo_name,
633 633 f_path='README'),
634 634 params={
635 635 'diff1': commit2.raw_id,
636 636 'diff2': commit1.raw_id,
637 637 'fulldiff': '1',
638 638 'diff': diff,
639 639 })
640 640
641 641 if diff == 'diff':
642 642 # use redirect since this is OLD view redirecting to compare page
643 643 response = response.follow()
644 644
645 645 # It's a symlink to README.rst
646 646 response.mustcontain('README.rst')
647 647 response.mustcontain('No newline at end of file')
648 648
649 649 def test_file_binary_diff(self, backend):
650 650 commits = [
651 651 {'message': 'First commit'},
652 652 {'message': 'Commit with binary',
653 653 'added': [nodes.FileNode(b'file.bin', content='\0BINARY\0')]},
654 654 ]
655 655 repo = backend.create_repo(commits=commits)
656 656
657 657 response = self.app.get(
658 658 route_path('repo_files_diff',
659 659 repo_name=backend.repo_name,
660 660 f_path='file.bin'),
661 661 params={
662 662 'diff1': repo.get_commit(commit_idx=0).raw_id,
663 663 'diff2': repo.get_commit(commit_idx=1).raw_id,
664 664 'fulldiff': '1',
665 665 'diff': 'diff',
666 666 })
667 667 # use redirect since this is OLD view redirecting to compare page
668 668 response = response.follow()
669 669 response.mustcontain('Collapse 1 commit')
670 670 file_changes = (1, 0, 0)
671 671
672 672 compare_page = ComparePage(response)
673 673 compare_page.contains_change_summary(*file_changes)
674 674
675 675 if backend.alias == 'svn':
676 676 response.mustcontain('new file 10644')
677 677 # TODO(marcink): SVN doesn't yet detect binary changes
678 678 else:
679 679 response.mustcontain('new file 100644')
680 680 response.mustcontain('binary diff hidden')
681 681
682 682 def test_diff_2way(self, backend):
683 683 commit1 = backend.repo.get_commit(commit_idx=-1)
684 684 commit2 = backend.repo.get_commit(commit_idx=-2)
685 685 response = self.app.get(
686 686 route_path('repo_files_diff_2way_redirect',
687 687 repo_name=backend.repo_name,
688 688 f_path='README'),
689 689 params={
690 690 'diff1': commit2.raw_id,
691 691 'diff2': commit1.raw_id,
692 692 })
693 693 # use redirect since this is OLD view redirecting to compare page
694 694 response = response.follow()
695 695
696 696 # It's a symlink to README.rst
697 697 response.mustcontain('README.rst')
698 698 response.mustcontain('No newline at end of file')
699 699
700 700 def test_requires_one_commit_id(self, backend, autologin_user):
701 701 response = self.app.get(
702 702 route_path('repo_files_diff',
703 703 repo_name=backend.repo_name,
704 704 f_path='README.rst'),
705 705 status=400)
706 706 response.mustcontain(
707 707 'Need query parameter', 'diff1', 'diff2', 'to generate a diff.')
708 708
709 709 def test_returns_no_files_if_file_does_not_exist(self, vcsbackend):
710 710 repo = vcsbackend.repo
711 711 response = self.app.get(
712 712 route_path('repo_files_diff',
713 713 repo_name=repo.name,
714 714 f_path='does-not-exist-in-any-commit'),
715 715 params={
716 716 'diff1': repo[0].raw_id,
717 717 'diff2': repo[1].raw_id
718 718 })
719 719
720 720 response = response.follow()
721 721 response.mustcontain('No files')
722 722
723 723 def test_returns_redirect_if_file_not_changed(self, backend):
724 724 commit = backend.repo.get_commit(commit_idx=-1)
725 725 response = self.app.get(
726 726 route_path('repo_files_diff_2way_redirect',
727 727 repo_name=backend.repo_name,
728 728 f_path='README'),
729 729 params={
730 730 'diff1': commit.raw_id,
731 731 'diff2': commit.raw_id,
732 732 })
733 733
734 734 response = response.follow()
735 735 response.mustcontain('No files')
736 736 response.mustcontain('No commits in this compare')
737 737
738 738 def test_supports_diff_to_different_path_svn(self, backend_svn):
739 739 #TODO: check this case
740 740 return
741 741
742 742 repo = backend_svn['svn-simple-layout'].scm_instance()
743 743 commit_id_1 = '24'
744 744 commit_id_2 = '26'
745 745
746 746 response = self.app.get(
747 747 route_path('repo_files_diff',
748 748 repo_name=backend_svn.repo_name,
749 749 f_path='trunk/example.py'),
750 750 params={
751 751 'diff1': 'tags/v0.2/example.py@' + commit_id_1,
752 752 'diff2': commit_id_2,
753 753 })
754 754
755 755 response = response.follow()
756 756 response.mustcontain(
757 757 # diff contains this
758 758 "Will print out a useful message on invocation.")
759 759
760 760 # Note: Expecting that we indicate the user what's being compared
761 761 response.mustcontain("trunk/example.py")
762 762 response.mustcontain("tags/v0.2/example.py")
763 763
764 764 def test_show_rev_redirects_to_svn_path(self, backend_svn):
765 765 #TODO: check this case
766 766 return
767 767
768 768 repo = backend_svn['svn-simple-layout'].scm_instance()
769 769 commit_id = repo[-1].raw_id
770 770
771 771 response = self.app.get(
772 772 route_path('repo_files_diff',
773 773 repo_name=backend_svn.repo_name,
774 774 f_path='trunk/example.py'),
775 775 params={
776 776 'diff1': 'branches/argparse/example.py@' + commit_id,
777 777 'diff2': commit_id,
778 778 },
779 779 status=302)
780 780 response = response.follow()
781 781 assert response.headers['Location'].endswith(
782 782 'svn-svn-simple-layout/files/26/branches/argparse/example.py')
783 783
784 784 def test_show_rev_and_annotate_redirects_to_svn_path(self, backend_svn):
785 785 #TODO: check this case
786 786 return
787 787
788 788 repo = backend_svn['svn-simple-layout'].scm_instance()
789 789 commit_id = repo[-1].raw_id
790 790 response = self.app.get(
791 791 route_path('repo_files_diff',
792 792 repo_name=backend_svn.repo_name,
793 793 f_path='trunk/example.py'),
794 794 params={
795 795 'diff1': 'branches/argparse/example.py@' + commit_id,
796 796 'diff2': commit_id,
797 797 'show_rev': 'Show at Revision',
798 798 'annotate': 'true',
799 799 },
800 800 status=302)
801 801 response = response.follow()
802 802 assert response.headers['Location'].endswith(
803 803 'svn-svn-simple-layout/annotate/26/branches/argparse/example.py')
804 804
805 805
806 806 @pytest.mark.usefixtures("app", "autologin_user")
807 807 class TestModifyFilesWithWebInterface(object):
808 808
809 809 def test_add_file_view(self, backend):
810 810 self.app.get(
811 811 route_path('repo_files_add_file',
812 812 repo_name=backend.repo_name,
813 813 commit_id='tip', f_path='/')
814 814 )
815 815
816 816 @pytest.mark.xfail_backends("svn", reason="Depends on online editing")
817 817 def test_add_file_into_repo_missing_content(self, backend, csrf_token):
818 818 backend.create_repo()
819 819 filename = 'init.py'
820 820 response = self.app.post(
821 821 route_path('repo_files_create_file',
822 822 repo_name=backend.repo_name,
823 823 commit_id='tip', f_path='/'),
824 824 params={
825 825 'content': "",
826 826 'filename': filename,
827 827 'csrf_token': csrf_token,
828 828 },
829 829 status=302)
830 830 expected_msg = 'Successfully committed new file `{}`'.format(os.path.join(filename))
831 831 assert_session_flash(response, expected_msg)
832 832
833 833 def test_add_file_into_repo_missing_filename(self, backend, csrf_token):
834 834 commit_id = backend.repo.get_commit().raw_id
835 835 response = self.app.post(
836 836 route_path('repo_files_create_file',
837 837 repo_name=backend.repo_name,
838 838 commit_id=commit_id, f_path='/'),
839 839 params={
840 840 'content': "foo",
841 841 'csrf_token': csrf_token,
842 842 },
843 843 status=302)
844 844
845 845 assert_session_flash(response, 'No filename specified')
846 846
847 847 def test_add_file_into_repo_errors_and_no_commits(
848 848 self, backend, csrf_token):
849 849 repo = backend.create_repo()
850 850 # Create a file with no filename, it will display an error but
851 851 # the repo has no commits yet
852 852 response = self.app.post(
853 853 route_path('repo_files_create_file',
854 854 repo_name=repo.repo_name,
855 855 commit_id='tip', f_path='/'),
856 856 params={
857 857 'content': "foo",
858 858 'csrf_token': csrf_token,
859 859 },
860 860 status=302)
861 861
862 862 assert_session_flash(response, 'No filename specified')
863 863
864 864 # Not allowed, redirect to the summary
865 865 redirected = response.follow()
866 866 summary_url = h.route_path('repo_summary', repo_name=repo.repo_name)
867 867
868 868 # As there are no commits, displays the summary page with the error of
869 869 # creating a file with no filename
870 870
871 871 assert redirected.request.path == summary_url
872 872
873 873 @pytest.mark.parametrize("filename, clean_filename", [
874 874 ('/abs/foo', 'abs/foo'),
875 875 ('../rel/foo', 'rel/foo'),
876 876 ('file/../foo/foo', 'file/foo/foo'),
877 877 ])
878 878 def test_add_file_into_repo_bad_filenames(self, filename, clean_filename, backend, csrf_token):
879 879 repo = backend.create_repo()
880 880 commit_id = repo.get_commit().raw_id
881 881
882 882 response = self.app.post(
883 883 route_path('repo_files_create_file',
884 884 repo_name=repo.repo_name,
885 885 commit_id=commit_id, f_path='/'),
886 886 params={
887 887 'content': "foo",
888 888 'filename': filename,
889 889 'csrf_token': csrf_token,
890 890 },
891 891 status=302)
892 892
893 893 expected_msg = 'Successfully committed new file `{}`'.format(clean_filename)
894 894 assert_session_flash(response, expected_msg)
895 895
896 896 @pytest.mark.parametrize("cnt, filename, content", [
897 897 (1, 'foo.txt', "Content"),
898 898 (2, 'dir/foo.rst', "Content"),
899 899 (3, 'dir/foo-second.rst', "Content"),
900 900 (4, 'rel/dir/foo.bar', "Content"),
901 901 ])
902 902 def test_add_file_into_empty_repo(self, cnt, filename, content, backend, csrf_token):
903 903 repo = backend.create_repo()
904 904 commit_id = repo.get_commit().raw_id
905 905 response = self.app.post(
906 906 route_path('repo_files_create_file',
907 907 repo_name=repo.repo_name,
908 908 commit_id=commit_id, f_path='/'),
909 909 params={
910 910 'content': content,
911 911 'filename': filename,
912 912 'csrf_token': csrf_token,
913 913 },
914 914 status=302)
915 915
916 916 expected_msg = 'Successfully committed new file `{}`'.format(filename)
917 917 assert_session_flash(response, expected_msg)
918 918
919 919 def test_edit_file_view(self, backend):
920 920 response = self.app.get(
921 921 route_path('repo_files_edit_file',
922 922 repo_name=backend.repo_name,
923 923 commit_id=backend.default_head_id,
924 924 f_path='vcs/nodes.py'),
925 925 status=200)
926 926 response.mustcontain("Module holding everything related to vcs nodes.")
927 927
928 928 def test_edit_file_view_not_on_branch(self, backend):
929 929 repo = backend.create_repo()
930 930 backend.ensure_file(b"vcs/nodes.py")
931 931
932 932 response = self.app.get(
933 933 route_path('repo_files_edit_file',
934 934 repo_name=repo.repo_name,
935 935 commit_id='tip',
936 936 f_path='vcs/nodes.py'),
937 937 status=302)
938 938 assert_session_flash(
939 939 response, 'Cannot modify file. Given commit `tip` is not head of a branch.')
940 940
941 941 def test_edit_file_view_commit_changes(self, backend, csrf_token):
942 942 repo = backend.create_repo()
943 943 backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'")
944 944
945 945 response = self.app.post(
946 946 route_path('repo_files_update_file',
947 947 repo_name=repo.repo_name,
948 948 commit_id=backend.default_head_id,
949 949 f_path='vcs/nodes.py'),
950 950 params={
951 951 'content': "print 'hello world'",
952 952 'message': 'I committed',
953 953 'filename': "vcs/nodes.py",
954 954 'csrf_token': csrf_token,
955 955 },
956 956 status=302)
957 957 assert_session_flash(
958 958 response, 'Successfully committed changes to file `vcs/nodes.py`')
959 959 tip = repo.get_commit(commit_idx=-1)
960 960 assert tip.message == 'I committed'
961 961
962 962 def test_edit_file_view_commit_changes_default_message(self, backend,
963 963 csrf_token):
964 964 repo = backend.create_repo()
965 965 backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'")
966 966
967 967 commit_id = (
968 968 backend.default_branch_name or
969 969 backend.repo.scm_instance().commit_ids[-1])
970 970
971 971 response = self.app.post(
972 972 route_path('repo_files_update_file',
973 973 repo_name=repo.repo_name,
974 974 commit_id=commit_id,
975 975 f_path='vcs/nodes.py'),
976 976 params={
977 977 'content': "print 'hello world'",
978 978 'message': '',
979 979 'filename': "vcs/nodes.py",
980 980 'csrf_token': csrf_token,
981 981 },
982 982 status=302)
983 983 assert_session_flash(
984 984 response, 'Successfully committed changes to file `vcs/nodes.py`')
985 985 tip = repo.get_commit(commit_idx=-1)
986 986 assert tip.message == 'Edited file vcs/nodes.py via RhodeCode Enterprise'
987 987
988 988 def test_delete_file_view(self, backend):
989 989 self.app.get(
990 990 route_path('repo_files_remove_file',
991 991 repo_name=backend.repo_name,
992 992 commit_id=backend.default_head_id,
993 993 f_path='vcs/nodes.py'),
994 994 status=200)
995 995
996 996 def test_delete_file_view_not_on_branch(self, backend):
997 997 repo = backend.create_repo()
998 998 backend.ensure_file(b'vcs/nodes.py')
999 999
1000 1000 response = self.app.get(
1001 1001 route_path('repo_files_remove_file',
1002 1002 repo_name=repo.repo_name,
1003 1003 commit_id='tip',
1004 1004 f_path='vcs/nodes.py'),
1005 1005 status=302)
1006 1006 assert_session_flash(
1007 1007 response, 'Cannot modify file. Given commit `tip` is not head of a branch.')
1008 1008
1009 1009 def test_delete_file_view_commit_changes(self, backend, csrf_token):
1010 1010 repo = backend.create_repo()
1011 1011 backend.ensure_file(b"vcs/nodes.py")
1012 1012
1013 1013 response = self.app.post(
1014 1014 route_path('repo_files_delete_file',
1015 1015 repo_name=repo.repo_name,
1016 1016 commit_id=backend.default_head_id,
1017 1017 f_path='vcs/nodes.py'),
1018 1018 params={
1019 1019 'message': 'i committed',
1020 1020 'csrf_token': csrf_token,
1021 1021 },
1022 1022 status=302)
1023 1023 assert_session_flash(
1024 1024 response, 'Successfully deleted file `vcs/nodes.py`')
1025 1025
1026 1026
1027 1027 @pytest.mark.usefixtures("app")
1028 1028 class TestFilesViewOtherCases(object):
1029 1029
1030 1030 def test_access_empty_repo_redirect_to_summary_with_alert_write_perms(
1031 1031 self, backend_stub, autologin_regular_user, user_regular,
1032 1032 user_util):
1033 1033
1034 1034 repo = backend_stub.create_repo()
1035 1035 user_util.grant_user_permission_to_repo(
1036 1036 repo, user_regular, 'repository.write')
1037 1037 response = self.app.get(
1038 1038 route_path('repo_files',
1039 1039 repo_name=repo.repo_name,
1040 1040 commit_id='tip', f_path='/'))
1041 1041
1042 1042 repo_file_add_url = route_path(
1043 1043 'repo_files_add_file',
1044 1044 repo_name=repo.repo_name,
1045 1045 commit_id=0, f_path='')
1046 1046 add_new = f'<a class="alert-link" href="{repo_file_add_url}">add a new file</a>'
1047 1047
1048 1048 repo_file_upload_url = route_path(
1049 1049 'repo_files_upload_file',
1050 1050 repo_name=repo.repo_name,
1051 1051 commit_id=0, f_path='')
1052 1052 upload_new = f'<a class="alert-link" href="{repo_file_upload_url}">upload a new file</a>'
1053 1053
1054 1054 assert_session_flash(
1055 1055 response,
1056 1056 'There are no files yet. Click here to %s or %s.' % (add_new, upload_new)
1057 1057 )
1058 1058
1059 1059 def test_access_empty_repo_redirect_to_summary_with_alert_no_write_perms(
1060 1060 self, backend_stub, autologin_regular_user):
1061 1061 repo = backend_stub.create_repo()
1062 1062 # init session for anon user
1063 1063 route_path('repo_summary', repo_name=repo.repo_name)
1064 1064
1065 1065 repo_file_add_url = route_path(
1066 1066 'repo_files_add_file',
1067 1067 repo_name=repo.repo_name,
1068 1068 commit_id=0, f_path='')
1069 1069
1070 1070 response = self.app.get(
1071 1071 route_path('repo_files',
1072 1072 repo_name=repo.repo_name,
1073 1073 commit_id='tip', f_path='/'))
1074 1074
1075 1075 assert_session_flash(response, no_=repo_file_add_url)
1076 1076
1077 1077 @pytest.mark.parametrize('file_node', [
1078 1078 b'archive/file.zip',
1079 1079 b'diff/my-file.txt',
1080 1080 b'render.py',
1081 1081 b'render',
1082 1082 b'remove_file',
1083 1083 b'remove_file/to-delete.txt',
1084 1084 ])
1085 1085 def test_file_names_equal_to_routes_parts(self, backend, file_node):
1086 1086 backend.create_repo()
1087 1087 backend.ensure_file(file_node)
1088 1088
1089 1089 self.app.get(
1090 1090 route_path('repo_files',
1091 1091 repo_name=backend.repo_name,
1092 1092 commit_id='tip', f_path=safe_str(file_node)),
1093 1093 status=200)
1094 1094
1095 1095
1096 1096 class TestAdjustFilePathForSvn(object):
1097 1097 """
1098 1098 SVN specific adjustments of node history in RepoFilesView.
1099 1099 """
1100 1100
1101 1101 def test_returns_path_relative_to_matched_reference(self):
1102 1102 repo = self._repo(branches=['trunk'])
1103 1103 self.assert_file_adjustment('trunk/file', 'file', repo)
1104 1104
1105 1105 def test_does_not_modify_file_if_no_reference_matches(self):
1106 1106 repo = self._repo(branches=['trunk'])
1107 1107 self.assert_file_adjustment('notes/file', 'notes/file', repo)
1108 1108
1109 1109 def test_does_not_adjust_partial_directory_names(self):
1110 1110 repo = self._repo(branches=['trun'])
1111 1111 self.assert_file_adjustment('trunk/file', 'trunk/file', repo)
1112 1112
1113 1113 def test_is_robust_to_patterns_which_prefix_other_patterns(self):
1114 1114 repo = self._repo(branches=['trunk', 'trunk/new', 'trunk/old'])
1115 1115 self.assert_file_adjustment('trunk/new/file', 'file', repo)
1116 1116
1117 1117 def assert_file_adjustment(self, f_path, expected, repo):
1118 1118 result = RepoFilesView.adjust_file_path_for_svn(f_path, repo)
1119 1119 assert result == expected
1120 1120
1121 1121 def _repo(self, branches=None):
1122 1122 repo = mock.Mock()
1123 1123 repo.branches = OrderedDict((name, '0') for name in branches or [])
1124 1124 repo.tags = {}
1125 1125 return repo
@@ -1,197 +1,197 b''
1 1
2 2
3 3 # Copyright (C) 2012-2023 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import re
21 21
22 22 import pygments.filter
23 23 import pygments.filters
24 24 from pygments.token import Comment
25 25
26 26 HL_BEG_MARKER = '__RCSearchHLMarkBEG__'
27 27 HL_END_MARKER = '__RCSearchHLMarkEND__'
28 28 HL_MARKER_RE = '{}(.*?){}'.format(HL_BEG_MARKER, HL_END_MARKER)
29 29
30 30
31 31 class ElasticSearchHLFilter(pygments.filters.Filter):
32 32 _names = [HL_BEG_MARKER, HL_END_MARKER]
33 33
34 34 def __init__(self, **options):
35 35 pygments.filters.Filter.__init__(self, **options)
36 36
37 37 def filter(self, lexer, stream):
38 38 def tokenize(_value):
39 39 for token in re.split('({}|{})'.format(
40 40 self._names[0], self._names[1]), _value):
41 41 if token:
42 42 yield token
43 43
44 44 hl = False
45 45 for ttype, value in stream:
46 46
47 47 if self._names[0] in value or self._names[1] in value:
48 48 for item in tokenize(value):
49 49 if item == self._names[0]:
50 50 # skip marker, but start HL
51 51 hl = True
52 52 continue
53 53 elif item == self._names[1]:
54 54 hl = False
55 55 continue
56 56
57 57 if hl:
58 58 yield Comment.ElasticMatch, item
59 59 else:
60 60 yield ttype, item
61 61 else:
62 62 if hl:
63 63 yield Comment.ElasticMatch, value
64 64 else:
65 65 yield ttype, value
66 66
67 67
68 68 def extract_phrases(text_query):
69 69 """
70 70 Extracts phrases from search term string making sure phrases
71 71 contained in double quotes are kept together - and discarding empty values
72 72 or fully whitespace values eg.
73 73
74 74 'some text "a phrase" more' => ['some', 'text', 'a phrase', 'more']
75 75
76 76 """
77 77
78 78 in_phrase = False
79 79 buf = ''
80 80 phrases = []
81 81 for char in text_query:
82 82 if in_phrase:
83 83 if char == '"': # end phrase
84 84 phrases.append(buf)
85 85 buf = ''
86 86 in_phrase = False
87 87 continue
88 88 else:
89 89 buf += char
90 90 continue
91 91 else:
92 92 if char == '"': # start phrase
93 93 in_phrase = True
94 94 phrases.append(buf)
95 95 buf = ''
96 96 continue
97 97 elif char == ' ':
98 98 phrases.append(buf)
99 99 buf = ''
100 100 continue
101 101 else:
102 102 buf += char
103 103
104 104 phrases.append(buf)
105 105 phrases = [phrase.strip() for phrase in phrases if phrase.strip()]
106 106 return phrases
107 107
108 108
109 109 def get_matching_phrase_offsets(text, phrases):
110 110 """
111 111 Returns a list of string offsets in `text` that the list of `terms` match
112 112
113 113 >>> get_matching_phrase_offsets('some text here', ['some', 'here'])
114 114 [(0, 4), (10, 14)]
115 115
116 116 """
117 117 phrases = phrases or []
118 118 offsets = []
119 119
120 120 for phrase in phrases:
121 121 for match in re.finditer(phrase, text):
122 122 offsets.append((match.start(), match.end()))
123 123
124 124 return offsets
125 125
126 126
127 127 def get_matching_markers_offsets(text, markers=None):
128 """
128 r"""
129 129 Returns a list of string offsets in `text` that the are between matching markers
130 130
131 131 >>> get_matching_markers_offsets('$1some$2 text $1here$2 marked', ['\$1(.*?)\$2'])
132 132 [(0, 5), (16, 22)]
133 133
134 134 """
135 135 markers = markers or [HL_MARKER_RE]
136 136 offsets = []
137 137
138 138 if markers:
139 139 for mark in markers:
140 140 for match in re.finditer(mark, text):
141 141 offsets.append((match.start(), match.end()))
142 142
143 143 return offsets
144 144
145 145
146 146 def normalize_text_for_matching(x):
147 147 """
148 148 Replaces all non alfanum characters to spaces and lower cases the string,
149 149 useful for comparing two text strings without punctuation
150 150 """
151 151 return re.sub(r'\W', ' ', x.lower())
152 152
153 153
154 154 def get_matching_line_offsets(lines, terms=None, markers=None):
155 155 """ Return a set of `lines` indices (starting from 1) matching a
156 156 text search query, along with `context` lines above/below matching lines
157 157
158 158 :param lines: list of strings representing lines
159 159 :param terms: search term string to match in lines eg. 'some text'
160 160 :param markers: instead of terms, use highlight markers instead that
161 161 mark beginning and end for matched item. eg. ['START(.*?)END']
162 162
163 163 eg.
164 164
165 165 text = '''
166 166 words words words
167 167 words words words
168 168 some text some
169 169 words words words
170 170 words words words
171 171 text here what
172 172 '''
173 173 get_matching_line_offsets(text, 'text', context=1)
174 174 6, {3: [(5, 9)], 6: [(0, 4)]]
175 175
176 176 """
177 177 matching_lines = {}
178 178 line_index = 0
179 179
180 180 if terms:
181 181 phrases = [normalize_text_for_matching(phrase)
182 182 for phrase in extract_phrases(terms)]
183 183
184 184 for line_index, line in enumerate(lines.splitlines(), start=1):
185 185 normalized_line = normalize_text_for_matching(line)
186 186 match_offsets = get_matching_phrase_offsets(normalized_line, phrases)
187 187 if match_offsets:
188 188 matching_lines[line_index] = match_offsets
189 189
190 190 else:
191 191 markers = markers or [HL_MARKER_RE]
192 192 for line_index, line in enumerate(lines.splitlines(), start=1):
193 193 match_offsets = get_matching_markers_offsets(line, markers=markers)
194 194 if match_offsets:
195 195 matching_lines[line_index] = match_offsets
196 196
197 197 return line_index, matching_lines
@@ -1,1730 +1,1735 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import collections
21 21 import datetime
22 22 import os
23 23 import re
24 24 import pprint
25 25 import shutil
26 26 import socket
27 27 import subprocess
28 28 import time
29 29 import uuid
30 30 import dateutil.tz
31 31 import logging
32 32 import functools
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39 import pyramid.paster
40 40
41 41 import rhodecode
42 42 import rhodecode.lib
43 43 from rhodecode.model.changeset_status import ChangesetStatusModel
44 44 from rhodecode.model.comment import CommentsModel
45 45 from rhodecode.model.db import (
46 46 PullRequest, PullRequestReviewers, Repository, RhodeCodeSetting, ChangesetStatus,
47 47 RepoGroup, UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
48 48 from rhodecode.model.meta import Session
49 49 from rhodecode.model.pull_request import PullRequestModel
50 50 from rhodecode.model.repo import RepoModel
51 51 from rhodecode.model.repo_group import RepoGroupModel
52 52 from rhodecode.model.user import UserModel
53 53 from rhodecode.model.settings import VcsSettingsModel
54 54 from rhodecode.model.user_group import UserGroupModel
55 55 from rhodecode.model.integration import IntegrationModel
56 56 from rhodecode.integrations import integration_type_registry
57 57 from rhodecode.integrations.types.base import IntegrationTypeBase
58 58 from rhodecode.lib.utils import repo2db_mapper
59 59 from rhodecode.lib.str_utils import safe_bytes
60 60 from rhodecode.lib.hash_utils import sha1_safe
61 61 from rhodecode.lib.vcs.backends import get_backend
62 62 from rhodecode.lib.vcs.nodes import FileNode
63 63 from rhodecode.tests import (
64 64 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
65 65 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
66 66 TEST_USER_REGULAR_PASS)
67 67 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
68 68 from rhodecode.tests.fixture import Fixture
69 69 from rhodecode.config import utils as config_utils
70 70
71 71 log = logging.getLogger(__name__)
72 72
73 73
74 74 def cmp(a, b):
75 75 # backport cmp from python2 so we can still use it in the custom code in this module
76 76 return (a > b) - (a < b)
77 77
78 78
79 79 @pytest.fixture(scope='session', autouse=True)
80 80 def activate_example_rcextensions(request):
81 81 """
82 82 Patch in an example rcextensions module which verifies passed in kwargs.
83 83 """
84 84 from rhodecode.config import rcextensions
85 85
86 86 old_extensions = rhodecode.EXTENSIONS
87 87 rhodecode.EXTENSIONS = rcextensions
88 88 rhodecode.EXTENSIONS.calls = collections.defaultdict(list)
89 89
90 90 @request.addfinalizer
91 91 def cleanup():
92 92 rhodecode.EXTENSIONS = old_extensions
93 93
94 94
95 95 @pytest.fixture()
96 96 def capture_rcextensions():
97 97 """
98 98 Returns the recorded calls to entry points in rcextensions.
99 99 """
100 100 calls = rhodecode.EXTENSIONS.calls
101 101 calls.clear()
102 102 # Note: At this moment, it is still the empty dict, but that will
103 103 # be filled during the test run and since it is a reference this
104 104 # is enough to make it work.
105 105 return calls
106 106
107 107
108 108 @pytest.fixture(scope='session')
109 109 def http_environ_session():
110 110 """
111 111 Allow to use "http_environ" in session scope.
112 112 """
113 113 return plain_http_environ()
114 114
115 115
116 116 def plain_http_host_stub():
117 117 """
118 118 Value of HTTP_HOST in the test run.
119 119 """
120 120 return 'example.com:80'
121 121
122 122
123 123 @pytest.fixture()
124 124 def http_host_stub():
125 125 """
126 126 Value of HTTP_HOST in the test run.
127 127 """
128 128 return plain_http_host_stub()
129 129
130 130
131 131 def plain_http_host_only_stub():
132 132 """
133 133 Value of HTTP_HOST in the test run.
134 134 """
135 135 return plain_http_host_stub().split(':')[0]
136 136
137 137
138 138 @pytest.fixture()
139 139 def http_host_only_stub():
140 140 """
141 141 Value of HTTP_HOST in the test run.
142 142 """
143 143 return plain_http_host_only_stub()
144 144
145 145
146 146 def plain_http_environ():
147 147 """
148 148 HTTP extra environ keys.
149 149
150 150 User by the test application and as well for setting up the pylons
151 151 environment. In the case of the fixture "app" it should be possible
152 152 to override this for a specific test case.
153 153 """
154 154 return {
155 155 'SERVER_NAME': plain_http_host_only_stub(),
156 156 'SERVER_PORT': plain_http_host_stub().split(':')[1],
157 157 'HTTP_HOST': plain_http_host_stub(),
158 158 'HTTP_USER_AGENT': 'rc-test-agent',
159 159 'REQUEST_METHOD': 'GET'
160 160 }
161 161
162 162
163 163 @pytest.fixture()
164 164 def http_environ():
165 165 """
166 166 HTTP extra environ keys.
167 167
168 168 User by the test application and as well for setting up the pylons
169 169 environment. In the case of the fixture "app" it should be possible
170 170 to override this for a specific test case.
171 171 """
172 172 return plain_http_environ()
173 173
174 174
175 175 @pytest.fixture(scope='session')
176 176 def baseapp(ini_config, vcsserver, http_environ_session):
177 177 from rhodecode.lib.pyramid_utils import get_app_config
178 178 from rhodecode.config.middleware import make_pyramid_app
179 179
180 180 log.info("Using the RhodeCode configuration:{}".format(ini_config))
181 181 pyramid.paster.setup_logging(ini_config)
182 182
183 183 settings = get_app_config(ini_config)
184 184 app = make_pyramid_app({'__file__': ini_config}, **settings)
185 185
186 186 return app
187 187
188 188
189 189 @pytest.fixture(scope='function')
190 190 def app(request, config_stub, baseapp, http_environ):
191 191 app = CustomTestApp(
192 192 baseapp,
193 193 extra_environ=http_environ)
194 194 if request.cls:
195 195 request.cls.app = app
196 196 return app
197 197
198 198
199 199 @pytest.fixture(scope='session')
200 200 def app_settings(baseapp, ini_config):
201 201 """
202 202 Settings dictionary used to create the app.
203 203
204 204 Parses the ini file and passes the result through the sanitize and apply
205 205 defaults mechanism in `rhodecode.config.middleware`.
206 206 """
207 207 return baseapp.config.get_settings()
208 208
209 209
210 210 @pytest.fixture(scope='session')
211 211 def db_connection(ini_settings):
212 212 # Initialize the database connection.
213 213 config_utils.initialize_database(ini_settings)
214 214
215 215
216 216 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
217 217
218 218
219 219 def _autologin_user(app, *args):
220 220 session = login_user_session(app, *args)
221 221 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
222 222 return LoginData(csrf_token, session['rhodecode_user'])
223 223
224 224
225 225 @pytest.fixture()
226 226 def autologin_user(app):
227 227 """
228 228 Utility fixture which makes sure that the admin user is logged in
229 229 """
230 230 return _autologin_user(app)
231 231
232 232
233 233 @pytest.fixture()
234 234 def autologin_regular_user(app):
235 235 """
236 236 Utility fixture which makes sure that the regular user is logged in
237 237 """
238 238 return _autologin_user(
239 239 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
240 240
241 241
242 242 @pytest.fixture(scope='function')
243 243 def csrf_token(request, autologin_user):
244 244 return autologin_user.csrf_token
245 245
246 246
247 247 @pytest.fixture(scope='function')
248 248 def xhr_header(request):
249 249 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
250 250
251 251
252 252 @pytest.fixture()
253 253 def real_crypto_backend(monkeypatch):
254 254 """
255 255 Switch the production crypto backend on for this test.
256 256
257 257 During the test run the crypto backend is replaced with a faster
258 258 implementation based on the MD5 algorithm.
259 259 """
260 260 monkeypatch.setattr(rhodecode, 'is_test', False)
261 261
262 262
263 263 @pytest.fixture(scope='class')
264 264 def index_location(request, baseapp):
265 265 index_location = baseapp.config.get_settings()['search.location']
266 266 if request.cls:
267 267 request.cls.index_location = index_location
268 268 return index_location
269 269
270 270
271 271 @pytest.fixture(scope='session', autouse=True)
272 272 def tests_tmp_path(request):
273 273 """
274 274 Create temporary directory to be used during the test session.
275 275 """
276 276 if not os.path.exists(TESTS_TMP_PATH):
277 277 os.makedirs(TESTS_TMP_PATH)
278 278
279 279 if not request.config.getoption('--keep-tmp-path'):
280 280 @request.addfinalizer
281 281 def remove_tmp_path():
282 282 shutil.rmtree(TESTS_TMP_PATH)
283 283
284 284 return TESTS_TMP_PATH
285 285
286 286
287 287 @pytest.fixture()
288 288 def test_repo_group(request):
289 289 """
290 290 Create a temporary repository group, and destroy it after
291 291 usage automatically
292 292 """
293 293 fixture = Fixture()
294 294 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
295 295 repo_group = fixture.create_repo_group(repogroupid)
296 296
297 297 def _cleanup():
298 298 fixture.destroy_repo_group(repogroupid)
299 299
300 300 request.addfinalizer(_cleanup)
301 301 return repo_group
302 302
303 303
304 304 @pytest.fixture()
305 305 def test_user_group(request):
306 306 """
307 307 Create a temporary user group, and destroy it after
308 308 usage automatically
309 309 """
310 310 fixture = Fixture()
311 311 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
312 312 user_group = fixture.create_user_group(usergroupid)
313 313
314 314 def _cleanup():
315 315 fixture.destroy_user_group(user_group)
316 316
317 317 request.addfinalizer(_cleanup)
318 318 return user_group
319 319
320 320
321 321 @pytest.fixture(scope='session')
322 322 def test_repo(request):
323 323 container = TestRepoContainer()
324 324 request.addfinalizer(container._cleanup)
325 325 return container
326 326
327 327
328 328 class TestRepoContainer(object):
329 329 """
330 330 Container for test repositories which are used read only.
331 331
332 332 Repositories will be created on demand and re-used during the lifetime
333 333 of this object.
334 334
335 335 Usage to get the svn test repository "minimal"::
336 336
337 337 test_repo = TestContainer()
338 338 repo = test_repo('minimal', 'svn')
339 339
340 340 """
341 341
342 342 dump_extractors = {
343 343 'git': utils.extract_git_repo_from_dump,
344 344 'hg': utils.extract_hg_repo_from_dump,
345 345 'svn': utils.extract_svn_repo_from_dump,
346 346 }
347 347
348 348 def __init__(self):
349 349 self._cleanup_repos = []
350 350 self._fixture = Fixture()
351 351 self._repos = {}
352 352
353 353 def __call__(self, dump_name, backend_alias, config=None):
354 354 key = (dump_name, backend_alias)
355 355 if key not in self._repos:
356 356 repo = self._create_repo(dump_name, backend_alias, config)
357 357 self._repos[key] = repo.repo_id
358 358 return Repository.get(self._repos[key])
359 359
360 360 def _create_repo(self, dump_name, backend_alias, config):
361 361 repo_name = '%s-%s' % (backend_alias, dump_name)
362 362 backend = get_backend(backend_alias)
363 363 dump_extractor = self.dump_extractors[backend_alias]
364 364 repo_path = dump_extractor(dump_name, repo_name)
365 365
366 366 vcs_repo = backend(repo_path, config=config)
367 367 repo2db_mapper({repo_name: vcs_repo})
368 368
369 369 repo = RepoModel().get_by_repo_name(repo_name)
370 370 self._cleanup_repos.append(repo_name)
371 371 return repo
372 372
373 373 def _cleanup(self):
374 374 for repo_name in reversed(self._cleanup_repos):
375 375 self._fixture.destroy_repo(repo_name)
376 376
377 377
378 378 def backend_base(request, backend_alias, baseapp, test_repo):
379 379 if backend_alias not in request.config.getoption('--backends'):
380 380 pytest.skip("Backend %s not selected." % (backend_alias, ))
381 381
382 382 utils.check_xfail_backends(request.node, backend_alias)
383 383 utils.check_skip_backends(request.node, backend_alias)
384 384
385 385 repo_name = 'vcs_test_%s' % (backend_alias, )
386 386 backend = Backend(
387 387 alias=backend_alias,
388 388 repo_name=repo_name,
389 389 test_name=request.node.name,
390 390 test_repo_container=test_repo)
391 391 request.addfinalizer(backend.cleanup)
392 392 return backend
393 393
394 394
395 395 @pytest.fixture()
396 396 def backend(request, backend_alias, baseapp, test_repo):
397 397 """
398 398 Parametrized fixture which represents a single backend implementation.
399 399
400 400 It respects the option `--backends` to focus the test run on specific
401 401 backend implementations.
402 402
403 403 It also supports `pytest.mark.xfail_backends` to mark tests as failing
404 404 for specific backends. This is intended as a utility for incremental
405 405 development of a new backend implementation.
406 406 """
407 407 return backend_base(request, backend_alias, baseapp, test_repo)
408 408
409 409
410 410 @pytest.fixture()
411 411 def backend_git(request, baseapp, test_repo):
412 412 return backend_base(request, 'git', baseapp, test_repo)
413 413
414 414
415 415 @pytest.fixture()
416 416 def backend_hg(request, baseapp, test_repo):
417 417 return backend_base(request, 'hg', baseapp, test_repo)
418 418
419 419
420 420 @pytest.fixture()
421 421 def backend_svn(request, baseapp, test_repo):
422 422 return backend_base(request, 'svn', baseapp, test_repo)
423 423
424 424
425 425 @pytest.fixture()
426 426 def backend_random(backend_git):
427 427 """
428 428 Use this to express that your tests need "a backend.
429 429
430 430 A few of our tests need a backend, so that we can run the code. This
431 431 fixture is intended to be used for such cases. It will pick one of the
432 432 backends and run the tests.
433 433
434 434 The fixture `backend` would run the test multiple times for each
435 435 available backend which is a pure waste of time if the test is
436 436 independent of the backend type.
437 437 """
438 438 # TODO: johbo: Change this to pick a random backend
439 439 return backend_git
440 440
441 441
442 442 @pytest.fixture()
443 443 def backend_stub(backend_git):
444 444 """
445 445 Use this to express that your tests need a backend stub
446 446
447 447 TODO: mikhail: Implement a real stub logic instead of returning
448 448 a git backend
449 449 """
450 450 return backend_git
451 451
452 452
453 453 @pytest.fixture()
454 454 def repo_stub(backend_stub):
455 455 """
456 456 Use this to express that your tests need a repository stub
457 457 """
458 458 return backend_stub.create_repo()
459 459
460 460
461 461 class Backend(object):
462 462 """
463 463 Represents the test configuration for one supported backend
464 464
465 465 Provides easy access to different test repositories based on
466 466 `__getitem__`. Such repositories will only be created once per test
467 467 session.
468 468 """
469 469
470 470 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
471 471 _master_repo = None
472 472 _master_repo_path = ''
473 473 _commit_ids = {}
474 474
475 475 def __init__(self, alias, repo_name, test_name, test_repo_container):
476 476 self.alias = alias
477 477 self.repo_name = repo_name
478 478 self._cleanup_repos = []
479 479 self._test_name = test_name
480 480 self._test_repo_container = test_repo_container
481 481 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
482 482 # Fixture will survive in the end.
483 483 self._fixture = Fixture()
484 484
485 485 def __getitem__(self, key):
486 486 return self._test_repo_container(key, self.alias)
487 487
488 488 def create_test_repo(self, key, config=None):
489 489 return self._test_repo_container(key, self.alias, config)
490 490
491 491 @property
492 def repo_id(self):
493 # just fake some repo_id
494 return self.repo.repo_id
495
496 @property
492 497 def repo(self):
493 498 """
494 499 Returns the "current" repository. This is the vcs_test repo or the
495 500 last repo which has been created with `create_repo`.
496 501 """
497 502 from rhodecode.model.db import Repository
498 503 return Repository.get_by_repo_name(self.repo_name)
499 504
500 505 @property
501 506 def default_branch_name(self):
502 507 VcsRepository = get_backend(self.alias)
503 508 return VcsRepository.DEFAULT_BRANCH_NAME
504 509
505 510 @property
506 511 def default_head_id(self):
507 512 """
508 513 Returns the default head id of the underlying backend.
509 514
510 515 This will be the default branch name in case the backend does have a
511 516 default branch. In the other cases it will point to a valid head
512 517 which can serve as the base to create a new commit on top of it.
513 518 """
514 519 vcsrepo = self.repo.scm_instance()
515 520 head_id = (
516 521 vcsrepo.DEFAULT_BRANCH_NAME or
517 522 vcsrepo.commit_ids[-1])
518 523 return head_id
519 524
520 525 @property
521 526 def commit_ids(self):
522 527 """
523 528 Returns the list of commits for the last created repository
524 529 """
525 530 return self._commit_ids
526 531
527 532 def create_master_repo(self, commits):
528 533 """
529 534 Create a repository and remember it as a template.
530 535
531 536 This allows to easily create derived repositories to construct
532 537 more complex scenarios for diff, compare and pull requests.
533 538
534 539 Returns a commit map which maps from commit message to raw_id.
535 540 """
536 541 self._master_repo = self.create_repo(commits=commits)
537 542 self._master_repo_path = self._master_repo.repo_full_path
538 543
539 544 return self._commit_ids
540 545
541 546 def create_repo(
542 547 self, commits=None, number_of_commits=0, heads=None,
543 548 name_suffix='', bare=False, **kwargs):
544 549 """
545 550 Create a repository and record it for later cleanup.
546 551
547 552 :param commits: Optional. A sequence of dict instances.
548 553 Will add a commit per entry to the new repository.
549 554 :param number_of_commits: Optional. If set to a number, this number of
550 555 commits will be added to the new repository.
551 556 :param heads: Optional. Can be set to a sequence of of commit
552 557 names which shall be pulled in from the master repository.
553 558 :param name_suffix: adds special suffix to generated repo name
554 559 :param bare: set a repo as bare (no checkout)
555 560 """
556 561 self.repo_name = self._next_repo_name() + name_suffix
557 562 repo = self._fixture.create_repo(
558 563 self.repo_name, repo_type=self.alias, bare=bare, **kwargs)
559 564 self._cleanup_repos.append(repo.repo_name)
560 565
561 566 commits = commits or [
562 567 {'message': 'Commit %s of %s' % (x, self.repo_name)}
563 568 for x in range(number_of_commits)]
564 569 vcs_repo = repo.scm_instance()
565 570 vcs_repo.count()
566 571 self._add_commits_to_repo(vcs_repo, commits)
567 572 if heads:
568 573 self.pull_heads(repo, heads)
569 574
570 575 return repo
571 576
572 577 def pull_heads(self, repo, heads):
573 578 """
574 579 Make sure that repo contains all commits mentioned in `heads`
575 580 """
576 581 vcsrepo = repo.scm_instance()
577 582 vcsrepo.config.clear_section('hooks')
578 583 commit_ids = [self._commit_ids[h] for h in heads]
579 584 vcsrepo.pull(self._master_repo_path, commit_ids=commit_ids)
580 585
581 586 def create_fork(self):
582 587 repo_to_fork = self.repo_name
583 588 self.repo_name = self._next_repo_name()
584 589 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
585 590 self._cleanup_repos.append(self.repo_name)
586 591 return repo
587 592
588 593 def new_repo_name(self, suffix=''):
589 594 self.repo_name = self._next_repo_name() + suffix
590 595 self._cleanup_repos.append(self.repo_name)
591 596 return self.repo_name
592 597
593 598 def _next_repo_name(self):
594 599 return u"%s_%s" % (
595 600 self.invalid_repo_name.sub('_', self._test_name), len(self._cleanup_repos))
596 601
597 602 def ensure_file(self, filename, content='Test content\n'):
598 603 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
599 604 commits = [
600 605 {'added': [
601 606 FileNode(filename, content=content),
602 607 ]},
603 608 ]
604 609 self._add_commits_to_repo(self.repo.scm_instance(), commits)
605 610
606 611 def enable_downloads(self):
607 612 repo = self.repo
608 613 repo.enable_downloads = True
609 614 Session().add(repo)
610 615 Session().commit()
611 616
612 617 def cleanup(self):
613 618 for repo_name in reversed(self._cleanup_repos):
614 619 self._fixture.destroy_repo(repo_name)
615 620
616 621 def _add_commits_to_repo(self, repo, commits):
617 622 commit_ids = _add_commits_to_repo(repo, commits)
618 623 if not commit_ids:
619 624 return
620 625 self._commit_ids = commit_ids
621 626
622 627 # Creating refs for Git to allow fetching them from remote repository
623 628 if self.alias == 'git':
624 629 refs = {}
625 630 for message in self._commit_ids:
626 631 # TODO: mikhail: do more special chars replacements
627 632 ref_name = 'refs/test-refs/{}'.format(
628 633 message.replace(' ', ''))
629 634 refs[ref_name] = self._commit_ids[message]
630 635 self._create_refs(repo, refs)
631 636
632 637 def _create_refs(self, repo, refs):
633 638 for ref_name in refs:
634 639 repo.set_refs(ref_name, refs[ref_name])
635 640
636 641
637 642 class VcsBackend(object):
638 643 """
639 644 Represents the test configuration for one supported vcs backend.
640 645 """
641 646
642 647 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
643 648
644 649 def __init__(self, alias, repo_path, test_name, test_repo_container):
645 650 self.alias = alias
646 651 self._repo_path = repo_path
647 652 self._cleanup_repos = []
648 653 self._test_name = test_name
649 654 self._test_repo_container = test_repo_container
650 655
651 656 def __getitem__(self, key):
652 657 return self._test_repo_container(key, self.alias).scm_instance()
653 658
654 659 def __repr__(self):
655 660 return f'{self.__class__.__name__}(alias={self.alias}, repo={self._repo_path})'
656 661
657 662 @property
658 663 def repo(self):
659 664 """
660 665 Returns the "current" repository. This is the vcs_test repo of the last
661 666 repo which has been created.
662 667 """
663 668 Repository = get_backend(self.alias)
664 669 return Repository(self._repo_path)
665 670
666 671 @property
667 672 def backend(self):
668 673 """
669 674 Returns the backend implementation class.
670 675 """
671 676 return get_backend(self.alias)
672 677
673 678 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None,
674 679 bare=False):
675 680 repo_name = self._next_repo_name()
676 681 self._repo_path = get_new_dir(repo_name)
677 682 repo_class = get_backend(self.alias)
678 683 src_url = None
679 684 if _clone_repo:
680 685 src_url = _clone_repo.path
681 686 repo = repo_class(self._repo_path, create=True, src_url=src_url, bare=bare)
682 687 self._cleanup_repos.append(repo)
683 688
684 689 commits = commits or [
685 690 {'message': 'Commit %s of %s' % (x, repo_name)}
686 691 for x in range(number_of_commits)]
687 692 _add_commits_to_repo(repo, commits)
688 693 return repo
689 694
690 695 def clone_repo(self, repo):
691 696 return self.create_repo(_clone_repo=repo)
692 697
693 698 def cleanup(self):
694 699 for repo in self._cleanup_repos:
695 700 shutil.rmtree(repo.path)
696 701
697 702 def new_repo_path(self):
698 703 repo_name = self._next_repo_name()
699 704 self._repo_path = get_new_dir(repo_name)
700 705 return self._repo_path
701 706
702 707 def _next_repo_name(self):
703 708
704 709 return "{}_{}".format(
705 710 self.invalid_repo_name.sub('_', self._test_name),
706 711 len(self._cleanup_repos)
707 712 )
708 713
709 714 def add_file(self, repo, filename, content='Test content\n'):
710 715 imc = repo.in_memory_commit
711 716 imc.add(FileNode(safe_bytes(filename), content=safe_bytes(content)))
712 717 imc.commit(
713 718 message='Automatic commit from vcsbackend fixture',
714 719 author='Automatic <automatic@rhodecode.com>')
715 720
716 721 def ensure_file(self, filename, content='Test content\n'):
717 722 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
718 723 self.add_file(self.repo, filename, content)
719 724
720 725
721 726 def vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo) -> VcsBackend:
722 727 if backend_alias not in request.config.getoption('--backends'):
723 728 pytest.skip("Backend %s not selected." % (backend_alias, ))
724 729
725 730 utils.check_xfail_backends(request.node, backend_alias)
726 731 utils.check_skip_backends(request.node, backend_alias)
727 732
728 733 repo_name = f'vcs_test_{backend_alias}'
729 734 repo_path = os.path.join(tests_tmp_path, repo_name)
730 735 backend = VcsBackend(
731 736 alias=backend_alias,
732 737 repo_path=repo_path,
733 738 test_name=request.node.name,
734 739 test_repo_container=test_repo)
735 740 request.addfinalizer(backend.cleanup)
736 741 return backend
737 742
738 743
739 744 @pytest.fixture()
740 745 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
741 746 """
742 747 Parametrized fixture which represents a single vcs backend implementation.
743 748
744 749 See the fixture `backend` for more details. This one implements the same
745 750 concept, but on vcs level. So it does not provide model instances etc.
746 751
747 752 Parameters are generated dynamically, see :func:`pytest_generate_tests`
748 753 for how this works.
749 754 """
750 755 return vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo)
751 756
752 757
753 758 @pytest.fixture()
754 759 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
755 760 return vcsbackend_base(request, 'git', tests_tmp_path, baseapp, test_repo)
756 761
757 762
758 763 @pytest.fixture()
759 764 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
760 765 return vcsbackend_base(request, 'hg', tests_tmp_path, baseapp, test_repo)
761 766
762 767
763 768 @pytest.fixture()
764 769 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
765 770 return vcsbackend_base(request, 'svn', tests_tmp_path, baseapp, test_repo)
766 771
767 772
768 773 @pytest.fixture()
769 774 def vcsbackend_stub(vcsbackend_git):
770 775 """
771 776 Use this to express that your test just needs a stub of a vcsbackend.
772 777
773 778 Plan is to eventually implement an in-memory stub to speed tests up.
774 779 """
775 780 return vcsbackend_git
776 781
777 782
778 783 def _add_commits_to_repo(vcs_repo, commits):
779 784 commit_ids = {}
780 785 if not commits:
781 786 return commit_ids
782 787
783 788 imc = vcs_repo.in_memory_commit
784 789 commit = None
785 790
786 791 for idx, commit in enumerate(commits):
787 792 message = str(commit.get('message', 'Commit %s' % idx))
788 793
789 794 for node in commit.get('added', []):
790 795 imc.add(FileNode(safe_bytes(node.path), content=node.content))
791 796 for node in commit.get('changed', []):
792 797 imc.change(FileNode(safe_bytes(node.path), content=node.content))
793 798 for node in commit.get('removed', []):
794 799 imc.remove(FileNode(safe_bytes(node.path)))
795 800
796 801 parents = [
797 802 vcs_repo.get_commit(commit_id=commit_ids[p])
798 803 for p in commit.get('parents', [])]
799 804
800 805 operations = ('added', 'changed', 'removed')
801 806 if not any((commit.get(o) for o in operations)):
802 807 imc.add(FileNode(b'file_%b' % safe_bytes(str(idx)), content=safe_bytes(message)))
803 808
804 809 commit = imc.commit(
805 810 message=message,
806 811 author=str(commit.get('author', 'Automatic <automatic@rhodecode.com>')),
807 812 date=commit.get('date'),
808 813 branch=commit.get('branch'),
809 814 parents=parents)
810 815
811 816 commit_ids[commit.message] = commit.raw_id
812 817
813 818 return commit_ids
814 819
815 820
816 821 @pytest.fixture()
817 822 def reposerver(request):
818 823 """
819 824 Allows to serve a backend repository
820 825 """
821 826
822 827 repo_server = RepoServer()
823 828 request.addfinalizer(repo_server.cleanup)
824 829 return repo_server
825 830
826 831
827 832 class RepoServer(object):
828 833 """
829 834 Utility to serve a local repository for the duration of a test case.
830 835
831 836 Supports only Subversion so far.
832 837 """
833 838
834 839 url = None
835 840
836 841 def __init__(self):
837 842 self._cleanup_servers = []
838 843
839 844 def serve(self, vcsrepo):
840 845 if vcsrepo.alias != 'svn':
841 846 raise TypeError("Backend %s not supported" % vcsrepo.alias)
842 847
843 848 proc = subprocess.Popen(
844 849 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
845 850 '--root', vcsrepo.path])
846 851 self._cleanup_servers.append(proc)
847 852 self.url = 'svn://localhost'
848 853
849 854 def cleanup(self):
850 855 for proc in self._cleanup_servers:
851 856 proc.terminate()
852 857
853 858
854 859 @pytest.fixture()
855 860 def pr_util(backend, request, config_stub):
856 861 """
857 862 Utility for tests of models and for functional tests around pull requests.
858 863
859 864 It gives an instance of :class:`PRTestUtility` which provides various
860 865 utility methods around one pull request.
861 866
862 867 This fixture uses `backend` and inherits its parameterization.
863 868 """
864 869
865 870 util = PRTestUtility(backend)
866 871 request.addfinalizer(util.cleanup)
867 872
868 873 return util
869 874
870 875
871 876 class PRTestUtility(object):
872 877
873 878 pull_request = None
874 879 pull_request_id = None
875 880 mergeable_patcher = None
876 881 mergeable_mock = None
877 882 notification_patcher = None
878 883
879 884 def __init__(self, backend):
880 885 self.backend = backend
881 886
882 887 def create_pull_request(
883 888 self, commits=None, target_head=None, source_head=None,
884 889 revisions=None, approved=False, author=None, mergeable=False,
885 890 enable_notifications=True, name_suffix='', reviewers=None, observers=None,
886 891 title=u"Test", description=u"Description"):
887 892 self.set_mergeable(mergeable)
888 893 if not enable_notifications:
889 894 # mock notification side effect
890 895 self.notification_patcher = mock.patch(
891 896 'rhodecode.model.notification.NotificationModel.create')
892 897 self.notification_patcher.start()
893 898
894 899 if not self.pull_request:
895 900 if not commits:
896 901 commits = [
897 902 {'message': 'c1'},
898 903 {'message': 'c2'},
899 904 {'message': 'c3'},
900 905 ]
901 906 target_head = 'c1'
902 907 source_head = 'c2'
903 908 revisions = ['c2']
904 909
905 910 self.commit_ids = self.backend.create_master_repo(commits)
906 911 self.target_repository = self.backend.create_repo(
907 912 heads=[target_head], name_suffix=name_suffix)
908 913 self.source_repository = self.backend.create_repo(
909 914 heads=[source_head], name_suffix=name_suffix)
910 915 self.author = author or UserModel().get_by_username(
911 916 TEST_USER_ADMIN_LOGIN)
912 917
913 918 model = PullRequestModel()
914 919 self.create_parameters = {
915 920 'created_by': self.author,
916 921 'source_repo': self.source_repository.repo_name,
917 922 'source_ref': self._default_branch_reference(source_head),
918 923 'target_repo': self.target_repository.repo_name,
919 924 'target_ref': self._default_branch_reference(target_head),
920 925 'revisions': [self.commit_ids[r] for r in revisions],
921 926 'reviewers': reviewers or self._get_reviewers(),
922 927 'observers': observers or self._get_observers(),
923 928 'title': title,
924 929 'description': description,
925 930 }
926 931 self.pull_request = model.create(**self.create_parameters)
927 932 assert model.get_versions(self.pull_request) == []
928 933
929 934 self.pull_request_id = self.pull_request.pull_request_id
930 935
931 936 if approved:
932 937 self.approve()
933 938
934 939 Session().add(self.pull_request)
935 940 Session().commit()
936 941
937 942 return self.pull_request
938 943
939 944 def approve(self):
940 945 self.create_status_votes(
941 946 ChangesetStatus.STATUS_APPROVED,
942 947 *self.pull_request.reviewers)
943 948
944 949 def close(self):
945 950 PullRequestModel().close_pull_request(self.pull_request, self.author)
946 951
947 952 def _default_branch_reference(self, commit_message):
948 953 reference = '%s:%s:%s' % (
949 954 'branch',
950 955 self.backend.default_branch_name,
951 956 self.commit_ids[commit_message])
952 957 return reference
953 958
954 959 def _get_reviewers(self):
955 960 role = PullRequestReviewers.ROLE_REVIEWER
956 961 return [
957 962 (TEST_USER_REGULAR_LOGIN, ['default1'], False, role, []),
958 963 (TEST_USER_REGULAR2_LOGIN, ['default2'], False, role, []),
959 964 ]
960 965
961 966 def _get_observers(self):
962 967 return [
963 968
964 969 ]
965 970
966 971 def update_source_repository(self, head=None):
967 972 heads = [head or 'c3']
968 973 self.backend.pull_heads(self.source_repository, heads=heads)
969 974
970 975 def add_one_commit(self, head=None):
971 976 self.update_source_repository(head=head)
972 977 old_commit_ids = set(self.pull_request.revisions)
973 978 PullRequestModel().update_commits(self.pull_request, self.pull_request.author)
974 979 commit_ids = set(self.pull_request.revisions)
975 980 new_commit_ids = commit_ids - old_commit_ids
976 981 assert len(new_commit_ids) == 1
977 982 return new_commit_ids.pop()
978 983
979 984 def remove_one_commit(self):
980 985 assert len(self.pull_request.revisions) == 2
981 986 source_vcs = self.source_repository.scm_instance()
982 987 removed_commit_id = source_vcs.commit_ids[-1]
983 988
984 989 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
985 990 # remove the if once that's sorted out.
986 991 if self.backend.alias == "git":
987 992 kwargs = {'branch_name': self.backend.default_branch_name}
988 993 else:
989 994 kwargs = {}
990 995 source_vcs.strip(removed_commit_id, **kwargs)
991 996
992 997 PullRequestModel().update_commits(self.pull_request, self.pull_request.author)
993 998 assert len(self.pull_request.revisions) == 1
994 999 return removed_commit_id
995 1000
996 1001 def create_comment(self, linked_to=None):
997 1002 comment = CommentsModel().create(
998 1003 text=u"Test comment",
999 1004 repo=self.target_repository.repo_name,
1000 1005 user=self.author,
1001 1006 pull_request=self.pull_request)
1002 1007 assert comment.pull_request_version_id is None
1003 1008
1004 1009 if linked_to:
1005 1010 PullRequestModel()._link_comments_to_version(linked_to)
1006 1011
1007 1012 return comment
1008 1013
1009 1014 def create_inline_comment(
1010 1015 self, linked_to=None, line_no='n1', file_path='file_1'):
1011 1016 comment = CommentsModel().create(
1012 1017 text=u"Test comment",
1013 1018 repo=self.target_repository.repo_name,
1014 1019 user=self.author,
1015 1020 line_no=line_no,
1016 1021 f_path=file_path,
1017 1022 pull_request=self.pull_request)
1018 1023 assert comment.pull_request_version_id is None
1019 1024
1020 1025 if linked_to:
1021 1026 PullRequestModel()._link_comments_to_version(linked_to)
1022 1027
1023 1028 return comment
1024 1029
1025 1030 def create_version_of_pull_request(self):
1026 1031 pull_request = self.create_pull_request()
1027 1032 version = PullRequestModel()._create_version_from_snapshot(
1028 1033 pull_request)
1029 1034 return version
1030 1035
1031 1036 def create_status_votes(self, status, *reviewers):
1032 1037 for reviewer in reviewers:
1033 1038 ChangesetStatusModel().set_status(
1034 1039 repo=self.pull_request.target_repo,
1035 1040 status=status,
1036 1041 user=reviewer.user_id,
1037 1042 pull_request=self.pull_request)
1038 1043
1039 1044 def set_mergeable(self, value):
1040 1045 if not self.mergeable_patcher:
1041 1046 self.mergeable_patcher = mock.patch.object(
1042 1047 VcsSettingsModel, 'get_general_settings')
1043 1048 self.mergeable_mock = self.mergeable_patcher.start()
1044 1049 self.mergeable_mock.return_value = {
1045 1050 'rhodecode_pr_merge_enabled': value}
1046 1051
1047 1052 def cleanup(self):
1048 1053 # In case the source repository is already cleaned up, the pull
1049 1054 # request will already be deleted.
1050 1055 pull_request = PullRequest().get(self.pull_request_id)
1051 1056 if pull_request:
1052 1057 PullRequestModel().delete(pull_request, pull_request.author)
1053 1058 Session().commit()
1054 1059
1055 1060 if self.notification_patcher:
1056 1061 self.notification_patcher.stop()
1057 1062
1058 1063 if self.mergeable_patcher:
1059 1064 self.mergeable_patcher.stop()
1060 1065
1061 1066
1062 1067 @pytest.fixture()
1063 1068 def user_admin(baseapp):
1064 1069 """
1065 1070 Provides the default admin test user as an instance of `db.User`.
1066 1071 """
1067 1072 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1068 1073 return user
1069 1074
1070 1075
1071 1076 @pytest.fixture()
1072 1077 def user_regular(baseapp):
1073 1078 """
1074 1079 Provides the default regular test user as an instance of `db.User`.
1075 1080 """
1076 1081 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1077 1082 return user
1078 1083
1079 1084
1080 1085 @pytest.fixture()
1081 1086 def user_util(request, db_connection):
1082 1087 """
1083 1088 Provides a wired instance of `UserUtility` with integrated cleanup.
1084 1089 """
1085 1090 utility = UserUtility(test_name=request.node.name)
1086 1091 request.addfinalizer(utility.cleanup)
1087 1092 return utility
1088 1093
1089 1094
1090 1095 # TODO: johbo: Split this up into utilities per domain or something similar
1091 1096 class UserUtility(object):
1092 1097
1093 1098 def __init__(self, test_name="test"):
1094 1099 self._test_name = self._sanitize_name(test_name)
1095 1100 self.fixture = Fixture()
1096 1101 self.repo_group_ids = []
1097 1102 self.repos_ids = []
1098 1103 self.user_ids = []
1099 1104 self.user_group_ids = []
1100 1105 self.user_repo_permission_ids = []
1101 1106 self.user_group_repo_permission_ids = []
1102 1107 self.user_repo_group_permission_ids = []
1103 1108 self.user_group_repo_group_permission_ids = []
1104 1109 self.user_user_group_permission_ids = []
1105 1110 self.user_group_user_group_permission_ids = []
1106 1111 self.user_permissions = []
1107 1112
1108 1113 def _sanitize_name(self, name):
1109 1114 for char in ['[', ']']:
1110 1115 name = name.replace(char, '_')
1111 1116 return name
1112 1117
1113 1118 def create_repo_group(
1114 1119 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1115 1120 group_name = "{prefix}_repogroup_{count}".format(
1116 1121 prefix=self._test_name,
1117 1122 count=len(self.repo_group_ids))
1118 1123 repo_group = self.fixture.create_repo_group(
1119 1124 group_name, cur_user=owner)
1120 1125 if auto_cleanup:
1121 1126 self.repo_group_ids.append(repo_group.group_id)
1122 1127 return repo_group
1123 1128
1124 1129 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1125 1130 auto_cleanup=True, repo_type='hg', bare=False):
1126 1131 repo_name = "{prefix}_repository_{count}".format(
1127 1132 prefix=self._test_name,
1128 1133 count=len(self.repos_ids))
1129 1134
1130 1135 repository = self.fixture.create_repo(
1131 1136 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type, bare=bare)
1132 1137 if auto_cleanup:
1133 1138 self.repos_ids.append(repository.repo_id)
1134 1139 return repository
1135 1140
1136 1141 def create_user(self, auto_cleanup=True, **kwargs):
1137 1142 user_name = "{prefix}_user_{count}".format(
1138 1143 prefix=self._test_name,
1139 1144 count=len(self.user_ids))
1140 1145 user = self.fixture.create_user(user_name, **kwargs)
1141 1146 if auto_cleanup:
1142 1147 self.user_ids.append(user.user_id)
1143 1148 return user
1144 1149
1145 1150 def create_additional_user_email(self, user, email):
1146 1151 uem = self.fixture.create_additional_user_email(user=user, email=email)
1147 1152 return uem
1148 1153
1149 1154 def create_user_with_group(self):
1150 1155 user = self.create_user()
1151 1156 user_group = self.create_user_group(members=[user])
1152 1157 return user, user_group
1153 1158
1154 1159 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1155 1160 auto_cleanup=True, **kwargs):
1156 1161 group_name = "{prefix}_usergroup_{count}".format(
1157 1162 prefix=self._test_name,
1158 1163 count=len(self.user_group_ids))
1159 1164 user_group = self.fixture.create_user_group(
1160 1165 group_name, cur_user=owner, **kwargs)
1161 1166
1162 1167 if auto_cleanup:
1163 1168 self.user_group_ids.append(user_group.users_group_id)
1164 1169 if members:
1165 1170 for user in members:
1166 1171 UserGroupModel().add_user_to_group(user_group, user)
1167 1172 return user_group
1168 1173
1169 1174 def grant_user_permission(self, user_name, permission_name):
1170 1175 self.inherit_default_user_permissions(user_name, False)
1171 1176 self.user_permissions.append((user_name, permission_name))
1172 1177
1173 1178 def grant_user_permission_to_repo_group(
1174 1179 self, repo_group, user, permission_name):
1175 1180 permission = RepoGroupModel().grant_user_permission(
1176 1181 repo_group, user, permission_name)
1177 1182 self.user_repo_group_permission_ids.append(
1178 1183 (repo_group.group_id, user.user_id))
1179 1184 return permission
1180 1185
1181 1186 def grant_user_group_permission_to_repo_group(
1182 1187 self, repo_group, user_group, permission_name):
1183 1188 permission = RepoGroupModel().grant_user_group_permission(
1184 1189 repo_group, user_group, permission_name)
1185 1190 self.user_group_repo_group_permission_ids.append(
1186 1191 (repo_group.group_id, user_group.users_group_id))
1187 1192 return permission
1188 1193
1189 1194 def grant_user_permission_to_repo(
1190 1195 self, repo, user, permission_name):
1191 1196 permission = RepoModel().grant_user_permission(
1192 1197 repo, user, permission_name)
1193 1198 self.user_repo_permission_ids.append(
1194 1199 (repo.repo_id, user.user_id))
1195 1200 return permission
1196 1201
1197 1202 def grant_user_group_permission_to_repo(
1198 1203 self, repo, user_group, permission_name):
1199 1204 permission = RepoModel().grant_user_group_permission(
1200 1205 repo, user_group, permission_name)
1201 1206 self.user_group_repo_permission_ids.append(
1202 1207 (repo.repo_id, user_group.users_group_id))
1203 1208 return permission
1204 1209
1205 1210 def grant_user_permission_to_user_group(
1206 1211 self, target_user_group, user, permission_name):
1207 1212 permission = UserGroupModel().grant_user_permission(
1208 1213 target_user_group, user, permission_name)
1209 1214 self.user_user_group_permission_ids.append(
1210 1215 (target_user_group.users_group_id, user.user_id))
1211 1216 return permission
1212 1217
1213 1218 def grant_user_group_permission_to_user_group(
1214 1219 self, target_user_group, user_group, permission_name):
1215 1220 permission = UserGroupModel().grant_user_group_permission(
1216 1221 target_user_group, user_group, permission_name)
1217 1222 self.user_group_user_group_permission_ids.append(
1218 1223 (target_user_group.users_group_id, user_group.users_group_id))
1219 1224 return permission
1220 1225
1221 1226 def revoke_user_permission(self, user_name, permission_name):
1222 1227 self.inherit_default_user_permissions(user_name, True)
1223 1228 UserModel().revoke_perm(user_name, permission_name)
1224 1229
1225 1230 def inherit_default_user_permissions(self, user_name, value):
1226 1231 user = UserModel().get_by_username(user_name)
1227 1232 user.inherit_default_permissions = value
1228 1233 Session().add(user)
1229 1234 Session().commit()
1230 1235
1231 1236 def cleanup(self):
1232 1237 self._cleanup_permissions()
1233 1238 self._cleanup_repos()
1234 1239 self._cleanup_repo_groups()
1235 1240 self._cleanup_user_groups()
1236 1241 self._cleanup_users()
1237 1242
1238 1243 def _cleanup_permissions(self):
1239 1244 if self.user_permissions:
1240 1245 for user_name, permission_name in self.user_permissions:
1241 1246 self.revoke_user_permission(user_name, permission_name)
1242 1247
1243 1248 for permission in self.user_repo_permission_ids:
1244 1249 RepoModel().revoke_user_permission(*permission)
1245 1250
1246 1251 for permission in self.user_group_repo_permission_ids:
1247 1252 RepoModel().revoke_user_group_permission(*permission)
1248 1253
1249 1254 for permission in self.user_repo_group_permission_ids:
1250 1255 RepoGroupModel().revoke_user_permission(*permission)
1251 1256
1252 1257 for permission in self.user_group_repo_group_permission_ids:
1253 1258 RepoGroupModel().revoke_user_group_permission(*permission)
1254 1259
1255 1260 for permission in self.user_user_group_permission_ids:
1256 1261 UserGroupModel().revoke_user_permission(*permission)
1257 1262
1258 1263 for permission in self.user_group_user_group_permission_ids:
1259 1264 UserGroupModel().revoke_user_group_permission(*permission)
1260 1265
1261 1266 def _cleanup_repo_groups(self):
1262 1267 def _repo_group_compare(first_group_id, second_group_id):
1263 1268 """
1264 1269 Gives higher priority to the groups with the most complex paths
1265 1270 """
1266 1271 first_group = RepoGroup.get(first_group_id)
1267 1272 second_group = RepoGroup.get(second_group_id)
1268 1273 first_group_parts = (
1269 1274 len(first_group.group_name.split('/')) if first_group else 0)
1270 1275 second_group_parts = (
1271 1276 len(second_group.group_name.split('/')) if second_group else 0)
1272 1277 return cmp(second_group_parts, first_group_parts)
1273 1278
1274 1279 sorted_repo_group_ids = sorted(
1275 1280 self.repo_group_ids, key=functools.cmp_to_key(_repo_group_compare))
1276 1281 for repo_group_id in sorted_repo_group_ids:
1277 1282 self.fixture.destroy_repo_group(repo_group_id)
1278 1283
1279 1284 def _cleanup_repos(self):
1280 1285 sorted_repos_ids = sorted(self.repos_ids)
1281 1286 for repo_id in sorted_repos_ids:
1282 1287 self.fixture.destroy_repo(repo_id)
1283 1288
1284 1289 def _cleanup_user_groups(self):
1285 1290 def _user_group_compare(first_group_id, second_group_id):
1286 1291 """
1287 1292 Gives higher priority to the groups with the most complex paths
1288 1293 """
1289 1294 first_group = UserGroup.get(first_group_id)
1290 1295 second_group = UserGroup.get(second_group_id)
1291 1296 first_group_parts = (
1292 1297 len(first_group.users_group_name.split('/'))
1293 1298 if first_group else 0)
1294 1299 second_group_parts = (
1295 1300 len(second_group.users_group_name.split('/'))
1296 1301 if second_group else 0)
1297 1302 return cmp(second_group_parts, first_group_parts)
1298 1303
1299 1304 sorted_user_group_ids = sorted(
1300 1305 self.user_group_ids, key=functools.cmp_to_key(_user_group_compare))
1301 1306 for user_group_id in sorted_user_group_ids:
1302 1307 self.fixture.destroy_user_group(user_group_id)
1303 1308
1304 1309 def _cleanup_users(self):
1305 1310 for user_id in self.user_ids:
1306 1311 self.fixture.destroy_user(user_id)
1307 1312
1308 1313
1309 1314 @pytest.fixture(scope='session')
1310 1315 def testrun():
1311 1316 return {
1312 1317 'uuid': uuid.uuid4(),
1313 1318 'start': datetime.datetime.utcnow().isoformat(),
1314 1319 'timestamp': int(time.time()),
1315 1320 }
1316 1321
1317 1322
1318 1323 class AppenlightClient(object):
1319 1324
1320 1325 url_template = '{url}?protocol_version=0.5'
1321 1326
1322 1327 def __init__(
1323 1328 self, url, api_key, add_server=True, add_timestamp=True,
1324 1329 namespace=None, request=None, testrun=None):
1325 1330 self.url = self.url_template.format(url=url)
1326 1331 self.api_key = api_key
1327 1332 self.add_server = add_server
1328 1333 self.add_timestamp = add_timestamp
1329 1334 self.namespace = namespace
1330 1335 self.request = request
1331 1336 self.server = socket.getfqdn(socket.gethostname())
1332 1337 self.tags_before = {}
1333 1338 self.tags_after = {}
1334 1339 self.stats = []
1335 1340 self.testrun = testrun or {}
1336 1341
1337 1342 def tag_before(self, tag, value):
1338 1343 self.tags_before[tag] = value
1339 1344
1340 1345 def tag_after(self, tag, value):
1341 1346 self.tags_after[tag] = value
1342 1347
1343 1348 def collect(self, data):
1344 1349 if self.add_server:
1345 1350 data.setdefault('server', self.server)
1346 1351 if self.add_timestamp:
1347 1352 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1348 1353 if self.namespace:
1349 1354 data.setdefault('namespace', self.namespace)
1350 1355 if self.request:
1351 1356 data.setdefault('request', self.request)
1352 1357 self.stats.append(data)
1353 1358
1354 1359 def send_stats(self):
1355 1360 tags = [
1356 1361 ('testrun', self.request),
1357 1362 ('testrun.start', self.testrun['start']),
1358 1363 ('testrun.timestamp', self.testrun['timestamp']),
1359 1364 ('test', self.namespace),
1360 1365 ]
1361 1366 for key, value in self.tags_before.items():
1362 1367 tags.append((key + '.before', value))
1363 1368 try:
1364 1369 delta = self.tags_after[key] - value
1365 1370 tags.append((key + '.delta', delta))
1366 1371 except Exception:
1367 1372 pass
1368 1373 for key, value in self.tags_after.items():
1369 1374 tags.append((key + '.after', value))
1370 1375 self.collect({
1371 1376 'message': "Collected tags",
1372 1377 'tags': tags,
1373 1378 })
1374 1379
1375 1380 response = requests.post(
1376 1381 self.url,
1377 1382 headers={
1378 1383 'X-appenlight-api-key': self.api_key},
1379 1384 json=self.stats,
1380 1385 )
1381 1386
1382 1387 if not response.status_code == 200:
1383 1388 pprint.pprint(self.stats)
1384 1389 print(response.headers)
1385 1390 print(response.text)
1386 1391 raise Exception('Sending to appenlight failed')
1387 1392
1388 1393
1389 1394 @pytest.fixture()
1390 1395 def gist_util(request, db_connection):
1391 1396 """
1392 1397 Provides a wired instance of `GistUtility` with integrated cleanup.
1393 1398 """
1394 1399 utility = GistUtility()
1395 1400 request.addfinalizer(utility.cleanup)
1396 1401 return utility
1397 1402
1398 1403
1399 1404 class GistUtility(object):
1400 1405 def __init__(self):
1401 1406 self.fixture = Fixture()
1402 1407 self.gist_ids = []
1403 1408
1404 1409 def create_gist(self, **kwargs):
1405 1410 gist = self.fixture.create_gist(**kwargs)
1406 1411 self.gist_ids.append(gist.gist_id)
1407 1412 return gist
1408 1413
1409 1414 def cleanup(self):
1410 1415 for id_ in self.gist_ids:
1411 1416 self.fixture.destroy_gists(str(id_))
1412 1417
1413 1418
1414 1419 @pytest.fixture()
1415 1420 def enabled_backends(request):
1416 1421 backends = request.config.option.backends
1417 1422 return backends[:]
1418 1423
1419 1424
1420 1425 @pytest.fixture()
1421 1426 def settings_util(request, db_connection):
1422 1427 """
1423 1428 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1424 1429 """
1425 1430 utility = SettingsUtility()
1426 1431 request.addfinalizer(utility.cleanup)
1427 1432 return utility
1428 1433
1429 1434
1430 1435 class SettingsUtility(object):
1431 1436 def __init__(self):
1432 1437 self.rhodecode_ui_ids = []
1433 1438 self.rhodecode_setting_ids = []
1434 1439 self.repo_rhodecode_ui_ids = []
1435 1440 self.repo_rhodecode_setting_ids = []
1436 1441
1437 1442 def create_repo_rhodecode_ui(
1438 1443 self, repo, section, value, key=None, active=True, cleanup=True):
1439 1444 key = key or sha1_safe(f'{section}{value}{repo.repo_id}')
1440 1445
1441 1446 setting = RepoRhodeCodeUi()
1442 1447 setting.repository_id = repo.repo_id
1443 1448 setting.ui_section = section
1444 1449 setting.ui_value = value
1445 1450 setting.ui_key = key
1446 1451 setting.ui_active = active
1447 1452 Session().add(setting)
1448 1453 Session().commit()
1449 1454
1450 1455 if cleanup:
1451 1456 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1452 1457 return setting
1453 1458
1454 1459 def create_rhodecode_ui(
1455 1460 self, section, value, key=None, active=True, cleanup=True):
1456 1461 key = key or sha1_safe(f'{section}{value}')
1457 1462
1458 1463 setting = RhodeCodeUi()
1459 1464 setting.ui_section = section
1460 1465 setting.ui_value = value
1461 1466 setting.ui_key = key
1462 1467 setting.ui_active = active
1463 1468 Session().add(setting)
1464 1469 Session().commit()
1465 1470
1466 1471 if cleanup:
1467 1472 self.rhodecode_ui_ids.append(setting.ui_id)
1468 1473 return setting
1469 1474
1470 1475 def create_repo_rhodecode_setting(
1471 1476 self, repo, name, value, type_, cleanup=True):
1472 1477 setting = RepoRhodeCodeSetting(
1473 1478 repo.repo_id, key=name, val=value, type=type_)
1474 1479 Session().add(setting)
1475 1480 Session().commit()
1476 1481
1477 1482 if cleanup:
1478 1483 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1479 1484 return setting
1480 1485
1481 1486 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1482 1487 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1483 1488 Session().add(setting)
1484 1489 Session().commit()
1485 1490
1486 1491 if cleanup:
1487 1492 self.rhodecode_setting_ids.append(setting.app_settings_id)
1488 1493
1489 1494 return setting
1490 1495
1491 1496 def cleanup(self):
1492 1497 for id_ in self.rhodecode_ui_ids:
1493 1498 setting = RhodeCodeUi.get(id_)
1494 1499 Session().delete(setting)
1495 1500
1496 1501 for id_ in self.rhodecode_setting_ids:
1497 1502 setting = RhodeCodeSetting.get(id_)
1498 1503 Session().delete(setting)
1499 1504
1500 1505 for id_ in self.repo_rhodecode_ui_ids:
1501 1506 setting = RepoRhodeCodeUi.get(id_)
1502 1507 Session().delete(setting)
1503 1508
1504 1509 for id_ in self.repo_rhodecode_setting_ids:
1505 1510 setting = RepoRhodeCodeSetting.get(id_)
1506 1511 Session().delete(setting)
1507 1512
1508 1513 Session().commit()
1509 1514
1510 1515
1511 1516 @pytest.fixture()
1512 1517 def no_notifications(request):
1513 1518 notification_patcher = mock.patch(
1514 1519 'rhodecode.model.notification.NotificationModel.create')
1515 1520 notification_patcher.start()
1516 1521 request.addfinalizer(notification_patcher.stop)
1517 1522
1518 1523
1519 1524 @pytest.fixture(scope='session')
1520 1525 def repeat(request):
1521 1526 """
1522 1527 The number of repetitions is based on this fixture.
1523 1528
1524 1529 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1525 1530 tests are not too slow in our default test suite.
1526 1531 """
1527 1532 return request.config.getoption('--repeat')
1528 1533
1529 1534
1530 1535 @pytest.fixture()
1531 1536 def rhodecode_fixtures():
1532 1537 return Fixture()
1533 1538
1534 1539
1535 1540 @pytest.fixture()
1536 1541 def context_stub():
1537 1542 """
1538 1543 Stub context object.
1539 1544 """
1540 1545 context = pyramid.testing.DummyResource()
1541 1546 return context
1542 1547
1543 1548
1544 1549 @pytest.fixture()
1545 1550 def request_stub():
1546 1551 """
1547 1552 Stub request object.
1548 1553 """
1549 1554 from rhodecode.lib.base import bootstrap_request
1550 1555 request = bootstrap_request(scheme='https')
1551 1556 return request
1552 1557
1553 1558
1554 1559 @pytest.fixture()
1555 1560 def config_stub(request, request_stub):
1556 1561 """
1557 1562 Set up pyramid.testing and return the Configurator.
1558 1563 """
1559 1564 from rhodecode.lib.base import bootstrap_config
1560 1565 config = bootstrap_config(request=request_stub)
1561 1566
1562 1567 @request.addfinalizer
1563 1568 def cleanup():
1564 1569 pyramid.testing.tearDown()
1565 1570
1566 1571 return config
1567 1572
1568 1573
1569 1574 @pytest.fixture()
1570 1575 def StubIntegrationType():
1571 1576 class _StubIntegrationType(IntegrationTypeBase):
1572 1577 """ Test integration type class """
1573 1578
1574 1579 key = 'test'
1575 1580 display_name = 'Test integration type'
1576 1581 description = 'A test integration type for testing'
1577 1582
1578 1583 @classmethod
1579 1584 def icon(cls):
1580 1585 return 'test_icon_html_image'
1581 1586
1582 1587 def __init__(self, settings):
1583 1588 super(_StubIntegrationType, self).__init__(settings)
1584 1589 self.sent_events = [] # for testing
1585 1590
1586 1591 def send_event(self, event):
1587 1592 self.sent_events.append(event)
1588 1593
1589 1594 def settings_schema(self):
1590 1595 class SettingsSchema(colander.Schema):
1591 1596 test_string_field = colander.SchemaNode(
1592 1597 colander.String(),
1593 1598 missing=colander.required,
1594 1599 title='test string field',
1595 1600 )
1596 1601 test_int_field = colander.SchemaNode(
1597 1602 colander.Int(),
1598 1603 title='some integer setting',
1599 1604 )
1600 1605 return SettingsSchema()
1601 1606
1602 1607
1603 1608 integration_type_registry.register_integration_type(_StubIntegrationType)
1604 1609 return _StubIntegrationType
1605 1610
1606 1611
1607 1612 @pytest.fixture()
1608 1613 def stub_integration_settings():
1609 1614 return {
1610 1615 'test_string_field': 'some data',
1611 1616 'test_int_field': 100,
1612 1617 }
1613 1618
1614 1619
1615 1620 @pytest.fixture()
1616 1621 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1617 1622 stub_integration_settings):
1618 1623 integration = IntegrationModel().create(
1619 1624 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1620 1625 name='test repo integration',
1621 1626 repo=repo_stub, repo_group=None, child_repos_only=None)
1622 1627
1623 1628 @request.addfinalizer
1624 1629 def cleanup():
1625 1630 IntegrationModel().delete(integration)
1626 1631
1627 1632 return integration
1628 1633
1629 1634
1630 1635 @pytest.fixture()
1631 1636 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1632 1637 stub_integration_settings):
1633 1638 integration = IntegrationModel().create(
1634 1639 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1635 1640 name='test repogroup integration',
1636 1641 repo=None, repo_group=test_repo_group, child_repos_only=True)
1637 1642
1638 1643 @request.addfinalizer
1639 1644 def cleanup():
1640 1645 IntegrationModel().delete(integration)
1641 1646
1642 1647 return integration
1643 1648
1644 1649
1645 1650 @pytest.fixture()
1646 1651 def repogroup_recursive_integration_stub(request, test_repo_group,
1647 1652 StubIntegrationType, stub_integration_settings):
1648 1653 integration = IntegrationModel().create(
1649 1654 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1650 1655 name='test recursive repogroup integration',
1651 1656 repo=None, repo_group=test_repo_group, child_repos_only=False)
1652 1657
1653 1658 @request.addfinalizer
1654 1659 def cleanup():
1655 1660 IntegrationModel().delete(integration)
1656 1661
1657 1662 return integration
1658 1663
1659 1664
1660 1665 @pytest.fixture()
1661 1666 def global_integration_stub(request, StubIntegrationType,
1662 1667 stub_integration_settings):
1663 1668 integration = IntegrationModel().create(
1664 1669 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1665 1670 name='test global integration',
1666 1671 repo=None, repo_group=None, child_repos_only=None)
1667 1672
1668 1673 @request.addfinalizer
1669 1674 def cleanup():
1670 1675 IntegrationModel().delete(integration)
1671 1676
1672 1677 return integration
1673 1678
1674 1679
1675 1680 @pytest.fixture()
1676 1681 def root_repos_integration_stub(request, StubIntegrationType,
1677 1682 stub_integration_settings):
1678 1683 integration = IntegrationModel().create(
1679 1684 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1680 1685 name='test global integration',
1681 1686 repo=None, repo_group=None, child_repos_only=True)
1682 1687
1683 1688 @request.addfinalizer
1684 1689 def cleanup():
1685 1690 IntegrationModel().delete(integration)
1686 1691
1687 1692 return integration
1688 1693
1689 1694
1690 1695 @pytest.fixture()
1691 1696 def local_dt_to_utc():
1692 1697 def _factory(dt):
1693 1698 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1694 1699 dateutil.tz.tzutc()).replace(tzinfo=None)
1695 1700 return _factory
1696 1701
1697 1702
1698 1703 @pytest.fixture()
1699 1704 def disable_anonymous_user(request, baseapp):
1700 1705 set_anonymous_access(False)
1701 1706
1702 1707 @request.addfinalizer
1703 1708 def cleanup():
1704 1709 set_anonymous_access(True)
1705 1710
1706 1711
1707 1712 @pytest.fixture(scope='module')
1708 1713 def rc_fixture(request):
1709 1714 return Fixture()
1710 1715
1711 1716
1712 1717 @pytest.fixture()
1713 1718 def repo_groups(request):
1714 1719 fixture = Fixture()
1715 1720
1716 1721 session = Session()
1717 1722 zombie_group = fixture.create_repo_group('zombie')
1718 1723 parent_group = fixture.create_repo_group('parent')
1719 1724 child_group = fixture.create_repo_group('parent/child')
1720 1725 groups_in_db = session.query(RepoGroup).all()
1721 1726 assert len(groups_in_db) == 3
1722 1727 assert child_group.group_parent_id == parent_group.group_id
1723 1728
1724 1729 @request.addfinalizer
1725 1730 def cleanup():
1726 1731 fixture.destroy_repo_group(zombie_group)
1727 1732 fixture.destroy_repo_group(child_group)
1728 1733 fixture.destroy_repo_group(parent_group)
1729 1734
1730 1735 return zombie_group, parent_group, child_group
General Comments 0
You need to be logged in to leave comments. Login now