##// END OF EJS Templates
tests for changeset comments
marcink -
r1715:e1e48209 beta
parent child Browse files
Show More
@@ -0,0 +1,143 b''
1 from rhodecode.tests import *
2 from rhodecode.model.db import ChangesetComment, Notification, User, \
3 UserNotification
4
5 class TestChangeSetCommentrController(TestController):
6
7 def setUp(self):
8 for x in ChangesetComment.query().all():
9 self.Session().delete(x)
10 self.Session().commit()
11
12 for x in Notification.query().all():
13 self.Session().delete(x)
14 self.Session().commit()
15
16 def tearDown(self):
17 for x in ChangesetComment.query().all():
18 self.Session().delete(x)
19 self.Session().commit()
20
21 for x in Notification.query().all():
22 self.Session().delete(x)
23 self.Session().commit()
24
25 def test_create(self):
26 self.log_user()
27 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
28 text = u'CommentOnRevision'
29
30 params = {'text':text}
31 response = self.app.post(url(controller='changeset', action='comment',
32 repo_name=HG_REPO, revision=rev),
33 params=params)
34 # Test response...
35 self.assertEqual(response.status, '302 Found')
36 response.follow()
37
38 response = self.app.get(url(controller='changeset', action='index',
39 repo_name=HG_REPO, revision=rev))
40 # test DB
41 self.assertEqual(ChangesetComment.query().count(), 1)
42 self.assertTrue('''<div class="comments-number">%s '''
43 '''comment(s) (0 inline)</div>''' % 1 in response.body)
44
45
46 self.assertEqual(Notification.query().count(), 1)
47 notification = Notification.query().all()[0]
48
49 self.assertEqual(notification.type_, Notification.TYPE_CHANGESET_COMMENT)
50 self.assertTrue((u'/vcs_test_hg/changeset/27cd5cce30c96924232df'
51 'fcd24178a07ffeb5dfc#comment-1') in notification.subject)
52
53 def test_create_inline(self):
54 self.log_user()
55 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
56 text = u'CommentOnRevision'
57 f_path = 'vcs/web/simplevcs/views/repository.py'
58 line = 'n1'
59
60 params = {'text':text, 'f_path':f_path, 'line':line}
61 response = self.app.post(url(controller='changeset', action='comment',
62 repo_name=HG_REPO, revision=rev),
63 params=params)
64 # Test response...
65 self.assertEqual(response.status, '302 Found')
66 response.follow()
67
68 response = self.app.get(url(controller='changeset', action='index',
69 repo_name=HG_REPO, revision=rev))
70 #test DB
71 self.assertEqual(ChangesetComment.query().count(), 1)
72 self.assertTrue('''<div class="comments-number">0 comment(s)'''
73 ''' (%s inline)</div>''' % 1 in response.body)
74 self.assertTrue('''<div class="inline-comment-placeholder-line"'''
75 ''' line="n1" target_id="vcswebsimplevcsviews'''
76 '''repositorypy">''' in response.body)
77
78 self.assertEqual(Notification.query().count(), 1)
79 notification = Notification.query().all()[0]
80
81 self.assertEqual(notification.type_, Notification.TYPE_CHANGESET_COMMENT)
82 self.assertTrue((u'/vcs_test_hg/changeset/27cd5cce30c96924232df'
83 'fcd24178a07ffeb5dfc#comment-1') in notification.subject)
84
85 def test_create_with_mention(self):
86 self.log_user()
87
88 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
89 text = u'@test_regular check CommentOnRevision'
90
91 params = {'text':text}
92 response = self.app.post(url(controller='changeset', action='comment',
93 repo_name=HG_REPO, revision=rev),
94 params=params)
95 # Test response...
96 self.assertEqual(response.status, '302 Found')
97 response.follow()
98
99 response = self.app.get(url(controller='changeset', action='index',
100 repo_name=HG_REPO, revision=rev))
101 # test DB
102 self.assertEqual(ChangesetComment.query().count(), 1)
103 self.assertTrue('''<div class="comments-number">%s '''
104 '''comment(s) (0 inline)</div>''' % 1 in response.body)
105
106
107 self.assertEqual(Notification.query().count(), 2)
108 users = [x.user.username for x in UserNotification.query().all()]
109
110 # test_regular get's notification by @mention
111 self.assertEqual(users, [u'test_admin', u'test_regular'])
112
113 def test_delete(self):
114 self.log_user()
115 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
116 text = u'CommentOnRevision'
117
118 params = {'text':text}
119 response = self.app.post(url(controller='changeset', action='comment',
120 repo_name=HG_REPO, revision=rev),
121 params=params)
122
123 comments = ChangesetComment.query().all()
124 self.assertEqual(len(comments), 1)
125 comment_id = comments[0].comment_id
126
127
128 self.app.delete(url(controller='changeset',
129 action='delete_comment',
130 repo_name=HG_REPO,
131 comment_id = comment_id))
132
133 comments = ChangesetComment.query().all()
134 self.assertEqual(len(comments), 0)
135
136 response = self.app.get(url(controller='changeset', action='index',
137 repo_name=HG_REPO, revision=rev))
138 self.assertTrue('''<div class="comments-number">0 comment(s)'''
139 ''' (0 inline)</div>''' in response.body)
140
141
142
143
@@ -1,401 +1,400 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 10 :license: GPLv3, see COPYING for more details.
11 11 """
12 12 # This program is free software: you can redistribute it and/or modify
13 13 # it under the terms of the GNU General Public License as published by
14 14 # the Free Software Foundation, either version 3 of the License, or
15 15 # (at your option) any later version.
16 16 #
17 17 # This program is distributed in the hope that it will be useful,
18 18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 20 # GNU General Public License for more details.
21 21 #
22 22 # You should have received a copy of the GNU General Public License
23 23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 24
25 25 import os
26 26 import time
27 27 import sys
28 28 import shutil
29 29 import logging
30 30
31 31 from os.path import join as jn
32 32 from os.path import dirname as dn
33 33
34 34 from tempfile import _RandomNameSequence
35 35 from subprocess import Popen, PIPE
36 36
37 37 from paste.deploy import appconfig
38 38 from pylons import config
39 39 from sqlalchemy import engine_from_config
40 40
41 41 from rhodecode.lib.utils import add_cache
42 42 from rhodecode.model import init_model
43 43 from rhodecode.model import meta
44 44 from rhodecode.model.db import User, Repository, UserLog
45 45 from rhodecode.lib.auth import get_crypt_password
46 46
47 47 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
48 48 from rhodecode.config.environment import load_environment
49 49
50 50 rel_path = dn(dn(dn(os.path.abspath(__file__))))
51 51
52 52 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
53 53 load_environment(conf.global_conf, conf.local_conf)
54 54
55 55 add_cache(conf)
56 56
57 57 USER = 'test_admin'
58 58 PASS = 'test12'
59 59 HOST = '127.0.0.1:5000'
60 60 DEBUG = False
61 61 print 'DEBUG:', DEBUG
62 62 log = logging.getLogger(__name__)
63 63
64 64 engine = engine_from_config(conf, 'sqlalchemy.db1.')
65 65 init_model(engine)
66 66 sa = meta.Session
67 67
68 68 class Command(object):
69 69
70 70 def __init__(self, cwd):
71 71 self.cwd = cwd
72 72
73 73 def execute(self, cmd, *args):
74 74 """Runs command on the system with given ``args``.
75 75 """
76 76
77 77 command = cmd + ' ' + ' '.join(args)
78 78 log.debug('Executing %s' % command)
79 79 if DEBUG:
80 80 print command
81 81 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
82 82 stdout, stderr = p.communicate()
83 83 if DEBUG:
84 84 print stdout, stderr
85 85 return stdout, stderr
86 86
87 87
88 88 def test_wrapp(func):
89 89
90 90 def __wrapp(*args, **kwargs):
91 91 print '>>>%s' % func.__name__
92 92 try:
93 93 res = func(*args, **kwargs)
94 94 except Exception, e:
95 95 print ('###############\n-'
96 96 '--%s failed %s--\n'
97 97 '###############\n' % (func.__name__, e))
98 98 sys.exit()
99 99 print '++OK++'
100 100 return res
101 101 return __wrapp
102 102
103 103
104 104 def create_test_user(force=True):
105 105 print '\tcreating test user'
106 106
107 107 user = User.get_by_username(USER)
108 108
109 109 if force and user is not None:
110 110 print '\tremoving current user'
111 111 for repo in Repository.query().filter(Repository.user == user).all():
112 112 sa.delete(repo)
113 113 sa.delete(user)
114 114 sa.commit()
115 115
116 116 if user is None or force:
117 117 print '\tcreating new one'
118 118 new_usr = User()
119 119 new_usr.username = USER
120 120 new_usr.password = get_crypt_password(PASS)
121 121 new_usr.email = 'mail@mail.com'
122 122 new_usr.name = 'test'
123 123 new_usr.lastname = 'lasttestname'
124 124 new_usr.active = True
125 125 new_usr.admin = True
126 126 sa.add(new_usr)
127 127 sa.commit()
128 128
129 129 print '\tdone'
130 130
131 131
132 132 def create_test_repo(force=True):
133 133 from rhodecode.model.repo import RepoModel
134 134
135 135 user = User.get_by_username(USER)
136 136 if user is None:
137 137 raise Exception('user not found')
138 138
139 139
140 140 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
141 141
142 142 if repo is None:
143 143 print '\trepo not found creating'
144 144
145 145 form_data = {'repo_name':HG_REPO,
146 146 'repo_type':'hg',
147 147 'private':False,
148 148 'clone_uri':'' }
149 149 rm = RepoModel(sa)
150 150 rm.base_path = '/home/hg'
151 151 rm.create(form_data, user)
152 152
153 153
154 154 def set_anonymous_access(enable=True):
155 155 user = User.get_by_username('default')
156 156 user.active = enable
157 157 sa.add(user)
158 158 sa.commit()
159 159 print '\tanonymous access is now:', enable
160 160 if enable != User.get_by_username('default').active:
161 161 raise Exception('Cannot set anonymous access')
162 162
163 163 def get_anonymous_access():
164 164 user = User.get_by_username('default')
165 165 return user.active
166 166
167 167
168 168 #==============================================================================
169 169 # TESTS
170 170 #==============================================================================
171 171 @test_wrapp
172 172 def test_clone_with_credentials(no_errors=False):
173 173 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
174 174
175 175 try:
176 176 shutil.rmtree(path, ignore_errors=True)
177 177 os.makedirs(path)
178 178 #print 'made dirs %s' % jn(path)
179 179 except OSError:
180 180 raise
181 181
182 182 print '\tchecking if anonymous access is enabled'
183 183 anonymous_access = get_anonymous_access()
184 184 if anonymous_access:
185 185 print '\tenabled, disabling it '
186 186 set_anonymous_access(enable=False)
187 time.sleep(1)
188 187
189 188 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
190 189 {'user':USER,
191 190 'pass':PASS,
192 191 'host':HOST,
193 192 'cloned_repo':HG_REPO,
194 193 'dest':path}
195 194
196 195 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
197 196
198 197 if no_errors is False:
199 198 assert """adding file changes""" in stdout, 'no messages about cloning'
200 199 assert """abort""" not in stderr , 'got error from clone'
201 200
202 201
203 202 @test_wrapp
204 203 def test_clone_anonymous():
205 204 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
206 205
207 206 try:
208 207 shutil.rmtree(path, ignore_errors=True)
209 208 os.makedirs(path)
210 209 #print 'made dirs %s' % jn(path)
211 210 except OSError:
212 211 raise
213 212
214 213
215 214 print '\tchecking if anonymous access is enabled'
216 215 anonymous_access = get_anonymous_access()
217 216 if not anonymous_access:
218 217 print '\tnot enabled, enabling it '
219 218 set_anonymous_access(enable=True)
220 time.sleep(1)
221 219
222 220 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
223 221 {'user':USER,
224 222 'pass':PASS,
225 223 'host':HOST,
226 224 'cloned_repo':HG_REPO,
227 225 'dest':path}
228 226
229 227 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
230 228
231 229 assert """adding file changes""" in stdout, 'no messages about cloning'
232 230 assert """abort""" not in stderr , 'got error from clone'
233 231
234 232 #disable if it was enabled
235 233 if not anonymous_access:
236 234 print '\tdisabling anonymous access'
237 235 set_anonymous_access(enable=False)
238 236
239 237 @test_wrapp
240 238 def test_clone_wrong_credentials():
241 239 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
242 240
243 241 try:
244 242 shutil.rmtree(path, ignore_errors=True)
245 243 os.makedirs(path)
246 244 #print 'made dirs %s' % jn(path)
247 245 except OSError:
248 246 raise
249 247
250 248 print '\tchecking if anonymous access is enabled'
251 249 anonymous_access = get_anonymous_access()
252 250 if anonymous_access:
253 251 print '\tenabled, disabling it '
254 252 set_anonymous_access(enable=False)
255 253
256 254 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
257 255 {'user':USER + 'error',
258 256 'pass':PASS,
259 257 'host':HOST,
260 258 'cloned_repo':HG_REPO,
261 259 'dest':path}
262 260
263 261 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
264 262
265 263 if not """abort: authorization failed""" in stderr:
266 264 raise Exception('Failure')
267 265
268 266 @test_wrapp
269 267 def test_pull():
270 268 pass
271 269
272 270 @test_wrapp
273 271 def test_push_modify_file(f_name='setup.py'):
274 272 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
275 273 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
276 274 for i in xrange(5):
277 275 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
278 276 Command(cwd).execute(cmd)
279 277
280 278 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
281 279 Command(cwd).execute(cmd)
282 280
283 281 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
284 282
285 283 @test_wrapp
286 284 def test_push_new_file(commits=15, with_clone=True):
287 285
288 286 if with_clone:
289 287 test_clone_with_credentials(no_errors=True)
290 288
291 289 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
292 290 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
293 291
294 292 Command(cwd).execute('touch %s' % added_file)
295 293
296 294 Command(cwd).execute('hg add %s' % added_file)
297 295
298 296 for i in xrange(commits):
299 297 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
300 298 Command(cwd).execute(cmd)
301 299
302 300 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
303 301 'Marcin KuΕΊminski <marcin@python-blog.com>',
304 302 added_file)
305 303 Command(cwd).execute(cmd)
306 304
307 305 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
308 306 {'user':USER,
309 307 'pass':PASS,
310 308 'host':HOST,
311 309 'cloned_repo':HG_REPO,
312 310 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
313 311
314 312 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
315 313
316 314 @test_wrapp
317 315 def test_push_wrong_credentials():
318 316 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
319 317 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
320 318 {'user':USER + 'xxx',
321 319 'pass':PASS,
322 320 'host':HOST,
323 321 'cloned_repo':HG_REPO,
324 322 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
325 323
326 324 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
327 325 for i in xrange(5):
328 326 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
329 327 Command(cwd).execute(cmd)
330 328
331 329 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
332 330 Command(cwd).execute(cmd)
333 331
334 332 Command(cwd).execute('hg push %s' % clone_url)
335 333
336 334 @test_wrapp
337 335 def test_push_wrong_path():
338 336 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
339 337 added_file = jn(path, 'somefile.py')
340 338
341 339 try:
342 340 shutil.rmtree(path, ignore_errors=True)
343 341 os.makedirs(path)
344 342 print '\tmade dirs %s' % jn(path)
345 343 except OSError:
346 344 raise
347 345
348 346 Command(cwd).execute("""echo '' > %s""" % added_file)
349 347 Command(cwd).execute("""hg init %s""" % path)
350 348 Command(cwd).execute("""hg add %s""" % added_file)
351 349
352 350 for i in xrange(2):
353 351 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
354 352 Command(cwd).execute(cmd)
355 353
356 354 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
357 355 Command(cwd).execute(cmd)
358 356
359 357 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
360 358 {'user':USER,
361 359 'pass':PASS,
362 360 'host':HOST,
363 361 'cloned_repo':HG_REPO + '_error',
364 362 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
365 363
366 364 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
367 365 if not """abort: HTTP Error 403: Forbidden""" in stderr:
368 366 raise Exception('Failure')
369 367
370 368 @test_wrapp
371 369 def get_logs():
372 370 return UserLog.query().all()
373 371
374 372 @test_wrapp
375 373 def test_logs(initial):
376 374 logs = UserLog.query().all()
377 375 operations = 4
378 376 if len(initial) + operations != len(logs):
379 377 raise Exception("missing number of logs initial:%s vs current:%s" % \
380 378 (len(initial), len(logs)))
381 379
382 380
383 381 if __name__ == '__main__':
384 382 create_test_user(force=False)
385 383 create_test_repo()
386 384
387 385 initial_logs = get_logs()
388 386 print 'initial activity logs: %s' % len(initial_logs)
389
387 s = time.time()
390 388 #test_push_modify_file()
391 389 test_clone_with_credentials()
392 390 test_clone_wrong_credentials()
393 391
394 392 test_push_new_file(commits=2, with_clone=True)
395 393
396 394 test_clone_anonymous()
397 395 test_push_wrong_path()
398 396
399 397 test_push_wrong_credentials()
400 398
401 399 test_logs(initial_logs)
400 print 'finished ok in %.3f' % (time.time() - s)
General Comments 0
You need to be logged in to leave comments. Login now