##// END OF EJS Templates
codingstyle: replace upper-case variable names with lower-case ones...
Lars Kruse -
r6790:94bbb7eb default
parent child Browse files
Show More
@@ -1,143 +1,145 b''
1 1 from kallithea.tests.base import *
2 2 from kallithea.model.db import ChangesetComment, Notification, \
3 3 UserNotification
4 4 from kallithea.model.meta import Session
5 5
6 6
7 7 class TestChangeSetCommentsController(TestController):
8 8
9 9 def setup_method(self, method):
10 10 for x in ChangesetComment.query().all():
11 11 Session().delete(x)
12 12 Session().commit()
13 13
14 14 self.remove_all_notifications()
15 15
16 16 def test_create(self):
17 17 self.log_user()
18 18 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
19 19 text = u'CommentOnRevision'
20 20
21 21 params = {'text': text, '_authentication_token': self.authentication_token()}
22 22 response = self.app.post(url(controller='changeset', action='comment',
23 23 repo_name=HG_REPO, revision=rev),
24 24 params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
25 25 # Test response...
26 26 assert response.status == '200 OK'
27 27
28 28 response = self.app.get(url(controller='changeset', action='index',
29 29 repo_name=HG_REPO, revision=rev))
30 30 # test DB
31 31 assert ChangesetComment.query().count() == 1
32 32 response.mustcontain(
33 33 '''<div class="comments-number">'''
34 34 ''' 1 comment (0 inline, 1 general)'''
35 35 )
36 36
37 37 assert Notification.query().count() == 1
38 38 assert ChangesetComment.query().count() == 1
39 39
40 40 notification = Notification.query().all()[0]
41 41
42 ID = ChangesetComment.query().first().comment_id
42 commit_id = ChangesetComment.query().first().comment_id
43 43 assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
44 44 sbj = (u'/%s/changeset/'
45 '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
45 '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
46 % (HG_REPO, commit_id))
46 47 print "%s vs %s" % (sbj, notification.subject)
47 48 assert sbj in notification.subject
48 49
49 50 def test_create_inline(self):
50 51 self.log_user()
51 52 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
52 53 text = u'CommentOnRevision'
53 54 f_path = 'vcs/web/simplevcs/views/repository.py'
54 55 line = 'n1'
55 56
56 57 params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
57 58 response = self.app.post(url(controller='changeset', action='comment',
58 59 repo_name=HG_REPO, revision=rev),
59 60 params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
60 61 # Test response...
61 62 assert response.status == '200 OK'
62 63
63 64 response = self.app.get(url(controller='changeset', action='index',
64 65 repo_name=HG_REPO, revision=rev))
65 66 # test DB
66 67 assert ChangesetComment.query().count() == 1
67 68 response.mustcontain(
68 69 '''<div class="comments-number">'''
69 70 ''' 1 comment (1 inline, 0 general)'''
70 71 )
71 72 response.mustcontain(
72 73 '''<div class="comments-list-chunk" '''
73 74 '''data-f_path="vcs/web/simplevcs/views/repository.py" '''
74 75 '''data-line_no="n1" data-target-id="vcswebsimplevcsviewsrepositorypy_n1">'''
75 76 )
76 77
77 78 assert Notification.query().count() == 1
78 79 assert ChangesetComment.query().count() == 1
79 80
80 81 notification = Notification.query().all()[0]
81 ID = ChangesetComment.query().first().comment_id
82 commit_id = ChangesetComment.query().first().comment_id
82 83 assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
83 84 sbj = (u'/%s/changeset/'
84 '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
85 '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
86 % (HG_REPO, commit_id))
85 87 print "%s vs %s" % (sbj, notification.subject)
86 88 assert sbj in notification.subject
87 89
88 90 def test_create_with_mention(self):
89 91 self.log_user()
90 92
91 93 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
92 94 text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
93 95
94 96 params = {'text': text, '_authentication_token': self.authentication_token()}
95 97 response = self.app.post(url(controller='changeset', action='comment',
96 98 repo_name=HG_REPO, revision=rev),
97 99 params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
98 100 # Test response...
99 101 assert response.status == '200 OK'
100 102
101 103 response = self.app.get(url(controller='changeset', action='index',
102 104 repo_name=HG_REPO, revision=rev))
103 105 # test DB
104 106 assert ChangesetComment.query().count() == 1
105 107 response.mustcontain(
106 108 '''<div class="comments-number">'''
107 109 ''' 1 comment (0 inline, 1 general)'''
108 110 )
109 111
110 112 assert Notification.query().count() == 2
111 113 users = [x.user.username for x in UserNotification.query().all()]
112 114
113 115 # test_regular gets notification by @mention
114 116 assert sorted(users) == [TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN]
115 117
116 118 def test_delete(self):
117 119 self.log_user()
118 120 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
119 121 text = u'CommentOnRevision'
120 122
121 123 params = {'text': text, '_authentication_token': self.authentication_token()}
122 124 response = self.app.post(url(controller='changeset', action='comment',
123 125 repo_name=HG_REPO, revision=rev),
124 126 params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
125 127
126 128 comments = ChangesetComment.query().all()
127 129 assert len(comments) == 1
128 130 comment_id = comments[0].comment_id
129 131
130 132 self.app.post(url("changeset_comment_delete",
131 133 repo_name=HG_REPO,
132 134 comment_id=comment_id),
133 135 params={'_authentication_token': self.authentication_token()})
134 136
135 137 comments = ChangesetComment.query().all()
136 138 assert len(comments) == 0
137 139
138 140 response = self.app.get(url(controller='changeset', action='index',
139 141 repo_name=HG_REPO, revision=rev))
140 142 response.mustcontain(
141 143 '''<div class="comments-number">'''
142 144 ''' 0 comments (0 inline, 0 general)'''
143 145 )
@@ -1,426 +1,426 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.tests.other.test_libs
16 16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 Package for testing various lib/helper functions in kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Jun 9, 2011
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import datetime
29 29 import hashlib
30 30 import mock
31 31 from kallithea.tests.base import *
32 32 from kallithea.lib.utils2 import AttributeDict
33 33 from kallithea.model.db import Repository
34 34 from tg.util.webtest import test_context
35 35
36 36 proto = 'http'
37 37 TEST_URLS = [
38 38 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
39 39 '%s://127.0.0.1' % proto),
40 40 ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
41 41 '%s://127.0.0.1' % proto),
42 42 ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
43 43 '%s://127.0.0.1' % proto),
44 44 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
45 45 '%s://127.0.0.1:8080' % proto),
46 46 ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
47 47 '%s://example.com' % proto),
48 48 ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
49 49 '8080'],
50 50 '%s://example.com:8080' % proto),
51 51 ]
52 52
53 53 proto = 'https'
54 54 TEST_URLS += [
55 55 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
56 56 '%s://127.0.0.1' % proto),
57 57 ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
58 58 '%s://127.0.0.1' % proto),
59 59 ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
60 60 '%s://127.0.0.1' % proto),
61 61 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
62 62 '%s://127.0.0.1:8080' % proto),
63 63 ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
64 64 '%s://example.com' % proto),
65 65 ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
66 66 '8080'],
67 67 '%s://example.com:8080' % proto),
68 68 ]
69 69
70 70
71 71 class FakeUrlGenerator(object):
72 72
73 73 def __init__(self, current_url=None, default_route=None, **routes):
74 74 """Initialize using specified 'current' URL template,
75 75 default route template, and all other aguments describing known
76 76 routes (format: route=template)"""
77 77 self.current_url = current_url
78 78 self.default_route = default_route
79 79 self.routes = routes
80 80
81 81 def __call__(self, route_name, *args, **kwargs):
82 82 if route_name in self.routes:
83 83 return self.routes[route_name] % kwargs
84 84
85 85 return self.default_route % kwargs
86 86
87 87 def current(self, *args, **kwargs):
88 88 return self.current_url % kwargs
89 89
90 90
91 91 class TestLibs(TestController):
92 92
93 93 @parametrize('test_url,expected,expected_creds', TEST_URLS)
94 94 def test_uri_filter(self, test_url, expected, expected_creds):
95 95 from kallithea.lib.utils2 import uri_filter
96 96 assert uri_filter(test_url) == expected
97 97
98 98 @parametrize('test_url,expected,expected_creds', TEST_URLS)
99 99 def test_credentials_filter(self, test_url, expected, expected_creds):
100 100 from kallithea.lib.utils2 import credentials_filter
101 101 assert credentials_filter(test_url) == expected_creds
102 102
103 103 @parametrize('str_bool,expected', [
104 104 ('t', True),
105 105 ('true', True),
106 106 ('y', True),
107 107 ('yes', True),
108 108 ('on', True),
109 109 ('1', True),
110 110 ('Y', True),
111 111 ('yeS', True),
112 112 ('Y', True),
113 113 ('TRUE', True),
114 114 ('T', True),
115 115 ('False', False),
116 116 ('F', False),
117 117 ('FALSE', False),
118 118 ('0', False),
119 119 ('-1', False),
120 120 ('', False)
121 121 ])
122 122 def test_str2bool(self, str_bool, expected):
123 123 from kallithea.lib.utils2 import str2bool
124 124 assert str2bool(str_bool) == expected
125 125
126 126 def test_mention_extractor(self):
127 127 from kallithea.lib.utils2 import extract_mentioned_usernames
128 128 sample = (
129 129 "@first hi there @world here's my email username@example.com "
130 130 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
131 131 "@UPPER @cAmEL @2one_more22 @john please see this http://org.pl "
132 132 "@marian.user just do it @marco-polo and next extract @marco_polo "
133 133 "user.dot hej ! not-needed maril@example.com"
134 134 )
135 135
136 136 expected = set([
137 137 '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
138 138 'marian.user', 'marco-polo', 'marco_polo', 'world'])
139 139 assert expected == set(extract_mentioned_usernames(sample))
140 140
141 141 @parametrize('age_args,expected', [
142 142 (dict(), u'just now'),
143 143 (dict(seconds= -1), u'1 second ago'),
144 144 (dict(seconds= -60 * 2), u'2 minutes ago'),
145 145 (dict(hours= -1), u'1 hour ago'),
146 146 (dict(hours= -24), u'1 day ago'),
147 147 (dict(hours= -24 * 5), u'5 days ago'),
148 148 (dict(months= -1), u'1 month ago'),
149 149 (dict(months= -1, days= -2), u'1 month and 2 days ago'),
150 150 (dict(months= -1, days= -20), u'1 month and 19 days ago'),
151 151 (dict(years= -1, months= -1), u'1 year and 1 month ago'),
152 152 (dict(years= -1, months= -10), u'1 year and 10 months ago'),
153 153 (dict(years= -2, months= -4), u'2 years and 4 months ago'),
154 154 (dict(years= -2, months= -11), u'2 years and 11 months ago'),
155 155 (dict(years= -3, months= -2), u'3 years and 2 months ago'),
156 156 ])
157 157 def test_age(self, age_args, expected):
158 158 from kallithea.lib.utils2 import age
159 159 from dateutil import relativedelta
160 160 with test_context(self.app):
161 161 n = datetime.datetime(year=2012, month=5, day=17)
162 162 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
163 163 assert age(n + delt(**age_args), now=n) == expected
164 164
165 165 @parametrize('age_args,expected', [
166 166 (dict(), u'just now'),
167 167 (dict(seconds= -1), u'1 second ago'),
168 168 (dict(seconds= -60 * 2), u'2 minutes ago'),
169 169 (dict(hours= -1), u'1 hour ago'),
170 170 (dict(hours= -24), u'1 day ago'),
171 171 (dict(hours= -24 * 5), u'5 days ago'),
172 172 (dict(months= -1), u'1 month ago'),
173 173 (dict(months= -1, days= -2), u'1 month ago'),
174 174 (dict(months= -1, days= -20), u'1 month ago'),
175 175 (dict(years= -1, months= -1), u'13 months ago'),
176 176 (dict(years= -1, months= -10), u'22 months ago'),
177 177 (dict(years= -2, months= -4), u'2 years ago'),
178 178 (dict(years= -2, months= -11), u'3 years ago'),
179 179 (dict(years= -3, months= -2), u'3 years ago'),
180 180 (dict(years= -4, months= -8), u'5 years ago'),
181 181 ])
182 182 def test_age_short(self, age_args, expected):
183 183 from kallithea.lib.utils2 import age
184 184 from dateutil import relativedelta
185 185 with test_context(self.app):
186 186 n = datetime.datetime(year=2012, month=5, day=17)
187 187 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
188 188 assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
189 189
190 190 @parametrize('age_args,expected', [
191 191 (dict(), u'just now'),
192 192 (dict(seconds=1), u'in 1 second'),
193 193 (dict(seconds=60 * 2), u'in 2 minutes'),
194 194 (dict(hours=1), u'in 1 hour'),
195 195 (dict(hours=24), u'in 1 day'),
196 196 (dict(hours=24 * 5), u'in 5 days'),
197 197 (dict(months=1), u'in 1 month'),
198 198 (dict(months=1, days=1), u'in 1 month and 1 day'),
199 199 (dict(years=1, months=1), u'in 1 year and 1 month')
200 200 ])
201 201 def test_age_in_future(self, age_args, expected):
202 202 from kallithea.lib.utils2 import age
203 203 from dateutil import relativedelta
204 204 with test_context(self.app):
205 205 n = datetime.datetime(year=2012, month=5, day=17)
206 206 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
207 207 assert age(n + delt(**age_args), now=n) == expected
208 208
209 209 def test_tag_extractor(self):
210 210 sample = (
211 211 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
212 212 "[requires] [stale] [see<>=>] [see => http://example.com]"
213 213 "[requires => url] [lang => python] [just a tag]"
214 214 "[,d] [ => ULR ] [obsolete] [desc]]"
215 215 )
216 216 from kallithea.lib.helpers import urlify_text
217 217 res = urlify_text(sample, stylize=True)
218 218 assert '<div class="metatag" data-tag="tag">tag</div>' in res
219 219 assert '<div class="metatag" data-tag="obsolete">obsolete</div>' in res
220 220 assert '<div class="metatag" data-tag="stale">stale</div>' in res
221 221 assert '<div class="metatag" data-tag="lang">python</div>' in res
222 222 assert '<div class="metatag" data-tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
223 223 assert '<div class="metatag" data-tag="tag">tag</div>' in res
224 224
225 225 def test_alternative_gravatar(self):
226 226 from kallithea.lib.helpers import gravatar_url
227 227 _md5 = lambda s: hashlib.md5(s).hexdigest()
228 228
229 229 # mock tg.tmpl_context
230 230 def fake_tmpl_context(_url):
231 231 _c = AttributeDict()
232 232 _c.visual = AttributeDict()
233 233 _c.visual.use_gravatar = True
234 234 _c.visual.gravatar_url = _url
235 235
236 236 return _c
237 237
238 238 fake_url = FakeUrlGenerator(current_url='https://example.com')
239 239 with mock.patch('kallithea.config.routing.url', fake_url):
240 240 fake = fake_tmpl_context(_url='http://example.com/{email}')
241 241 with mock.patch('tg.tmpl_context', fake):
242 242 from kallithea.config.routing import url
243 243 assert url.current() == 'https://example.com'
244 244 grav = gravatar_url(email_address='test@example.com', size=24)
245 245 assert grav == 'http://example.com/test@example.com'
246 246
247 247 fake = fake_tmpl_context(_url='http://example.com/{email}')
248 248 with mock.patch('tg.tmpl_context', fake):
249 249 grav = gravatar_url(email_address='test@example.com', size=24)
250 250 assert grav == 'http://example.com/test@example.com'
251 251
252 252 fake = fake_tmpl_context(_url='http://example.com/{md5email}')
253 253 with mock.patch('tg.tmpl_context', fake):
254 254 em = 'test@example.com'
255 255 grav = gravatar_url(email_address=em, size=24)
256 256 assert grav == 'http://example.com/%s' % (_md5(em))
257 257
258 258 fake = fake_tmpl_context(_url='http://example.com/{md5email}/{size}')
259 259 with mock.patch('tg.tmpl_context', fake):
260 260 em = 'test@example.com'
261 261 grav = gravatar_url(email_address=em, size=24)
262 262 assert grav == 'http://example.com/%s/%s' % (_md5(em), 24)
263 263
264 264 fake = fake_tmpl_context(_url='{scheme}://{netloc}/{md5email}/{size}')
265 265 with mock.patch('tg.tmpl_context', fake):
266 266 em = 'test@example.com'
267 267 grav = gravatar_url(email_address=em, size=24)
268 268 assert grav == 'https://example.com/%s/%s' % (_md5(em), 24)
269 269
270 270 @parametrize('tmpl,repo_name,overrides,prefix,expected', [
271 271 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '', 'http://vps1:8000/group/repo1'),
272 272 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/group/repo1'),
273 273 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
274 274 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
275 275 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
276 276 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
277 277 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
278 278 ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'),
279 279 ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/_23'),
280 280 ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/_23'),
281 281 ('http://{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://vps1:8000/_23'),
282 282 ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {'user': 'username'}, '', 'https://username@proxy1.example.com/group/repo1'),
283 283 ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.example.com/group/repo1'),
284 284 ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', {'user': 'username'}, '', 'https://proxy1.example.com/username/group/repo1'),
285 285 ])
286 286 def test_clone_url_generator(self, tmpl, repo_name, overrides, prefix, expected):
287 287 from kallithea.lib.utils2 import get_clone_url
288 288 clone_url = get_clone_url(uri_tmpl=tmpl, qualified_home_url='http://vps1:8000'+prefix,
289 289 repo_name=repo_name, repo_id=23, **overrides)
290 290 assert clone_url == expected
291 291
292 292 def _quick_url(self, text, tmpl="""<a class="changeset_hash" href="%s">%s</a>""", url_=None):
293 293 """
294 294 Changes `some text url[foo]` => `some text <a href="/">foo</a>
295 295
296 296 :param text:
297 297 """
298 298 import re
299 299 # quickly change expected url[] into a link
300 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
300 url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
301 301
302 302 def url_func(match_obj):
303 303 _url = match_obj.groups()[0]
304 304 return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
305 return URL_PAT.sub(url_func, text)
305 return url_pattern.sub(url_func, text)
306 306
307 307 @parametrize('sample,expected', [
308 308 ("",
309 309 ""),
310 310 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
311 311 """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
312 312 ("from rev 000000000000",
313 313 """from rev url[000000000000]"""),
314 314 ("from rev 000000000000123123 also rev 000000000000",
315 315 """from rev url[000000000000123123] also rev url[000000000000]"""),
316 316 ("this should-000 00",
317 317 """this should-000 00"""),
318 318 ("longtextffffffffff rev 123123123123",
319 319 """longtextffffffffff rev url[123123123123]"""),
320 320 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
321 321 """rev ffffffffffffffffffffffffffffffffffffffffffffffffff"""),
322 322 ("ffffffffffff some text traalaa",
323 323 """url[ffffffffffff] some text traalaa"""),
324 324 ("""Multi line
325 325 123123123123
326 326 some text 123123123123
327 327 sometimes !
328 328 """,
329 329 """Multi line<br/>"""
330 330 """ url[123123123123]<br/>"""
331 331 """ some text url[123123123123]<br/>"""
332 332 """ sometimes !"""),
333 333 ])
334 334 def test_urlify_text(self, sample, expected):
335 335 expected = self._quick_url(expected)
336 336 fake_url = FakeUrlGenerator(changeset_home='/%(repo_name)s/changeset/%(revision)s')
337 337 with mock.patch('kallithea.config.routing.url', fake_url):
338 338 from kallithea.lib.helpers import urlify_text
339 339 assert urlify_text(sample, 'repo_name') == expected
340 340
341 341 @parametrize('sample,expected,url_', [
342 342 ("",
343 343 "",
344 344 ""),
345 345 ("https://svn.apache.org/repos",
346 346 """url[https://svn.apache.org/repos]""",
347 347 "https://svn.apache.org/repos"),
348 348 ("http://svn.apache.org/repos",
349 349 """url[http://svn.apache.org/repos]""",
350 350 "http://svn.apache.org/repos"),
351 351 ("from rev a also rev http://google.com",
352 352 """from rev a also rev url[http://google.com]""",
353 353 "http://google.com"),
354 354 ("http://imgur.com/foo.gif inline http://imgur.com/foo.gif ending http://imgur.com/foo.gif",
355 355 """url[http://imgur.com/foo.gif] inline url[http://imgur.com/foo.gif] ending url[http://imgur.com/foo.gif]""",
356 356 "http://imgur.com/foo.gif"),
357 357 ("""Multi line
358 358 https://foo.bar.example.com
359 359 some text lalala""",
360 360 """Multi line<br/>"""
361 361 """ url[https://foo.bar.example.com]<br/>"""
362 362 """ some text lalala""",
363 363 "https://foo.bar.example.com"),
364 364 ("@mention @someone",
365 365 """<b>@mention</b> <b>@someone</b>""",
366 366 ""),
367 367 ("deadbeefcafe 123412341234",
368 368 """<a class="changeset_hash" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a> <a class="changeset_hash" href="/repo_name/changeset/123412341234">123412341234</a>""",
369 369 ""),
370 370 ("We support * markup for *bold* markup of *single or multiple* words, "
371 371 "*a bit @like http://slack.com*. "
372 372 "The first * must come after whitespace and not be followed by whitespace, "
373 373 "contain anything but * and newline until the next *, "
374 374 "which must not come after whitespace "
375 375 "and not be followed by * or alphanumerical *characters*.",
376 376 """We support * markup for <b>*bold*</b> markup of <b>*single or multiple*</b> words, """
377 377 """<b>*a bit <b>@like</b> <a href="http://slack.com">http://slack.com</a>*</b>. """
378 378 """The first * must come after whitespace and not be followed by whitespace, """
379 379 """contain anything but * and newline until the next *, """
380 380 """which must not come after whitespace """
381 381 """and not be followed by * or alphanumerical <b>*characters*</b>.""",
382 382 "-"),
383 383 # tags are covered by test_tag_extractor
384 384 ])
385 385 def test_urlify_test(self, sample, expected, url_):
386 386 expected = self._quick_url(expected,
387 387 tmpl="""<a href="%s">%s</a>""", url_=url_)
388 388 fake_url = FakeUrlGenerator(changeset_home='/%(repo_name)s/changeset/%(revision)s')
389 389 with mock.patch('kallithea.config.routing.url', fake_url):
390 390 from kallithea.lib.helpers import urlify_text
391 391 assert urlify_text(sample, 'repo_name', stylize=True) == expected
392 392
393 393 @parametrize('sample,expected', [
394 394 ("deadbeefcafe @mention, and http://foo.bar/ yo",
395 395 """<a class="changeset_hash" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a>"""
396 396 """<a class="message-link" href="#the-link"> <b>@mention</b>, and </a>"""
397 397 """<a href="http://foo.bar/">http://foo.bar/</a>"""
398 398 """<a class="message-link" href="#the-link"> yo</a>"""),
399 399 ])
400 400 def test_urlify_link(self, sample, expected):
401 401 fake_url = FakeUrlGenerator(changeset_home='/%(repo_name)s/changeset/%(revision)s')
402 402 with mock.patch('kallithea.config.routing.url', fake_url):
403 403 from kallithea.lib.helpers import urlify_text
404 404 assert urlify_text(sample, 'repo_name', link_='#the-link') == expected
405 405
406 406 @parametrize('test,expected', [
407 407 ("", None),
408 408 ("/_2", '2'),
409 409 ("_2", '2'),
410 410 ("/_2/", '2'),
411 411 ("_2/", '2'),
412 412
413 413 ("/_21", '21'),
414 414 ("_21", '21'),
415 415 ("/_21/", '21'),
416 416 ("_21/", '21'),
417 417
418 418 ("/_21/foobar", '21'),
419 419 ("_21/121", '21'),
420 420 ("/_21/_12", '21'),
421 421 ("_21/prefix/foo", '21'),
422 422 ])
423 423 def test_get_repo_by_id(self, test, expected):
424 424 from kallithea.lib.utils import _extract_id_from_repo_name
425 425 _test = _extract_id_from_repo_name(test)
426 426 assert _test == expected, 'url:%s, got:`%s` expected: `%s`' % (test, _test, expected)
@@ -1,550 +1,552 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 Test suite for vcs push/pull operations.
16 16
17 17 The tests need Git > 1.8.1.
18 18
19 19 This file was forked by the Kallithea project in July 2014.
20 20 Original author and date, and relevant copyright and licensing information is below:
21 21 :created_on: Dec 30, 2010
22 22 :author: marcink
23 23 :copyright: (c) 2013 RhodeCode GmbH, and others.
24 24 :license: GPLv3, see LICENSE.md for more details.
25 25
26 26 """
27 27
28 28 import os
29 29 import re
30 30 import tempfile
31 31 import time
32 32 import pytest
33 33
34 34 from tempfile import _RandomNameSequence
35 35 from subprocess import Popen, PIPE
36 36
37 37 from kallithea.tests.base import *
38 38 from kallithea.tests.fixture import Fixture
39 39 from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
40 40 from kallithea.model.meta import Session
41 41 from kallithea.model.repo import RepoModel
42 42 from kallithea.model.user import UserModel
43 43
44 44 DEBUG = True
45 45 HOST = '127.0.0.1:4999' # test host
46 46
47 47 fixture = Fixture()
48 48
49 49
50 50 class Command(object):
51 51
52 52 def __init__(self, cwd):
53 53 self.cwd = cwd
54 54
55 55 def execute(self, cmd, *args, **environ):
56 56 """
57 57 Runs command on the system with given ``args``.
58 58 """
59 59
60 60 command = cmd + ' ' + ' '.join(args)
61 61 ignoreReturnCode = environ.pop('ignoreReturnCode', False)
62 62 if DEBUG:
63 63 print '*** CMD %s ***' % command
64 64 testenv = dict(os.environ)
65 65 testenv['LANG'] = 'en_US.UTF-8'
66 66 testenv['LANGUAGE'] = 'en_US:en'
67 67 testenv['HGPLAIN'] = ''
68 68 testenv['HGRCPATH'] = ''
69 69 testenv.update(environ)
70 70 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
71 71 stdout, stderr = p.communicate()
72 72 if DEBUG:
73 73 if stdout:
74 74 print 'stdout:', repr(stdout)
75 75 if stderr:
76 76 print 'stderr:', repr(stderr)
77 77 if not ignoreReturnCode:
78 78 assert p.returncode == 0
79 79 return stdout, stderr
80 80
81 81
82 82 def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
83 83 return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
84 84
85 85
86 def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3,
86 def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
87 87 clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
88 88 """
89 Generate some files, add it to DEST repo and push back
89 Generate some files, add it to dest_dir repo and push back
90 90 vcs is git or hg and defines what VCS we want to make those files for
91 91
92 92 :param vcs:
93 :param DEST:
93 :param dest_dir:
94 94 """
95 95 # commit some stuff into this repo
96 cwd = os.path.join(DEST)
96 cwd = os.path.join(dest_dir)
97 97 #added_file = '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next()
98 98 added_file = '%ssetup.py' % _RandomNameSequence().next()
99 99 open(os.path.join(cwd, added_file), 'a').close()
100 100 Command(cwd).execute('%s add %s' % (vcs, added_file))
101 101
102 102 email = 'me@example.com'
103 103 if os.name == 'nt':
104 104 author_str = 'User <%s>' % email
105 105 else:
106 106 author_str = 'User ǝɯɐᴎ <%s>' % email
107 107 for i in xrange(files_no):
108 108 cmd = """echo "added_line%s" >> %s""" % (i, added_file)
109 109 Command(cwd).execute(cmd)
110 110 if vcs == 'hg':
111 111 cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
112 112 i, author_str, added_file
113 113 )
114 114 elif vcs == 'git':
115 115 cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
116 116 i, author_str, added_file
117 117 )
118 118 # git commit needs EMAIL on some machines
119 119 Command(cwd).execute(cmd, EMAIL=email)
120 120
121 121 # PUSH it back
122 122 _REPO = None
123 123 if vcs == 'hg':
124 124 _REPO = HG_REPO
125 125 elif vcs == 'git':
126 126 _REPO = GIT_REPO
127 127
128 128 if clone_url is None:
129 129 clone_url = webserver.repo_url(_REPO, username=username, password=password)
130 130
131 131 stdout = stderr = None
132 132 if vcs == 'hg':
133 133 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
134 134 elif vcs == 'git':
135 135 stdout, stderr = Command(cwd).execute('git push --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
136 136
137 137 return stdout, stderr
138 138
139 139
140 140 def set_anonymous_access(enable=True):
141 141 user = User.get_default_user()
142 142 user.active = enable
143 143 Session().commit()
144 144 print '\tanonymous access is now:', enable
145 145 if enable != User.get_default_user().active:
146 146 raise Exception('Cannot set anonymous access')
147 147
148 148
149 149 #==============================================================================
150 150 # TESTS
151 151 #==============================================================================
152 152
153 153
154 154 def _check_proper_git_push(stdout, stderr):
155 155 # WTF Git stderr is output ?!
156 156 assert 'fatal' not in stderr
157 157 assert 'rejected' not in stderr
158 158 assert 'Pushing to' in stderr
159 159 assert 'master -> master' in stderr
160 160
161 161
162 162 @pytest.mark.usefixtures("test_context_fixture")
163 163 class TestVCSOperations(TestController):
164 164
165 165 @classmethod
166 166 def setup_class(cls):
167 167 # DISABLE ANONYMOUS ACCESS
168 168 set_anonymous_access(False)
169 169
170 170 def setup_method(self, method):
171 171 r = Repository.get_by_repo_name(GIT_REPO)
172 172 Repository.unlock(r)
173 173 r.enable_locking = False
174 174 Session().commit()
175 175
176 176 r = Repository.get_by_repo_name(HG_REPO)
177 177 Repository.unlock(r)
178 178 r.enable_locking = False
179 179 Session().commit()
180 180
181 181 def test_clone_hg_repo_by_admin(self, webserver):
182 182 clone_url = webserver.repo_url(HG_REPO)
183 183 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
184 184
185 185 assert 'requesting all changes' in stdout
186 186 assert 'adding changesets' in stdout
187 187 assert 'adding manifests' in stdout
188 188 assert 'adding file changes' in stdout
189 189
190 190 assert stderr == ''
191 191
192 192 def test_clone_git_repo_by_admin(self, webserver):
193 193 clone_url = webserver.repo_url(GIT_REPO)
194 194 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
195 195
196 196 assert 'Cloning into' in stdout + stderr
197 197 assert stderr == '' or stdout == ''
198 198
199 199 def test_clone_wrong_credentials_hg(self, webserver):
200 200 clone_url = webserver.repo_url(HG_REPO, password='bad!')
201 201 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
202 202 assert 'abort: authorization failed' in stderr
203 203
204 204 def test_clone_wrong_credentials_git(self, webserver):
205 205 clone_url = webserver.repo_url(GIT_REPO, password='bad!')
206 206 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
207 207 assert 'fatal: Authentication failed' in stderr
208 208
209 209 def test_clone_git_dir_as_hg(self, webserver):
210 210 clone_url = webserver.repo_url(GIT_REPO)
211 211 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
212 212 assert 'HTTP Error 404: Not Found' in stderr
213 213
214 214 def test_clone_hg_repo_as_git(self, webserver):
215 215 clone_url = webserver.repo_url(HG_REPO)
216 216 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
217 217 assert 'not found' in stderr
218 218
219 219 def test_clone_non_existing_path_hg(self, webserver):
220 220 clone_url = webserver.repo_url('trololo')
221 221 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
222 222 assert 'HTTP Error 404: Not Found' in stderr
223 223
224 224 def test_clone_non_existing_path_git(self, webserver):
225 225 clone_url = webserver.repo_url('trololo')
226 226 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
227 227 assert 'not found' in stderr
228 228
229 229 def test_push_new_file_hg(self, webserver):
230 DEST = _get_tmp_dir()
230 dest_dir = _get_tmp_dir()
231 231 clone_url = webserver.repo_url(HG_REPO)
232 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
232 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
233 233
234 234 fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
235 235 fixture.create_fork(HG_REPO, fork_name)
236 236 clone_url = webserver.repo_url(fork_name)
237 stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
237 stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
238 238
239 239 assert 'pushing to' in stdout
240 240 assert 'Repository size' in stdout
241 241 assert 'Last revision is now' in stdout
242 242
243 243 def test_push_new_file_git(self, webserver):
244 DEST = _get_tmp_dir()
244 dest_dir = _get_tmp_dir()
245 245 clone_url = webserver.repo_url(GIT_REPO)
246 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
246 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
247 247
248 248 # commit some stuff into this repo
249 249 fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
250 250 fixture.create_fork(GIT_REPO, fork_name)
251 251 clone_url = webserver.repo_url(fork_name)
252 stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
252 stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
253 253 print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
254 254 _check_proper_git_push(stdout, stderr)
255 255
256 256 def test_push_invalidates_cache_hg(self, webserver):
257 257 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
258 258 == HG_REPO).scalar()
259 259 if not key:
260 260 key = CacheInvalidation(HG_REPO, HG_REPO)
261 261 Session().add(key)
262 262
263 263 key.cache_active = True
264 264 Session().commit()
265 265
266 DEST = _get_tmp_dir()
266 dest_dir = _get_tmp_dir()
267 267 clone_url = webserver.repo_url(HG_REPO)
268 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
268 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
269 269
270 270 fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
271 271 fixture.create_fork(HG_REPO, fork_name)
272 272 clone_url = webserver.repo_url(fork_name)
273 stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, files_no=1, clone_url=clone_url)
273 stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, files_no=1, clone_url=clone_url)
274 274
275 275 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
276 276 == fork_name).all()
277 277 assert key == []
278 278
279 279 def test_push_invalidates_cache_git(self, webserver):
280 280 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
281 281 == GIT_REPO).scalar()
282 282 if not key:
283 283 key = CacheInvalidation(GIT_REPO, GIT_REPO)
284 284 Session().add(key)
285 285
286 286 key.cache_active = True
287 287 Session().commit()
288 288
289 DEST = _get_tmp_dir()
289 dest_dir = _get_tmp_dir()
290 290 clone_url = webserver.repo_url(GIT_REPO)
291 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
291 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
292 292
293 293 # commit some stuff into this repo
294 294 fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
295 295 fixture.create_fork(GIT_REPO, fork_name)
296 296 clone_url = webserver.repo_url(fork_name)
297 stdout, stderr = _add_files_and_push(webserver, 'git', DEST, files_no=1, clone_url=clone_url)
297 stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, files_no=1, clone_url=clone_url)
298 298 _check_proper_git_push(stdout, stderr)
299 299
300 300 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
301 301 == fork_name).all()
302 302 assert key == []
303 303
304 304 def test_push_wrong_credentials_hg(self, webserver):
305 DEST = _get_tmp_dir()
305 dest_dir = _get_tmp_dir()
306 306 clone_url = webserver.repo_url(HG_REPO)
307 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
307 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
308 308
309 stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, username='bad',
309 stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username='bad',
310 310 password='name', ignoreReturnCode=True)
311 311
312 312 assert 'abort: authorization failed' in stderr
313 313
314 314 def test_push_wrong_credentials_git(self, webserver):
315 DEST = _get_tmp_dir()
315 dest_dir = _get_tmp_dir()
316 316 clone_url = webserver.repo_url(GIT_REPO)
317 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
317 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
318 318
319 stdout, stderr = _add_files_and_push(webserver, 'git', DEST, username='bad',
319 stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username='bad',
320 320 password='name', ignoreReturnCode=True)
321 321
322 322 assert 'fatal: Authentication failed' in stderr
323 323
324 324 def test_push_back_to_wrong_url_hg(self, webserver):
325 DEST = _get_tmp_dir()
325 dest_dir = _get_tmp_dir()
326 326 clone_url = webserver.repo_url(HG_REPO)
327 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
327 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
328 328
329 stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
330 clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
331 ignoreReturnCode = True)
329 stdout, stderr = _add_files_and_push(
330 webserver, 'hg', dest_dir, clone_url='http://%s:%s/tmp' % (
331 webserver.server_address[0], webserver.server_address[1]),
332 ignoreReturnCode=True)
332 333
333 334 assert 'HTTP Error 404: Not Found' in stderr
334 335
335 336 def test_push_back_to_wrong_url_git(self, webserver):
336 DEST = _get_tmp_dir()
337 dest_dir = _get_tmp_dir()
337 338 clone_url = webserver.repo_url(GIT_REPO)
338 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
339 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
339 340
340 stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
341 clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
342 ignoreReturnCode = True)
341 stdout, stderr = _add_files_and_push(
342 webserver, 'git', dest_dir, clone_url='http://%s:%s/tmp' % (
343 webserver.server_address[0], webserver.server_address[1]),
344 ignoreReturnCode=True)
343 345
344 346 assert 'not found' in stderr
345 347
346 348 def test_clone_and_create_lock_hg(self, webserver):
347 349 # enable locking
348 350 r = Repository.get_by_repo_name(HG_REPO)
349 351 r.enable_locking = True
350 352 Session().commit()
351 353 # clone
352 354 clone_url = webserver.repo_url(HG_REPO)
353 355 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
354 356
355 357 # check if lock was made
356 358 r = Repository.get_by_repo_name(HG_REPO)
357 359 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
358 360
359 361 def test_clone_and_create_lock_git(self, webserver):
360 362 # enable locking
361 363 r = Repository.get_by_repo_name(GIT_REPO)
362 364 r.enable_locking = True
363 365 Session().commit()
364 366 # clone
365 367 clone_url = webserver.repo_url(GIT_REPO)
366 368 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
367 369
368 370 # check if lock was made
369 371 r = Repository.get_by_repo_name(GIT_REPO)
370 372 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
371 373
372 374 def test_clone_after_repo_was_locked_hg(self, webserver):
373 375 # lock repo
374 376 r = Repository.get_by_repo_name(HG_REPO)
375 377 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
376 378 # pull fails since repo is locked
377 379 clone_url = webserver.repo_url(HG_REPO)
378 380 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
379 381 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
380 382 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
381 383 assert msg in stderr
382 384
383 385 def test_clone_after_repo_was_locked_git(self, webserver):
384 386 # lock repo
385 387 r = Repository.get_by_repo_name(GIT_REPO)
386 388 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
387 389 # pull fails since repo is locked
388 390 clone_url = webserver.repo_url(GIT_REPO)
389 391 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
390 392 msg = ("""The requested URL returned error: 423""")
391 393 assert msg in stderr
392 394
393 395 def test_push_on_locked_repo_by_other_user_hg(self, webserver):
394 396 # clone some temp
395 DEST = _get_tmp_dir()
397 dest_dir = _get_tmp_dir()
396 398 clone_url = webserver.repo_url(HG_REPO)
397 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
399 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
398 400
399 401 # lock repo
400 402 r = Repository.get_by_repo_name(HG_REPO)
401 403 # let this user actually push !
402 404 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
403 405 perm='repository.write')
404 406 Session().commit()
405 407 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
406 408
407 409 # push fails repo is locked by other user !
408 stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
410 stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir,
409 411 username=TEST_USER_REGULAR_LOGIN,
410 412 password=TEST_USER_REGULAR_PASS,
411 413 ignoreReturnCode=True)
412 414 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
413 415 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
414 416 assert msg in stderr
415 417
416 418 def test_push_on_locked_repo_by_other_user_git(self, webserver):
417 419 # Note: Git hooks must be executable on unix. This test will thus fail
418 420 # for example on Linux if /tmp is mounted noexec.
419 421
420 422 # clone some temp
421 DEST = _get_tmp_dir()
423 dest_dir = _get_tmp_dir()
422 424 clone_url = webserver.repo_url(GIT_REPO)
423 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
425 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
424 426
425 427 # lock repo
426 428 r = Repository.get_by_repo_name(GIT_REPO)
427 429 # let this user actually push !
428 430 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
429 431 perm='repository.write')
430 432 Session().commit()
431 433 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
432 434
433 435 # push fails repo is locked by other user !
434 stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
436 stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir,
435 437 username=TEST_USER_REGULAR_LOGIN,
436 438 password=TEST_USER_REGULAR_PASS,
437 439 ignoreReturnCode=True)
438 440 err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
439 441 assert err in stderr
440 442
441 443 # TODO: fix this somehow later on Git, Git is stupid and even if we throw
442 444 # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
443 445
444 446 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
445 447 % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
446 448 #msg = "405 Method Not Allowed"
447 449 #assert msg in stderr
448 450
449 451 def test_push_unlocks_repository_hg(self, webserver):
450 452 # enable locking
451 453 fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
452 454 fixture.create_fork(HG_REPO, fork_name)
453 455 r = Repository.get_by_repo_name(fork_name)
454 456 r.enable_locking = True
455 457 Session().commit()
456 458 # clone some temp
457 DEST = _get_tmp_dir()
459 dest_dir = _get_tmp_dir()
458 460 clone_url = webserver.repo_url(fork_name)
459 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
461 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
460 462
461 463 # check for lock repo after clone
462 464 r = Repository.get_by_repo_name(fork_name)
463 465 uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
464 466 assert r.locked[0] == uid
465 467
466 468 # push is ok and repo is now unlocked
467 stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
469 stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
468 470 assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
469 471 # we need to cleanup the Session Here !
470 472 Session.remove()
471 473 r = Repository.get_by_repo_name(fork_name)
472 474 assert r.locked == [None, None]
473 475
474 476 # TODO: fix me ! somehow during tests hooks don't get called on Git
475 477 def test_push_unlocks_repository_git(self, webserver):
476 478 # enable locking
477 479 fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
478 480 fixture.create_fork(GIT_REPO, fork_name)
479 481 r = Repository.get_by_repo_name(fork_name)
480 482 r.enable_locking = True
481 483 Session().commit()
482 484 # clone some temp
483 DEST = _get_tmp_dir()
485 dest_dir = _get_tmp_dir()
484 486 clone_url = webserver.repo_url(fork_name)
485 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
487 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
486 488
487 489 # check for lock repo after clone
488 490 r = Repository.get_by_repo_name(fork_name)
489 491 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
490 492
491 493 # push is ok and repo is now unlocked
492 stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
494 stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
493 495 _check_proper_git_push(stdout, stderr)
494 496
495 497 assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
496 498 # we need to cleanup the Session Here !
497 499 Session.remove()
498 500 r = Repository.get_by_repo_name(fork_name)
499 501 assert r.locked == [None, None]
500 502
501 503 def test_ip_restriction_hg(self, webserver):
502 504 user_model = UserModel()
503 505 try:
504 506 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
505 507 Session().commit()
506 508 clone_url = webserver.repo_url(HG_REPO)
507 509 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
508 510 assert 'abort: HTTP Error 403: Forbidden' in stderr
509 511 finally:
510 512 # release IP restrictions
511 513 for ip in UserIpMap.query():
512 514 UserIpMap.delete(ip.ip_id)
513 515 Session().commit()
514 516
515 517 # IP permissions are cached, need to wait for the cache in the server process to expire
516 518 time.sleep(1.5)
517 519
518 520 clone_url = webserver.repo_url(HG_REPO)
519 521 stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
520 522
521 523 assert 'requesting all changes' in stdout
522 524 assert 'adding changesets' in stdout
523 525 assert 'adding manifests' in stdout
524 526 assert 'adding file changes' in stdout
525 527
526 528 assert stderr == ''
527 529
528 530 def test_ip_restriction_git(self, webserver):
529 531 user_model = UserModel()
530 532 try:
531 533 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
532 534 Session().commit()
533 535 clone_url = webserver.repo_url(GIT_REPO)
534 536 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
535 537 # The message apparently changed in Git 1.8.3, so match it loosely.
536 538 assert re.search(r'\b403\b', stderr)
537 539 finally:
538 540 # release IP restrictions
539 541 for ip in UserIpMap.query():
540 542 UserIpMap.delete(ip.ip_id)
541 543 Session().commit()
542 544
543 545 # IP permissions are cached, need to wait for the cache in the server process to expire
544 546 time.sleep(1.5)
545 547
546 548 clone_url = webserver.repo_url(GIT_REPO)
547 549 stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
548 550
549 551 assert 'Cloning into' in stdout + stderr
550 552 assert stderr == '' or stdout == ''
@@ -1,851 +1,851 b''
1 1
2 2 import os
3 3 import sys
4 4 import mock
5 5 import datetime
6 6 import urllib2
7 7
8 8 import pytest
9 9
10 10 from kallithea.lib.vcs.backends.git import GitRepository, GitChangeset
11 11 from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
12 12 from kallithea.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
13 13 from kallithea.lib.vcs.utils.compat import unittest
14 14 from kallithea.model.scm import ScmModel
15 15 from kallithea.tests.vcs.base import _BackendTestMixin
16 16 from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, TEST_TMP_PATH, get_new_dir
17 17
18 18
19 19 class GitRepositoryTest(unittest.TestCase):
20 20
21 21 def __check_for_existing_repo(self):
22 22 if os.path.exists(TEST_GIT_REPO_CLONE):
23 23 pytest.fail('Cannot test git clone repo as location %s already '
24 24 'exists. You should manually remove it first.'
25 25 % TEST_GIT_REPO_CLONE)
26 26
27 27 def setUp(self):
28 28 self.repo = GitRepository(TEST_GIT_REPO)
29 29
30 30 def test_wrong_repo_path(self):
31 31 wrong_repo_path = os.path.join(TEST_TMP_PATH, 'errorrepo')
32 32 self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
33 33
34 34 def test_git_cmd_injection(self):
35 35 repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
36 36 with self.assertRaises(urllib2.URLError):
37 37 # Should fail because URL will contain the parts after ; too
38 38 urlerror_fail_repo = GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
39 39
40 40 with self.assertRaises(RepositoryError):
41 41 # Should fail on direct clone call, which as of this writing does not happen outside of class
42 42 clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
43 43 clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
44 44
45 45 # Verify correct quoting of evil characters that should work on posix file systems
46 46 if sys.platform == 'win32':
47 47 # windows does not allow '"' in dir names
48 48 # and some versions of the git client don't like ` and '
49 49 tricky_path = get_new_dir("tricky-path-repo-$")
50 50 else:
51 51 tricky_path = get_new_dir("tricky-path-repo-$'\"`")
52 52 successfully_cloned = GitRepository(tricky_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
53 53 # Repo should have been created
54 54 self.assertFalse(successfully_cloned._repo.bare)
55 55
56 56 if sys.platform == 'win32':
57 57 # windows does not allow '"' in dir names
58 58 # and some versions of the git client don't like ` and '
59 59 tricky_path_2 = get_new_dir("tricky-path-2-repo-$")
60 60 else:
61 61 tricky_path_2 = get_new_dir("tricky-path-2-repo-$'\"`")
62 62 successfully_cloned2 = GitRepository(tricky_path_2, src_url=tricky_path, bare=True, create=True)
63 63 # Repo should have been created and thus used correct quoting for clone
64 64 self.assertTrue(successfully_cloned2._repo.bare)
65 65
66 66 # Should pass because URL has been properly quoted
67 67 successfully_cloned.pull(tricky_path_2)
68 68 successfully_cloned2.fetch(tricky_path)
69 69
70 70 def test_repo_create_with_spaces_in_path(self):
71 71 repo_path = get_new_dir("path with spaces")
72 72 repo = GitRepository(repo_path, src_url=None, bare=True, create=True)
73 73 # Repo should have been created
74 74 self.assertTrue(repo._repo.bare)
75 75
76 76 def test_repo_clone(self):
77 77 self.__check_for_existing_repo()
78 78 repo = GitRepository(TEST_GIT_REPO)
79 79 repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
80 80 src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
81 81 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
82 82 # Checking hashes of changesets should be enough
83 83 for changeset in repo.get_changesets():
84 84 raw_id = changeset.raw_id
85 85 self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
86 86
87 87 def test_repo_clone_with_spaces_in_path(self):
88 88 repo_path = get_new_dir("path with spaces")
89 89 successfully_cloned = GitRepository(repo_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
90 90 # Repo should have been created
91 91 self.assertFalse(successfully_cloned._repo.bare)
92 92
93 93 successfully_cloned.pull(TEST_GIT_REPO)
94 94 self.repo.fetch(repo_path)
95 95
96 96 def test_repo_clone_without_create(self):
97 97 self.assertRaises(RepositoryError, GitRepository,
98 98 TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
99 99
100 100 def test_repo_clone_with_update(self):
101 101 repo = GitRepository(TEST_GIT_REPO)
102 102 clone_path = TEST_GIT_REPO_CLONE + '_with_update'
103 103 repo_clone = GitRepository(clone_path,
104 104 create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
105 105 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
106 106
107 107 # check if current workdir was updated
108 108 fpath = os.path.join(clone_path, 'MANIFEST.in')
109 109 self.assertEqual(True, os.path.isfile(fpath),
110 110 'Repo was cloned and updated but file %s could not be found'
111 111 % fpath)
112 112
113 113 def test_repo_clone_without_update(self):
114 114 repo = GitRepository(TEST_GIT_REPO)
115 115 clone_path = TEST_GIT_REPO_CLONE + '_without_update'
116 116 repo_clone = GitRepository(clone_path,
117 117 create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
118 118 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
119 119 # check if current workdir was *NOT* updated
120 120 fpath = os.path.join(clone_path, 'MANIFEST.in')
121 121 # Make sure it's not bare repo
122 122 self.assertFalse(repo_clone._repo.bare)
123 123 self.assertEqual(False, os.path.isfile(fpath),
124 124 'Repo was cloned and updated but file %s was found'
125 125 % fpath)
126 126
127 127 def test_repo_clone_into_bare_repo(self):
128 128 repo = GitRepository(TEST_GIT_REPO)
129 129 clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
130 130 repo_clone = GitRepository(clone_path, create=True,
131 131 src_url=repo.path, bare=True)
132 132 self.assertTrue(repo_clone._repo.bare)
133 133
134 134 def test_create_repo_is_not_bare_by_default(self):
135 135 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
136 136 self.assertFalse(repo._repo.bare)
137 137
138 138 def test_create_bare_repo(self):
139 139 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
140 140 self.assertTrue(repo._repo.bare)
141 141
142 142 def test_revisions(self):
143 143 # there are 112 revisions (by now)
144 144 # so we can assume they would be available from now on
145 145 subset = set([
146 146 'c1214f7e79e02fc37156ff215cd71275450cffc3',
147 147 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
148 148 'fa6600f6848800641328adbf7811fd2372c02ab2',
149 149 '102607b09cdd60e2793929c4f90478be29f85a17',
150 150 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
151 151 '2d1028c054665b962fa3d307adfc923ddd528038',
152 152 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
153 153 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
154 154 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
155 155 '8430a588b43b5d6da365400117c89400326e7992',
156 156 'd955cd312c17b02143c04fa1099a352b04368118',
157 157 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
158 158 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
159 159 'f298fe1189f1b69779a4423f40b48edf92a703fc',
160 160 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
161 161 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
162 162 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
163 163 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
164 164 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
165 165 '45223f8f114c64bf4d6f853e3c35a369a6305520',
166 166 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
167 167 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
168 168 '27d48942240f5b91dfda77accd2caac94708cc7d',
169 169 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
170 170 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
171 171 self.assertTrue(subset.issubset(set(self.repo.revisions)))
172 172
173 173 def test_slicing(self):
174 174 # 4 1 5 10 95
175 175 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
176 176 (10, 20, 10), (5, 100, 95)]:
177 177 revs = list(self.repo[sfrom:sto])
178 178 self.assertEqual(len(revs), size)
179 179 self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
180 180 self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
181 181
182 182 def test_branches(self):
183 183 # TODO: Need more tests here
184 184 # Removed (those are 'remotes' branches for cloned repo)
185 185 #self.assertTrue('master' in self.repo.branches)
186 186 #self.assertTrue('gittree' in self.repo.branches)
187 187 #self.assertTrue('web-branch' in self.repo.branches)
188 188 for name, id in self.repo.branches.items():
189 189 self.assertTrue(isinstance(
190 190 self.repo.get_changeset(id), GitChangeset))
191 191
192 192 def test_tags(self):
193 193 # TODO: Need more tests here
194 194 self.assertTrue('v0.1.1' in self.repo.tags)
195 195 self.assertTrue('v0.1.2' in self.repo.tags)
196 196 for name, id in self.repo.tags.items():
197 197 self.assertTrue(isinstance(
198 198 self.repo.get_changeset(id), GitChangeset))
199 199
200 200 def _test_single_changeset_cache(self, revision):
201 201 chset = self.repo.get_changeset(revision)
202 202 self.assertTrue(revision in self.repo.changesets)
203 203 self.assertTrue(chset is self.repo.changesets[revision])
204 204
205 205 def test_initial_changeset(self):
206 206 id = self.repo.revisions[0]
207 207 init_chset = self.repo.get_changeset(id)
208 208 self.assertEqual(init_chset.message, 'initial import\n')
209 209 self.assertEqual(init_chset.author,
210 210 'Marcin Kuzminski <marcin@python-blog.com>')
211 211 for path in ('vcs/__init__.py',
212 212 'vcs/backends/BaseRepository.py',
213 213 'vcs/backends/__init__.py'):
214 214 self.assertTrue(isinstance(init_chset.get_node(path), FileNode))
215 215 for path in ('', 'vcs', 'vcs/backends'):
216 216 self.assertTrue(isinstance(init_chset.get_node(path), DirNode))
217 217
218 218 self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
219 219
220 220 node = init_chset.get_node('vcs/')
221 221 self.assertTrue(hasattr(node, 'kind'))
222 222 self.assertEqual(node.kind, NodeKind.DIR)
223 223
224 224 node = init_chset.get_node('vcs')
225 225 self.assertTrue(hasattr(node, 'kind'))
226 226 self.assertEqual(node.kind, NodeKind.DIR)
227 227
228 228 node = init_chset.get_node('vcs/__init__.py')
229 229 self.assertTrue(hasattr(node, 'kind'))
230 230 self.assertEqual(node.kind, NodeKind.FILE)
231 231
232 232 def test_not_existing_changeset(self):
233 233 self.assertRaises(RepositoryError, self.repo.get_changeset,
234 234 'f' * 40)
235 235
236 236 def test_changeset10(self):
237 237
238 238 chset10 = self.repo.get_changeset(self.repo.revisions[9])
239 README = """===
239 readme = """===
240 240 VCS
241 241 ===
242 242
243 243 Various Version Control System management abstraction layer for Python.
244 244
245 245 Introduction
246 246 ------------
247 247
248 248 TODO: To be written...
249 249
250 250 """
251 251 node = chset10.get_node('README.rst')
252 252 self.assertEqual(node.kind, NodeKind.FILE)
253 self.assertEqual(node.content, README)
253 self.assertEqual(node.content, readme)
254 254
255 255
256 256 class GitChangesetTest(unittest.TestCase):
257 257
258 258 def setUp(self):
259 259 self.repo = GitRepository(TEST_GIT_REPO)
260 260
261 261 def test_default_changeset(self):
262 262 tip = self.repo.get_changeset()
263 263 self.assertEqual(tip, self.repo.get_changeset(None))
264 264 self.assertEqual(tip, self.repo.get_changeset('tip'))
265 265
266 266 def test_root_node(self):
267 267 tip = self.repo.get_changeset()
268 268 self.assertTrue(tip.root is tip.get_node(''))
269 269
270 270 def test_lazy_fetch(self):
271 271 """
272 272 Test if changeset's nodes expands and are cached as we walk through
273 273 the revision. This test is somewhat hard to write as order of tests
274 274 is a key here. Written by running command after command in a shell.
275 275 """
276 276 hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
277 277 self.assertTrue(hex in self.repo.revisions)
278 278 chset = self.repo.get_changeset(hex)
279 279 self.assertTrue(len(chset.nodes) == 0)
280 280 root = chset.root
281 281 self.assertTrue(len(chset.nodes) == 1)
282 282 self.assertTrue(len(root.nodes) == 8)
283 283 # accessing root.nodes updates chset.nodes
284 284 self.assertTrue(len(chset.nodes) == 9)
285 285
286 286 docs = root.get_node('docs')
287 287 # we haven't yet accessed anything new as docs dir was already cached
288 288 self.assertTrue(len(chset.nodes) == 9)
289 289 self.assertTrue(len(docs.nodes) == 8)
290 290 # accessing docs.nodes updates chset.nodes
291 291 self.assertTrue(len(chset.nodes) == 17)
292 292
293 293 self.assertTrue(docs is chset.get_node('docs'))
294 294 self.assertTrue(docs is root.nodes[0])
295 295 self.assertTrue(docs is root.dirs[0])
296 296 self.assertTrue(docs is chset.get_node('docs'))
297 297
298 298 def test_nodes_with_changeset(self):
299 299 hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
300 300 chset = self.repo.get_changeset(hex)
301 301 root = chset.root
302 302 docs = root.get_node('docs')
303 303 self.assertTrue(docs is chset.get_node('docs'))
304 304 api = docs.get_node('api')
305 305 self.assertTrue(api is chset.get_node('docs/api'))
306 306 index = api.get_node('index.rst')
307 307 self.assertTrue(index is chset.get_node('docs/api/index.rst'))
308 308 self.assertTrue(index is chset.get_node('docs') \
309 309 .get_node('api') \
310 310 .get_node('index.rst'))
311 311
312 312 def test_branch_and_tags(self):
313 313 """
314 314 rev0 = self.repo.revisions[0]
315 315 chset0 = self.repo.get_changeset(rev0)
316 316 self.assertEqual(chset0.branch, 'master')
317 317 self.assertEqual(chset0.tags, [])
318 318
319 319 rev10 = self.repo.revisions[10]
320 320 chset10 = self.repo.get_changeset(rev10)
321 321 self.assertEqual(chset10.branch, 'master')
322 322 self.assertEqual(chset10.tags, [])
323 323
324 324 rev44 = self.repo.revisions[44]
325 325 chset44 = self.repo.get_changeset(rev44)
326 326 self.assertEqual(chset44.branch, 'web-branch')
327 327
328 328 tip = self.repo.get_changeset('tip')
329 329 self.assertTrue('tip' in tip.tags)
330 330 """
331 331 # Those tests would fail - branches are now going
332 332 # to be changed at main API in order to support git backend
333 333 pass
334 334
335 335 def _test_slices(self, limit, offset):
336 336 count = self.repo.count()
337 337 changesets = self.repo.get_changesets(limit=limit, offset=offset)
338 338 idx = 0
339 339 for changeset in changesets:
340 340 rev = offset + idx
341 341 idx += 1
342 342 rev_id = self.repo.revisions[rev]
343 343 if idx > limit:
344 344 pytest.fail("Exceeded limit already (getting revision %s, "
345 345 "there are %s total revisions, offset=%s, limit=%s)"
346 346 % (rev_id, count, offset, limit))
347 347 self.assertEqual(changeset, self.repo.get_changeset(rev_id))
348 348 result = list(self.repo.get_changesets(limit=limit, offset=offset))
349 349 start = offset
350 350 end = limit and offset + limit or None
351 351 sliced = list(self.repo[start:end])
352 352 pytest.failUnlessEqual(result, sliced,
353 353 msg="Comparison failed for limit=%s, offset=%s"
354 354 "(get_changeset returned: %s and sliced: %s"
355 355 % (limit, offset, result, sliced))
356 356
357 357 def _test_file_size(self, revision, path, size):
358 358 node = self.repo.get_changeset(revision).get_node(path)
359 359 self.assertTrue(node.is_file())
360 360 self.assertEqual(node.size, size)
361 361
362 362 def test_file_size(self):
363 363 to_check = (
364 364 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
365 365 'vcs/backends/BaseRepository.py', 502),
366 366 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
367 367 'vcs/backends/hg.py', 854),
368 368 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
369 369 'setup.py', 1068),
370 370 ('d955cd312c17b02143c04fa1099a352b04368118',
371 371 'vcs/backends/base.py', 2921),
372 372 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
373 373 'vcs/backends/base.py', 3936),
374 374 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
375 375 'vcs/backends/base.py', 6189),
376 376 )
377 377 for revision, path, size in to_check:
378 378 self._test_file_size(revision, path, size)
379 379
380 380 def _test_dir_size(self, revision, path, size):
381 381 node = self.repo.get_changeset(revision).get_node(path)
382 382 self.assertEqual(node.size, size)
383 383
384 384 def test_dir_size(self):
385 385 to_check = (
386 386 ('5f2c6ee195929b0be80749243c18121c9864a3b3', '/', 674076),
387 387 ('7ab37bc680b4aa72c34d07b230c866c28e9fc204', '/', 674049),
388 388 ('6892503fb8f2a552cef5f4d4cc2cdbd13ae1cd2f', '/', 671830),
389 389 )
390 390 for revision, path, size in to_check:
391 391 self._test_dir_size(revision, path, size)
392 392
393 393 def test_repo_size(self):
394 394 self.assertEqual(self.repo.size, 674076)
395 395
396 396 def test_file_history(self):
397 397 # we can only check if those revisions are present in the history
398 398 # as we cannot update this test every time file is changed
399 399 files = {
400 400 'setup.py': [
401 401 '54386793436c938cff89326944d4c2702340037d',
402 402 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
403 403 '998ed409c795fec2012b1c0ca054d99888b22090',
404 404 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
405 405 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
406 406 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
407 407 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
408 408 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
409 409 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
410 410 ],
411 411 'vcs/nodes.py': [
412 412 '33fa3223355104431402a888fa77a4e9956feb3e',
413 413 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
414 414 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
415 415 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
416 416 'c877b68d18e792a66b7f4c529ea02c8f80801542',
417 417 '4313566d2e417cb382948f8d9d7c765330356054',
418 418 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
419 419 '54386793436c938cff89326944d4c2702340037d',
420 420 '54000345d2e78b03a99d561399e8e548de3f3203',
421 421 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
422 422 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
423 423 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
424 424 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
425 425 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
426 426 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
427 427 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
428 428 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
429 429 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
430 430 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
431 431 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
432 432 'f15c21f97864b4f071cddfbf2750ec2e23859414',
433 433 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
434 434 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
435 435 '84dec09632a4458f79f50ddbbd155506c460b4f9',
436 436 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
437 437 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
438 438 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
439 439 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
440 440 '6970b057cffe4aab0a792aa634c89f4bebf01441',
441 441 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
442 442 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
443 443 ],
444 444 'vcs/backends/git.py': [
445 445 '4cf116ad5a457530381135e2f4c453e68a1b0105',
446 446 '9a751d84d8e9408e736329767387f41b36935153',
447 447 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
448 448 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
449 449 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
450 450 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
451 451 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
452 452 '54000345d2e78b03a99d561399e8e548de3f3203',
453 453 ],
454 454 }
455 455 for path, revs in files.items():
456 456 node = self.repo.get_changeset(revs[0]).get_node(path)
457 457 node_revs = [chset.raw_id for chset in node.history]
458 458 self.assertTrue(set(revs).issubset(set(node_revs)),
459 459 "We assumed that %s is subset of revisions for which file %s "
460 460 "has been changed, and history of that node returned: %s"
461 461 % (revs, path, node_revs))
462 462
463 463 def test_file_annotate(self):
464 464 files = {
465 465 'vcs/backends/__init__.py': {
466 466 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
467 467 'lines_no': 1,
468 468 'changesets': [
469 469 'c1214f7e79e02fc37156ff215cd71275450cffc3',
470 470 ],
471 471 },
472 472 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
473 473 'lines_no': 21,
474 474 'changesets': [
475 475 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
476 476 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
477 477 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
478 478 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
479 479 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
480 480 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
481 481 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
482 482 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
483 483 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
484 484 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
485 485 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
486 486 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
487 487 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
488 488 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
489 489 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
490 490 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
491 491 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
492 492 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
493 493 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
494 494 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
495 495 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
496 496 ],
497 497 },
498 498 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
499 499 'lines_no': 32,
500 500 'changesets': [
501 501 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
502 502 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
503 503 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
504 504 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
505 505 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
506 506 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
507 507 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
508 508 '54000345d2e78b03a99d561399e8e548de3f3203',
509 509 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
510 510 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
511 511 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
512 512 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
513 513 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
514 514 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
515 515 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
516 516 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
517 517 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
518 518 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
519 519 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
520 520 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
521 521 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
522 522 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
523 523 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
524 524 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
525 525 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
526 526 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
527 527 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
528 528 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
529 529 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
530 530 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
531 531 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
532 532 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
533 533 ],
534 534 },
535 535 },
536 536 }
537 537
538 538 for fname, revision_dict in files.items():
539 539 for rev, data in revision_dict.items():
540 540 cs = self.repo.get_changeset(rev)
541 541
542 542 l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
543 543 l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
544 544 self.assertEqual(l1_1, l1_2)
545 545 l1 = l1_1
546 546 l2 = files[fname][rev]['changesets']
547 547 self.assertTrue(l1 == l2, "The lists of revision for %s@rev %s"
548 548 "from annotation list should match each other, "
549 549 "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
550 550
551 551 def test_files_state(self):
552 552 """
553 553 Tests state of FileNodes.
554 554 """
555 555 node = self.repo \
556 556 .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0') \
557 557 .get_node('vcs/utils/diffs.py')
558 558 self.assertTrue(node.state, NodeState.ADDED)
559 559 self.assertTrue(node.added)
560 560 self.assertFalse(node.changed)
561 561 self.assertFalse(node.not_changed)
562 562 self.assertFalse(node.removed)
563 563
564 564 node = self.repo \
565 565 .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e') \
566 566 .get_node('.hgignore')
567 567 self.assertTrue(node.state, NodeState.CHANGED)
568 568 self.assertFalse(node.added)
569 569 self.assertTrue(node.changed)
570 570 self.assertFalse(node.not_changed)
571 571 self.assertFalse(node.removed)
572 572
573 573 node = self.repo \
574 574 .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064') \
575 575 .get_node('setup.py')
576 576 self.assertTrue(node.state, NodeState.NOT_CHANGED)
577 577 self.assertFalse(node.added)
578 578 self.assertFalse(node.changed)
579 579 self.assertTrue(node.not_changed)
580 580 self.assertFalse(node.removed)
581 581
582 582 # If node has REMOVED state then trying to fetch it would raise
583 583 # ChangesetError exception
584 584 chset = self.repo.get_changeset(
585 585 'fa6600f6848800641328adbf7811fd2372c02ab2')
586 586 path = 'vcs/backends/BaseRepository.py'
587 587 self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
588 588 # but it would be one of ``removed`` (changeset's attribute)
589 589 self.assertTrue(path in [rf.path for rf in chset.removed])
590 590
591 591 chset = self.repo.get_changeset(
592 592 '54386793436c938cff89326944d4c2702340037d')
593 593 changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
594 594 'vcs/nodes.py']
595 595 self.assertEqual(set(changed), set([f.path for f in chset.changed]))
596 596
597 597 def test_commit_message_is_unicode(self):
598 598 for cs in self.repo:
599 599 self.assertEqual(type(cs.message), unicode)
600 600
601 601 def test_changeset_author_is_unicode(self):
602 602 for cs in self.repo:
603 603 self.assertEqual(type(cs.author), unicode)
604 604
605 605 def test_repo_files_content_is_unicode(self):
606 606 changeset = self.repo.get_changeset()
607 607 for node in changeset.get_node('/'):
608 608 if node.is_file():
609 609 self.assertEqual(type(node.content), unicode)
610 610
611 611 def test_wrong_path(self):
612 612 # There is 'setup.py' in the root dir but not there:
613 613 path = 'foo/bar/setup.py'
614 614 tip = self.repo.get_changeset()
615 615 self.assertRaises(VCSError, tip.get_node, path)
616 616
617 617 def test_author_email(self):
618 618 self.assertEqual('marcin@python-blog.com',
619 619 self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3') \
620 620 .author_email)
621 621 self.assertEqual('lukasz.balcerzak@python-center.pl',
622 622 self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b') \
623 623 .author_email)
624 624 self.assertEqual('',
625 625 self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992') \
626 626 .author_email)
627 627
628 628 def test_author_username(self):
629 629 self.assertEqual('Marcin Kuzminski',
630 630 self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3') \
631 631 .author_name)
632 632 self.assertEqual('Lukasz Balcerzak',
633 633 self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b') \
634 634 .author_name)
635 635 self.assertEqual('marcink none@none',
636 636 self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992') \
637 637 .author_name)
638 638
639 639
640 640 class GitSpecificTest(unittest.TestCase):
641 641
642 642 def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self):
643 643 repo = mock.MagicMock()
644 644 changeset = GitChangeset(repo, 'foobar')
645 645 changeset._diff_name_status = 'foobar'
646 646 with self.assertRaises(VCSError):
647 647 changeset.added
648 648
649 649 def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self):
650 650 repo = mock.MagicMock()
651 651 changeset = GitChangeset(repo, 'foobar')
652 652 changeset._diff_name_status = 'foobar'
653 653 with self.assertRaises(VCSError):
654 654 changeset.added
655 655
656 656 def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self):
657 657 repo = mock.MagicMock()
658 658 changeset = GitChangeset(repo, 'foobar')
659 659 changeset._diff_name_status = 'foobar'
660 660 with self.assertRaises(VCSError):
661 661 changeset.added
662 662
663 663
664 664 class GitSpecificWithRepoTest(_BackendTestMixin, unittest.TestCase):
665 665 backend_alias = 'git'
666 666
667 667 @classmethod
668 668 def _get_commits(cls):
669 669 return [
670 670 {
671 671 'message': 'Initial',
672 672 'author': 'Joe Doe <joe.doe@example.com>',
673 673 'date': datetime.datetime(2010, 1, 1, 20),
674 674 'added': [
675 675 FileNode('foobar/static/js/admin/base.js', content='base'),
676 676 FileNode('foobar/static/admin', content='admin',
677 677 mode=0120000), # this is a link
678 678 FileNode('foo', content='foo'),
679 679 ],
680 680 },
681 681 {
682 682 'message': 'Second',
683 683 'author': 'Joe Doe <joe.doe@example.com>',
684 684 'date': datetime.datetime(2010, 1, 1, 22),
685 685 'added': [
686 686 FileNode('foo2', content='foo2'),
687 687 ],
688 688 },
689 689 ]
690 690
691 691 def test_paths_slow_traversing(self):
692 692 cs = self.repo.get_changeset()
693 693 self.assertEqual(cs.get_node('foobar').get_node('static').get_node('js')
694 694 .get_node('admin').get_node('base.js').content, 'base')
695 695
696 696 def test_paths_fast_traversing(self):
697 697 cs = self.repo.get_changeset()
698 698 self.assertEqual(cs.get_node('foobar/static/js/admin/base.js').content,
699 699 'base')
700 700
701 701 def test_workdir_get_branch(self):
702 702 self.repo.run_git_command(['checkout', '-b', 'production'])
703 703 # Regression test: one of following would fail if we don't check
704 704 # .git/HEAD file
705 705 self.repo.run_git_command(['checkout', 'production'])
706 706 self.assertEqual(self.repo.workdir.get_branch(), 'production')
707 707 self.repo.run_git_command(['checkout', 'master'])
708 708 self.assertEqual(self.repo.workdir.get_branch(), 'master')
709 709
710 710 def test_get_diff_runs_git_command_with_hashes(self):
711 711 self.repo.run_git_command = mock.Mock(return_value=['', ''])
712 712 self.repo.get_diff(0, 1)
713 713 self.repo.run_git_command.assert_called_once_with(
714 714 ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
715 715 self.repo._get_revision(0), self.repo._get_revision(1)])
716 716
717 717 def test_get_diff_runs_git_command_with_str_hashes(self):
718 718 self.repo.run_git_command = mock.Mock(return_value=['', ''])
719 719 self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
720 720 self.repo.run_git_command.assert_called_once_with(
721 721 ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
722 722 self.repo._get_revision(1)])
723 723
724 724 def test_get_diff_runs_git_command_with_path_if_its_given(self):
725 725 self.repo.run_git_command = mock.Mock(return_value=['', ''])
726 726 self.repo.get_diff(0, 1, 'foo')
727 727 self.repo.run_git_command.assert_called_once_with(
728 728 ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
729 729 self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
730 730
731 731
732 732 class GitRegressionTest(_BackendTestMixin, unittest.TestCase):
733 733 backend_alias = 'git'
734 734
735 735 @classmethod
736 736 def _get_commits(cls):
737 737 return [
738 738 {
739 739 'message': 'Initial',
740 740 'author': 'Joe Doe <joe.doe@example.com>',
741 741 'date': datetime.datetime(2010, 1, 1, 20),
742 742 'added': [
743 743 FileNode('bot/__init__.py', content='base'),
744 744 FileNode('bot/templates/404.html', content='base'),
745 745 FileNode('bot/templates/500.html', content='base'),
746 746 ],
747 747 },
748 748 {
749 749 'message': 'Second',
750 750 'author': 'Joe Doe <joe.doe@example.com>',
751 751 'date': datetime.datetime(2010, 1, 1, 22),
752 752 'added': [
753 753 FileNode('bot/build/migrations/1.py', content='foo2'),
754 754 FileNode('bot/build/migrations/2.py', content='foo2'),
755 755 FileNode('bot/build/static/templates/f.html', content='foo2'),
756 756 FileNode('bot/build/static/templates/f1.html', content='foo2'),
757 757 FileNode('bot/build/templates/err.html', content='foo2'),
758 758 FileNode('bot/build/templates/err2.html', content='foo2'),
759 759 ],
760 760 },
761 761 ]
762 762
763 763 def test_similar_paths(self):
764 764 cs = self.repo.get_changeset()
765 765 paths = lambda *n: [x.path for x in n]
766 766 self.assertEqual(paths(*cs.get_nodes('bot')), ['bot/build', 'bot/templates', 'bot/__init__.py'])
767 767 self.assertEqual(paths(*cs.get_nodes('bot/build')), ['bot/build/migrations', 'bot/build/static', 'bot/build/templates'])
768 768 self.assertEqual(paths(*cs.get_nodes('bot/build/static')), ['bot/build/static/templates'])
769 769 # this get_nodes below causes troubles !
770 770 self.assertEqual(paths(*cs.get_nodes('bot/build/static/templates')), ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html'])
771 771 self.assertEqual(paths(*cs.get_nodes('bot/build/templates')), ['bot/build/templates/err.html', 'bot/build/templates/err2.html'])
772 772 self.assertEqual(paths(*cs.get_nodes('bot/templates/')), ['bot/templates/404.html', 'bot/templates/500.html'])
773 773
774 774
775 775 class GitHooksTest(unittest.TestCase):
776 776 """
777 777 Tests related to hook functionality of Git repositories.
778 778 """
779 779
780 780 def setUp(self):
781 781 # For each run we want a fresh repo.
782 782 self.repo_directory = get_new_dir("githookrepo")
783 783 self.repo = GitRepository(self.repo_directory, create=True)
784 784
785 785 # Create a dictionary where keys are hook names, and values are paths to
786 786 # them. Deduplicates code in tests a bit.
787 787 self.hook_directory = self.repo.get_hook_location()
788 788 self.kallithea_hooks = dict((h, os.path.join(self.hook_directory, h)) for h in ("pre-receive", "post-receive"))
789 789
790 790 def test_hooks_created_if_missing(self):
791 791 """
792 792 Tests if hooks are installed in repository if they are missing.
793 793 """
794 794
795 795 for hook, hook_path in self.kallithea_hooks.iteritems():
796 796 if os.path.exists(hook_path):
797 797 os.remove(hook_path)
798 798
799 799 ScmModel().install_git_hooks(repo=self.repo)
800 800
801 801 for hook, hook_path in self.kallithea_hooks.iteritems():
802 802 self.assertTrue(os.path.exists(hook_path))
803 803
804 804 def test_kallithea_hooks_updated(self):
805 805 """
806 806 Tests if hooks are updated if they are Kallithea hooks already.
807 807 """
808 808
809 809 for hook, hook_path in self.kallithea_hooks.iteritems():
810 810 with open(hook_path, "w") as f:
811 811 f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
812 812
813 813 ScmModel().install_git_hooks(repo=self.repo)
814 814
815 815 for hook, hook_path in self.kallithea_hooks.iteritems():
816 816 with open(hook_path) as f:
817 817 self.assertNotIn("JUST_BOGUS", f.read())
818 818
819 819 def test_custom_hooks_untouched(self):
820 820 """
821 821 Tests if hooks are left untouched if they are not Kallithea hooks.
822 822 """
823 823
824 824 for hook, hook_path in self.kallithea_hooks.iteritems():
825 825 with open(hook_path, "w") as f:
826 826 f.write("#!/bin/bash\n#CUSTOM_HOOK")
827 827
828 828 ScmModel().install_git_hooks(repo=self.repo)
829 829
830 830 for hook, hook_path in self.kallithea_hooks.iteritems():
831 831 with open(hook_path) as f:
832 832 self.assertIn("CUSTOM_HOOK", f.read())
833 833
834 834 def test_custom_hooks_forced_update(self):
835 835 """
836 836 Tests if hooks are forcefully updated even though they are custom hooks.
837 837 """
838 838
839 839 for hook, hook_path in self.kallithea_hooks.iteritems():
840 840 with open(hook_path, "w") as f:
841 841 f.write("#!/bin/bash\n#CUSTOM_HOOK")
842 842
843 843 ScmModel().install_git_hooks(repo=self.repo, force_create=True)
844 844
845 845 for hook, hook_path in self.kallithea_hooks.iteritems():
846 846 with open(hook_path) as f:
847 847 self.assertIn("KALLITHEA_HOOK_VER", f.read())
848 848
849 849
850 850 if __name__ == '__main__':
851 851 unittest.main()
@@ -1,568 +1,568 b''
1 1
2 2 import os
3 3
4 4 import pytest
5 5
6 6 from kallithea.lib.utils2 import safe_str
7 7 from kallithea.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
8 8 from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
9 9 from kallithea.lib.vcs.nodes import NodeKind, NodeState
10 10 from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, \
11 11 TEST_HG_REPO_PULL, TEST_TMP_PATH
12 12 from kallithea.lib.vcs.utils.compat import unittest
13 13
14 14
15 15 class MercurialRepositoryTest(unittest.TestCase):
16 16
17 17 def __check_for_existing_repo(self):
18 18 if os.path.exists(TEST_HG_REPO_CLONE):
19 19 pytest.fail('Cannot test mercurial clone repo as location %s already '
20 20 'exists. You should manually remove it first.'
21 21 % TEST_HG_REPO_CLONE)
22 22
23 23 def setUp(self):
24 24 self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
25 25
26 26 def test_wrong_repo_path(self):
27 27 wrong_repo_path = os.path.join(TEST_TMP_PATH, 'errorrepo')
28 28 self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
29 29
30 30 def test_unicode_path_repo(self):
31 31 self.assertRaises(VCSError, lambda: MercurialRepository(u'iShouldFail'))
32 32
33 33 def test_repo_clone(self):
34 34 self.__check_for_existing_repo()
35 35 repo = MercurialRepository(safe_str(TEST_HG_REPO))
36 36 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
37 37 src_url=TEST_HG_REPO, update_after_clone=True)
38 38 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
39 39 # Checking hashes of changesets should be enough
40 40 for changeset in repo.get_changesets():
41 41 raw_id = changeset.raw_id
42 42 self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
43 43
44 44 def test_repo_clone_with_update(self):
45 45 repo = MercurialRepository(safe_str(TEST_HG_REPO))
46 46 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
47 47 src_url=TEST_HG_REPO, update_after_clone=True)
48 48 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
49 49
50 50 # check if current workdir was updated
51 51 self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
52 52 + '_w_update',
53 53 'MANIFEST.in')), True,)
54 54
55 55 def test_repo_clone_without_update(self):
56 56 repo = MercurialRepository(safe_str(TEST_HG_REPO))
57 57 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
58 58 src_url=TEST_HG_REPO, update_after_clone=False)
59 59 self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
60 60 self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
61 61 + '_wo_update',
62 62 'MANIFEST.in')), False,)
63 63
64 64 def test_pull(self):
65 65 if os.path.exists(TEST_HG_REPO_PULL):
66 66 pytest.fail('Cannot test mercurial pull command as location %s '
67 67 'already exists. You should manually remove it first'
68 68 % TEST_HG_REPO_PULL)
69 69 repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
70 70 self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions))
71 71
72 72 repo_new.pull(self.repo.path)
73 73 repo_new = MercurialRepository(TEST_HG_REPO_PULL)
74 74 self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions))
75 75
76 76 def test_revisions(self):
77 77 # there are 21 revisions at bitbucket now
78 78 # so we can assume they would be available from now on
79 79 subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
80 80 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
81 81 '6cba7170863a2411822803fa77a0a264f1310b35',
82 82 '56349e29c2af3ac913b28bde9a2c6154436e615b',
83 83 '2dda4e345facb0ccff1a191052dd1606dba6781d',
84 84 '6fff84722075f1607a30f436523403845f84cd9e',
85 85 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
86 86 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
87 87 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
88 88 'be90031137367893f1c406e0a8683010fd115b79',
89 89 'db8e58be770518cbb2b1cdfa69146e47cd481481',
90 90 '84478366594b424af694a6c784cb991a16b87c21',
91 91 '17f8e105dddb9f339600389c6dc7175d395a535c',
92 92 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
93 93 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
94 94 '786facd2c61deb9cf91e9534735124fb8fc11842',
95 95 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
96 96 'aa6a0de05b7612707db567078e130a6cd114a9a7',
97 97 'eada5a770da98ab0dd7325e29d00e0714f228d09'
98 98 ])
99 99 self.assertTrue(subset.issubset(set(self.repo.revisions)))
100 100
101 101 # check if we have the proper order of revisions
102 102 org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
103 103 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
104 104 '6cba7170863a2411822803fa77a0a264f1310b35',
105 105 '56349e29c2af3ac913b28bde9a2c6154436e615b',
106 106 '2dda4e345facb0ccff1a191052dd1606dba6781d',
107 107 '6fff84722075f1607a30f436523403845f84cd9e',
108 108 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
109 109 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
110 110 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
111 111 'be90031137367893f1c406e0a8683010fd115b79',
112 112 'db8e58be770518cbb2b1cdfa69146e47cd481481',
113 113 '84478366594b424af694a6c784cb991a16b87c21',
114 114 '17f8e105dddb9f339600389c6dc7175d395a535c',
115 115 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
116 116 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
117 117 '786facd2c61deb9cf91e9534735124fb8fc11842',
118 118 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
119 119 'aa6a0de05b7612707db567078e130a6cd114a9a7',
120 120 'eada5a770da98ab0dd7325e29d00e0714f228d09',
121 121 '2c1885c735575ca478bf9e17b0029dca68824458',
122 122 'd9bcd465040bf869799b09ad732c04e0eea99fe9',
123 123 '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
124 124 '4fb8326d78e5120da2c7468dcf7098997be385da',
125 125 '62b4a097164940bd66030c4db51687f3ec035eed',
126 126 '536c1a19428381cfea92ac44985304f6a8049569',
127 127 '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
128 128 '9bb326a04ae5d98d437dece54be04f830cf1edd9',
129 129 'f8940bcb890a98c4702319fbe36db75ea309b475',
130 130 'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
131 131 '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
132 132 'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
133 133 self.assertEqual(org, self.repo.revisions[:31])
134 134
135 135 def test_iter_slice(self):
136 136 sliced = list(self.repo[:10])
137 137 itered = list(self.repo)[:10]
138 138 self.assertEqual(sliced, itered)
139 139
140 140 def test_slicing(self):
141 141 # 4 1 5 10 95
142 142 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
143 143 (10, 20, 10), (5, 100, 95)]:
144 144 revs = list(self.repo[sfrom:sto])
145 145 self.assertEqual(len(revs), size)
146 146 self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
147 147 self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
148 148
149 149 def test_branches(self):
150 150 # TODO: Need more tests here
151 151
152 152 # active branches
153 153 self.assertTrue('default' in self.repo.branches)
154 154 self.assertTrue('stable' in self.repo.branches)
155 155
156 156 # closed
157 157 self.assertTrue('git' in self.repo._get_branches(closed=True))
158 158 self.assertTrue('web' in self.repo._get_branches(closed=True))
159 159
160 160 for name, id in self.repo.branches.items():
161 161 self.assertTrue(isinstance(
162 162 self.repo.get_changeset(id), MercurialChangeset))
163 163
164 164 def test_tip_in_tags(self):
165 165 # tip is always a tag
166 166 self.assertIn('tip', self.repo.tags)
167 167
168 168 def test_tip_changeset_in_tags(self):
169 169 tip = self.repo.get_changeset()
170 170 self.assertEqual(self.repo.tags['tip'], tip.raw_id)
171 171
172 172 def test_initial_changeset(self):
173 173
174 174 init_chset = self.repo.get_changeset(0)
175 175 self.assertEqual(init_chset.message, 'initial import')
176 176 self.assertEqual(init_chset.author,
177 177 'Marcin Kuzminski <marcin@python-blog.com>')
178 178 self.assertEqual(sorted(init_chset._file_paths),
179 179 sorted([
180 180 'vcs/__init__.py',
181 181 'vcs/backends/BaseRepository.py',
182 182 'vcs/backends/__init__.py',
183 183 ])
184 184 )
185 185 self.assertEqual(sorted(init_chset._dir_paths),
186 186 sorted(['', 'vcs', 'vcs/backends']))
187 187
188 188 self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
189 189
190 190 node = init_chset.get_node('vcs/')
191 191 self.assertTrue(hasattr(node, 'kind'))
192 192 self.assertEqual(node.kind, NodeKind.DIR)
193 193
194 194 node = init_chset.get_node('vcs')
195 195 self.assertTrue(hasattr(node, 'kind'))
196 196 self.assertEqual(node.kind, NodeKind.DIR)
197 197
198 198 node = init_chset.get_node('vcs/__init__.py')
199 199 self.assertTrue(hasattr(node, 'kind'))
200 200 self.assertEqual(node.kind, NodeKind.FILE)
201 201
202 202 def test_not_existing_changeset(self):
203 203 # rawid
204 204 self.assertRaises(RepositoryError, self.repo.get_changeset,
205 205 'abcd' * 10)
206 206 # shortid
207 207 self.assertRaises(RepositoryError, self.repo.get_changeset,
208 208 'erro' * 4)
209 209 # numeric
210 210 self.assertRaises(RepositoryError, self.repo.get_changeset,
211 211 self.repo.count() + 1)
212 212
213 213
214 214 # Small chance we ever get to this one
215 215 revision = pow(2, 30)
216 216 self.assertRaises(RepositoryError, self.repo.get_changeset, revision)
217 217
218 218 def test_changeset10(self):
219 219
220 220 chset10 = self.repo.get_changeset(10)
221 README = """===
221 readme = """===
222 222 VCS
223 223 ===
224 224
225 225 Various Version Control System management abstraction layer for Python.
226 226
227 227 Introduction
228 228 ------------
229 229
230 230 TODO: To be written...
231 231
232 232 """
233 233 node = chset10.get_node('README.rst')
234 234 self.assertEqual(node.kind, NodeKind.FILE)
235 self.assertEqual(node.content, README)
235 self.assertEqual(node.content, readme)
236 236
237 237
238 238 class MercurialChangesetTest(unittest.TestCase):
239 239
240 240 def setUp(self):
241 241 self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
242 242
243 243 def _test_equality(self, changeset):
244 244 revision = changeset.revision
245 245 self.assertEqual(changeset, self.repo.get_changeset(revision))
246 246
247 247 def test_equality(self):
248 248 self.setUp()
249 249 revs = [0, 10, 20]
250 250 changesets = [self.repo.get_changeset(rev) for rev in revs]
251 251 for changeset in changesets:
252 252 self._test_equality(changeset)
253 253
254 254 def test_default_changeset(self):
255 255 tip = self.repo.get_changeset('tip')
256 256 self.assertEqual(tip, self.repo.get_changeset())
257 257 self.assertEqual(tip, self.repo.get_changeset(revision=None))
258 258 self.assertEqual(tip, list(self.repo[-1:])[0])
259 259
260 260 def test_root_node(self):
261 261 tip = self.repo.get_changeset('tip')
262 262 self.assertTrue(tip.root is tip.get_node(''))
263 263
264 264 def test_lazy_fetch(self):
265 265 """
266 266 Test if changeset's nodes expands and are cached as we walk through
267 267 the revision. This test is somewhat hard to write as order of tests
268 268 is a key here. Written by running command after command in a shell.
269 269 """
270 270 self.setUp()
271 271 chset = self.repo.get_changeset(45)
272 272 self.assertTrue(len(chset.nodes) == 0)
273 273 root = chset.root
274 274 self.assertTrue(len(chset.nodes) == 1)
275 275 self.assertTrue(len(root.nodes) == 8)
276 276 # accessing root.nodes updates chset.nodes
277 277 self.assertTrue(len(chset.nodes) == 9)
278 278
279 279 docs = root.get_node('docs')
280 280 # we haven't yet accessed anything new as docs dir was already cached
281 281 self.assertTrue(len(chset.nodes) == 9)
282 282 self.assertTrue(len(docs.nodes) == 8)
283 283 # accessing docs.nodes updates chset.nodes
284 284 self.assertTrue(len(chset.nodes) == 17)
285 285
286 286 self.assertTrue(docs is chset.get_node('docs'))
287 287 self.assertTrue(docs is root.nodes[0])
288 288 self.assertTrue(docs is root.dirs[0])
289 289 self.assertTrue(docs is chset.get_node('docs'))
290 290
291 291 def test_nodes_with_changeset(self):
292 292 self.setUp()
293 293 chset = self.repo.get_changeset(45)
294 294 root = chset.root
295 295 docs = root.get_node('docs')
296 296 self.assertTrue(docs is chset.get_node('docs'))
297 297 api = docs.get_node('api')
298 298 self.assertTrue(api is chset.get_node('docs/api'))
299 299 index = api.get_node('index.rst')
300 300 self.assertTrue(index is chset.get_node('docs/api/index.rst'))
301 301 self.assertTrue(index is chset.get_node('docs') \
302 302 .get_node('api') \
303 303 .get_node('index.rst'))
304 304
305 305 def test_branch_and_tags(self):
306 306 chset0 = self.repo.get_changeset(0)
307 307 self.assertEqual(chset0.branch, 'default')
308 308 self.assertEqual(chset0.tags, [])
309 309
310 310 chset10 = self.repo.get_changeset(10)
311 311 self.assertEqual(chset10.branch, 'default')
312 312 self.assertEqual(chset10.tags, [])
313 313
314 314 chset44 = self.repo.get_changeset(44)
315 315 self.assertEqual(chset44.branch, 'web')
316 316
317 317 tip = self.repo.get_changeset('tip')
318 318 self.assertTrue('tip' in tip.tags)
319 319
320 320 def _test_file_size(self, revision, path, size):
321 321 node = self.repo.get_changeset(revision).get_node(path)
322 322 self.assertTrue(node.is_file())
323 323 self.assertEqual(node.size, size)
324 324
325 325 def test_file_size(self):
326 326 to_check = (
327 327 (10, 'setup.py', 1068),
328 328 (20, 'setup.py', 1106),
329 329 (60, 'setup.py', 1074),
330 330
331 331 (10, 'vcs/backends/base.py', 2921),
332 332 (20, 'vcs/backends/base.py', 3936),
333 333 (60, 'vcs/backends/base.py', 6189),
334 334 )
335 335 for revision, path, size in to_check:
336 336 self._test_file_size(revision, path, size)
337 337
338 338 def _test_dir_size(self, revision, path, size):
339 339 node = self.repo.get_changeset(revision).get_node(path)
340 340 self.assertFalse(node.is_file())
341 341 self.assertEqual(node.size, size)
342 342
343 343 def test_dir_size(self):
344 344 to_check = (
345 345 ('96507bd11ecc', '/', 682421),
346 346 ('a53d9201d4bc', '/', 682410),
347 347 ('90243de06161', '/', 682006),
348 348 )
349 349 for revision, path, size in to_check:
350 350 self._test_dir_size(revision, path, size)
351 351
352 352 def test_repo_size(self):
353 353 self.assertEqual(self.repo.size, 682421)
354 354
355 355 def test_file_history(self):
356 356 # we can only check if those revisions are present in the history
357 357 # as we cannot update this test every time file is changed
358 358 files = {
359 359 'setup.py': [7, 18, 45, 46, 47, 69, 77],
360 360 'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60,
361 361 61, 73, 76],
362 362 'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23,
363 363 26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47,
364 364 48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79,
365 365 82],
366 366 }
367 367 for path, revs in files.items():
368 368 tip = self.repo.get_changeset(revs[-1])
369 369 node = tip.get_node(path)
370 370 node_revs = [chset.revision for chset in node.history]
371 371 self.assertTrue(set(revs).issubset(set(node_revs)),
372 372 "We assumed that %s is subset of revisions for which file %s "
373 373 "has been changed, and history of that node returned: %s"
374 374 % (revs, path, node_revs))
375 375
376 376 def test_file_annotate(self):
377 377 files = {
378 378 'vcs/backends/__init__.py':
379 379 {89: {'lines_no': 31,
380 380 'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44,
381 381 37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
382 382 32, 32, 32, 32, 37, 32, 37, 37, 32,
383 383 32, 32]},
384 384 20: {'lines_no': 1,
385 385 'changesets': [4]},
386 386 55: {'lines_no': 31,
387 387 'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44,
388 388 37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
389 389 32, 32, 32, 32, 37, 32, 37, 37, 32,
390 390 32, 32]}},
391 391 'vcs/exceptions.py':
392 392 {89: {'lines_no': 18,
393 393 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
394 394 16, 16, 17, 16, 16, 18, 18, 18]},
395 395 20: {'lines_no': 18,
396 396 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
397 397 16, 16, 17, 16, 16, 18, 18, 18]},
398 398 55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16,
399 399 16, 16, 16, 16, 16, 16,
400 400 17, 16, 16, 18, 18, 18]}},
401 401 'MANIFEST.in': {89: {'lines_no': 5,
402 402 'changesets': [7, 7, 7, 71, 71]},
403 403 20: {'lines_no': 3,
404 404 'changesets': [7, 7, 7]},
405 405 55: {'lines_no': 3,
406 406 'changesets': [7, 7, 7]}}}
407 407
408 408 for fname, revision_dict in files.items():
409 409 for rev, data in revision_dict.items():
410 410 cs = self.repo.get_changeset(rev)
411 411 l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
412 412 l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
413 413 self.assertEqual(l1_1, l1_2)
414 414 l1 = l1_2 = [x[2]().revision for x in cs.get_file_annotate(fname)]
415 415 l2 = files[fname][rev]['changesets']
416 416 self.assertTrue(l1 == l2, "The lists of revision for %s@rev%s"
417 417 "from annotation list should match each other,"
418 418 "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
419 419
420 420 def test_changeset_state(self):
421 421 """
422 422 Tests which files have been added/changed/removed at particular revision
423 423 """
424 424
425 425 # rev 46ad32a4f974:
426 426 # hg st --rev 46ad32a4f974
427 427 # changed: 13
428 428 # added: 20
429 429 # removed: 1
430 430 changed = set(['.hgignore'
431 431 , 'README.rst' , 'docs/conf.py' , 'docs/index.rst' , 'setup.py'
432 432 , 'tests/test_hg.py' , 'tests/test_nodes.py' , 'vcs/__init__.py'
433 433 , 'vcs/backends/__init__.py' , 'vcs/backends/base.py'
434 434 , 'vcs/backends/hg.py' , 'vcs/nodes.py' , 'vcs/utils/__init__.py'])
435 435
436 436 added = set(['docs/api/backends/hg.rst'
437 437 , 'docs/api/backends/index.rst' , 'docs/api/index.rst'
438 438 , 'docs/api/nodes.rst' , 'docs/api/web/index.rst'
439 439 , 'docs/api/web/simplevcs.rst' , 'docs/installation.rst'
440 440 , 'docs/quickstart.rst' , 'setup.cfg' , 'vcs/utils/baseui_config.py'
441 441 , 'vcs/utils/web.py' , 'vcs/web/__init__.py' , 'vcs/web/exceptions.py'
442 442 , 'vcs/web/simplevcs/__init__.py' , 'vcs/web/simplevcs/exceptions.py'
443 443 , 'vcs/web/simplevcs/middleware.py' , 'vcs/web/simplevcs/models.py'
444 444 , 'vcs/web/simplevcs/settings.py' , 'vcs/web/simplevcs/utils.py'
445 445 , 'vcs/web/simplevcs/views.py'])
446 446
447 447 removed = set(['docs/api.rst'])
448 448
449 449 chset64 = self.repo.get_changeset('46ad32a4f974')
450 450 self.assertEqual(set((node.path for node in chset64.added)), added)
451 451 self.assertEqual(set((node.path for node in chset64.changed)), changed)
452 452 self.assertEqual(set((node.path for node in chset64.removed)), removed)
453 453
454 454 # rev b090f22d27d6:
455 455 # hg st --rev b090f22d27d6
456 456 # changed: 13
457 457 # added: 20
458 458 # removed: 1
459 459 chset88 = self.repo.get_changeset('b090f22d27d6')
460 460 self.assertEqual(set((node.path for node in chset88.added)), set())
461 461 self.assertEqual(set((node.path for node in chset88.changed)),
462 462 set(['.hgignore']))
463 463 self.assertEqual(set((node.path for node in chset88.removed)), set())
464 464
465 465 # 85:
466 466 # added: 2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
467 467 # changed: 4 ['vcs/web/simplevcs/models.py', ...]
468 468 # removed: 1 ['vcs/utils/web.py']
469 469 chset85 = self.repo.get_changeset(85)
470 470 self.assertEqual(set((node.path for node in chset85.added)), set([
471 471 'vcs/utils/diffs.py',
472 472 'vcs/web/simplevcs/views/diffs.py']))
473 473 self.assertEqual(set((node.path for node in chset85.changed)), set([
474 474 'vcs/web/simplevcs/models.py',
475 475 'vcs/web/simplevcs/utils.py',
476 476 'vcs/web/simplevcs/views/__init__.py',
477 477 'vcs/web/simplevcs/views/repository.py',
478 478 ]))
479 479 self.assertEqual(set((node.path for node in chset85.removed)),
480 480 set(['vcs/utils/web.py']))
481 481
482 482
483 483 def test_files_state(self):
484 484 """
485 485 Tests state of FileNodes.
486 486 """
487 487 chset = self.repo.get_changeset(85)
488 488 node = chset.get_node('vcs/utils/diffs.py')
489 489 self.assertTrue(node.state, NodeState.ADDED)
490 490 self.assertTrue(node.added)
491 491 self.assertFalse(node.changed)
492 492 self.assertFalse(node.not_changed)
493 493 self.assertFalse(node.removed)
494 494
495 495 chset = self.repo.get_changeset(88)
496 496 node = chset.get_node('.hgignore')
497 497 self.assertTrue(node.state, NodeState.CHANGED)
498 498 self.assertFalse(node.added)
499 499 self.assertTrue(node.changed)
500 500 self.assertFalse(node.not_changed)
501 501 self.assertFalse(node.removed)
502 502
503 503 chset = self.repo.get_changeset(85)
504 504 node = chset.get_node('setup.py')
505 505 self.assertTrue(node.state, NodeState.NOT_CHANGED)
506 506 self.assertFalse(node.added)
507 507 self.assertFalse(node.changed)
508 508 self.assertTrue(node.not_changed)
509 509 self.assertFalse(node.removed)
510 510
511 511 # If node has REMOVED state then trying to fetch it would raise
512 512 # ChangesetError exception
513 513 chset = self.repo.get_changeset(2)
514 514 path = 'vcs/backends/BaseRepository.py'
515 515 self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
516 516 # but it would be one of ``removed`` (changeset's attribute)
517 517 self.assertTrue(path in [rf.path for rf in chset.removed])
518 518
519 519 def test_commit_message_is_unicode(self):
520 520 for cm in self.repo:
521 521 self.assertEqual(type(cm.message), unicode)
522 522
523 523 def test_changeset_author_is_unicode(self):
524 524 for cm in self.repo:
525 525 self.assertEqual(type(cm.author), unicode)
526 526
527 527 def test_repo_files_content_is_unicode(self):
528 528 test_changeset = self.repo.get_changeset(100)
529 529 for node in test_changeset.get_node('/'):
530 530 if node.is_file():
531 531 self.assertEqual(type(node.content), unicode)
532 532
533 533 def test_wrong_path(self):
534 534 # There is 'setup.py' in the root dir but not there:
535 535 path = 'foo/bar/setup.py'
536 536 self.assertRaises(VCSError, self.repo.get_changeset().get_node, path)
537 537
538 538 def test_archival_file(self):
539 539 # TODO:
540 540 pass
541 541
542 542 def test_archival_as_generator(self):
543 543 # TODO:
544 544 pass
545 545
546 546 def test_archival_wrong_kind(self):
547 547 tip = self.repo.get_changeset()
548 548 self.assertRaises(VCSError, tip.fill_archive, kind='error')
549 549
550 550 def test_archival_empty_prefix(self):
551 551 # TODO:
552 552 pass
553 553
554 554 def test_author_email(self):
555 555 self.assertEqual('marcin@python-blog.com',
556 556 self.repo.get_changeset('b986218ba1c9').author_email)
557 557 self.assertEqual('lukasz.balcerzak@python-center.pl',
558 558 self.repo.get_changeset('3803844fdbd3').author_email)
559 559 self.assertEqual('',
560 560 self.repo.get_changeset('84478366594b').author_email)
561 561
562 562 def test_author_username(self):
563 563 self.assertEqual('Marcin Kuzminski',
564 564 self.repo.get_changeset('b986218ba1c9').author_name)
565 565 self.assertEqual('Lukasz Balcerzak',
566 566 self.repo.get_changeset('3803844fdbd3').author_name)
567 567 self.assertEqual('marcink',
568 568 self.repo.get_changeset('84478366594b').author_name)
General Comments 0
You need to be logged in to leave comments. Login now