##// END OF EJS Templates
tests for changeset comments
marcink -
r1715:e1e48209 beta
parent child Browse files
Show More
@@ -0,0 +1,143 b''
1 from rhodecode.tests import *
2 from rhodecode.model.db import ChangesetComment, Notification, User, \
3 UserNotification
4
5 class TestChangeSetCommentrController(TestController):
6
7 def setUp(self):
8 for x in ChangesetComment.query().all():
9 self.Session().delete(x)
10 self.Session().commit()
11
12 for x in Notification.query().all():
13 self.Session().delete(x)
14 self.Session().commit()
15
16 def tearDown(self):
17 for x in ChangesetComment.query().all():
18 self.Session().delete(x)
19 self.Session().commit()
20
21 for x in Notification.query().all():
22 self.Session().delete(x)
23 self.Session().commit()
24
25 def test_create(self):
26 self.log_user()
27 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
28 text = u'CommentOnRevision'
29
30 params = {'text':text}
31 response = self.app.post(url(controller='changeset', action='comment',
32 repo_name=HG_REPO, revision=rev),
33 params=params)
34 # Test response...
35 self.assertEqual(response.status, '302 Found')
36 response.follow()
37
38 response = self.app.get(url(controller='changeset', action='index',
39 repo_name=HG_REPO, revision=rev))
40 # test DB
41 self.assertEqual(ChangesetComment.query().count(), 1)
42 self.assertTrue('''<div class="comments-number">%s '''
43 '''comment(s) (0 inline)</div>''' % 1 in response.body)
44
45
46 self.assertEqual(Notification.query().count(), 1)
47 notification = Notification.query().all()[0]
48
49 self.assertEqual(notification.type_, Notification.TYPE_CHANGESET_COMMENT)
50 self.assertTrue((u'/vcs_test_hg/changeset/27cd5cce30c96924232df'
51 'fcd24178a07ffeb5dfc#comment-1') in notification.subject)
52
53 def test_create_inline(self):
54 self.log_user()
55 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
56 text = u'CommentOnRevision'
57 f_path = 'vcs/web/simplevcs/views/repository.py'
58 line = 'n1'
59
60 params = {'text':text, 'f_path':f_path, 'line':line}
61 response = self.app.post(url(controller='changeset', action='comment',
62 repo_name=HG_REPO, revision=rev),
63 params=params)
64 # Test response...
65 self.assertEqual(response.status, '302 Found')
66 response.follow()
67
68 response = self.app.get(url(controller='changeset', action='index',
69 repo_name=HG_REPO, revision=rev))
70 #test DB
71 self.assertEqual(ChangesetComment.query().count(), 1)
72 self.assertTrue('''<div class="comments-number">0 comment(s)'''
73 ''' (%s inline)</div>''' % 1 in response.body)
74 self.assertTrue('''<div class="inline-comment-placeholder-line"'''
75 ''' line="n1" target_id="vcswebsimplevcsviews'''
76 '''repositorypy">''' in response.body)
77
78 self.assertEqual(Notification.query().count(), 1)
79 notification = Notification.query().all()[0]
80
81 self.assertEqual(notification.type_, Notification.TYPE_CHANGESET_COMMENT)
82 self.assertTrue((u'/vcs_test_hg/changeset/27cd5cce30c96924232df'
83 'fcd24178a07ffeb5dfc#comment-1') in notification.subject)
84
85 def test_create_with_mention(self):
86 self.log_user()
87
88 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
89 text = u'@test_regular check CommentOnRevision'
90
91 params = {'text':text}
92 response = self.app.post(url(controller='changeset', action='comment',
93 repo_name=HG_REPO, revision=rev),
94 params=params)
95 # Test response...
96 self.assertEqual(response.status, '302 Found')
97 response.follow()
98
99 response = self.app.get(url(controller='changeset', action='index',
100 repo_name=HG_REPO, revision=rev))
101 # test DB
102 self.assertEqual(ChangesetComment.query().count(), 1)
103 self.assertTrue('''<div class="comments-number">%s '''
104 '''comment(s) (0 inline)</div>''' % 1 in response.body)
105
106
107 self.assertEqual(Notification.query().count(), 2)
108 users = [x.user.username for x in UserNotification.query().all()]
109
110 # test_regular get's notification by @mention
111 self.assertEqual(users, [u'test_admin', u'test_regular'])
112
113 def test_delete(self):
114 self.log_user()
115 rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
116 text = u'CommentOnRevision'
117
118 params = {'text':text}
119 response = self.app.post(url(controller='changeset', action='comment',
120 repo_name=HG_REPO, revision=rev),
121 params=params)
122
123 comments = ChangesetComment.query().all()
124 self.assertEqual(len(comments), 1)
125 comment_id = comments[0].comment_id
126
127
128 self.app.delete(url(controller='changeset',
129 action='delete_comment',
130 repo_name=HG_REPO,
131 comment_id = comment_id))
132
133 comments = ChangesetComment.query().all()
134 self.assertEqual(len(comments), 0)
135
136 response = self.app.get(url(controller='changeset', action='index',
137 repo_name=HG_REPO, revision=rev))
138 self.assertTrue('''<div class="comments-number">0 comment(s)'''
139 ''' (0 inline)</div>''' in response.body)
140
141
142
143
@@ -1,401 +1,400 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_hg_operations
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations
6 Test suite for making push/pull operations
7
7
8 :created_on: Dec 30, 2010
8 :created_on: Dec 30, 2010
9 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
9 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 :license: GPLv3, see COPYING for more details.
10 :license: GPLv3, see COPYING for more details.
11 """
11 """
12 # This program is free software: you can redistribute it and/or modify
12 # This program is free software: you can redistribute it and/or modify
13 # it under the terms of the GNU General Public License as published by
13 # it under the terms of the GNU General Public License as published by
14 # the Free Software Foundation, either version 3 of the License, or
14 # the Free Software Foundation, either version 3 of the License, or
15 # (at your option) any later version.
15 # (at your option) any later version.
16 #
16 #
17 # This program is distributed in the hope that it will be useful,
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
20 # GNU General Public License for more details.
21 #
21 #
22 # You should have received a copy of the GNU General Public License
22 # You should have received a copy of the GNU General Public License
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24
24
25 import os
25 import os
26 import time
26 import time
27 import sys
27 import sys
28 import shutil
28 import shutil
29 import logging
29 import logging
30
30
31 from os.path import join as jn
31 from os.path import join as jn
32 from os.path import dirname as dn
32 from os.path import dirname as dn
33
33
34 from tempfile import _RandomNameSequence
34 from tempfile import _RandomNameSequence
35 from subprocess import Popen, PIPE
35 from subprocess import Popen, PIPE
36
36
37 from paste.deploy import appconfig
37 from paste.deploy import appconfig
38 from pylons import config
38 from pylons import config
39 from sqlalchemy import engine_from_config
39 from sqlalchemy import engine_from_config
40
40
41 from rhodecode.lib.utils import add_cache
41 from rhodecode.lib.utils import add_cache
42 from rhodecode.model import init_model
42 from rhodecode.model import init_model
43 from rhodecode.model import meta
43 from rhodecode.model import meta
44 from rhodecode.model.db import User, Repository, UserLog
44 from rhodecode.model.db import User, Repository, UserLog
45 from rhodecode.lib.auth import get_crypt_password
45 from rhodecode.lib.auth import get_crypt_password
46
46
47 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
47 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
48 from rhodecode.config.environment import load_environment
48 from rhodecode.config.environment import load_environment
49
49
50 rel_path = dn(dn(dn(os.path.abspath(__file__))))
50 rel_path = dn(dn(dn(os.path.abspath(__file__))))
51
51
52 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
52 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
53 load_environment(conf.global_conf, conf.local_conf)
53 load_environment(conf.global_conf, conf.local_conf)
54
54
55 add_cache(conf)
55 add_cache(conf)
56
56
57 USER = 'test_admin'
57 USER = 'test_admin'
58 PASS = 'test12'
58 PASS = 'test12'
59 HOST = '127.0.0.1:5000'
59 HOST = '127.0.0.1:5000'
60 DEBUG = False
60 DEBUG = False
61 print 'DEBUG:', DEBUG
61 print 'DEBUG:', DEBUG
62 log = logging.getLogger(__name__)
62 log = logging.getLogger(__name__)
63
63
64 engine = engine_from_config(conf, 'sqlalchemy.db1.')
64 engine = engine_from_config(conf, 'sqlalchemy.db1.')
65 init_model(engine)
65 init_model(engine)
66 sa = meta.Session
66 sa = meta.Session
67
67
68 class Command(object):
68 class Command(object):
69
69
70 def __init__(self, cwd):
70 def __init__(self, cwd):
71 self.cwd = cwd
71 self.cwd = cwd
72
72
73 def execute(self, cmd, *args):
73 def execute(self, cmd, *args):
74 """Runs command on the system with given ``args``.
74 """Runs command on the system with given ``args``.
75 """
75 """
76
76
77 command = cmd + ' ' + ' '.join(args)
77 command = cmd + ' ' + ' '.join(args)
78 log.debug('Executing %s' % command)
78 log.debug('Executing %s' % command)
79 if DEBUG:
79 if DEBUG:
80 print command
80 print command
81 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
81 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
82 stdout, stderr = p.communicate()
82 stdout, stderr = p.communicate()
83 if DEBUG:
83 if DEBUG:
84 print stdout, stderr
84 print stdout, stderr
85 return stdout, stderr
85 return stdout, stderr
86
86
87
87
88 def test_wrapp(func):
88 def test_wrapp(func):
89
89
90 def __wrapp(*args, **kwargs):
90 def __wrapp(*args, **kwargs):
91 print '>>>%s' % func.__name__
91 print '>>>%s' % func.__name__
92 try:
92 try:
93 res = func(*args, **kwargs)
93 res = func(*args, **kwargs)
94 except Exception, e:
94 except Exception, e:
95 print ('###############\n-'
95 print ('###############\n-'
96 '--%s failed %s--\n'
96 '--%s failed %s--\n'
97 '###############\n' % (func.__name__, e))
97 '###############\n' % (func.__name__, e))
98 sys.exit()
98 sys.exit()
99 print '++OK++'
99 print '++OK++'
100 return res
100 return res
101 return __wrapp
101 return __wrapp
102
102
103
103
104 def create_test_user(force=True):
104 def create_test_user(force=True):
105 print '\tcreating test user'
105 print '\tcreating test user'
106
106
107 user = User.get_by_username(USER)
107 user = User.get_by_username(USER)
108
108
109 if force and user is not None:
109 if force and user is not None:
110 print '\tremoving current user'
110 print '\tremoving current user'
111 for repo in Repository.query().filter(Repository.user == user).all():
111 for repo in Repository.query().filter(Repository.user == user).all():
112 sa.delete(repo)
112 sa.delete(repo)
113 sa.delete(user)
113 sa.delete(user)
114 sa.commit()
114 sa.commit()
115
115
116 if user is None or force:
116 if user is None or force:
117 print '\tcreating new one'
117 print '\tcreating new one'
118 new_usr = User()
118 new_usr = User()
119 new_usr.username = USER
119 new_usr.username = USER
120 new_usr.password = get_crypt_password(PASS)
120 new_usr.password = get_crypt_password(PASS)
121 new_usr.email = 'mail@mail.com'
121 new_usr.email = 'mail@mail.com'
122 new_usr.name = 'test'
122 new_usr.name = 'test'
123 new_usr.lastname = 'lasttestname'
123 new_usr.lastname = 'lasttestname'
124 new_usr.active = True
124 new_usr.active = True
125 new_usr.admin = True
125 new_usr.admin = True
126 sa.add(new_usr)
126 sa.add(new_usr)
127 sa.commit()
127 sa.commit()
128
128
129 print '\tdone'
129 print '\tdone'
130
130
131
131
132 def create_test_repo(force=True):
132 def create_test_repo(force=True):
133 from rhodecode.model.repo import RepoModel
133 from rhodecode.model.repo import RepoModel
134
134
135 user = User.get_by_username(USER)
135 user = User.get_by_username(USER)
136 if user is None:
136 if user is None:
137 raise Exception('user not found')
137 raise Exception('user not found')
138
138
139
139
140 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
140 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
141
141
142 if repo is None:
142 if repo is None:
143 print '\trepo not found creating'
143 print '\trepo not found creating'
144
144
145 form_data = {'repo_name':HG_REPO,
145 form_data = {'repo_name':HG_REPO,
146 'repo_type':'hg',
146 'repo_type':'hg',
147 'private':False,
147 'private':False,
148 'clone_uri':'' }
148 'clone_uri':'' }
149 rm = RepoModel(sa)
149 rm = RepoModel(sa)
150 rm.base_path = '/home/hg'
150 rm.base_path = '/home/hg'
151 rm.create(form_data, user)
151 rm.create(form_data, user)
152
152
153
153
154 def set_anonymous_access(enable=True):
154 def set_anonymous_access(enable=True):
155 user = User.get_by_username('default')
155 user = User.get_by_username('default')
156 user.active = enable
156 user.active = enable
157 sa.add(user)
157 sa.add(user)
158 sa.commit()
158 sa.commit()
159 print '\tanonymous access is now:', enable
159 print '\tanonymous access is now:', enable
160 if enable != User.get_by_username('default').active:
160 if enable != User.get_by_username('default').active:
161 raise Exception('Cannot set anonymous access')
161 raise Exception('Cannot set anonymous access')
162
162
163 def get_anonymous_access():
163 def get_anonymous_access():
164 user = User.get_by_username('default')
164 user = User.get_by_username('default')
165 return user.active
165 return user.active
166
166
167
167
168 #==============================================================================
168 #==============================================================================
169 # TESTS
169 # TESTS
170 #==============================================================================
170 #==============================================================================
171 @test_wrapp
171 @test_wrapp
172 def test_clone_with_credentials(no_errors=False):
172 def test_clone_with_credentials(no_errors=False):
173 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
173 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
174
174
175 try:
175 try:
176 shutil.rmtree(path, ignore_errors=True)
176 shutil.rmtree(path, ignore_errors=True)
177 os.makedirs(path)
177 os.makedirs(path)
178 #print 'made dirs %s' % jn(path)
178 #print 'made dirs %s' % jn(path)
179 except OSError:
179 except OSError:
180 raise
180 raise
181
181
182 print '\tchecking if anonymous access is enabled'
182 print '\tchecking if anonymous access is enabled'
183 anonymous_access = get_anonymous_access()
183 anonymous_access = get_anonymous_access()
184 if anonymous_access:
184 if anonymous_access:
185 print '\tenabled, disabling it '
185 print '\tenabled, disabling it '
186 set_anonymous_access(enable=False)
186 set_anonymous_access(enable=False)
187 time.sleep(1)
188
187
189 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
188 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
190 {'user':USER,
189 {'user':USER,
191 'pass':PASS,
190 'pass':PASS,
192 'host':HOST,
191 'host':HOST,
193 'cloned_repo':HG_REPO,
192 'cloned_repo':HG_REPO,
194 'dest':path}
193 'dest':path}
195
194
196 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
195 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
197
196
198 if no_errors is False:
197 if no_errors is False:
199 assert """adding file changes""" in stdout, 'no messages about cloning'
198 assert """adding file changes""" in stdout, 'no messages about cloning'
200 assert """abort""" not in stderr , 'got error from clone'
199 assert """abort""" not in stderr , 'got error from clone'
201
200
202
201
203 @test_wrapp
202 @test_wrapp
204 def test_clone_anonymous():
203 def test_clone_anonymous():
205 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
204 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
206
205
207 try:
206 try:
208 shutil.rmtree(path, ignore_errors=True)
207 shutil.rmtree(path, ignore_errors=True)
209 os.makedirs(path)
208 os.makedirs(path)
210 #print 'made dirs %s' % jn(path)
209 #print 'made dirs %s' % jn(path)
211 except OSError:
210 except OSError:
212 raise
211 raise
213
212
214
213
215 print '\tchecking if anonymous access is enabled'
214 print '\tchecking if anonymous access is enabled'
216 anonymous_access = get_anonymous_access()
215 anonymous_access = get_anonymous_access()
217 if not anonymous_access:
216 if not anonymous_access:
218 print '\tnot enabled, enabling it '
217 print '\tnot enabled, enabling it '
219 set_anonymous_access(enable=True)
218 set_anonymous_access(enable=True)
220 time.sleep(1)
221
219
222 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
220 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
223 {'user':USER,
221 {'user':USER,
224 'pass':PASS,
222 'pass':PASS,
225 'host':HOST,
223 'host':HOST,
226 'cloned_repo':HG_REPO,
224 'cloned_repo':HG_REPO,
227 'dest':path}
225 'dest':path}
228
226
229 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
227 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
230
228
231 assert """adding file changes""" in stdout, 'no messages about cloning'
229 assert """adding file changes""" in stdout, 'no messages about cloning'
232 assert """abort""" not in stderr , 'got error from clone'
230 assert """abort""" not in stderr , 'got error from clone'
233
231
234 #disable if it was enabled
232 #disable if it was enabled
235 if not anonymous_access:
233 if not anonymous_access:
236 print '\tdisabling anonymous access'
234 print '\tdisabling anonymous access'
237 set_anonymous_access(enable=False)
235 set_anonymous_access(enable=False)
238
236
239 @test_wrapp
237 @test_wrapp
240 def test_clone_wrong_credentials():
238 def test_clone_wrong_credentials():
241 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
239 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
242
240
243 try:
241 try:
244 shutil.rmtree(path, ignore_errors=True)
242 shutil.rmtree(path, ignore_errors=True)
245 os.makedirs(path)
243 os.makedirs(path)
246 #print 'made dirs %s' % jn(path)
244 #print 'made dirs %s' % jn(path)
247 except OSError:
245 except OSError:
248 raise
246 raise
249
247
250 print '\tchecking if anonymous access is enabled'
248 print '\tchecking if anonymous access is enabled'
251 anonymous_access = get_anonymous_access()
249 anonymous_access = get_anonymous_access()
252 if anonymous_access:
250 if anonymous_access:
253 print '\tenabled, disabling it '
251 print '\tenabled, disabling it '
254 set_anonymous_access(enable=False)
252 set_anonymous_access(enable=False)
255
253
256 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
254 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
257 {'user':USER + 'error',
255 {'user':USER + 'error',
258 'pass':PASS,
256 'pass':PASS,
259 'host':HOST,
257 'host':HOST,
260 'cloned_repo':HG_REPO,
258 'cloned_repo':HG_REPO,
261 'dest':path}
259 'dest':path}
262
260
263 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
261 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
264
262
265 if not """abort: authorization failed""" in stderr:
263 if not """abort: authorization failed""" in stderr:
266 raise Exception('Failure')
264 raise Exception('Failure')
267
265
268 @test_wrapp
266 @test_wrapp
269 def test_pull():
267 def test_pull():
270 pass
268 pass
271
269
272 @test_wrapp
270 @test_wrapp
273 def test_push_modify_file(f_name='setup.py'):
271 def test_push_modify_file(f_name='setup.py'):
274 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
272 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
275 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
273 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
276 for i in xrange(5):
274 for i in xrange(5):
277 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
275 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
278 Command(cwd).execute(cmd)
276 Command(cwd).execute(cmd)
279
277
280 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
278 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
281 Command(cwd).execute(cmd)
279 Command(cwd).execute(cmd)
282
280
283 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
281 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
284
282
285 @test_wrapp
283 @test_wrapp
286 def test_push_new_file(commits=15, with_clone=True):
284 def test_push_new_file(commits=15, with_clone=True):
287
285
288 if with_clone:
286 if with_clone:
289 test_clone_with_credentials(no_errors=True)
287 test_clone_with_credentials(no_errors=True)
290
288
291 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
289 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
292 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
290 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
293
291
294 Command(cwd).execute('touch %s' % added_file)
292 Command(cwd).execute('touch %s' % added_file)
295
293
296 Command(cwd).execute('hg add %s' % added_file)
294 Command(cwd).execute('hg add %s' % added_file)
297
295
298 for i in xrange(commits):
296 for i in xrange(commits):
299 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
297 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
300 Command(cwd).execute(cmd)
298 Command(cwd).execute(cmd)
301
299
302 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
300 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
303 'Marcin KuΕΊminski <marcin@python-blog.com>',
301 'Marcin KuΕΊminski <marcin@python-blog.com>',
304 added_file)
302 added_file)
305 Command(cwd).execute(cmd)
303 Command(cwd).execute(cmd)
306
304
307 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
305 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
308 {'user':USER,
306 {'user':USER,
309 'pass':PASS,
307 'pass':PASS,
310 'host':HOST,
308 'host':HOST,
311 'cloned_repo':HG_REPO,
309 'cloned_repo':HG_REPO,
312 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
310 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
313
311
314 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
312 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
315
313
316 @test_wrapp
314 @test_wrapp
317 def test_push_wrong_credentials():
315 def test_push_wrong_credentials():
318 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
316 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
319 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
317 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
320 {'user':USER + 'xxx',
318 {'user':USER + 'xxx',
321 'pass':PASS,
319 'pass':PASS,
322 'host':HOST,
320 'host':HOST,
323 'cloned_repo':HG_REPO,
321 'cloned_repo':HG_REPO,
324 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
322 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
325
323
326 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
324 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
327 for i in xrange(5):
325 for i in xrange(5):
328 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
326 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
329 Command(cwd).execute(cmd)
327 Command(cwd).execute(cmd)
330
328
331 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
329 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
332 Command(cwd).execute(cmd)
330 Command(cwd).execute(cmd)
333
331
334 Command(cwd).execute('hg push %s' % clone_url)
332 Command(cwd).execute('hg push %s' % clone_url)
335
333
336 @test_wrapp
334 @test_wrapp
337 def test_push_wrong_path():
335 def test_push_wrong_path():
338 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
336 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
339 added_file = jn(path, 'somefile.py')
337 added_file = jn(path, 'somefile.py')
340
338
341 try:
339 try:
342 shutil.rmtree(path, ignore_errors=True)
340 shutil.rmtree(path, ignore_errors=True)
343 os.makedirs(path)
341 os.makedirs(path)
344 print '\tmade dirs %s' % jn(path)
342 print '\tmade dirs %s' % jn(path)
345 except OSError:
343 except OSError:
346 raise
344 raise
347
345
348 Command(cwd).execute("""echo '' > %s""" % added_file)
346 Command(cwd).execute("""echo '' > %s""" % added_file)
349 Command(cwd).execute("""hg init %s""" % path)
347 Command(cwd).execute("""hg init %s""" % path)
350 Command(cwd).execute("""hg add %s""" % added_file)
348 Command(cwd).execute("""hg add %s""" % added_file)
351
349
352 for i in xrange(2):
350 for i in xrange(2):
353 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
351 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
354 Command(cwd).execute(cmd)
352 Command(cwd).execute(cmd)
355
353
356 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
354 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
357 Command(cwd).execute(cmd)
355 Command(cwd).execute(cmd)
358
356
359 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
357 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
360 {'user':USER,
358 {'user':USER,
361 'pass':PASS,
359 'pass':PASS,
362 'host':HOST,
360 'host':HOST,
363 'cloned_repo':HG_REPO + '_error',
361 'cloned_repo':HG_REPO + '_error',
364 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
362 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
365
363
366 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
364 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
367 if not """abort: HTTP Error 403: Forbidden""" in stderr:
365 if not """abort: HTTP Error 403: Forbidden""" in stderr:
368 raise Exception('Failure')
366 raise Exception('Failure')
369
367
370 @test_wrapp
368 @test_wrapp
371 def get_logs():
369 def get_logs():
372 return UserLog.query().all()
370 return UserLog.query().all()
373
371
374 @test_wrapp
372 @test_wrapp
375 def test_logs(initial):
373 def test_logs(initial):
376 logs = UserLog.query().all()
374 logs = UserLog.query().all()
377 operations = 4
375 operations = 4
378 if len(initial) + operations != len(logs):
376 if len(initial) + operations != len(logs):
379 raise Exception("missing number of logs initial:%s vs current:%s" % \
377 raise Exception("missing number of logs initial:%s vs current:%s" % \
380 (len(initial), len(logs)))
378 (len(initial), len(logs)))
381
379
382
380
383 if __name__ == '__main__':
381 if __name__ == '__main__':
384 create_test_user(force=False)
382 create_test_user(force=False)
385 create_test_repo()
383 create_test_repo()
386
384
387 initial_logs = get_logs()
385 initial_logs = get_logs()
388 print 'initial activity logs: %s' % len(initial_logs)
386 print 'initial activity logs: %s' % len(initial_logs)
389
387 s = time.time()
390 #test_push_modify_file()
388 #test_push_modify_file()
391 test_clone_with_credentials()
389 test_clone_with_credentials()
392 test_clone_wrong_credentials()
390 test_clone_wrong_credentials()
393
391
394 test_push_new_file(commits=2, with_clone=True)
392 test_push_new_file(commits=2, with_clone=True)
395
393
396 test_clone_anonymous()
394 test_clone_anonymous()
397 test_push_wrong_path()
395 test_push_wrong_path()
398
396
399 test_push_wrong_credentials()
397 test_push_wrong_credentials()
400
398
401 test_logs(initial_logs)
399 test_logs(initial_logs)
400 print 'finished ok in %.3f' % (time.time() - s)
General Comments 0
You need to be logged in to leave comments. Login now