##// END OF EJS Templates
archival: added tests at subpath generation of archivals
super-admin -
r5112:50a51f17 default
parent child Browse files
Show More
@@ -1,1100 +1,1125 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import os
21 21
22 22 import mock
23 23 import pytest
24 24
25 25 from rhodecode.apps.repository.tests.test_repo_compare import ComparePage
26 26 from rhodecode.apps.repository.views.repo_files import RepoFilesView, get_archive_name, get_path_sha
27 27 from rhodecode.lib import helpers as h
28 28 from collections import OrderedDict
29 29 from rhodecode.lib.ext_json import json
30 30 from rhodecode.lib.str_utils import safe_str
31 31 from rhodecode.lib.vcs import nodes
32 32
33 33 from rhodecode.lib.vcs.conf import settings
34 34 from rhodecode.tests import assert_session_flash
35 35 from rhodecode.tests.fixture import Fixture
36 36 from rhodecode.model.db import Session
37 37
38 38 fixture = Fixture()
39 39
40 40
41 41 def get_node_history(backend_type):
42 42 return {
43 43 'hg': json.loads(fixture.load_resource('hg_node_history_response.json')),
44 44 'git': json.loads(fixture.load_resource('git_node_history_response.json')),
45 45 'svn': json.loads(fixture.load_resource('svn_node_history_response.json')),
46 46 }[backend_type]
47 47
48 48
49 49 def route_path(name, params=None, **kwargs):
50 50 import urllib.request
51 51 import urllib.parse
52 52 import urllib.error
53 53
54 54 base_url = {
55 55 'repo_summary': '/{repo_name}',
56 56 'repo_archivefile': '/{repo_name}/archive/{fname}',
57 57 'repo_files_diff': '/{repo_name}/diff/{f_path}',
58 58 'repo_files_diff_2way_redirect': '/{repo_name}/diff-2way/{f_path}',
59 59 'repo_files': '/{repo_name}/files/{commit_id}/{f_path}',
60 60 'repo_files:default_path': '/{repo_name}/files/{commit_id}/',
61 61 'repo_files:default_commit': '/{repo_name}/files',
62 62 'repo_files:rendered': '/{repo_name}/render/{commit_id}/{f_path}',
63 63 'repo_files:annotated': '/{repo_name}/annotate/{commit_id}/{f_path}',
64 64 'repo_files:annotated_previous': '/{repo_name}/annotate-previous/{commit_id}/{f_path}',
65 65 'repo_files_nodelist': '/{repo_name}/nodelist/{commit_id}/{f_path}',
66 66 'repo_file_raw': '/{repo_name}/raw/{commit_id}/{f_path}',
67 67 'repo_file_download': '/{repo_name}/download/{commit_id}/{f_path}',
68 68 'repo_file_history': '/{repo_name}/history/{commit_id}/{f_path}',
69 69 'repo_file_authors': '/{repo_name}/authors/{commit_id}/{f_path}',
70 70 'repo_files_remove_file': '/{repo_name}/remove_file/{commit_id}/{f_path}',
71 71 'repo_files_delete_file': '/{repo_name}/delete_file/{commit_id}/{f_path}',
72 72 'repo_files_edit_file': '/{repo_name}/edit_file/{commit_id}/{f_path}',
73 73 'repo_files_update_file': '/{repo_name}/update_file/{commit_id}/{f_path}',
74 74 'repo_files_add_file': '/{repo_name}/add_file/{commit_id}/{f_path}',
75 'repo_files_upload_file': '/{repo_name}/upload_file/{commit_id}/{f_path}',
75 76 'repo_files_create_file': '/{repo_name}/create_file/{commit_id}/{f_path}',
76 77 'repo_nodetree_full': '/{repo_name}/nodetree_full/{commit_id}/{f_path}',
77 78 'repo_nodetree_full:default_path': '/{repo_name}/nodetree_full/{commit_id}/',
78 79 }[name].format(**kwargs)
79 80
80 81 if params:
81 82 base_url = '{}?{}'.format(base_url, urllib.parse.urlencode(params))
82 83 return base_url
83 84
84 85
85 86 def assert_files_in_response(response, files, params):
86 87 template = (
87 88 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"')
88 89 _assert_items_in_response(response, files, template, params)
89 90
90 91
91 92 def assert_dirs_in_response(response, dirs, params):
92 93 template = (
93 94 'href="/%(repo_name)s/files/%(commit_id)s/%(name)s"')
94 95 _assert_items_in_response(response, dirs, template, params)
95 96
96 97
97 98 def _assert_items_in_response(response, items, template, params):
98 99 for item in items:
99 100 item_params = {'name': item}
100 101 item_params.update(params)
101 102 response.mustcontain(template % item_params)
102 103
103 104
104 105 def assert_timeago_in_response(response, items, params):
105 106 for item in items:
106 107 response.mustcontain(h.age_component(params['date']))
107 108
108 109
109 110 @pytest.mark.usefixtures("app")
110 111 class TestFilesViews(object):
111 112
112 113 def test_show_files(self, backend):
113 114 response = self.app.get(
114 115 route_path('repo_files',
115 116 repo_name=backend.repo_name,
116 117 commit_id='tip', f_path='/'))
117 118 commit = backend.repo.get_commit()
118 119
119 120 params = {
120 121 'repo_name': backend.repo_name,
121 122 'commit_id': commit.raw_id,
122 123 'date': commit.date
123 124 }
124 125 assert_dirs_in_response(response, ['docs', 'vcs'], params)
125 126 files = [
126 127 '.gitignore',
127 128 '.hgignore',
128 129 '.hgtags',
129 130 # TODO: missing in Git
130 131 # '.travis.yml',
131 132 'MANIFEST.in',
132 133 'README.rst',
133 134 # TODO: File is missing in svn repository
134 135 # 'run_test_and_report.sh',
135 136 'setup.cfg',
136 137 'setup.py',
137 138 'test_and_report.sh',
138 139 'tox.ini',
139 140 ]
140 141 assert_files_in_response(response, files, params)
141 142 assert_timeago_in_response(response, files, params)
142 143
143 144 def test_show_files_links_submodules_with_absolute_url(self, backend_hg):
144 145 repo = backend_hg['subrepos']
145 146 response = self.app.get(
146 147 route_path('repo_files',
147 148 repo_name=repo.repo_name,
148 149 commit_id='tip', f_path='/'))
149 150 assert_response = response.assert_response()
150 151 assert_response.contains_one_link(
151 152 'absolute-path @ 000000000000', 'http://example.com/absolute-path')
152 153
153 154 def test_show_files_links_submodules_with_absolute_url_subpaths(
154 155 self, backend_hg):
155 156 repo = backend_hg['subrepos']
156 157 response = self.app.get(
157 158 route_path('repo_files',
158 159 repo_name=repo.repo_name,
159 160 commit_id='tip', f_path='/'))
160 161 assert_response = response.assert_response()
161 162 assert_response.contains_one_link(
162 163 'subpaths-path @ 000000000000',
163 164 'http://sub-base.example.com/subpaths-path')
164 165
165 166 @pytest.mark.xfail_backends("svn", reason="Depends on branch support")
166 167 def test_files_menu(self, backend):
167 168 new_branch = "temp_branch_name"
168 169 commits = [
169 170 {'message': 'a'},
170 171 {'message': 'b', 'branch': new_branch}
171 172 ]
172 173 backend.create_repo(commits)
173 174 backend.repo.landing_rev = "branch:%s" % new_branch
174 175 Session().commit()
175 176
176 177 # get response based on tip and not new commit
177 178 response = self.app.get(
178 179 route_path('repo_files',
179 180 repo_name=backend.repo_name,
180 181 commit_id='tip', f_path='/'))
181 182
182 183 # make sure Files menu url is not tip but new commit
183 184 landing_rev = backend.repo.landing_ref_name
184 185 files_url = route_path('repo_files:default_path',
185 186 repo_name=backend.repo_name,
186 187 commit_id=landing_rev, params={'at': landing_rev})
187 188
188 189 assert landing_rev != 'tip'
189 190 response.mustcontain(
190 191 '<li class="active"><a class="menulink" href="%s">' % files_url)
191 192
192 193 def test_show_files_commit(self, backend):
193 194 commit = backend.repo.get_commit(commit_idx=32)
194 195
195 196 response = self.app.get(
196 197 route_path('repo_files',
197 198 repo_name=backend.repo_name,
198 199 commit_id=commit.raw_id, f_path='/'))
199 200
200 201 dirs = ['docs', 'tests']
201 202 files = ['README.rst']
202 203 params = {
203 204 'repo_name': backend.repo_name,
204 205 'commit_id': commit.raw_id,
205 206 }
206 207 assert_dirs_in_response(response, dirs, params)
207 208 assert_files_in_response(response, files, params)
208 209
209 210 def test_show_files_different_branch(self, backend):
210 211 branches = dict(
211 212 hg=(150, ['git']),
212 213 # TODO: Git test repository does not contain other branches
213 214 git=(633, ['master']),
214 215 # TODO: Branch support in Subversion
215 216 svn=(150, [])
216 217 )
217 218 idx, branches = branches[backend.alias]
218 219 commit = backend.repo.get_commit(commit_idx=idx)
219 220 response = self.app.get(
220 221 route_path('repo_files',
221 222 repo_name=backend.repo_name,
222 223 commit_id=commit.raw_id, f_path='/'))
223 224
224 225 assert_response = response.assert_response()
225 226 for branch in branches:
226 227 assert_response.element_contains('.tags .branchtag', branch)
227 228
228 229 def test_show_files_paging(self, backend):
229 230 repo = backend.repo
230 231 indexes = [73, 92, 109, 1, 0]
231 232 idx_map = [(rev, repo.get_commit(commit_idx=rev).raw_id)
232 233 for rev in indexes]
233 234
234 235 for idx in idx_map:
235 236 response = self.app.get(
236 237 route_path('repo_files',
237 238 repo_name=backend.repo_name,
238 239 commit_id=idx[1], f_path='/'))
239 240
240 241 response.mustcontain("""r%s:%s""" % (idx[0], idx[1][:8]))
241 242
242 243 def test_file_source(self, backend):
243 244 commit = backend.repo.get_commit(commit_idx=167)
244 245 response = self.app.get(
245 246 route_path('repo_files',
246 247 repo_name=backend.repo_name,
247 248 commit_id=commit.raw_id, f_path='vcs/nodes.py'))
248 249
249 250 msgbox = """<div class="commit">%s</div>"""
250 251 response.mustcontain(msgbox % (commit.message, ))
251 252
252 253 assert_response = response.assert_response()
253 254 if commit.branch:
254 255 assert_response.element_contains(
255 256 '.tags.tags-main .branchtag', commit.branch)
256 257 if commit.tags:
257 258 for tag in commit.tags:
258 259 assert_response.element_contains('.tags.tags-main .tagtag', tag)
259 260
260 261 def test_file_source_annotated(self, backend):
261 262 response = self.app.get(
262 263 route_path('repo_files:annotated',
263 264 repo_name=backend.repo_name,
264 265 commit_id='tip', f_path='vcs/nodes.py'))
265 266 expected_commits = {
266 267 'hg': 'r356',
267 268 'git': 'r345',
268 269 'svn': 'r208',
269 270 }
270 271 response.mustcontain(expected_commits[backend.alias])
271 272
272 273 def test_file_source_authors(self, backend):
273 274 response = self.app.get(
274 275 route_path('repo_file_authors',
275 276 repo_name=backend.repo_name,
276 277 commit_id='tip', f_path='vcs/nodes.py'))
277 278 expected_authors = {
278 279 'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
279 280 'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
280 281 'svn': ('marcin', 'lukasz'),
281 282 }
282 283
283 284 for author in expected_authors[backend.alias]:
284 285 response.mustcontain(author)
285 286
286 287 def test_file_source_authors_with_annotation(self, backend):
287 288 response = self.app.get(
288 289 route_path('repo_file_authors',
289 290 repo_name=backend.repo_name,
290 291 commit_id='tip', f_path='vcs/nodes.py',
291 292 params=dict(annotate=1)))
292 293 expected_authors = {
293 294 'hg': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
294 295 'git': ('Marcin Kuzminski', 'Lukasz Balcerzak'),
295 296 'svn': ('marcin', 'lukasz'),
296 297 }
297 298
298 299 for author in expected_authors[backend.alias]:
299 300 response.mustcontain(author)
300 301
301 302 def test_file_source_history(self, backend, xhr_header):
302 303 response = self.app.get(
303 304 route_path('repo_file_history',
304 305 repo_name=backend.repo_name,
305 306 commit_id='tip', f_path='vcs/nodes.py'),
306 307 extra_environ=xhr_header)
307 308 assert get_node_history(backend.alias) == json.loads(response.body)
308 309
309 310 def test_file_source_history_svn(self, backend_svn, xhr_header):
310 311 simple_repo = backend_svn['svn-simple-layout']
311 312 response = self.app.get(
312 313 route_path('repo_file_history',
313 314 repo_name=simple_repo.repo_name,
314 315 commit_id='tip', f_path='trunk/example.py'),
315 316 extra_environ=xhr_header)
316 317
317 318 expected_data = json.loads(
318 319 fixture.load_resource('svn_node_history_branches.json'))
319 320
320 321 assert expected_data == response.json
321 322
322 323 def test_file_source_history_with_annotation(self, backend, xhr_header):
323 324 response = self.app.get(
324 325 route_path('repo_file_history',
325 326 repo_name=backend.repo_name,
326 327 commit_id='tip', f_path='vcs/nodes.py',
327 328 params=dict(annotate=1)),
328 329
329 330 extra_environ=xhr_header)
330 331 assert get_node_history(backend.alias) == json.loads(response.body)
331 332
332 333 def test_tree_search_top_level(self, backend, xhr_header):
333 334 commit = backend.repo.get_commit(commit_idx=173)
334 335 response = self.app.get(
335 336 route_path('repo_files_nodelist',
336 337 repo_name=backend.repo_name,
337 338 commit_id=commit.raw_id, f_path='/'),
338 339 extra_environ=xhr_header)
339 340 assert 'nodes' in response.json
340 341 assert {'name': 'docs', 'type': 'dir'} in response.json['nodes']
341 342
342 343 def test_tree_search_missing_xhr(self, backend):
343 344 self.app.get(
344 345 route_path('repo_files_nodelist',
345 346 repo_name=backend.repo_name,
346 347 commit_id='tip', f_path='/'),
347 348 status=404)
348 349
349 350 def test_tree_search_at_path(self, backend, xhr_header):
350 351 commit = backend.repo.get_commit(commit_idx=173)
351 352 response = self.app.get(
352 353 route_path('repo_files_nodelist',
353 354 repo_name=backend.repo_name,
354 355 commit_id=commit.raw_id, f_path='/docs'),
355 356 extra_environ=xhr_header)
356 357 assert 'nodes' in response.json
357 358 nodes = response.json['nodes']
358 359 assert {'name': 'docs/api', 'type': 'dir'} in nodes
359 360 assert {'name': 'docs/index.rst', 'type': 'file'} in nodes
360 361
361 362 def test_tree_search_at_path_2nd_level(self, backend, xhr_header):
362 363 commit = backend.repo.get_commit(commit_idx=173)
363 364 response = self.app.get(
364 365 route_path('repo_files_nodelist',
365 366 repo_name=backend.repo_name,
366 367 commit_id=commit.raw_id, f_path='/docs/api'),
367 368 extra_environ=xhr_header)
368 369 assert 'nodes' in response.json
369 370 nodes = response.json['nodes']
370 371 assert {'name': 'docs/api/index.rst', 'type': 'file'} in nodes
371 372
372 373 def test_tree_search_at_path_missing_xhr(self, backend):
373 374 self.app.get(
374 375 route_path('repo_files_nodelist',
375 376 repo_name=backend.repo_name,
376 377 commit_id='tip', f_path='/docs'),
377 378 status=404)
378 379
379 380 def test_nodetree(self, backend, xhr_header):
380 381 commit = backend.repo.get_commit(commit_idx=173)
381 382 response = self.app.get(
382 383 route_path('repo_nodetree_full',
383 384 repo_name=backend.repo_name,
384 385 commit_id=commit.raw_id, f_path='/'),
385 386 extra_environ=xhr_header)
386 387
387 388 assert_response = response.assert_response()
388 389
389 390 for attr in ['data-commit-id', 'data-date', 'data-author']:
390 391 elements = assert_response.get_elements('[{}]'.format(attr))
391 392 assert len(elements) > 1
392 393
393 394 for element in elements:
394 395 assert element.get(attr)
395 396
396 397 def test_nodetree_if_file(self, backend, xhr_header):
397 398 commit = backend.repo.get_commit(commit_idx=173)
398 399 response = self.app.get(
399 400 route_path('repo_nodetree_full',
400 401 repo_name=backend.repo_name,
401 402 commit_id=commit.raw_id, f_path='README.rst'),
402 403 extra_environ=xhr_header)
403 404 assert response.text == ''
404 405
405 406 def test_nodetree_wrong_path(self, backend, xhr_header):
406 407 commit = backend.repo.get_commit(commit_idx=173)
407 408 response = self.app.get(
408 409 route_path('repo_nodetree_full',
409 410 repo_name=backend.repo_name,
410 411 commit_id=commit.raw_id, f_path='/dont-exist'),
411 412 extra_environ=xhr_header)
412 413
413 414 err = 'error: There is no file nor ' \
414 415 'directory at the given path'
415 416 assert err in response.text
416 417
417 418 def test_nodetree_missing_xhr(self, backend):
418 419 self.app.get(
419 420 route_path('repo_nodetree_full',
420 421 repo_name=backend.repo_name,
421 422 commit_id='tip', f_path='/'),
422 423 status=404)
423 424
424 425
425 426 @pytest.mark.usefixtures("app", "autologin_user")
426 427 class TestRawFileHandling(object):
427 428
428 429 def test_download_file(self, backend):
429 430 commit = backend.repo.get_commit(commit_idx=173)
430 431 response = self.app.get(
431 432 route_path('repo_file_download',
432 433 repo_name=backend.repo_name,
433 434 commit_id=commit.raw_id, f_path='vcs/nodes.py'),)
434 435
435 436 assert response.content_disposition == 'attachment; filename="nodes.py"; filename*=UTF-8\'\'nodes.py'
436 437 assert response.content_type == "text/x-python"
437 438
438 439 def test_download_file_wrong_cs(self, backend):
439 440 raw_id = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
440 441
441 442 response = self.app.get(
442 443 route_path('repo_file_download',
443 444 repo_name=backend.repo_name,
444 445 commit_id=raw_id, f_path='vcs/nodes.svg'),
445 446 status=404)
446 447
447 448 msg = """No such commit exists for this repository"""
448 449 response.mustcontain(msg)
449 450
450 451 def test_download_file_wrong_f_path(self, backend):
451 452 commit = backend.repo.get_commit(commit_idx=173)
452 453 f_path = 'vcs/ERRORnodes.py'
453 454
454 455 response = self.app.get(
455 456 route_path('repo_file_download',
456 457 repo_name=backend.repo_name,
457 458 commit_id=commit.raw_id, f_path=f_path),
458 459 status=404)
459 460
460 461 msg = (
461 462 "There is no file nor directory at the given path: "
462 463 "`%s` at commit %s" % (f_path, commit.short_id))
463 464 response.mustcontain(msg)
464 465
465 466 def test_file_raw(self, backend):
466 467 commit = backend.repo.get_commit(commit_idx=173)
467 468 response = self.app.get(
468 469 route_path('repo_file_raw',
469 470 repo_name=backend.repo_name,
470 471 commit_id=commit.raw_id, f_path='vcs/nodes.py'),)
471 472
472 473 assert response.content_type == "text/plain"
473 474
474 475 def test_file_raw_binary(self, backend):
475 476 commit = backend.repo.get_commit()
476 477 response = self.app.get(
477 478 route_path('repo_file_raw',
478 479 repo_name=backend.repo_name,
479 480 commit_id=commit.raw_id,
480 481 f_path='docs/theme/ADC/static/breadcrumb_background.png'),)
481 482
482 483 assert response.content_disposition == 'inline'
483 484
484 485 def test_raw_file_wrong_cs(self, backend):
485 486 raw_id = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
486 487
487 488 response = self.app.get(
488 489 route_path('repo_file_raw',
489 490 repo_name=backend.repo_name,
490 491 commit_id=raw_id, f_path='vcs/nodes.svg'),
491 492 status=404)
492 493
493 494 msg = """No such commit exists for this repository"""
494 495 response.mustcontain(msg)
495 496
496 497 def test_raw_wrong_f_path(self, backend):
497 498 commit = backend.repo.get_commit(commit_idx=173)
498 499 f_path = 'vcs/ERRORnodes.py'
499 500 response = self.app.get(
500 501 route_path('repo_file_raw',
501 502 repo_name=backend.repo_name,
502 503 commit_id=commit.raw_id, f_path=f_path),
503 504 status=404)
504 505
505 506 msg = (
506 507 "There is no file nor directory at the given path: "
507 508 "`%s` at commit %s" % (f_path, commit.short_id))
508 509 response.mustcontain(msg)
509 510
510 511 def test_raw_svg_should_not_be_rendered(self, backend):
511 512 backend.create_repo()
512 513 backend.ensure_file(b"xss.svg")
513 514 response = self.app.get(
514 515 route_path('repo_file_raw',
515 516 repo_name=backend.repo_name,
516 517 commit_id='tip', f_path='xss.svg'),)
517 518 # If the content type is image/svg+xml then it allows to render HTML
518 519 # and malicious SVG.
519 520 assert response.content_type == "text/plain"
520 521
521 522
522 523 @pytest.mark.usefixtures("app")
523 524 class TestRepositoryArchival(object):
524 525
525 526 def test_archival(self, backend):
526 527 backend.enable_downloads()
527 528 commit = backend.repo.get_commit(commit_idx=173)
528 529 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
529 530 path_sha = get_path_sha('/')
530 531 filename = get_archive_name(backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
531 532
532 533 fname = commit.raw_id + extension
533 534 response = self.app.get(
534 535 route_path('repo_archivefile',
535 536 repo_name=backend.repo_name,
536 537 fname=fname))
537 538
538 539 assert response.status == '200 OK'
539 540 headers = [
540 541 ('Content-Disposition', 'attachment; filename=%s' % filename),
541 542 ('Content-Type', '%s' % content_type),
542 543 ]
543 544
544 545 for header in headers:
545 546 assert header in response.headers.items()
546 547
547 548 def test_archival_no_hash(self, backend):
548 549 backend.enable_downloads()
549 550 commit = backend.repo.get_commit(commit_idx=173)
550 551 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
551 552 path_sha = get_path_sha('/')
552 553 filename = get_archive_name(backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha, with_hash=False)
553 554
554 555 fname = commit.raw_id + extension
555 556 response = self.app.get(
556 557 route_path('repo_archivefile',
557 558 repo_name=backend.repo_name,
558 559 fname=fname, params={'with_hash': 0}))
559 560
560 561 assert response.status == '200 OK'
561 562 headers = [
562 563 ('Content-Disposition', 'attachment; filename=%s' % filename),
563 564 ('Content-Type', '%s' % content_type),
564 565 ]
565 566
566 567 for header in headers:
567 assert header in response.headers.items()
568 assert header in list(response.headers.items())
569
570 def test_archival_at_path(self, backend):
571 backend.enable_downloads()
572 commit = backend.repo.get_commit(commit_idx=190)
573 at_path = 'vcs'
574
575 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
576 path_sha = get_path_sha(at_path)
577 filename = get_archive_name(backend.repo_name, commit_sha=commit.short_id, ext=extension, path_sha=path_sha)
578
579 fname = commit.raw_id + extension
580 response = self.app.get(
581 route_path('repo_archivefile',
582 repo_name=backend.repo_name,
583 fname=fname, params={'at_path': at_path}))
584
585 assert response.status == '200 OK'
586 headers = [
587 ('Content-Disposition', 'attachment; filename=%s' % filename),
588 ('Content-Type', '%s' % content_type),
589 ]
590
591 for header in headers:
592 assert header in list(response.headers.items())
568 593
569 594 @pytest.mark.parametrize('arch_ext',[
570 595 'tar', 'rar', 'x', '..ax', '.zipz', 'tar.gz.tar'])
571 596 def test_archival_wrong_ext(self, backend, arch_ext):
572 597 backend.enable_downloads()
573 598 commit = backend.repo.get_commit(commit_idx=173)
574 599
575 600 fname = commit.raw_id + '.' + arch_ext
576 601
577 602 response = self.app.get(
578 603 route_path('repo_archivefile',
579 604 repo_name=backend.repo_name,
580 605 fname=fname))
581 606 response.mustcontain(
582 607 'Unknown archive type for: `{}`'.format(fname))
583 608
584 609 @pytest.mark.parametrize('commit_id', [
585 610 '00x000000', 'tar', 'wrong', '@$@$42413232', '232dffcd'])
586 611 def test_archival_wrong_commit_id(self, backend, commit_id):
587 612 backend.enable_downloads()
588 613 fname = '%s.zip' % commit_id
589 614
590 615 response = self.app.get(
591 616 route_path('repo_archivefile',
592 617 repo_name=backend.repo_name,
593 618 fname=fname))
594 619 response.mustcontain('Unknown commit_id')
595 620
596 621
597 622 @pytest.mark.usefixtures("app")
598 623 class TestFilesDiff(object):
599 624
600 625 @pytest.mark.parametrize("diff", ['diff', 'download', 'raw'])
601 626 def test_file_full_diff(self, backend, diff):
602 627 commit1 = backend.repo.get_commit(commit_idx=-1)
603 628 commit2 = backend.repo.get_commit(commit_idx=-2)
604 629
605 630 response = self.app.get(
606 631 route_path('repo_files_diff',
607 632 repo_name=backend.repo_name,
608 633 f_path='README'),
609 634 params={
610 635 'diff1': commit2.raw_id,
611 636 'diff2': commit1.raw_id,
612 637 'fulldiff': '1',
613 638 'diff': diff,
614 639 })
615 640
616 641 if diff == 'diff':
617 642 # use redirect since this is OLD view redirecting to compare page
618 643 response = response.follow()
619 644
620 645 # It's a symlink to README.rst
621 646 response.mustcontain('README.rst')
622 647 response.mustcontain('No newline at end of file')
623 648
624 649 def test_file_binary_diff(self, backend):
625 650 commits = [
626 651 {'message': 'First commit'},
627 652 {'message': 'Commit with binary',
628 653 'added': [nodes.FileNode(b'file.bin', content='\0BINARY\0')]},
629 654 ]
630 655 repo = backend.create_repo(commits=commits)
631 656
632 657 response = self.app.get(
633 658 route_path('repo_files_diff',
634 659 repo_name=backend.repo_name,
635 660 f_path='file.bin'),
636 661 params={
637 662 'diff1': repo.get_commit(commit_idx=0).raw_id,
638 663 'diff2': repo.get_commit(commit_idx=1).raw_id,
639 664 'fulldiff': '1',
640 665 'diff': 'diff',
641 666 })
642 667 # use redirect since this is OLD view redirecting to compare page
643 668 response = response.follow()
644 669 response.mustcontain('Collapse 1 commit')
645 670 file_changes = (1, 0, 0)
646 671
647 672 compare_page = ComparePage(response)
648 673 compare_page.contains_change_summary(*file_changes)
649 674
650 675 if backend.alias == 'svn':
651 676 response.mustcontain('new file 10644')
652 677 # TODO(marcink): SVN doesn't yet detect binary changes
653 678 else:
654 679 response.mustcontain('new file 100644')
655 680 response.mustcontain('binary diff hidden')
656 681
657 682 def test_diff_2way(self, backend):
658 683 commit1 = backend.repo.get_commit(commit_idx=-1)
659 684 commit2 = backend.repo.get_commit(commit_idx=-2)
660 685 response = self.app.get(
661 686 route_path('repo_files_diff_2way_redirect',
662 687 repo_name=backend.repo_name,
663 688 f_path='README'),
664 689 params={
665 690 'diff1': commit2.raw_id,
666 691 'diff2': commit1.raw_id,
667 692 })
668 693 # use redirect since this is OLD view redirecting to compare page
669 694 response = response.follow()
670 695
671 696 # It's a symlink to README.rst
672 697 response.mustcontain('README.rst')
673 698 response.mustcontain('No newline at end of file')
674 699
675 700 def test_requires_one_commit_id(self, backend, autologin_user):
676 701 response = self.app.get(
677 702 route_path('repo_files_diff',
678 703 repo_name=backend.repo_name,
679 704 f_path='README.rst'),
680 705 status=400)
681 706 response.mustcontain(
682 707 'Need query parameter', 'diff1', 'diff2', 'to generate a diff.')
683 708
684 709 def test_returns_no_files_if_file_does_not_exist(self, vcsbackend):
685 710 repo = vcsbackend.repo
686 711 response = self.app.get(
687 712 route_path('repo_files_diff',
688 713 repo_name=repo.name,
689 714 f_path='does-not-exist-in-any-commit'),
690 715 params={
691 716 'diff1': repo[0].raw_id,
692 717 'diff2': repo[1].raw_id
693 718 })
694 719
695 720 response = response.follow()
696 721 response.mustcontain('No files')
697 722
698 723 def test_returns_redirect_if_file_not_changed(self, backend):
699 724 commit = backend.repo.get_commit(commit_idx=-1)
700 725 response = self.app.get(
701 726 route_path('repo_files_diff_2way_redirect',
702 727 repo_name=backend.repo_name,
703 728 f_path='README'),
704 729 params={
705 730 'diff1': commit.raw_id,
706 731 'diff2': commit.raw_id,
707 732 })
708 733
709 734 response = response.follow()
710 735 response.mustcontain('No files')
711 736 response.mustcontain('No commits in this compare')
712 737
713 738 def test_supports_diff_to_different_path_svn(self, backend_svn):
714 739 #TODO: check this case
715 740 return
716 741
717 742 repo = backend_svn['svn-simple-layout'].scm_instance()
718 743 commit_id_1 = '24'
719 744 commit_id_2 = '26'
720 745
721 746 response = self.app.get(
722 747 route_path('repo_files_diff',
723 748 repo_name=backend_svn.repo_name,
724 749 f_path='trunk/example.py'),
725 750 params={
726 751 'diff1': 'tags/v0.2/example.py@' + commit_id_1,
727 752 'diff2': commit_id_2,
728 753 })
729 754
730 755 response = response.follow()
731 756 response.mustcontain(
732 757 # diff contains this
733 758 "Will print out a useful message on invocation.")
734 759
735 760 # Note: Expecting that we indicate the user what's being compared
736 761 response.mustcontain("trunk/example.py")
737 762 response.mustcontain("tags/v0.2/example.py")
738 763
739 764 def test_show_rev_redirects_to_svn_path(self, backend_svn):
740 765 #TODO: check this case
741 766 return
742 767
743 768 repo = backend_svn['svn-simple-layout'].scm_instance()
744 769 commit_id = repo[-1].raw_id
745 770
746 771 response = self.app.get(
747 772 route_path('repo_files_diff',
748 773 repo_name=backend_svn.repo_name,
749 774 f_path='trunk/example.py'),
750 775 params={
751 776 'diff1': 'branches/argparse/example.py@' + commit_id,
752 777 'diff2': commit_id,
753 778 },
754 779 status=302)
755 780 response = response.follow()
756 781 assert response.headers['Location'].endswith(
757 782 'svn-svn-simple-layout/files/26/branches/argparse/example.py')
758 783
759 784 def test_show_rev_and_annotate_redirects_to_svn_path(self, backend_svn):
760 785 #TODO: check this case
761 786 return
762 787
763 788 repo = backend_svn['svn-simple-layout'].scm_instance()
764 789 commit_id = repo[-1].raw_id
765 790 response = self.app.get(
766 791 route_path('repo_files_diff',
767 792 repo_name=backend_svn.repo_name,
768 793 f_path='trunk/example.py'),
769 794 params={
770 795 'diff1': 'branches/argparse/example.py@' + commit_id,
771 796 'diff2': commit_id,
772 797 'show_rev': 'Show at Revision',
773 798 'annotate': 'true',
774 799 },
775 800 status=302)
776 801 response = response.follow()
777 802 assert response.headers['Location'].endswith(
778 803 'svn-svn-simple-layout/annotate/26/branches/argparse/example.py')
779 804
780 805
781 806 @pytest.mark.usefixtures("app", "autologin_user")
782 807 class TestModifyFilesWithWebInterface(object):
783 808
784 809 def test_add_file_view(self, backend):
785 810 self.app.get(
786 811 route_path('repo_files_add_file',
787 812 repo_name=backend.repo_name,
788 813 commit_id='tip', f_path='/')
789 814 )
790 815
791 816 @pytest.mark.xfail_backends("svn", reason="Depends on online editing")
792 817 def test_add_file_into_repo_missing_content(self, backend, csrf_token):
793 818 backend.create_repo()
794 819 filename = 'init.py'
795 820 response = self.app.post(
796 821 route_path('repo_files_create_file',
797 822 repo_name=backend.repo_name,
798 823 commit_id='tip', f_path='/'),
799 824 params={
800 825 'content': "",
801 826 'filename': filename,
802 827 'csrf_token': csrf_token,
803 828 },
804 829 status=302)
805 830 expected_msg = 'Successfully committed new file `{}`'.format(os.path.join(filename))
806 831 assert_session_flash(response, expected_msg)
807 832
808 833 def test_add_file_into_repo_missing_filename(self, backend, csrf_token):
809 834 commit_id = backend.repo.get_commit().raw_id
810 835 response = self.app.post(
811 836 route_path('repo_files_create_file',
812 837 repo_name=backend.repo_name,
813 838 commit_id=commit_id, f_path='/'),
814 839 params={
815 840 'content': "foo",
816 841 'csrf_token': csrf_token,
817 842 },
818 843 status=302)
819 844
820 845 assert_session_flash(response, 'No filename specified')
821 846
822 847 def test_add_file_into_repo_errors_and_no_commits(
823 848 self, backend, csrf_token):
824 849 repo = backend.create_repo()
825 850 # Create a file with no filename, it will display an error but
826 851 # the repo has no commits yet
827 852 response = self.app.post(
828 853 route_path('repo_files_create_file',
829 854 repo_name=repo.repo_name,
830 855 commit_id='tip', f_path='/'),
831 856 params={
832 857 'content': "foo",
833 858 'csrf_token': csrf_token,
834 859 },
835 860 status=302)
836 861
837 862 assert_session_flash(response, 'No filename specified')
838 863
839 864 # Not allowed, redirect to the summary
840 865 redirected = response.follow()
841 866 summary_url = h.route_path('repo_summary', repo_name=repo.repo_name)
842 867
843 868 # As there are no commits, displays the summary page with the error of
844 869 # creating a file with no filename
845 870
846 871 assert redirected.request.path == summary_url
847 872
848 873 @pytest.mark.parametrize("filename, clean_filename", [
849 874 ('/abs/foo', 'abs/foo'),
850 875 ('../rel/foo', 'rel/foo'),
851 876 ('file/../foo/foo', 'file/foo/foo'),
852 877 ])
853 878 def test_add_file_into_repo_bad_filenames(self, filename, clean_filename, backend, csrf_token):
854 879 repo = backend.create_repo()
855 880 commit_id = repo.get_commit().raw_id
856 881
857 882 response = self.app.post(
858 883 route_path('repo_files_create_file',
859 884 repo_name=repo.repo_name,
860 885 commit_id=commit_id, f_path='/'),
861 886 params={
862 887 'content': "foo",
863 888 'filename': filename,
864 889 'csrf_token': csrf_token,
865 890 },
866 891 status=302)
867 892
868 893 expected_msg = 'Successfully committed new file `{}`'.format(clean_filename)
869 894 assert_session_flash(response, expected_msg)
870 895
871 896 @pytest.mark.parametrize("cnt, filename, content", [
872 897 (1, 'foo.txt', "Content"),
873 898 (2, 'dir/foo.rst', "Content"),
874 899 (3, 'dir/foo-second.rst', "Content"),
875 900 (4, 'rel/dir/foo.bar', "Content"),
876 901 ])
877 902 def test_add_file_into_empty_repo(self, cnt, filename, content, backend, csrf_token):
878 903 repo = backend.create_repo()
879 904 commit_id = repo.get_commit().raw_id
880 905 response = self.app.post(
881 906 route_path('repo_files_create_file',
882 907 repo_name=repo.repo_name,
883 908 commit_id=commit_id, f_path='/'),
884 909 params={
885 910 'content': content,
886 911 'filename': filename,
887 912 'csrf_token': csrf_token,
888 913 },
889 914 status=302)
890 915
891 916 expected_msg = 'Successfully committed new file `{}`'.format(filename)
892 917 assert_session_flash(response, expected_msg)
893 918
894 919 def test_edit_file_view(self, backend):
895 920 response = self.app.get(
896 921 route_path('repo_files_edit_file',
897 922 repo_name=backend.repo_name,
898 923 commit_id=backend.default_head_id,
899 924 f_path='vcs/nodes.py'),
900 925 status=200)
901 926 response.mustcontain("Module holding everything related to vcs nodes.")
902 927
903 928 def test_edit_file_view_not_on_branch(self, backend):
904 929 repo = backend.create_repo()
905 930 backend.ensure_file(b"vcs/nodes.py")
906 931
907 932 response = self.app.get(
908 933 route_path('repo_files_edit_file',
909 934 repo_name=repo.repo_name,
910 935 commit_id='tip',
911 936 f_path='vcs/nodes.py'),
912 937 status=302)
913 938 assert_session_flash(
914 939 response, 'Cannot modify file. Given commit `tip` is not head of a branch.')
915 940
916 941 def test_edit_file_view_commit_changes(self, backend, csrf_token):
917 942 repo = backend.create_repo()
918 943 backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'")
919 944
920 945 response = self.app.post(
921 946 route_path('repo_files_update_file',
922 947 repo_name=repo.repo_name,
923 948 commit_id=backend.default_head_id,
924 949 f_path='vcs/nodes.py'),
925 950 params={
926 951 'content': "print 'hello world'",
927 952 'message': 'I committed',
928 953 'filename': "vcs/nodes.py",
929 954 'csrf_token': csrf_token,
930 955 },
931 956 status=302)
932 957 assert_session_flash(
933 958 response, 'Successfully committed changes to file `vcs/nodes.py`')
934 959 tip = repo.get_commit(commit_idx=-1)
935 960 assert tip.message == 'I committed'
936 961
937 962 def test_edit_file_view_commit_changes_default_message(self, backend,
938 963 csrf_token):
939 964 repo = backend.create_repo()
940 965 backend.ensure_file(b"vcs/nodes.py", content=b"print 'hello'")
941 966
942 967 commit_id = (
943 968 backend.default_branch_name or
944 969 backend.repo.scm_instance().commit_ids[-1])
945 970
946 971 response = self.app.post(
947 972 route_path('repo_files_update_file',
948 973 repo_name=repo.repo_name,
949 974 commit_id=commit_id,
950 975 f_path='vcs/nodes.py'),
951 976 params={
952 977 'content': "print 'hello world'",
953 978 'message': '',
954 979 'filename': "vcs/nodes.py",
955 980 'csrf_token': csrf_token,
956 981 },
957 982 status=302)
958 983 assert_session_flash(
959 984 response, 'Successfully committed changes to file `vcs/nodes.py`')
960 985 tip = repo.get_commit(commit_idx=-1)
961 986 assert tip.message == 'Edited file vcs/nodes.py via RhodeCode Enterprise'
962 987
963 988 def test_delete_file_view(self, backend):
964 989 self.app.get(
965 990 route_path('repo_files_remove_file',
966 991 repo_name=backend.repo_name,
967 992 commit_id=backend.default_head_id,
968 993 f_path='vcs/nodes.py'),
969 994 status=200)
970 995
971 996 def test_delete_file_view_not_on_branch(self, backend):
972 997 repo = backend.create_repo()
973 998 backend.ensure_file(b'vcs/nodes.py')
974 999
975 1000 response = self.app.get(
976 1001 route_path('repo_files_remove_file',
977 1002 repo_name=repo.repo_name,
978 1003 commit_id='tip',
979 1004 f_path='vcs/nodes.py'),
980 1005 status=302)
981 1006 assert_session_flash(
982 1007 response, 'Cannot modify file. Given commit `tip` is not head of a branch.')
983 1008
984 1009 def test_delete_file_view_commit_changes(self, backend, csrf_token):
985 1010 repo = backend.create_repo()
986 1011 backend.ensure_file(b"vcs/nodes.py")
987 1012
988 1013 response = self.app.post(
989 1014 route_path('repo_files_delete_file',
990 1015 repo_name=repo.repo_name,
991 1016 commit_id=backend.default_head_id,
992 1017 f_path='vcs/nodes.py'),
993 1018 params={
994 1019 'message': 'i committed',
995 1020 'csrf_token': csrf_token,
996 1021 },
997 1022 status=302)
998 1023 assert_session_flash(
999 1024 response, 'Successfully deleted file `vcs/nodes.py`')
1000 1025
1001 1026
1002 1027 @pytest.mark.usefixtures("app")
1003 1028 class TestFilesViewOtherCases(object):
1004 1029
1005 1030 def test_access_empty_repo_redirect_to_summary_with_alert_write_perms(
1006 1031 self, backend_stub, autologin_regular_user, user_regular,
1007 1032 user_util):
1008 1033
1009 1034 repo = backend_stub.create_repo()
1010 1035 user_util.grant_user_permission_to_repo(
1011 1036 repo, user_regular, 'repository.write')
1012 1037 response = self.app.get(
1013 1038 route_path('repo_files',
1014 1039 repo_name=repo.repo_name,
1015 1040 commit_id='tip', f_path='/'))
1016 1041
1017 1042 repo_file_add_url = route_path(
1018 1043 'repo_files_add_file',
1019 1044 repo_name=repo.repo_name,
1020 1045 commit_id=0, f_path='')
1021 1046 add_new = f'<a class="alert-link" href="{repo_file_add_url}">add a new file</a>'
1022 1047
1023 1048 repo_file_upload_url = route_path(
1024 1049 'repo_files_upload_file',
1025 1050 repo_name=repo.repo_name,
1026 1051 commit_id=0, f_path='')
1027 1052 upload_new = f'<a class="alert-link" href="{repo_file_upload_url}">upload a new file</a>'
1028 1053
1029 1054 assert_session_flash(
1030 1055 response,
1031 1056 'There are no files yet. Click here to %s or %s.' % (add_new, upload_new)
1032 1057 )
1033 1058
1034 1059 def test_access_empty_repo_redirect_to_summary_with_alert_no_write_perms(
1035 1060 self, backend_stub, autologin_regular_user):
1036 1061 repo = backend_stub.create_repo()
1037 1062 # init session for anon user
1038 1063 route_path('repo_summary', repo_name=repo.repo_name)
1039 1064
1040 1065 repo_file_add_url = route_path(
1041 1066 'repo_files_add_file',
1042 1067 repo_name=repo.repo_name,
1043 1068 commit_id=0, f_path='')
1044 1069
1045 1070 response = self.app.get(
1046 1071 route_path('repo_files',
1047 1072 repo_name=repo.repo_name,
1048 1073 commit_id='tip', f_path='/'))
1049 1074
1050 1075 assert_session_flash(response, no_=repo_file_add_url)
1051 1076
1052 1077 @pytest.mark.parametrize('file_node', [
1053 1078 b'archive/file.zip',
1054 1079 b'diff/my-file.txt',
1055 1080 b'render.py',
1056 1081 b'render',
1057 1082 b'remove_file',
1058 1083 b'remove_file/to-delete.txt',
1059 1084 ])
1060 1085 def test_file_names_equal_to_routes_parts(self, backend, file_node):
1061 1086 backend.create_repo()
1062 1087 backend.ensure_file(file_node)
1063 1088
1064 1089 self.app.get(
1065 1090 route_path('repo_files',
1066 1091 repo_name=backend.repo_name,
1067 1092 commit_id='tip', f_path=safe_str(file_node)),
1068 1093 status=200)
1069 1094
1070 1095
1071 1096 class TestAdjustFilePathForSvn(object):
1072 1097 """
1073 1098 SVN specific adjustments of node history in RepoFilesView.
1074 1099 """
1075 1100
1076 1101 def test_returns_path_relative_to_matched_reference(self):
1077 1102 repo = self._repo(branches=['trunk'])
1078 1103 self.assert_file_adjustment('trunk/file', 'file', repo)
1079 1104
1080 1105 def test_does_not_modify_file_if_no_reference_matches(self):
1081 1106 repo = self._repo(branches=['trunk'])
1082 1107 self.assert_file_adjustment('notes/file', 'notes/file', repo)
1083 1108
1084 1109 def test_does_not_adjust_partial_directory_names(self):
1085 1110 repo = self._repo(branches=['trun'])
1086 1111 self.assert_file_adjustment('trunk/file', 'trunk/file', repo)
1087 1112
1088 1113 def test_is_robust_to_patterns_which_prefix_other_patterns(self):
1089 1114 repo = self._repo(branches=['trunk', 'trunk/new', 'trunk/old'])
1090 1115 self.assert_file_adjustment('trunk/new/file', 'file', repo)
1091 1116
1092 1117 def assert_file_adjustment(self, f_path, expected, repo):
1093 1118 result = RepoFilesView.adjust_file_path_for_svn(f_path, repo)
1094 1119 assert result == expected
1095 1120
1096 1121 def _repo(self, branches=None):
1097 1122 repo = mock.Mock()
1098 1123 repo.branches = OrderedDict((name, '0') for name in branches or [])
1099 1124 repo.tags = {}
1100 1125 return repo
@@ -1,1584 +1,1584 b''
1 1 # Copyright (C) 2011-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import itertools
20 20 import logging
21 21 import os
22 22 import collections
23 23 import urllib.request
24 24 import urllib.parse
25 25 import urllib.error
26 26 import pathlib
27 27
28 28 from pyramid.httpexceptions import HTTPNotFound, HTTPBadRequest, HTTPFound
29 29
30 30 from pyramid.renderers import render
31 31 from pyramid.response import Response
32 32
33 33 import rhodecode
34 34 from rhodecode.apps._base import RepoAppView
35 35
36 36
37 37 from rhodecode.lib import diffs, helpers as h, rc_cache
38 38 from rhodecode.lib import audit_logger
39 39 from rhodecode.lib.hash_utils import sha1_safe
40 40 from rhodecode.lib.rc_cache.archive_cache import get_archival_cache_store, get_archival_config, ReentrantLock
41 41 from rhodecode.lib.str_utils import safe_bytes
42 42 from rhodecode.lib.view_utils import parse_path_ref
43 43 from rhodecode.lib.exceptions import NonRelativePathError
44 44 from rhodecode.lib.codeblocks import (
45 45 filenode_as_lines_tokens, filenode_as_annotated_lines_tokens)
46 46 from rhodecode.lib.utils2 import convert_line_endings, detect_mode
47 47 from rhodecode.lib.type_utils import str2bool
48 48 from rhodecode.lib.str_utils import safe_str, safe_int
49 49 from rhodecode.lib.auth import (
50 50 LoginRequired, HasRepoPermissionAnyDecorator, CSRFRequired)
51 51 from rhodecode.lib.vcs import path as vcspath
52 52 from rhodecode.lib.vcs.backends.base import EmptyCommit
53 53 from rhodecode.lib.vcs.conf import settings
54 54 from rhodecode.lib.vcs.nodes import FileNode
55 55 from rhodecode.lib.vcs.exceptions import (
56 56 RepositoryError, CommitDoesNotExistError, EmptyRepositoryError,
57 57 ImproperArchiveTypeError, VCSError, NodeAlreadyExistsError,
58 58 NodeDoesNotExistError, CommitError, NodeError)
59 59
60 60 from rhodecode.model.scm import ScmModel
61 61 from rhodecode.model.db import Repository
62 62
63 63 log = logging.getLogger(__name__)
64 64
65 65
66 66 def get_archive_name(db_repo_name, commit_sha, ext, subrepos=False, path_sha='', with_hash=True):
67 67 # original backward compat name of archive
68 68 clean_name = safe_str(db_repo_name.replace('/', '_'))
69 69
70 70 # e.g vcsserver-sub-1-abcfdef-archive-all.zip
71 71 # vcsserver-sub-0-abcfdef-COMMIT_SHA-PATH_SHA.zip
72 72
73 73 sub_repo = 'sub-1' if subrepos else 'sub-0'
74 74 commit = commit_sha if with_hash else 'archive'
75 75 path_marker = (path_sha if with_hash else '') or 'all'
76 76 archive_name = f'{clean_name}-{sub_repo}-{commit}-{path_marker}{ext}'
77 77
78 78 return archive_name
79 79
80 80
81 81 def get_path_sha(at_path):
82 82 return safe_str(sha1_safe(at_path)[:8])
83 83
84 84
85 85 def _get_archive_spec(fname):
86 86 log.debug('Detecting archive spec for: `%s`', fname)
87 87
88 88 fileformat = None
89 89 ext = None
90 90 content_type = None
91 91 for a_type, content_type, extension in settings.ARCHIVE_SPECS:
92 92
93 93 if fname.endswith(extension):
94 94 fileformat = a_type
95 95 log.debug('archive is of type: %s', fileformat)
96 96 ext = extension
97 97 break
98 98
99 99 if not fileformat:
100 100 raise ValueError()
101 101
102 102 # left over part of whole fname is the commit
103 103 commit_id = fname[:-len(ext)]
104 104
105 105 return commit_id, ext, fileformat, content_type
106 106
107 107
108 108 class RepoFilesView(RepoAppView):
109 109
110 110 @staticmethod
111 111 def adjust_file_path_for_svn(f_path, repo):
112 112 """
113 113 Computes the relative path of `f_path`.
114 114
115 115 This is mainly based on prefix matching of the recognized tags and
116 116 branches in the underlying repository.
117 117 """
118 118 tags_and_branches = itertools.chain(
119 119 repo.branches.keys(),
120 120 repo.tags.keys())
121 121 tags_and_branches = sorted(tags_and_branches, key=len, reverse=True)
122 122
123 123 for name in tags_and_branches:
124 124 if f_path.startswith(f'{name}/'):
125 125 f_path = vcspath.relpath(f_path, name)
126 126 break
127 127 return f_path
128 128
129 129 def load_default_context(self):
130 130 c = self._get_local_tmpl_context(include_app_defaults=True)
131 131 c.rhodecode_repo = self.rhodecode_vcs_repo
132 132 c.enable_downloads = self.db_repo.enable_downloads
133 133 return c
134 134
135 135 def _ensure_not_locked(self, commit_id='tip'):
136 136 _ = self.request.translate
137 137
138 138 repo = self.db_repo
139 139 if repo.enable_locking and repo.locked[0]:
140 140 h.flash(_('This repository has been locked by %s on %s')
141 141 % (h.person_by_id(repo.locked[0]),
142 142 h.format_date(h.time_to_datetime(repo.locked[1]))),
143 143 'warning')
144 144 files_url = h.route_path(
145 145 'repo_files:default_path',
146 146 repo_name=self.db_repo_name, commit_id=commit_id)
147 147 raise HTTPFound(files_url)
148 148
149 149 def forbid_non_head(self, is_head, f_path, commit_id='tip', json_mode=False):
150 150 _ = self.request.translate
151 151
152 152 if not is_head:
153 153 message = _('Cannot modify file. '
154 154 'Given commit `{}` is not head of a branch.').format(commit_id)
155 155 h.flash(message, category='warning')
156 156
157 157 if json_mode:
158 158 return message
159 159
160 160 files_url = h.route_path(
161 161 'repo_files', repo_name=self.db_repo_name, commit_id=commit_id,
162 162 f_path=f_path)
163 163 raise HTTPFound(files_url)
164 164
165 165 def check_branch_permission(self, branch_name, commit_id='tip', json_mode=False):
166 166 _ = self.request.translate
167 167
168 168 rule, branch_perm = self._rhodecode_user.get_rule_and_branch_permission(
169 169 self.db_repo_name, branch_name)
170 170 if branch_perm and branch_perm not in ['branch.push', 'branch.push_force']:
171 171 message = _('Branch `{}` changes forbidden by rule {}.').format(
172 172 h.escape(branch_name), h.escape(rule))
173 173 h.flash(message, 'warning')
174 174
175 175 if json_mode:
176 176 return message
177 177
178 178 files_url = h.route_path(
179 179 'repo_files:default_path', repo_name=self.db_repo_name, commit_id=commit_id)
180 180
181 181 raise HTTPFound(files_url)
182 182
183 183 def _get_commit_and_path(self):
184 184 default_commit_id = self.db_repo.landing_ref_name
185 185 default_f_path = '/'
186 186
187 187 commit_id = self.request.matchdict.get(
188 188 'commit_id', default_commit_id)
189 189 f_path = self._get_f_path(self.request.matchdict, default_f_path)
190 190 return commit_id, f_path
191 191
192 192 def _get_default_encoding(self, c):
193 193 enc_list = getattr(c, 'default_encodings', [])
194 194 return enc_list[0] if enc_list else 'UTF-8'
195 195
196 196 def _get_commit_or_redirect(self, commit_id, redirect_after=True):
197 197 """
198 198 This is a safe way to get commit. If an error occurs it redirects to
199 199 tip with proper message
200 200
201 201 :param commit_id: id of commit to fetch
202 202 :param redirect_after: toggle redirection
203 203 """
204 204 _ = self.request.translate
205 205
206 206 try:
207 207 return self.rhodecode_vcs_repo.get_commit(commit_id)
208 208 except EmptyRepositoryError:
209 209 if not redirect_after:
210 210 return None
211 211
212 212 add_new = upload_new = ""
213 213 if h.HasRepoPermissionAny(
214 214 'repository.write', 'repository.admin')(self.db_repo_name):
215 215 _url = h.route_path(
216 216 'repo_files_add_file',
217 217 repo_name=self.db_repo_name, commit_id=0, f_path='')
218 218 add_new = h.link_to(
219 219 _('add a new file'), _url, class_="alert-link")
220 220
221 221 _url_upld = h.route_path(
222 222 'repo_files_upload_file',
223 223 repo_name=self.db_repo_name, commit_id=0, f_path='')
224 224 upload_new = h.link_to(
225 225 _('upload a new file'), _url_upld, class_="alert-link")
226 226
227 227 h.flash(h.literal(
228 228 _('There are no files yet. Click here to %s or %s.') % (add_new, upload_new)), category='warning')
229 229 raise HTTPFound(
230 230 h.route_path('repo_summary', repo_name=self.db_repo_name))
231 231
232 232 except (CommitDoesNotExistError, LookupError) as e:
233 233 msg = _('No such commit exists for this repository. Commit: {}').format(commit_id)
234 234 h.flash(msg, category='error')
235 235 raise HTTPNotFound()
236 236 except RepositoryError as e:
237 237 h.flash(h.escape(safe_str(e)), category='error')
238 238 raise HTTPNotFound()
239 239
240 240 def _get_filenode_or_redirect(self, commit_obj, path, pre_load=None):
241 241 """
242 242 Returns file_node, if error occurs or given path is directory,
243 243 it'll redirect to top level path
244 244 """
245 245 _ = self.request.translate
246 246
247 247 try:
248 248 file_node = commit_obj.get_node(path, pre_load=pre_load)
249 249 if file_node.is_dir():
250 250 raise RepositoryError('The given path is a directory')
251 251 except CommitDoesNotExistError:
252 252 log.exception('No such commit exists for this repository')
253 253 h.flash(_('No such commit exists for this repository'), category='error')
254 254 raise HTTPNotFound()
255 255 except RepositoryError as e:
256 256 log.warning('Repository error while fetching filenode `%s`. Err:%s', path, e)
257 257 h.flash(h.escape(safe_str(e)), category='error')
258 258 raise HTTPNotFound()
259 259
260 260 return file_node
261 261
262 262 def _is_valid_head(self, commit_id, repo, landing_ref):
263 263 branch_name = sha_commit_id = ''
264 264 is_head = False
265 265 log.debug('Checking if commit_id `%s` is a head for %s.', commit_id, repo)
266 266
267 267 for _branch_name, branch_commit_id in repo.branches.items():
268 268 # simple case we pass in branch name, it's a HEAD
269 269 if commit_id == _branch_name:
270 270 is_head = True
271 271 branch_name = _branch_name
272 272 sha_commit_id = branch_commit_id
273 273 break
274 274 # case when we pass in full sha commit_id, which is a head
275 275 elif commit_id == branch_commit_id:
276 276 is_head = True
277 277 branch_name = _branch_name
278 278 sha_commit_id = branch_commit_id
279 279 break
280 280
281 281 if h.is_svn(repo) and not repo.is_empty():
282 282 # Note: Subversion only has one head.
283 283 if commit_id == repo.get_commit(commit_idx=-1).raw_id:
284 284 is_head = True
285 285 return branch_name, sha_commit_id, is_head
286 286
287 287 # checked branches, means we only need to try to get the branch/commit_sha
288 288 if repo.is_empty():
289 289 is_head = True
290 290 branch_name = landing_ref
291 291 sha_commit_id = EmptyCommit().raw_id
292 292 else:
293 293 commit = repo.get_commit(commit_id=commit_id)
294 294 if commit:
295 295 branch_name = commit.branch
296 296 sha_commit_id = commit.raw_id
297 297
298 298 return branch_name, sha_commit_id, is_head
299 299
300 300 def _get_tree_at_commit(self, c, commit_id, f_path, full_load=False, at_rev=None):
301 301
302 302 repo_id = self.db_repo.repo_id
303 303 force_recache = self.get_recache_flag()
304 304
305 305 cache_seconds = safe_int(
306 306 rhodecode.CONFIG.get('rc_cache.cache_repo.expiration_time'))
307 307 cache_on = not force_recache and cache_seconds > 0
308 308 log.debug(
309 309 'Computing FILE TREE for repo_id %s commit_id `%s` and path `%s`'
310 310 'with caching: %s[TTL: %ss]' % (
311 311 repo_id, commit_id, f_path, cache_on, cache_seconds or 0))
312 312
313 313 cache_namespace_uid = f'repo.{rc_cache.FILE_TREE_CACHE_VER}.{repo_id}'
314 314 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
315 315
316 316 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=cache_on)
317 317 def compute_file_tree(_name_hash, _repo_id, _commit_id, _f_path, _full_load, _at_rev):
318 log.debug('Generating cached file tree at ver:%s for repo_id: %s, %s, %s',
318 log.debug('Generating cached file tree at for repo_id: %s, %s, %s',
319 319 _repo_id, _commit_id, _f_path)
320 320
321 321 c.full_load = _full_load
322 322 return render(
323 323 'rhodecode:templates/files/files_browser_tree.mako',
324 324 self._get_template_context(c), self.request, _at_rev)
325 325
326 326 return compute_file_tree(
327 327 self.db_repo.repo_name_hash, self.db_repo.repo_id, commit_id, f_path, full_load, at_rev)
328 328
329 329 def create_pure_path(self, *parts):
330 330 # Split paths and sanitize them, removing any ../ etc
331 331 sanitized_path = [
332 332 x for x in pathlib.PurePath(*parts).parts
333 333 if x not in ['.', '..']]
334 334
335 335 pure_path = pathlib.PurePath(*sanitized_path)
336 336 return pure_path
337 337
338 338 def _is_lf_enabled(self, target_repo):
339 339 lf_enabled = False
340 340
341 341 lf_key_for_vcs_map = {
342 342 'hg': 'extensions_largefiles',
343 343 'git': 'vcs_git_lfs_enabled'
344 344 }
345 345
346 346 lf_key_for_vcs = lf_key_for_vcs_map.get(target_repo.repo_type)
347 347
348 348 if lf_key_for_vcs:
349 349 lf_enabled = self._get_repo_setting(target_repo, lf_key_for_vcs)
350 350
351 351 return lf_enabled
352 352
353 353 @LoginRequired()
354 354 @HasRepoPermissionAnyDecorator(
355 355 'repository.read', 'repository.write', 'repository.admin')
356 356 def repo_archivefile(self):
357 357 # archive cache config
358 358 from rhodecode import CONFIG
359 359 _ = self.request.translate
360 360 self.load_default_context()
361 361 default_at_path = '/'
362 362 fname = self.request.matchdict['fname']
363 363 subrepos = self.request.GET.get('subrepos') == 'true'
364 364 with_hash = str2bool(self.request.GET.get('with_hash', '1'))
365 365 at_path = self.request.GET.get('at_path') or default_at_path
366 366
367 367 if not self.db_repo.enable_downloads:
368 368 return Response(_('Downloads disabled'))
369 369
370 370 try:
371 371 commit_id, ext, fileformat, content_type = \
372 372 _get_archive_spec(fname)
373 373 except ValueError:
374 374 return Response(_('Unknown archive type for: `{}`').format(
375 375 h.escape(fname)))
376 376
377 377 try:
378 378 commit = self.rhodecode_vcs_repo.get_commit(commit_id)
379 379 except CommitDoesNotExistError:
380 380 return Response(_('Unknown commit_id {}').format(
381 381 h.escape(commit_id)))
382 382 except EmptyRepositoryError:
383 383 return Response(_('Empty repository'))
384 384
385 385 # we used a ref, or a shorter version, lets redirect client ot use explicit hash
386 386 if commit_id != commit.raw_id:
387 387 fname=f'{commit.raw_id}{ext}'
388 388 raise HTTPFound(self.request.current_route_path(fname=fname))
389 389
390 390 try:
391 391 at_path = commit.get_node(at_path).path or default_at_path
392 392 except Exception:
393 393 return Response(_('No node at path {} for this repository').format(h.escape(at_path)))
394 394
395 395 path_sha = get_path_sha(at_path)
396 396
397 397 # used for cache etc, consistent unique archive name
398 398 archive_name_key = get_archive_name(
399 399 self.db_repo_name, commit_sha=commit.short_id, ext=ext, subrepos=subrepos,
400 400 path_sha=path_sha, with_hash=True)
401 401
402 402 if not with_hash:
403 403 path_sha = ''
404 404
405 405 # what end client gets served
406 406 response_archive_name = get_archive_name(
407 407 self.db_repo_name, commit_sha=commit.short_id, ext=ext, subrepos=subrepos,
408 408 path_sha=path_sha, with_hash=with_hash)
409 409
410 410 # remove extension from our archive directory name
411 411 archive_dir_name = response_archive_name[:-len(ext)]
412 412
413 413 archive_cache_disable = self.request.GET.get('no_cache')
414 414
415 415 d_cache = get_archival_cache_store(config=CONFIG)
416 416 # NOTE: we get the config to pass to a call to lazy-init the SAME type of cache on vcsserver
417 417 d_cache_conf = get_archival_config(config=CONFIG)
418 418
419 419 reentrant_lock_key = archive_name_key + '.lock'
420 420 with ReentrantLock(d_cache, reentrant_lock_key):
421 421 # This is also a cache key
422 422 use_cached_archive = False
423 423 if archive_name_key in d_cache and not archive_cache_disable:
424 424 reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True)
425 425 use_cached_archive = True
426 426 log.debug('Found cached archive as key=%s tag=%s, serving archive from cache reader=%s',
427 427 archive_name_key, tag, reader.name)
428 428 else:
429 429 reader = None
430 430 log.debug('Archive with key=%s is not yet cached, creating one now...', archive_name_key)
431 431
432 432 # generate new archive, as previous was not found in the cache
433 433 if not reader:
434 434 # first remove expired items, before generating a new one :)
435 435 # we di this manually because automatic eviction is disabled
436 436 d_cache.cull(retry=True)
437 437
438 438 try:
439 439 commit.archive_repo(archive_name_key, archive_dir_name=archive_dir_name,
440 440 kind=fileformat, subrepos=subrepos,
441 441 archive_at_path=at_path, cache_config=d_cache_conf)
442 442 except ImproperArchiveTypeError:
443 443 return _('Unknown archive type')
444 444
445 445 reader, tag = d_cache.get(archive_name_key, read=True, tag=True, retry=True)
446 446
447 447 if not reader:
448 448 raise ValueError('archive cache reader is empty, failed to fetch file from distributed archive cache')
449 449
450 450 def archive_iterator(_reader):
451 451 while 1:
452 452 data = _reader.read(1024)
453 453 if not data:
454 454 break
455 455 yield data
456 456
457 457 response = Response(app_iter=archive_iterator(reader))
458 458 response.content_disposition = f'attachment; filename={response_archive_name}'
459 459 response.content_type = str(content_type)
460 460
461 461 try:
462 462 return response
463 463 finally:
464 464 # store download action
465 465 audit_logger.store_web(
466 466 'repo.archive.download', action_data={
467 467 'user_agent': self.request.user_agent,
468 468 'archive_name': archive_name_key,
469 469 'archive_spec': fname,
470 470 'archive_cached': use_cached_archive},
471 471 user=self._rhodecode_user,
472 472 repo=self.db_repo,
473 473 commit=True
474 474 )
475 475
476 476 def _get_file_node(self, commit_id, f_path):
477 477 if commit_id not in ['', None, 'None', '0' * 12, '0' * 40]:
478 478 commit = self.rhodecode_vcs_repo.get_commit(commit_id=commit_id)
479 479 try:
480 480 node = commit.get_node(f_path)
481 481 if node.is_dir():
482 482 raise NodeError(f'{node} path is a {type(node)} not a file')
483 483 except NodeDoesNotExistError:
484 484 commit = EmptyCommit(
485 485 commit_id=commit_id,
486 486 idx=commit.idx,
487 487 repo=commit.repository,
488 488 alias=commit.repository.alias,
489 489 message=commit.message,
490 490 author=commit.author,
491 491 date=commit.date)
492 492 node = FileNode(safe_bytes(f_path), b'', commit=commit)
493 493 else:
494 494 commit = EmptyCommit(
495 495 repo=self.rhodecode_vcs_repo,
496 496 alias=self.rhodecode_vcs_repo.alias)
497 497 node = FileNode(safe_bytes(f_path), b'', commit=commit)
498 498 return node
499 499
500 500 @LoginRequired()
501 501 @HasRepoPermissionAnyDecorator(
502 502 'repository.read', 'repository.write', 'repository.admin')
503 503 def repo_files_diff(self):
504 504 c = self.load_default_context()
505 505 f_path = self._get_f_path(self.request.matchdict)
506 506 diff1 = self.request.GET.get('diff1', '')
507 507 diff2 = self.request.GET.get('diff2', '')
508 508
509 509 path1, diff1 = parse_path_ref(diff1, default_path=f_path)
510 510
511 511 ignore_whitespace = str2bool(self.request.GET.get('ignorews'))
512 512 line_context = self.request.GET.get('context', 3)
513 513
514 514 if not any((diff1, diff2)):
515 515 h.flash(
516 516 'Need query parameter "diff1" or "diff2" to generate a diff.',
517 517 category='error')
518 518 raise HTTPBadRequest()
519 519
520 520 c.action = self.request.GET.get('diff')
521 521 if c.action not in ['download', 'raw']:
522 522 compare_url = h.route_path(
523 523 'repo_compare',
524 524 repo_name=self.db_repo_name,
525 525 source_ref_type='rev',
526 526 source_ref=diff1,
527 527 target_repo=self.db_repo_name,
528 528 target_ref_type='rev',
529 529 target_ref=diff2,
530 530 _query=dict(f_path=f_path))
531 531 # redirect to new view if we render diff
532 532 raise HTTPFound(compare_url)
533 533
534 534 try:
535 535 node1 = self._get_file_node(diff1, path1)
536 536 node2 = self._get_file_node(diff2, f_path)
537 537 except (RepositoryError, NodeError):
538 538 log.exception("Exception while trying to get node from repository")
539 539 raise HTTPFound(
540 540 h.route_path('repo_files', repo_name=self.db_repo_name,
541 541 commit_id='tip', f_path=f_path))
542 542
543 543 if all(isinstance(node.commit, EmptyCommit)
544 544 for node in (node1, node2)):
545 545 raise HTTPNotFound()
546 546
547 547 c.commit_1 = node1.commit
548 548 c.commit_2 = node2.commit
549 549
550 550 if c.action == 'download':
551 551 _diff = diffs.get_gitdiff(node1, node2,
552 552 ignore_whitespace=ignore_whitespace,
553 553 context=line_context)
554 554 # NOTE: this was using diff_format='gitdiff'
555 555 diff = diffs.DiffProcessor(_diff, diff_format='newdiff')
556 556
557 557 response = Response(self.path_filter.get_raw_patch(diff))
558 558 response.content_type = 'text/plain'
559 559 response.content_disposition = (
560 560 f'attachment; filename={f_path}_{diff1}_vs_{diff2}.diff'
561 561 )
562 562 charset = self._get_default_encoding(c)
563 563 if charset:
564 564 response.charset = charset
565 565 return response
566 566
567 567 elif c.action == 'raw':
568 568 _diff = diffs.get_gitdiff(node1, node2,
569 569 ignore_whitespace=ignore_whitespace,
570 570 context=line_context)
571 571 # NOTE: this was using diff_format='gitdiff'
572 572 diff = diffs.DiffProcessor(_diff, diff_format='newdiff')
573 573
574 574 response = Response(self.path_filter.get_raw_patch(diff))
575 575 response.content_type = 'text/plain'
576 576 charset = self._get_default_encoding(c)
577 577 if charset:
578 578 response.charset = charset
579 579 return response
580 580
581 581 # in case we ever end up here
582 582 raise HTTPNotFound()
583 583
584 584 @LoginRequired()
585 585 @HasRepoPermissionAnyDecorator(
586 586 'repository.read', 'repository.write', 'repository.admin')
587 587 def repo_files_diff_2way_redirect(self):
588 588 """
589 589 Kept only to make OLD links work
590 590 """
591 591 f_path = self._get_f_path_unchecked(self.request.matchdict)
592 592 diff1 = self.request.GET.get('diff1', '')
593 593 diff2 = self.request.GET.get('diff2', '')
594 594
595 595 if not any((diff1, diff2)):
596 596 h.flash(
597 597 'Need query parameter "diff1" or "diff2" to generate a diff.',
598 598 category='error')
599 599 raise HTTPBadRequest()
600 600
601 601 compare_url = h.route_path(
602 602 'repo_compare',
603 603 repo_name=self.db_repo_name,
604 604 source_ref_type='rev',
605 605 source_ref=diff1,
606 606 target_ref_type='rev',
607 607 target_ref=diff2,
608 608 _query=dict(f_path=f_path, diffmode='sideside',
609 609 target_repo=self.db_repo_name,))
610 610 raise HTTPFound(compare_url)
611 611
612 612 @LoginRequired()
613 613 def repo_files_default_commit_redirect(self):
614 614 """
615 615 Special page that redirects to the landing page of files based on the default
616 616 commit for repository
617 617 """
618 618 c = self.load_default_context()
619 619 ref_name = c.rhodecode_db_repo.landing_ref_name
620 620 landing_url = h.repo_files_by_ref_url(
621 621 c.rhodecode_db_repo.repo_name,
622 622 c.rhodecode_db_repo.repo_type,
623 623 f_path='',
624 624 ref_name=ref_name,
625 625 commit_id='tip',
626 626 query=dict(at=ref_name)
627 627 )
628 628
629 629 raise HTTPFound(landing_url)
630 630
631 631 @LoginRequired()
632 632 @HasRepoPermissionAnyDecorator(
633 633 'repository.read', 'repository.write', 'repository.admin')
634 634 def repo_files(self):
635 635 c = self.load_default_context()
636 636
637 637 view_name = getattr(self.request.matched_route, 'name', None)
638 638
639 639 c.annotate = view_name == 'repo_files:annotated'
640 640 # default is false, but .rst/.md files later are auto rendered, we can
641 641 # overwrite auto rendering by setting this GET flag
642 642 c.renderer = view_name == 'repo_files:rendered' or not self.request.GET.get('no-render', False)
643 643
644 644 commit_id, f_path = self._get_commit_and_path()
645 645
646 646 c.commit = self._get_commit_or_redirect(commit_id)
647 647 c.branch = self.request.GET.get('branch', None)
648 648 c.f_path = f_path
649 649 at_rev = self.request.GET.get('at')
650 650
651 651 # prev link
652 652 try:
653 653 prev_commit = c.commit.prev(c.branch)
654 654 c.prev_commit = prev_commit
655 655 c.url_prev = h.route_path(
656 656 'repo_files', repo_name=self.db_repo_name,
657 657 commit_id=prev_commit.raw_id, f_path=f_path)
658 658 if c.branch:
659 659 c.url_prev += '?branch=%s' % c.branch
660 660 except (CommitDoesNotExistError, VCSError):
661 661 c.url_prev = '#'
662 662 c.prev_commit = EmptyCommit()
663 663
664 664 # next link
665 665 try:
666 666 next_commit = c.commit.next(c.branch)
667 667 c.next_commit = next_commit
668 668 c.url_next = h.route_path(
669 669 'repo_files', repo_name=self.db_repo_name,
670 670 commit_id=next_commit.raw_id, f_path=f_path)
671 671 if c.branch:
672 672 c.url_next += '?branch=%s' % c.branch
673 673 except (CommitDoesNotExistError, VCSError):
674 674 c.url_next = '#'
675 675 c.next_commit = EmptyCommit()
676 676
677 677 # files or dirs
678 678 try:
679 679 c.file = c.commit.get_node(f_path, pre_load=['is_binary', 'size', 'data'])
680 680
681 681 c.file_author = True
682 682 c.file_tree = ''
683 683
684 684 # load file content
685 685 if c.file.is_file():
686 686 c.lf_node = {}
687 687
688 688 has_lf_enabled = self._is_lf_enabled(self.db_repo)
689 689 if has_lf_enabled:
690 690 c.lf_node = c.file.get_largefile_node()
691 691
692 692 c.file_source_page = 'true'
693 693 c.file_last_commit = c.file.last_commit
694 694
695 695 c.file_size_too_big = c.file.size > c.visual.cut_off_limit_file
696 696
697 697 if not (c.file_size_too_big or c.file.is_binary):
698 698 if c.annotate: # annotation has precedence over renderer
699 699 c.annotated_lines = filenode_as_annotated_lines_tokens(
700 700 c.file
701 701 )
702 702 else:
703 703 c.renderer = (
704 704 c.renderer and h.renderer_from_filename(c.file.path)
705 705 )
706 706 if not c.renderer:
707 707 c.lines = filenode_as_lines_tokens(c.file)
708 708
709 709 _branch_name, _sha_commit_id, is_head = \
710 710 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
711 711 landing_ref=self.db_repo.landing_ref_name)
712 712 c.on_branch_head = is_head
713 713
714 714 branch = c.commit.branch if (
715 715 c.commit.branch and '/' not in c.commit.branch) else None
716 716 c.branch_or_raw_id = branch or c.commit.raw_id
717 717 c.branch_name = c.commit.branch or h.short_id(c.commit.raw_id)
718 718
719 719 author = c.file_last_commit.author
720 720 c.authors = [[
721 721 h.email(author),
722 722 h.person(author, 'username_or_name_or_email'),
723 723 1
724 724 ]]
725 725
726 726 else: # load tree content at path
727 727 c.file_source_page = 'false'
728 728 c.authors = []
729 729 # this loads a simple tree without metadata to speed things up
730 730 # later via ajax we call repo_nodetree_full and fetch whole
731 731 c.file_tree = self._get_tree_at_commit(c, c.commit.raw_id, f_path, at_rev=at_rev)
732 732
733 733 c.readme_data, c.readme_file = \
734 734 self._get_readme_data(self.db_repo, c.visual.default_renderer,
735 735 c.commit.raw_id, f_path)
736 736
737 737 except RepositoryError as e:
738 738 h.flash(h.escape(safe_str(e)), category='error')
739 739 raise HTTPNotFound()
740 740
741 741 if self.request.environ.get('HTTP_X_PJAX'):
742 742 html = render('rhodecode:templates/files/files_pjax.mako',
743 743 self._get_template_context(c), self.request)
744 744 else:
745 745 html = render('rhodecode:templates/files/files.mako',
746 746 self._get_template_context(c), self.request)
747 747 return Response(html)
748 748
749 749 @HasRepoPermissionAnyDecorator(
750 750 'repository.read', 'repository.write', 'repository.admin')
751 751 def repo_files_annotated_previous(self):
752 752 self.load_default_context()
753 753
754 754 commit_id, f_path = self._get_commit_and_path()
755 755 commit = self._get_commit_or_redirect(commit_id)
756 756 prev_commit_id = commit.raw_id
757 757 line_anchor = self.request.GET.get('line_anchor')
758 758 is_file = False
759 759 try:
760 760 _file = commit.get_node(f_path)
761 761 is_file = _file.is_file()
762 762 except (NodeDoesNotExistError, CommitDoesNotExistError, VCSError):
763 763 pass
764 764
765 765 if is_file:
766 766 history = commit.get_path_history(f_path)
767 767 prev_commit_id = history[1].raw_id \
768 768 if len(history) > 1 else prev_commit_id
769 769 prev_url = h.route_path(
770 770 'repo_files:annotated', repo_name=self.db_repo_name,
771 771 commit_id=prev_commit_id, f_path=f_path,
772 772 _anchor=f'L{line_anchor}')
773 773
774 774 raise HTTPFound(prev_url)
775 775
776 776 @LoginRequired()
777 777 @HasRepoPermissionAnyDecorator(
778 778 'repository.read', 'repository.write', 'repository.admin')
779 779 def repo_nodetree_full(self):
780 780 """
781 781 Returns rendered html of file tree that contains commit date,
782 782 author, commit_id for the specified combination of
783 783 repo, commit_id and file path
784 784 """
785 785 c = self.load_default_context()
786 786
787 787 commit_id, f_path = self._get_commit_and_path()
788 788 commit = self._get_commit_or_redirect(commit_id)
789 789 try:
790 790 dir_node = commit.get_node(f_path)
791 791 except RepositoryError as e:
792 792 return Response(f'error: {h.escape(safe_str(e))}')
793 793
794 794 if dir_node.is_file():
795 795 return Response('')
796 796
797 797 c.file = dir_node
798 798 c.commit = commit
799 799 at_rev = self.request.GET.get('at')
800 800
801 801 html = self._get_tree_at_commit(
802 802 c, commit.raw_id, dir_node.path, full_load=True, at_rev=at_rev)
803 803
804 804 return Response(html)
805 805
806 806 def _get_attachement_headers(self, f_path):
807 807 f_name = safe_str(f_path.split(Repository.NAME_SEP)[-1])
808 808 safe_path = f_name.replace('"', '\\"')
809 809 encoded_path = urllib.parse.quote(f_name)
810 810
811 811 return "attachment; " \
812 812 "filename=\"{}\"; " \
813 813 "filename*=UTF-8\'\'{}".format(safe_path, encoded_path)
814 814
815 815 @LoginRequired()
816 816 @HasRepoPermissionAnyDecorator(
817 817 'repository.read', 'repository.write', 'repository.admin')
818 818 def repo_file_raw(self):
819 819 """
820 820 Action for show as raw, some mimetypes are "rendered",
821 821 those include images, icons.
822 822 """
823 823 c = self.load_default_context()
824 824
825 825 commit_id, f_path = self._get_commit_and_path()
826 826 commit = self._get_commit_or_redirect(commit_id)
827 827 file_node = self._get_filenode_or_redirect(commit, f_path)
828 828
829 829 raw_mimetype_mapping = {
830 830 # map original mimetype to a mimetype used for "show as raw"
831 831 # you can also provide a content-disposition to override the
832 832 # default "attachment" disposition.
833 833 # orig_type: (new_type, new_dispo)
834 834
835 835 # show images inline:
836 836 # Do not re-add SVG: it is unsafe and permits XSS attacks. One can
837 837 # for example render an SVG with javascript inside or even render
838 838 # HTML.
839 839 'image/x-icon': ('image/x-icon', 'inline'),
840 840 'image/png': ('image/png', 'inline'),
841 841 'image/gif': ('image/gif', 'inline'),
842 842 'image/jpeg': ('image/jpeg', 'inline'),
843 843 'application/pdf': ('application/pdf', 'inline'),
844 844 }
845 845
846 846 mimetype = file_node.mimetype
847 847 try:
848 848 mimetype, disposition = raw_mimetype_mapping[mimetype]
849 849 except KeyError:
850 850 # we don't know anything special about this, handle it safely
851 851 if file_node.is_binary:
852 852 # do same as download raw for binary files
853 853 mimetype, disposition = 'application/octet-stream', 'attachment'
854 854 else:
855 855 # do not just use the original mimetype, but force text/plain,
856 856 # otherwise it would serve text/html and that might be unsafe.
857 857 # Note: underlying vcs library fakes text/plain mimetype if the
858 858 # mimetype can not be determined and it thinks it is not
859 859 # binary.This might lead to erroneous text display in some
860 860 # cases, but helps in other cases, like with text files
861 861 # without extension.
862 862 mimetype, disposition = 'text/plain', 'inline'
863 863
864 864 if disposition == 'attachment':
865 865 disposition = self._get_attachement_headers(f_path)
866 866
867 867 stream_content = file_node.stream_bytes()
868 868
869 869 response = Response(app_iter=stream_content)
870 870 response.content_disposition = disposition
871 871 response.content_type = mimetype
872 872
873 873 charset = self._get_default_encoding(c)
874 874 if charset:
875 875 response.charset = charset
876 876
877 877 return response
878 878
879 879 @LoginRequired()
880 880 @HasRepoPermissionAnyDecorator(
881 881 'repository.read', 'repository.write', 'repository.admin')
882 882 def repo_file_download(self):
883 883 c = self.load_default_context()
884 884
885 885 commit_id, f_path = self._get_commit_and_path()
886 886 commit = self._get_commit_or_redirect(commit_id)
887 887 file_node = self._get_filenode_or_redirect(commit, f_path)
888 888
889 889 if self.request.GET.get('lf'):
890 890 # only if lf get flag is passed, we download this file
891 891 # as LFS/Largefile
892 892 lf_node = file_node.get_largefile_node()
893 893 if lf_node:
894 894 # overwrite our pointer with the REAL large-file
895 895 file_node = lf_node
896 896
897 897 disposition = self._get_attachement_headers(f_path)
898 898
899 899 stream_content = file_node.stream_bytes()
900 900
901 901 response = Response(app_iter=stream_content)
902 902 response.content_disposition = disposition
903 903 response.content_type = file_node.mimetype
904 904
905 905 charset = self._get_default_encoding(c)
906 906 if charset:
907 907 response.charset = charset
908 908
909 909 return response
910 910
911 911 def _get_nodelist_at_commit(self, repo_name, repo_id, commit_id, f_path):
912 912
913 913 cache_seconds = safe_int(
914 914 rhodecode.CONFIG.get('rc_cache.cache_repo.expiration_time'))
915 915 cache_on = cache_seconds > 0
916 916 log.debug(
917 917 'Computing FILE SEARCH for repo_id %s commit_id `%s` and path `%s`'
918 918 'with caching: %s[TTL: %ss]' % (
919 919 repo_id, commit_id, f_path, cache_on, cache_seconds or 0))
920 920
921 921 cache_namespace_uid = f'repo.{repo_id}'
922 922 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
923 923
924 924 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=cache_on)
925 925 def compute_file_search(_name_hash, _repo_id, _commit_id, _f_path):
926 926 log.debug('Generating cached nodelist for repo_id:%s, %s, %s',
927 927 _repo_id, commit_id, f_path)
928 928 try:
929 929 _d, _f = ScmModel().get_quick_filter_nodes(repo_name, _commit_id, _f_path)
930 930 except (RepositoryError, CommitDoesNotExistError, Exception) as e:
931 931 log.exception(safe_str(e))
932 932 h.flash(h.escape(safe_str(e)), category='error')
933 933 raise HTTPFound(h.route_path(
934 934 'repo_files', repo_name=self.db_repo_name,
935 935 commit_id='tip', f_path='/'))
936 936
937 937 return _d + _f
938 938
939 939 result = compute_file_search(self.db_repo.repo_name_hash, self.db_repo.repo_id,
940 940 commit_id, f_path)
941 941 return filter(lambda n: self.path_filter.path_access_allowed(n['name']), result)
942 942
943 943 @LoginRequired()
944 944 @HasRepoPermissionAnyDecorator(
945 945 'repository.read', 'repository.write', 'repository.admin')
946 946 def repo_nodelist(self):
947 947 self.load_default_context()
948 948
949 949 commit_id, f_path = self._get_commit_and_path()
950 950 commit = self._get_commit_or_redirect(commit_id)
951 951
952 952 metadata = self._get_nodelist_at_commit(
953 953 self.db_repo_name, self.db_repo.repo_id, commit.raw_id, f_path)
954 954 return {'nodes': [x for x in metadata]}
955 955
956 956 def _create_references(self, branches_or_tags, symbolic_reference, f_path, ref_type):
957 957 items = []
958 958 for name, commit_id in branches_or_tags.items():
959 959 sym_ref = symbolic_reference(commit_id, name, f_path, ref_type)
960 960 items.append((sym_ref, name, ref_type))
961 961 return items
962 962
963 963 def _symbolic_reference(self, commit_id, name, f_path, ref_type):
964 964 return commit_id
965 965
966 966 def _symbolic_reference_svn(self, commit_id, name, f_path, ref_type):
967 967 return commit_id
968 968
969 969 # NOTE(dan): old code we used in "diff" mode compare
970 970 new_f_path = vcspath.join(name, f_path)
971 971 return f'{new_f_path}@{commit_id}'
972 972
973 973 def _get_node_history(self, commit_obj, f_path, commits=None):
974 974 """
975 975 get commit history for given node
976 976
977 977 :param commit_obj: commit to calculate history
978 978 :param f_path: path for node to calculate history for
979 979 :param commits: if passed don't calculate history and take
980 980 commits defined in this list
981 981 """
982 982 _ = self.request.translate
983 983
984 984 # calculate history based on tip
985 985 tip = self.rhodecode_vcs_repo.get_commit()
986 986 if commits is None:
987 987 pre_load = ["author", "branch"]
988 988 try:
989 989 commits = tip.get_path_history(f_path, pre_load=pre_load)
990 990 except (NodeDoesNotExistError, CommitError):
991 991 # this node is not present at tip!
992 992 commits = commit_obj.get_path_history(f_path, pre_load=pre_load)
993 993
994 994 history = []
995 995 commits_group = ([], _("Changesets"))
996 996 for commit in commits:
997 997 branch = ' (%s)' % commit.branch if commit.branch else ''
998 998 n_desc = f'r{commit.idx}:{commit.short_id}{branch}'
999 999 commits_group[0].append((commit.raw_id, n_desc, 'sha'))
1000 1000 history.append(commits_group)
1001 1001
1002 1002 symbolic_reference = self._symbolic_reference
1003 1003
1004 1004 if self.rhodecode_vcs_repo.alias == 'svn':
1005 1005 adjusted_f_path = RepoFilesView.adjust_file_path_for_svn(
1006 1006 f_path, self.rhodecode_vcs_repo)
1007 1007 if adjusted_f_path != f_path:
1008 1008 log.debug(
1009 1009 'Recognized svn tag or branch in file "%s", using svn '
1010 1010 'specific symbolic references', f_path)
1011 1011 f_path = adjusted_f_path
1012 1012 symbolic_reference = self._symbolic_reference_svn
1013 1013
1014 1014 branches = self._create_references(
1015 1015 self.rhodecode_vcs_repo.branches, symbolic_reference, f_path, 'branch')
1016 1016 branches_group = (branches, _("Branches"))
1017 1017
1018 1018 tags = self._create_references(
1019 1019 self.rhodecode_vcs_repo.tags, symbolic_reference, f_path, 'tag')
1020 1020 tags_group = (tags, _("Tags"))
1021 1021
1022 1022 history.append(branches_group)
1023 1023 history.append(tags_group)
1024 1024
1025 1025 return history, commits
1026 1026
1027 1027 @LoginRequired()
1028 1028 @HasRepoPermissionAnyDecorator(
1029 1029 'repository.read', 'repository.write', 'repository.admin')
1030 1030 def repo_file_history(self):
1031 1031 self.load_default_context()
1032 1032
1033 1033 commit_id, f_path = self._get_commit_and_path()
1034 1034 commit = self._get_commit_or_redirect(commit_id)
1035 1035 file_node = self._get_filenode_or_redirect(commit, f_path)
1036 1036
1037 1037 if file_node.is_file():
1038 1038 file_history, _hist = self._get_node_history(commit, f_path)
1039 1039
1040 1040 res = []
1041 1041 for section_items, section in file_history:
1042 1042 items = []
1043 1043 for obj_id, obj_text, obj_type in section_items:
1044 1044 at_rev = ''
1045 1045 if obj_type in ['branch', 'bookmark', 'tag']:
1046 1046 at_rev = obj_text
1047 1047 entry = {
1048 1048 'id': obj_id,
1049 1049 'text': obj_text,
1050 1050 'type': obj_type,
1051 1051 'at_rev': at_rev
1052 1052 }
1053 1053
1054 1054 items.append(entry)
1055 1055
1056 1056 res.append({
1057 1057 'text': section,
1058 1058 'children': items
1059 1059 })
1060 1060
1061 1061 data = {
1062 1062 'more': False,
1063 1063 'results': res
1064 1064 }
1065 1065 return data
1066 1066
1067 1067 log.warning('Cannot fetch history for directory')
1068 1068 raise HTTPBadRequest()
1069 1069
1070 1070 @LoginRequired()
1071 1071 @HasRepoPermissionAnyDecorator(
1072 1072 'repository.read', 'repository.write', 'repository.admin')
1073 1073 def repo_file_authors(self):
1074 1074 c = self.load_default_context()
1075 1075
1076 1076 commit_id, f_path = self._get_commit_and_path()
1077 1077 commit = self._get_commit_or_redirect(commit_id)
1078 1078 file_node = self._get_filenode_or_redirect(commit, f_path)
1079 1079
1080 1080 if not file_node.is_file():
1081 1081 raise HTTPBadRequest()
1082 1082
1083 1083 c.file_last_commit = file_node.last_commit
1084 1084 if self.request.GET.get('annotate') == '1':
1085 1085 # use _hist from annotation if annotation mode is on
1086 1086 commit_ids = {x[1] for x in file_node.annotate}
1087 1087 _hist = (
1088 1088 self.rhodecode_vcs_repo.get_commit(commit_id)
1089 1089 for commit_id in commit_ids)
1090 1090 else:
1091 1091 _f_history, _hist = self._get_node_history(commit, f_path)
1092 1092 c.file_author = False
1093 1093
1094 1094 unique = collections.OrderedDict()
1095 1095 for commit in _hist:
1096 1096 author = commit.author
1097 1097 if author not in unique:
1098 1098 unique[commit.author] = [
1099 1099 h.email(author),
1100 1100 h.person(author, 'username_or_name_or_email'),
1101 1101 1 # counter
1102 1102 ]
1103 1103
1104 1104 else:
1105 1105 # increase counter
1106 1106 unique[commit.author][2] += 1
1107 1107
1108 1108 c.authors = [val for val in unique.values()]
1109 1109
1110 1110 return self._get_template_context(c)
1111 1111
1112 1112 @LoginRequired()
1113 1113 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1114 1114 def repo_files_check_head(self):
1115 1115 self.load_default_context()
1116 1116
1117 1117 commit_id, f_path = self._get_commit_and_path()
1118 1118 _branch_name, _sha_commit_id, is_head = \
1119 1119 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1120 1120 landing_ref=self.db_repo.landing_ref_name)
1121 1121
1122 1122 new_path = self.request.POST.get('path')
1123 1123 operation = self.request.POST.get('operation')
1124 1124 path_exist = ''
1125 1125
1126 1126 if new_path and operation in ['create', 'upload']:
1127 1127 new_f_path = os.path.join(f_path.lstrip('/'), new_path)
1128 1128 try:
1129 1129 commit_obj = self.rhodecode_vcs_repo.get_commit(commit_id)
1130 1130 # NOTE(dan): construct whole path without leading /
1131 1131 file_node = commit_obj.get_node(new_f_path)
1132 1132 if file_node is not None:
1133 1133 path_exist = new_f_path
1134 1134 except EmptyRepositoryError:
1135 1135 pass
1136 1136 except Exception:
1137 1137 pass
1138 1138
1139 1139 return {
1140 1140 'branch': _branch_name,
1141 1141 'sha': _sha_commit_id,
1142 1142 'is_head': is_head,
1143 1143 'path_exists': path_exist
1144 1144 }
1145 1145
1146 1146 @LoginRequired()
1147 1147 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1148 1148 def repo_files_remove_file(self):
1149 1149 _ = self.request.translate
1150 1150 c = self.load_default_context()
1151 1151 commit_id, f_path = self._get_commit_and_path()
1152 1152
1153 1153 self._ensure_not_locked()
1154 1154 _branch_name, _sha_commit_id, is_head = \
1155 1155 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1156 1156 landing_ref=self.db_repo.landing_ref_name)
1157 1157
1158 1158 self.forbid_non_head(is_head, f_path)
1159 1159 self.check_branch_permission(_branch_name)
1160 1160
1161 1161 c.commit = self._get_commit_or_redirect(commit_id)
1162 1162 c.file = self._get_filenode_or_redirect(c.commit, f_path)
1163 1163
1164 1164 c.default_message = _(
1165 1165 'Deleted file {} via RhodeCode Enterprise').format(f_path)
1166 1166 c.f_path = f_path
1167 1167
1168 1168 return self._get_template_context(c)
1169 1169
1170 1170 @LoginRequired()
1171 1171 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1172 1172 @CSRFRequired()
1173 1173 def repo_files_delete_file(self):
1174 1174 _ = self.request.translate
1175 1175
1176 1176 c = self.load_default_context()
1177 1177 commit_id, f_path = self._get_commit_and_path()
1178 1178
1179 1179 self._ensure_not_locked()
1180 1180 _branch_name, _sha_commit_id, is_head = \
1181 1181 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1182 1182 landing_ref=self.db_repo.landing_ref_name)
1183 1183
1184 1184 self.forbid_non_head(is_head, f_path)
1185 1185 self.check_branch_permission(_branch_name)
1186 1186
1187 1187 c.commit = self._get_commit_or_redirect(commit_id)
1188 1188 c.file = self._get_filenode_or_redirect(c.commit, f_path)
1189 1189
1190 1190 c.default_message = _(
1191 1191 'Deleted file {} via RhodeCode Enterprise').format(f_path)
1192 1192 c.f_path = f_path
1193 1193 node_path = f_path
1194 1194 author = self._rhodecode_db_user.full_contact
1195 1195 message = self.request.POST.get('message') or c.default_message
1196 1196 try:
1197 1197 nodes = {
1198 1198 safe_bytes(node_path): {
1199 1199 'content': b''
1200 1200 }
1201 1201 }
1202 1202 ScmModel().delete_nodes(
1203 1203 user=self._rhodecode_db_user.user_id, repo=self.db_repo,
1204 1204 message=message,
1205 1205 nodes=nodes,
1206 1206 parent_commit=c.commit,
1207 1207 author=author,
1208 1208 )
1209 1209
1210 1210 h.flash(
1211 1211 _('Successfully deleted file `{}`').format(
1212 1212 h.escape(f_path)), category='success')
1213 1213 except Exception:
1214 1214 log.exception('Error during commit operation')
1215 1215 h.flash(_('Error occurred during commit'), category='error')
1216 1216 raise HTTPFound(
1217 1217 h.route_path('repo_commit', repo_name=self.db_repo_name,
1218 1218 commit_id='tip'))
1219 1219
1220 1220 @LoginRequired()
1221 1221 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1222 1222 def repo_files_edit_file(self):
1223 1223 _ = self.request.translate
1224 1224 c = self.load_default_context()
1225 1225 commit_id, f_path = self._get_commit_and_path()
1226 1226
1227 1227 self._ensure_not_locked()
1228 1228 _branch_name, _sha_commit_id, is_head = \
1229 1229 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1230 1230 landing_ref=self.db_repo.landing_ref_name)
1231 1231
1232 1232 self.forbid_non_head(is_head, f_path, commit_id=commit_id)
1233 1233 self.check_branch_permission(_branch_name, commit_id=commit_id)
1234 1234
1235 1235 c.commit = self._get_commit_or_redirect(commit_id)
1236 1236 c.file = self._get_filenode_or_redirect(c.commit, f_path)
1237 1237
1238 1238 if c.file.is_binary:
1239 1239 files_url = h.route_path(
1240 1240 'repo_files',
1241 1241 repo_name=self.db_repo_name,
1242 1242 commit_id=c.commit.raw_id, f_path=f_path)
1243 1243 raise HTTPFound(files_url)
1244 1244
1245 1245 c.default_message = _('Edited file {} via RhodeCode Enterprise').format(f_path)
1246 1246 c.f_path = f_path
1247 1247
1248 1248 return self._get_template_context(c)
1249 1249
1250 1250 @LoginRequired()
1251 1251 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1252 1252 @CSRFRequired()
1253 1253 def repo_files_update_file(self):
1254 1254 _ = self.request.translate
1255 1255 c = self.load_default_context()
1256 1256 commit_id, f_path = self._get_commit_and_path()
1257 1257
1258 1258 self._ensure_not_locked()
1259 1259
1260 1260 c.commit = self._get_commit_or_redirect(commit_id)
1261 1261 c.file = self._get_filenode_or_redirect(c.commit, f_path)
1262 1262
1263 1263 if c.file.is_binary:
1264 1264 raise HTTPFound(h.route_path('repo_files', repo_name=self.db_repo_name,
1265 1265 commit_id=c.commit.raw_id, f_path=f_path))
1266 1266
1267 1267 _branch_name, _sha_commit_id, is_head = \
1268 1268 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1269 1269 landing_ref=self.db_repo.landing_ref_name)
1270 1270
1271 1271 self.forbid_non_head(is_head, f_path, commit_id=commit_id)
1272 1272 self.check_branch_permission(_branch_name, commit_id=commit_id)
1273 1273
1274 1274 c.default_message = _('Edited file {} via RhodeCode Enterprise').format(f_path)
1275 1275 c.f_path = f_path
1276 1276
1277 1277 old_content = c.file.str_content
1278 1278 sl = old_content.splitlines(1)
1279 1279 first_line = sl[0] if sl else ''
1280 1280
1281 1281 r_post = self.request.POST
1282 1282 # line endings: 0 - Unix, 1 - Mac, 2 - DOS
1283 1283 line_ending_mode = detect_mode(first_line, 0)
1284 1284 content = convert_line_endings(r_post.get('content', ''), line_ending_mode)
1285 1285
1286 1286 message = r_post.get('message') or c.default_message
1287 1287
1288 1288 org_node_path = c.file.str_path
1289 1289 filename = r_post['filename']
1290 1290
1291 1291 root_path = c.file.dir_path
1292 1292 pure_path = self.create_pure_path(root_path, filename)
1293 1293 node_path = pure_path.as_posix()
1294 1294
1295 1295 default_redirect_url = h.route_path('repo_commit', repo_name=self.db_repo_name,
1296 1296 commit_id=commit_id)
1297 1297 if content == old_content and node_path == org_node_path:
1298 1298 h.flash(_('No changes detected on {}').format(h.escape(org_node_path)),
1299 1299 category='warning')
1300 1300 raise HTTPFound(default_redirect_url)
1301 1301
1302 1302 try:
1303 1303 mapping = {
1304 1304 c.file.bytes_path: {
1305 1305 'org_filename': org_node_path,
1306 1306 'filename': safe_bytes(node_path),
1307 1307 'content': safe_bytes(content),
1308 1308 'lexer': '',
1309 1309 'op': 'mod',
1310 1310 'mode': c.file.mode
1311 1311 }
1312 1312 }
1313 1313
1314 1314 commit = ScmModel().update_nodes(
1315 1315 user=self._rhodecode_db_user.user_id,
1316 1316 repo=self.db_repo,
1317 1317 message=message,
1318 1318 nodes=mapping,
1319 1319 parent_commit=c.commit,
1320 1320 )
1321 1321
1322 1322 h.flash(_('Successfully committed changes to file `{}`').format(
1323 1323 h.escape(f_path)), category='success')
1324 1324 default_redirect_url = h.route_path(
1325 1325 'repo_commit', repo_name=self.db_repo_name, commit_id=commit.raw_id)
1326 1326
1327 1327 except Exception:
1328 1328 log.exception('Error occurred during commit')
1329 1329 h.flash(_('Error occurred during commit'), category='error')
1330 1330
1331 1331 raise HTTPFound(default_redirect_url)
1332 1332
1333 1333 @LoginRequired()
1334 1334 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1335 1335 def repo_files_add_file(self):
1336 1336 _ = self.request.translate
1337 1337 c = self.load_default_context()
1338 1338 commit_id, f_path = self._get_commit_and_path()
1339 1339
1340 1340 self._ensure_not_locked()
1341 1341
1342 1342 c.commit = self._get_commit_or_redirect(commit_id, redirect_after=False)
1343 1343 if c.commit is None:
1344 1344 c.commit = EmptyCommit(alias=self.rhodecode_vcs_repo.alias)
1345 1345
1346 1346 if self.rhodecode_vcs_repo.is_empty():
1347 1347 # for empty repository we cannot check for current branch, we rely on
1348 1348 # c.commit.branch instead
1349 1349 _branch_name, _sha_commit_id, is_head = c.commit.branch, '', True
1350 1350 else:
1351 1351 _branch_name, _sha_commit_id, is_head = \
1352 1352 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1353 1353 landing_ref=self.db_repo.landing_ref_name)
1354 1354
1355 1355 self.forbid_non_head(is_head, f_path, commit_id=commit_id)
1356 1356 self.check_branch_permission(_branch_name, commit_id=commit_id)
1357 1357
1358 1358 c.default_message = (_('Added file via RhodeCode Enterprise'))
1359 1359 c.f_path = f_path.lstrip('/') # ensure not relative path
1360 1360
1361 1361 return self._get_template_context(c)
1362 1362
1363 1363 @LoginRequired()
1364 1364 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1365 1365 @CSRFRequired()
1366 1366 def repo_files_create_file(self):
1367 1367 _ = self.request.translate
1368 1368 c = self.load_default_context()
1369 1369 commit_id, f_path = self._get_commit_and_path()
1370 1370
1371 1371 self._ensure_not_locked()
1372 1372
1373 1373 c.commit = self._get_commit_or_redirect(commit_id, redirect_after=False)
1374 1374 if c.commit is None:
1375 1375 c.commit = EmptyCommit(alias=self.rhodecode_vcs_repo.alias)
1376 1376
1377 1377 # calculate redirect URL
1378 1378 if self.rhodecode_vcs_repo.is_empty():
1379 1379 default_redirect_url = h.route_path(
1380 1380 'repo_summary', repo_name=self.db_repo_name)
1381 1381 else:
1382 1382 default_redirect_url = h.route_path(
1383 1383 'repo_commit', repo_name=self.db_repo_name, commit_id='tip')
1384 1384
1385 1385 if self.rhodecode_vcs_repo.is_empty():
1386 1386 # for empty repository we cannot check for current branch, we rely on
1387 1387 # c.commit.branch instead
1388 1388 _branch_name, _sha_commit_id, is_head = c.commit.branch, '', True
1389 1389 else:
1390 1390 _branch_name, _sha_commit_id, is_head = \
1391 1391 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1392 1392 landing_ref=self.db_repo.landing_ref_name)
1393 1393
1394 1394 self.forbid_non_head(is_head, f_path, commit_id=commit_id)
1395 1395 self.check_branch_permission(_branch_name, commit_id=commit_id)
1396 1396
1397 1397 c.default_message = (_('Added file via RhodeCode Enterprise'))
1398 1398 c.f_path = f_path
1399 1399
1400 1400 r_post = self.request.POST
1401 1401 message = r_post.get('message') or c.default_message
1402 1402 filename = r_post.get('filename')
1403 1403 unix_mode = 0
1404 1404
1405 1405 if not filename:
1406 1406 # If there's no commit, redirect to repo summary
1407 1407 if type(c.commit) is EmptyCommit:
1408 1408 redirect_url = h.route_path(
1409 1409 'repo_summary', repo_name=self.db_repo_name)
1410 1410 else:
1411 1411 redirect_url = default_redirect_url
1412 1412 h.flash(_('No filename specified'), category='warning')
1413 1413 raise HTTPFound(redirect_url)
1414 1414
1415 1415 root_path = f_path
1416 1416 pure_path = self.create_pure_path(root_path, filename)
1417 1417 node_path = pure_path.as_posix().lstrip('/')
1418 1418
1419 1419 author = self._rhodecode_db_user.full_contact
1420 1420 content = convert_line_endings(r_post.get('content', ''), unix_mode)
1421 1421 nodes = {
1422 1422 safe_bytes(node_path): {
1423 1423 'content': safe_bytes(content)
1424 1424 }
1425 1425 }
1426 1426
1427 1427 try:
1428 1428
1429 1429 commit = ScmModel().create_nodes(
1430 1430 user=self._rhodecode_db_user.user_id,
1431 1431 repo=self.db_repo,
1432 1432 message=message,
1433 1433 nodes=nodes,
1434 1434 parent_commit=c.commit,
1435 1435 author=author,
1436 1436 )
1437 1437
1438 1438 h.flash(_('Successfully committed new file `{}`').format(
1439 1439 h.escape(node_path)), category='success')
1440 1440
1441 1441 default_redirect_url = h.route_path(
1442 1442 'repo_commit', repo_name=self.db_repo_name, commit_id=commit.raw_id)
1443 1443
1444 1444 except NonRelativePathError:
1445 1445 log.exception('Non Relative path found')
1446 1446 h.flash(_('The location specified must be a relative path and must not '
1447 1447 'contain .. in the path'), category='warning')
1448 1448 raise HTTPFound(default_redirect_url)
1449 1449 except (NodeError, NodeAlreadyExistsError) as e:
1450 1450 h.flash(h.escape(safe_str(e)), category='error')
1451 1451 except Exception:
1452 1452 log.exception('Error occurred during commit')
1453 1453 h.flash(_('Error occurred during commit'), category='error')
1454 1454
1455 1455 raise HTTPFound(default_redirect_url)
1456 1456
1457 1457 @LoginRequired()
1458 1458 @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
1459 1459 @CSRFRequired()
1460 1460 def repo_files_upload_file(self):
1461 1461 _ = self.request.translate
1462 1462 c = self.load_default_context()
1463 1463 commit_id, f_path = self._get_commit_and_path()
1464 1464
1465 1465 self._ensure_not_locked()
1466 1466
1467 1467 c.commit = self._get_commit_or_redirect(commit_id, redirect_after=False)
1468 1468 if c.commit is None:
1469 1469 c.commit = EmptyCommit(alias=self.rhodecode_vcs_repo.alias)
1470 1470
1471 1471 # calculate redirect URL
1472 1472 if self.rhodecode_vcs_repo.is_empty():
1473 1473 default_redirect_url = h.route_path(
1474 1474 'repo_summary', repo_name=self.db_repo_name)
1475 1475 else:
1476 1476 default_redirect_url = h.route_path(
1477 1477 'repo_commit', repo_name=self.db_repo_name, commit_id='tip')
1478 1478
1479 1479 if self.rhodecode_vcs_repo.is_empty():
1480 1480 # for empty repository we cannot check for current branch, we rely on
1481 1481 # c.commit.branch instead
1482 1482 _branch_name, _sha_commit_id, is_head = c.commit.branch, '', True
1483 1483 else:
1484 1484 _branch_name, _sha_commit_id, is_head = \
1485 1485 self._is_valid_head(commit_id, self.rhodecode_vcs_repo,
1486 1486 landing_ref=self.db_repo.landing_ref_name)
1487 1487
1488 1488 error = self.forbid_non_head(is_head, f_path, json_mode=True)
1489 1489 if error:
1490 1490 return {
1491 1491 'error': error,
1492 1492 'redirect_url': default_redirect_url
1493 1493 }
1494 1494 error = self.check_branch_permission(_branch_name, json_mode=True)
1495 1495 if error:
1496 1496 return {
1497 1497 'error': error,
1498 1498 'redirect_url': default_redirect_url
1499 1499 }
1500 1500
1501 1501 c.default_message = (_('Uploaded file via RhodeCode Enterprise'))
1502 1502 c.f_path = f_path
1503 1503
1504 1504 r_post = self.request.POST
1505 1505
1506 1506 message = c.default_message
1507 1507 user_message = r_post.getall('message')
1508 1508 if isinstance(user_message, list) and user_message:
1509 1509 # we take the first from duplicated results if it's not empty
1510 1510 message = user_message[0] if user_message[0] else message
1511 1511
1512 1512 nodes = {}
1513 1513
1514 1514 for file_obj in r_post.getall('files_upload') or []:
1515 1515 content = file_obj.file
1516 1516 filename = file_obj.filename
1517 1517
1518 1518 root_path = f_path
1519 1519 pure_path = self.create_pure_path(root_path, filename)
1520 1520 node_path = pure_path.as_posix().lstrip('/')
1521 1521
1522 1522 nodes[safe_bytes(node_path)] = {
1523 1523 'content': content
1524 1524 }
1525 1525
1526 1526 if not nodes:
1527 1527 error = 'missing files'
1528 1528 return {
1529 1529 'error': error,
1530 1530 'redirect_url': default_redirect_url
1531 1531 }
1532 1532
1533 1533 author = self._rhodecode_db_user.full_contact
1534 1534
1535 1535 try:
1536 1536 commit = ScmModel().create_nodes(
1537 1537 user=self._rhodecode_db_user.user_id,
1538 1538 repo=self.db_repo,
1539 1539 message=message,
1540 1540 nodes=nodes,
1541 1541 parent_commit=c.commit,
1542 1542 author=author,
1543 1543 )
1544 1544 if len(nodes) == 1:
1545 1545 flash_message = _('Successfully committed {} new files').format(len(nodes))
1546 1546 else:
1547 1547 flash_message = _('Successfully committed 1 new file')
1548 1548
1549 1549 h.flash(flash_message, category='success')
1550 1550
1551 1551 default_redirect_url = h.route_path(
1552 1552 'repo_commit', repo_name=self.db_repo_name, commit_id=commit.raw_id)
1553 1553
1554 1554 except NonRelativePathError:
1555 1555 log.exception('Non Relative path found')
1556 1556 error = _('The location specified must be a relative path and must not '
1557 1557 'contain .. in the path')
1558 1558 h.flash(error, category='warning')
1559 1559
1560 1560 return {
1561 1561 'error': error,
1562 1562 'redirect_url': default_redirect_url
1563 1563 }
1564 1564 except (NodeError, NodeAlreadyExistsError) as e:
1565 1565 error = h.escape(e)
1566 1566 h.flash(error, category='error')
1567 1567
1568 1568 return {
1569 1569 'error': error,
1570 1570 'redirect_url': default_redirect_url
1571 1571 }
1572 1572 except Exception:
1573 1573 log.exception('Error occurred during commit')
1574 1574 error = _('Error occurred during commit')
1575 1575 h.flash(error, category='error')
1576 1576 return {
1577 1577 'error': error,
1578 1578 'redirect_url': default_redirect_url
1579 1579 }
1580 1580
1581 1581 return {
1582 1582 'error': None,
1583 1583 'redirect_url': default_redirect_url
1584 1584 }
@@ -1,197 +1,198 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 import datetime
21 21 import os
22 22 import shutil
23 23 import tarfile
24 24 import zipfile
25 25 import io
26 26
27 27 import mock
28 28 import pytest
29 29
30 30 import rhodecode
31 31 from rhodecode.lib.rc_cache.archive_cache import get_archival_config
32 32 from rhodecode.lib.str_utils import ascii_bytes
33 33 from rhodecode.lib.vcs.backends import base
34 34 from rhodecode.lib.vcs.exceptions import ImproperArchiveTypeError, VCSError
35 35 from rhodecode.lib.vcs.nodes import FileNode
36 36 from rhodecode.tests.vcs.conftest import BackendTestMixin
37 37
38 38
39 39 @pytest.fixture()
40 40 def d_cache_config():
41 41 return get_archival_config(config=rhodecode.CONFIG)
42 42
43 43
44 44 @pytest.mark.usefixtures("vcs_repository_support")
45 45 class TestArchives(BackendTestMixin):
46 46
47 47 @classmethod
48 48 def _get_commits(cls):
49 49 start_date = datetime.datetime(2010, 1, 1, 20)
50 50 yield {
51 51 'message': 'Initial Commit',
52 52 'author': 'Joe Doe <joe.doe@example.com>',
53 53 'date': start_date + datetime.timedelta(hours=12),
54 54 'added': [
55 55 FileNode(b'executable_0o100755', b'mode_755', mode=0o100755),
56 56 FileNode(b'executable_0o100500', b'mode_500', mode=0o100500),
57 57 FileNode(b'not_executable', b'mode_644', mode=0o100644),
58 58 ],
59 59 }
60 60 for x in range(5):
61 61 yield {
62 62 'message': 'Commit %d' % x,
63 63 'author': 'Joe Doe <joe.doe@example.com>',
64 64 'date': start_date + datetime.timedelta(hours=12 * x),
65 65 'added': [
66 66 FileNode(b'%d/file_%d.txt' % (x, x), content=b'Foobar %d' % x),
67 67 ],
68 68 }
69 69
70 70 @pytest.mark.parametrize('compressor', ['gz', 'bz2'])
71 71 def test_archive_tar(self, compressor, tmpdir, tmp_path, d_cache_config):
72 72
73 73 archive_node = tmp_path / 'archive-node'
74 74 archive_node.touch()
75 75
76 76 archive_lnk = self.tip.archive_repo(
77 77 str(archive_node), kind=f't{compressor}', archive_dir_name='repo', cache_config=d_cache_config)
78 78
79 79 out_dir = tmpdir
80 80 out_file = tarfile.open(str(archive_lnk), f'r|{compressor}')
81 81 out_file.extractall(out_dir)
82 82 out_file.close()
83 83
84 84 for x in range(5):
85 85 node_path = '%d/file_%d.txt' % (x, x)
86 86 with open(os.path.join(out_dir, 'repo/' + node_path), 'rb') as f:
87 87 file_content = f.read()
88 88 assert file_content == self.tip.get_node(node_path).content
89 89
90 90 shutil.rmtree(out_dir)
91 91
92 92 @pytest.mark.parametrize('compressor', ['gz', 'bz2'])
93 93 def test_archive_tar_symlink(self, compressor):
94 94 pytest.skip('Not supported')
95 95
96 96 @pytest.mark.parametrize('compressor', ['gz', 'bz2'])
97 97 def test_archive_tar_file_modes(self, compressor, tmpdir, tmp_path, d_cache_config):
98 98 archive_node = tmp_path / 'archive-node'
99 99 archive_node.touch()
100 100
101 101 archive_lnk = self.tip.archive_repo(
102 102 str(archive_node), kind='t{}'.format(compressor), archive_dir_name='repo', cache_config=d_cache_config)
103 103
104 104 out_dir = tmpdir
105 105 out_file = tarfile.open(str(archive_lnk), 'r|{}'.format(compressor))
106 106 out_file.extractall(out_dir)
107 107 out_file.close()
108 108
109 109 def dest(inp):
110 110 return os.path.join(out_dir, "repo/" + inp)
111 111
112 112 assert oct(os.stat(dest('not_executable')).st_mode) == '0o100644'
113 113
114 114 def test_archive_zip(self, tmp_path, d_cache_config):
115 115 archive_node = tmp_path / 'archive-node'
116 116 archive_node.touch()
117 117
118 118 archive_lnk = self.tip.archive_repo(str(archive_node), kind='zip',
119 119 archive_dir_name='repo', cache_config=d_cache_config)
120 120 zip_file = zipfile.ZipFile(str(archive_lnk))
121 121
122 122 for x in range(5):
123 123 node_path = '%d/file_%d.txt' % (x, x)
124 124 data = zip_file.read(f'repo/{node_path}')
125 125
126 126 decompressed = io.BytesIO()
127 127 decompressed.write(data)
128 128 assert decompressed.getvalue() == \
129 129 self.tip.get_node(node_path).content
130 130 decompressed.close()
131 131
132 132 def test_archive_zip_with_metadata(self, tmp_path, d_cache_config):
133 133 archive_node = tmp_path / 'archive-node'
134 134 archive_node.touch()
135 135
136 archive_lnk = self.tip.archive_repo(str(archive_node), kind='zip',
136 archive_lnk = self.tip.archive_repo(
137 str(archive_node), kind='zip',
137 138 archive_dir_name='repo', write_metadata=True, cache_config=d_cache_config)
138 139
139 140 zip_file = zipfile.ZipFile(str(archive_lnk))
140 141 metafile = zip_file.read('repo/.archival.txt')
141 142
142 143 raw_id = ascii_bytes(self.tip.raw_id)
143 144 assert b'commit_id:%b' % raw_id in metafile
144 145
145 146 for x in range(5):
146 147 node_path = '%d/file_%d.txt' % (x, x)
147 148 data = zip_file.read(f'repo/{node_path}')
148 149 decompressed = io.BytesIO()
149 150 decompressed.write(data)
150 151 assert decompressed.getvalue() == \
151 152 self.tip.get_node(node_path).content
152 153 decompressed.close()
153 154
154 155 def test_archive_wrong_kind(self, tmp_path, d_cache_config):
155 156 archive_node = tmp_path / 'archive-node'
156 157 archive_node.touch()
157 158
158 159 with pytest.raises(ImproperArchiveTypeError):
159 160 self.tip.archive_repo(str(archive_node), kind='wrong kind', cache_config=d_cache_config)
160 161
161 162
162 163 @pytest.fixture()
163 164 def base_commit():
164 165 """
165 166 Prepare a `base.BaseCommit` just enough for `_validate_archive_prefix`.
166 167 """
167 168 commit = base.BaseCommit()
168 169 commit.repository = mock.Mock()
169 170 commit.repository.name = 'fake_repo'
170 171 commit.short_id = 'fake_id'
171 172 return commit
172 173
173 174
174 175 def test_validate_archive_prefix_enforces_non_ascii_as_prefix(base_commit):
175 176 with pytest.raises(VCSError):
176 177 base_commit._validate_archive_prefix("Ünïcödë")
177 178
178 179
179 180 def test_validate_archive_prefix_empty_prefix(base_commit):
180 181 # TODO: johbo: Should raise a ValueError here.
181 182 with pytest.raises(VCSError):
182 183 base_commit._validate_archive_prefix('')
183 184
184 185
185 186 def test_validate_archive_prefix_with_leading_slash(base_commit):
186 187 # TODO: johbo: Should raise a ValueError here.
187 188 with pytest.raises(VCSError):
188 189 base_commit._validate_archive_prefix('/any')
189 190
190 191
191 192 def test_validate_archive_prefix_falls_back_to_repository_name(base_commit):
192 193 prefix = base_commit._validate_archive_prefix(None)
193 194 expected_prefix = base_commit._ARCHIVE_PREFIX_TEMPLATE.format(
194 195 repo_name='fake_repo',
195 196 short_id='fake_id')
196 197 assert isinstance(prefix, str)
197 198 assert prefix == expected_prefix
General Comments 0
You need to be logged in to leave comments. Login now