##// END OF EJS Templates
rewrote test_scm_operations, now run by nosetests
marcink -
r2728:6341084b beta
parent child Browse files
Show More
@@ -41,12 +41,13 b' log = logging.getLogger(__name__)'
41 __all__ = [
41 __all__ = [
42 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
42 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
43 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
43 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
44 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
44 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
45 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
45 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
46 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
46 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
47 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
47 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
48 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
48 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
49 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
49 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
50 'GIT_REMOTE_REPO', 'SCM_TESTS',
50 ]
51 ]
51
52
52 # Invoke websetup with the current config file
53 # Invoke websetup with the current config file
@@ -1,9 +1,12 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_hg_operations
3 rhodecode.tests.test_scm_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations
6 Test suite for making push/pull operations.
7 Run using::
8
9 RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
7
10
8 :created_on: Dec 30, 2010
11 :created_on: Dec 30, 2010
9 :author: marcink
12 :author: marcink
@@ -24,47 +27,19 b''
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
28
26 import os
29 import os
27 import time
30 import tempfile
28 import sys
29 import shutil
30 import logging
31
32 from os.path import join as jn
31 from os.path import join as jn
33 from os.path import dirname as dn
32 from os.path import dirname as dn
34
33
35 from tempfile import _RandomNameSequence
34 from tempfile import _RandomNameSequence
36 from subprocess import Popen, PIPE
35 from subprocess import Popen, PIPE
37
36
38 from paste.deploy import appconfig
37 from rhodecode.tests import *
39 from pylons import config
40 from sqlalchemy import engine_from_config
41
42 from rhodecode.lib.utils import add_cache
43 from rhodecode.model import init_model
44 from rhodecode.model import meta
45 from rhodecode.model.db import User, Repository, UserLog
38 from rhodecode.model.db import User, Repository, UserLog
46 from rhodecode.lib.auth import get_crypt_password
39 from rhodecode.model.meta import Session
47
48 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
49 from rhodecode.config.environment import load_environment
50
51 rel_path = dn(dn(dn(os.path.abspath(__file__))))
52
40
53 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
41 DEBUG = True
54 load_environment(conf.global_conf, conf.local_conf)
42 HOST = '127.0.0.1:5000' # test host
55
56 add_cache(conf)
57
58 USER = 'test_admin'
59 PASS = 'test12'
60 HOST = '127.0.0.1:5000'
61 DEBUG = False
62 print 'DEBUG:', DEBUG
63 log = logging.getLogger(__name__)
64
65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
66 init_model(engine)
67 sa = meta.Session()
68
43
69
44
70 class Command(object):
45 class Command(object):
@@ -73,13 +48,13 b' class Command(object):'
73 self.cwd = cwd
48 self.cwd = cwd
74
49
75 def execute(self, cmd, *args):
50 def execute(self, cmd, *args):
76 """Runs command on the system with given ``args``.
51 """
52 Runs command on the system with given ``args``.
77 """
53 """
78
54
79 command = cmd + ' ' + ' '.join(args)
55 command = cmd + ' ' + ' '.join(args)
80 log.debug('Executing %s' % command)
81 if DEBUG:
56 if DEBUG:
82 print command
57 print '*** CMD %s ***' % command
83 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
58 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
84 stdout, stderr = p.communicate()
59 stdout, stderr = p.communicate()
85 if DEBUG:
60 if DEBUG:
@@ -87,324 +62,181 b' class Command(object):'
87 return stdout, stderr
62 return stdout, stderr
88
63
89
64
90 def test_wrapp(func):
65 def _get_tmp_dir():
91
66 return tempfile.mkdtemp(prefix='rc_integration_test')
92 def __wrapp(*args, **kwargs):
93 print '>>>%s' % func.__name__
94 try:
95 res = func(*args, **kwargs)
96 except Exception, e:
97 print ('###############\n-'
98 '--%s failed %s--\n'
99 '###############\n' % (func.__name__, e))
100 sys.exit()
101 print '++OK++'
102 return res
103 return __wrapp
104
67
105
68
106 def create_test_user(force=True):
69 def _construct_url(repo, dest=None, **kwargs):
107 print '\tcreating test user'
70 if dest is None:
108
71 #make temp clone
109 user = User.get_by_username(USER)
72 dest = _get_tmp_dir()
110
73 params = {
111 if force and user is not None:
74 'user': TEST_USER_ADMIN_LOGIN,
112 print '\tremoving current user'
75 'passwd': TEST_USER_ADMIN_PASS,
113 for repo in Repository.query().filter(Repository.user == user).all():
76 'host': HOST,
114 sa.delete(repo)
77 'cloned_repo': repo,
115 sa.delete(user)
78 'dest': dest
116 sa.commit()
79 }
117
80 params.update(**kwargs)
118 if user is None or force:
81 if params['user'] and params['passwd']:
119 print '\tcreating new one'
82 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
120 new_usr = User()
83 else:
121 new_usr.username = USER
84 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
122 new_usr.password = get_crypt_password(PASS)
85 return _url
123 new_usr.email = 'mail@mail.com'
124 new_usr.name = 'test'
125 new_usr.lastname = 'lasttestname'
126 new_usr.active = True
127 new_usr.admin = True
128 sa.add(new_usr)
129 sa.commit()
130
131 print '\tdone'
132
133
134 def create_test_repo(force=True):
135 from rhodecode.model.repo import RepoModel
136
137 user = User.get_by_username(USER)
138 if user is None:
139 raise Exception('user not found')
140
141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
142
143 if repo is None:
144 print '\trepo not found creating'
145
146 form_data = {'repo_name':HG_REPO,
147 'repo_type':'hg',
148 'private':False,
149 'clone_uri':'' }
150 rm = RepoModel(sa)
151 rm.base_path = '/home/hg'
152 rm.create(form_data, user)
153
86
154
87
155 def set_anonymous_access(enable=True):
88 def set_anonymous_access(enable=True):
156 user = User.get_by_username('default')
89 user = User.get_by_username(User.DEFAULT_USER)
157 user.active = enable
90 user.active = enable
158 sa.add(user)
91 Session().add(user)
159 sa.commit()
92 Session().commit()
160 print '\tanonymous access is now:', enable
93 print '\tanonymous access is now:', enable
161 if enable != User.get_by_username('default').active:
94 if enable != User.get_by_username(User.DEFAULT_USER).active:
162 raise Exception('Cannot set anonymous access')
95 raise Exception('Cannot set anonymous access')
163
96
164
97
165 def get_anonymous_access():
98 def setup_module():
166 user = User.get_by_username('default')
99 #DISABLE ANONYMOUS ACCESS
167 return user.active
100 set_anonymous_access(False)
101
102
103 def test_clone_hg_repo_by_admin():
104 clone_url = _construct_url(HG_REPO)
105 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
106
107 assert 'requesting all changes' in stdout
108 assert 'adding changesets' in stdout
109 assert 'adding manifests' in stdout
110 assert 'adding file changes' in stdout
111
112 assert stderr == ''
168
113
169
114
170 #==============================================================================
115 def test_clone_git_repo_by_admin():
171 # TESTS
116 clone_url = _construct_url(GIT_REPO)
172 #==============================================================================
117 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
173 @test_wrapp
174 def test_clone_with_credentials(no_errors=False):
175 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
176
177 try:
178 shutil.rmtree(path, ignore_errors=True)
179 os.makedirs(path)
180 #print 'made dirs %s' % jn(path)
181 except OSError:
182 raise
183
118
184 print '\tchecking if anonymous access is enabled'
119 assert 'Cloning into' in stdout
185 anonymous_access = get_anonymous_access()
120 assert stderr == ''
186 if anonymous_access:
187 print '\tenabled, disabling it '
188 set_anonymous_access(enable=False)
189
190 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
191 {'user':USER,
192 'pass':PASS,
193 'host':HOST,
194 'cloned_repo':HG_REPO,
195 'dest':path}
196
197 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
198
199 if no_errors is False:
200 assert """adding file changes""" in stdout, 'no messages about cloning'
201 assert """abort""" not in stderr , 'got error from clone'
202
121
203
122
204 @test_wrapp
123 def test_clone_wrong_credentials_hg():
205 def test_clone_anonymous():
124 clone_url = _construct_url(HG_REPO, passwd='bad!')
206 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
125 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
126 assert 'abort: authorization failed' in stderr
207
127
208 try:
128
209 shutil.rmtree(path, ignore_errors=True)
129 def test_clone_wrong_credentials_git():
210 os.makedirs(path)
130 clone_url = _construct_url(GIT_REPO, passwd='bad!')
211 #print 'made dirs %s' % jn(path)
131 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
212 except OSError:
132 assert 'fatal: Authentication failed' in stderr
213 raise
214
133
215
134
216 print '\tchecking if anonymous access is enabled'
135 def test_clone_git_dir_as_hg():
217 anonymous_access = get_anonymous_access()
136 clone_url = _construct_url(GIT_REPO)
218 if not anonymous_access:
137 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
219 print '\tnot enabled, enabling it '
138 assert 'HTTP Error 404: Not Found' in stderr
220 set_anonymous_access(enable=True)
139
221
140
222 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
141 def test_clone_hg_repo_as_git():
223 {'user':USER,
142 clone_url = _construct_url(HG_REPO)
224 'pass':PASS,
143 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
225 'host':HOST,
144 assert 'not found: did you run git update-server-info on the server' in stderr
226 'cloned_repo':HG_REPO,
227 'dest':path}
228
145
229 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
230
146
231 assert """adding file changes""" in stdout, 'no messages about cloning'
147 def test_clone_non_existing_path_hg():
232 assert """abort""" not in stderr , 'got error from clone'
148 clone_url = _construct_url('trololo')
233
149 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
234 #disable if it was enabled
150 assert 'HTTP Error 404: Not Found' in stderr
235 if not anonymous_access:
236 print '\tdisabling anonymous access'
237 set_anonymous_access(enable=False)
238
151
239
152
240 @test_wrapp
153 def test_clone_non_existing_path_git():
241 def test_clone_wrong_credentials():
154 clone_url = _construct_url('trololo')
242 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
155 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
243
156 assert 'not found: did you run git update-server-info on the server' in stderr
244 try:
245 shutil.rmtree(path, ignore_errors=True)
246 os.makedirs(path)
247 #print 'made dirs %s' % jn(path)
248 except OSError:
249 raise
250
251 print '\tchecking if anonymous access is enabled'
252 anonymous_access = get_anonymous_access()
253 if anonymous_access:
254 print '\tenabled, disabling it '
255 set_anonymous_access(enable=False)
256
257 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
258 {'user':USER + 'error',
259 'pass':PASS,
260 'host':HOST,
261 'cloned_repo':HG_REPO,
262 'dest':path}
263
264 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
265
266 if not """abort: authorization failed""" in stderr:
267 raise Exception('Failure')
268
157
269
158
270 @test_wrapp
159 def test_push_new_file_hg():
271 def test_pull():
160 DEST = _get_tmp_dir()
272 pass
161 clone_url = _construct_url(HG_REPO, dest=DEST)
273
162 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
274
275 @test_wrapp
276 def test_push_modify_file(f_name='setup.py'):
277 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
278 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
279 for i in xrange(5):
280 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
281 Command(cwd).execute(cmd)
282
163
283 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
164 # commit some stuff into this repo
284 Command(cwd).execute(cmd)
165 cwd = path = jn(DEST)
285
286 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
287
288
289 @test_wrapp
290 def test_push_new_file(commits=15, with_clone=True):
291
292 if with_clone:
293 test_clone_with_credentials(no_errors=True)
294
295 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
296 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
166 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
297
298 Command(cwd).execute('touch %s' % added_file)
167 Command(cwd).execute('touch %s' % added_file)
299
300 Command(cwd).execute('hg add %s' % added_file)
168 Command(cwd).execute('hg add %s' % added_file)
301
169
302 for i in xrange(commits):
170 for i in xrange(3):
303 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
171 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
304 Command(cwd).execute(cmd)
172 Command(cwd).execute(cmd)
305
173
306 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
174 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
307 'Marcin KuΕΊminski <marcin@python-blog.com>',
175 i,
308 added_file)
176 'Marcin KuΕΊminski <marcin@python-blog.com>',
177 added_file
178 )
309 Command(cwd).execute(cmd)
179 Command(cwd).execute(cmd)
180 # PUSH it back
181 clone_url = _construct_url(HG_REPO, dest='')
182 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
310
183
311 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
184 assert 'pushing to' in stdout
312 {'user':USER,
185 assert 'Repository size' in stdout
313 'pass':PASS,
186 assert 'Last revision is now' in stdout
314 'host':HOST,
315 'cloned_repo':HG_REPO,
316 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
317
318 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
319
187
320
188
321 @test_wrapp
189 def test_push_new_file_git():
322 def test_push_wrong_credentials():
190 DEST = _get_tmp_dir()
323 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
191 clone_url = _construct_url(GIT_REPO, dest=DEST)
324 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
192 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
325 {'user':USER + 'xxx',
326 'pass':PASS,
327 'host':HOST,
328 'cloned_repo':HG_REPO,
329 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
330
331 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
332 for i in xrange(5):
333 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
334 Command(cwd).execute(cmd)
335
336 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
337 Command(cwd).execute(cmd)
338
193
339 Command(cwd).execute('hg push %s' % clone_url)
194 # commit some stuff into this repo
340
195 cwd = path = jn(DEST)
341
196 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
342 @test_wrapp
197 Command(cwd).execute('touch %s' % added_file)
343 def test_push_wrong_path():
198 Command(cwd).execute('git add %s' % added_file)
344 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
345 added_file = jn(path, 'somefile.py')
346
199
347 try:
200 for i in xrange(3):
348 shutil.rmtree(path, ignore_errors=True)
349 os.makedirs(path)
350 print '\tmade dirs %s' % jn(path)
351 except OSError:
352 raise
353
354 Command(cwd).execute("""echo '' > %s""" % added_file)
355 Command(cwd).execute("""hg init %s""" % path)
356 Command(cwd).execute("""hg add %s""" % added_file)
357
358 for i in xrange(2):
359 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
201 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
360 Command(cwd).execute(cmd)
202 Command(cwd).execute(cmd)
361
203
362 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
204 cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
205 i,
206 'Marcin KuΕΊminski <marcin@python-blog.com>',
207 added_file
208 )
363 Command(cwd).execute(cmd)
209 Command(cwd).execute(cmd)
210 # PUSH it back
211 clone_url = _construct_url(GIT_REPO, dest='')
212 stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
364
213
365 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
214 #WTF git stderr ?!
366 {'user':USER,
215 assert 'master -> master' in stderr
367 'pass':PASS,
368 'host':HOST,
369 'cloned_repo':HG_REPO + '_error',
370 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
371
372 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
373 if not """abort: HTTP Error 403: Forbidden""" in stderr:
374 raise Exception('Failure')
375
216
376
217
377 @test_wrapp
218 def test_push_modify_existing_file_hg():
378 def get_logs():
219 assert 0
379 return UserLog.query().all()
380
220
381
221
382 @test_wrapp
222 def test_push_modify_existing_file_git():
383 def test_logs(initial):
223 assert 0
384 logs = UserLog.query().all()
224
385 operations = 4
225
386 if len(initial) + operations != len(logs):
226 def test_push_wrong_credentials_hg():
387 raise Exception("missing number of logs initial:%s vs current:%s" % \
227 assert 0
388 (len(initial), len(logs)))
389
228
390
229
391 if __name__ == '__main__':
230 def test_push_wrong_credentials_git():
392 create_test_user(force=False)
231 assert 0
393 create_test_repo()
232
233
234 def test_push_back_to_wrong_url_hg():
235 assert 0
394
236
395 initial_logs = get_logs()
396 print 'initial activity logs: %s' % len(initial_logs)
397 s = time.time()
398 #test_push_modify_file()
399 test_clone_with_credentials()
400 test_clone_wrong_credentials()
401
237
402 test_push_new_file(commits=2, with_clone=True)
238 def test_push_back_to_wrong_url_git():
403
239 assert 0
404 test_clone_anonymous()
405 test_push_wrong_path()
406
240
407 test_push_wrong_credentials()
408
241
409 test_logs(initial_logs)
242 #TODO: write all locking tests
410 print 'finished ok in %.3f' % (time.time() - s)
General Comments 0
You need to be logged in to leave comments. Login now