##// END OF EJS Templates
Fixed test_hg_operations test and added concurency test
marcink -
r1529:0b268dd3 beta
parent child Browse files
Show More
@@ -0,0 +1,177 b''
1 # -*- coding: utf-8 -*-
2 """
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
6 Test suite for making push/pull operations
7
8 :created_on: Dec 30, 2010
9 :copyright: (c) 2010 by marcink.
10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 """
12
13 import os
14 import sys
15 import shutil
16 import logging
17 from os.path import join as jn
18 from os.path import dirname as dn
19
20 from tempfile import _RandomNameSequence
21 from subprocess import Popen, PIPE
22
23 from paste.deploy import appconfig
24 from pylons import config
25 from sqlalchemy import engine_from_config
26
27 from rhodecode.lib.utils import add_cache
28 from rhodecode.model import init_model
29 from rhodecode.model import meta
30 from rhodecode.model.db import User, Repository
31 from rhodecode.lib.auth import get_crypt_password
32
33 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
34 from rhodecode.config.environment import load_environment
35
36 rel_path = dn(dn(dn(os.path.abspath(__file__))))
37 conf = appconfig('config:development.ini', relative_to=rel_path)
38 load_environment(conf.global_conf, conf.local_conf)
39
40 add_cache(conf)
41
42 USER = 'test_admin'
43 PASS = 'test12'
44 HOST = '127.0.0.1:5000'
45 DEBUG = True
46 log = logging.getLogger(__name__)
47
48
49 class Command(object):
50
51 def __init__(self, cwd):
52 self.cwd = cwd
53
54 def execute(self, cmd, *args):
55 """Runs command on the system with given ``args``.
56 """
57
58 command = cmd + ' ' + ' '.join(args)
59 log.debug('Executing %s' % command)
60 if DEBUG:
61 print command
62 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
63 stdout, stderr = p.communicate()
64 if DEBUG:
65 print stdout, stderr
66 return stdout, stderr
67
68 def get_session():
69 engine = engine_from_config(conf, 'sqlalchemy.db1.')
70 init_model(engine)
71 sa = meta.Session()
72 return sa
73
74
75 def create_test_user(force=True):
76 print 'creating test user'
77 sa = get_session()
78
79 user = sa.query(User).filter(User.username == USER).scalar()
80
81 if force and user is not None:
82 print 'removing current user'
83 for repo in sa.query(Repository).filter(Repository.user == user).all():
84 sa.delete(repo)
85 sa.delete(user)
86 sa.commit()
87
88 if user is None or force:
89 print 'creating new one'
90 new_usr = User()
91 new_usr.username = USER
92 new_usr.password = get_crypt_password(PASS)
93 new_usr.email = 'mail@mail.com'
94 new_usr.name = 'test'
95 new_usr.lastname = 'lasttestname'
96 new_usr.active = True
97 new_usr.admin = True
98 sa.add(new_usr)
99 sa.commit()
100
101 print 'done'
102
103
104 def create_test_repo(force=True):
105 print 'creating test repo'
106 from rhodecode.model.repo import RepoModel
107 sa = get_session()
108
109 user = sa.query(User).filter(User.username == USER).scalar()
110 if user is None:
111 raise Exception('user not found')
112
113
114 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
115
116 if repo is None:
117 print 'repo not found creating'
118
119 form_data = {'repo_name':HG_REPO,
120 'repo_type':'hg',
121 'private':False,
122 'clone_uri':'' }
123 rm = RepoModel(sa)
124 rm.base_path = '/home/hg'
125 rm.create(form_data, user)
126
127 print 'done'
128
129 def set_anonymous_access(enable=True):
130 sa = get_session()
131 user = sa.query(User).filter(User.username == 'default').one()
132 user.active = enable
133 sa.add(user)
134 sa.commit()
135
136 def get_anonymous_access():
137 sa = get_session()
138 return sa.query(User).filter(User.username == 'default').one().active
139
140
141 #==============================================================================
142 # TESTS
143 #==============================================================================
144 def test_clone_with_credentials(no_errors=False,repo=HG_REPO):
145 cwd = path = jn(TESTS_TMP_PATH, repo)
146
147
148 try:
149 shutil.rmtree(path, ignore_errors=True)
150 os.makedirs(path)
151 #print 'made dirs %s' % jn(path)
152 except OSError:
153 raise
154
155
156 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
157 {'user':USER,
158 'pass':PASS,
159 'host':HOST,
160 'cloned_repo':repo,
161 'dest':path+_RandomNameSequence().next()}
162
163 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
164
165 if no_errors is False:
166 assert """adding file changes""" in stdout, 'no messages about cloning'
167 assert """abort""" not in stderr , 'got error from clone'
168
169 if __name__ == '__main__':
170 try:
171 create_test_user(force=False)
172
173 for i in range(int(sys.argv[2])):
174 test_clone_with_credentials(repo=sys.argv[1])
175
176 except Exception,e:
177 sys.exit('stop on %s' % e)
@@ -76,7 +76,7 b' class SimpleHg(object):'
76 76
77 77 # skip passing error to error controller
78 78 environ['pylons.status_code_redirect'] = True
79
79
80 80 #======================================================================
81 81 # EXTRACT REPOSITORY NAME FROM ENV
82 82 #======================================================================
@@ -90,12 +90,13 b' class SimpleHg(object):'
90 90 # GET ACTION PULL or PUSH
91 91 #======================================================================
92 92 action = self.__get_action(environ)
93
93
94 94 #======================================================================
95 95 # CHECK ANONYMOUS PERMISSION
96 96 #======================================================================
97 97 if action in ['pull', 'push']:
98 98 anonymous_user = self.__get_user('default')
99
99 100 username = anonymous_user.username
100 101 anonymous_perm = self.__check_permission(action,
101 102 anonymous_user,
@@ -152,13 +153,13 b' class SimpleHg(object):'
152 153 #======================================================================
153 154 # MERCURIAL REQUEST HANDLING
154 155 #======================================================================
155
156
156 157 repo_path = safe_str(os.path.join(self.basepath, repo_name))
157 158 log.debug('Repository path is %s' % repo_path)
158
159
159 160 baseui = make_ui('db')
160 161 self.__inject_extras(repo_path, baseui, extras)
161
162
162 163
163 164 # quick check if that dir exists...
164 165 if is_valid_repo(repo_name, self.basepath) is False:
@@ -257,7 +258,7 b' class SimpleHg(object):'
257 258 push requests"""
258 259 invalidate_cache('get_repo_cached_%s' % repo_name)
259 260
260 def __inject_extras(self,repo_path, baseui, extras={}):
261 def __inject_extras(self, repo_path, baseui, extras={}):
261 262 """
262 263 Injects some extra params into baseui instance
263 264
@@ -29,7 +29,7 b' from sqlalchemy import engine_from_confi'
29 29 from rhodecode.lib.utils import add_cache
30 30 from rhodecode.model import init_model
31 31 from rhodecode.model import meta
32 from rhodecode.model.db import User, Repository
32 from rhodecode.model.db import User, Repository, UserLog
33 33 from rhodecode.lib.auth import get_crypt_password
34 34
35 35 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
@@ -45,7 +45,7 b" USER = 'test_admin'"
45 45 PASS = 'test12'
46 46 HOST = '127.0.0.1:5000'
47 47 DEBUG = bool(int(sys.argv[1]))
48 print 'DEBUG:',DEBUG
48 print 'DEBUG:', DEBUG
49 49 log = logging.getLogger(__name__)
50 50
51 51
@@ -70,15 +70,17 b' class Command(object):'
70 70
71 71
72 72 def test_wrapp(func):
73
74 def __wrapp(*args,**kwargs):
75 print '###%s###' %func.__name__
73
74 def __wrapp(*args, **kwargs):
75 print '###%s###' % func.__name__
76 76 try:
77 res = func(*args,**kwargs)
78 except:
79 print '--%s failed--' % func.__name__
80 return
81 print 'ok'
77 res = func(*args, **kwargs)
78 except Exception, e:
79 print ('###############\n-'
80 '--%s failed %s--\n'
81 '###############\n' % (func.__name__, e))
82 sys.exit()
83 print '++OK++'
82 84 return res
83 85 return __wrapp
84 86
@@ -90,20 +92,20 b' def get_session():'
90 92
91 93
92 94 def create_test_user(force=True):
93 print 'creating test user'
95 print '\tcreating test user'
94 96 sa = get_session()
95 97
96 98 user = sa.query(User).filter(User.username == USER).scalar()
97 99
98 100 if force and user is not None:
99 print 'removing current user'
101 print '\tremoving current user'
100 102 for repo in sa.query(Repository).filter(Repository.user == user).all():
101 103 sa.delete(repo)
102 104 sa.delete(user)
103 105 sa.commit()
104 106
105 107 if user is None or force:
106 print 'creating new one'
108 print '\tcreating new one'
107 109 new_usr = User()
108 110 new_usr.username = USER
109 111 new_usr.password = get_crypt_password(PASS)
@@ -115,7 +117,7 b' def create_test_user(force=True):'
115 117 sa.add(new_usr)
116 118 sa.commit()
117 119
118 print 'done'
120 print '\tdone'
119 121
120 122
121 123 def create_test_repo(force=True):
@@ -130,7 +132,7 b' def create_test_repo(force=True):'
130 132 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
131 133
132 134 if repo is None:
133 print 'repo not found creating'
135 print '\trepo not found creating'
134 136
135 137 form_data = {'repo_name':HG_REPO,
136 138 'repo_type':'hg',
@@ -144,12 +146,13 b' def create_test_repo(force=True):'
144 146 def set_anonymous_access(enable=True):
145 147 sa = get_session()
146 148 user = sa.query(User).filter(User.username == 'default').one()
149 sa.expire(user)
147 150 user.active = enable
148 151 sa.add(user)
149 152 sa.commit()
150 153 sa.remove()
151
152 print 'anonymous access is now:',enable
154 import time;time.sleep(3)
155 print '\tanonymous access is now:', enable
153 156
154 157
155 158 def get_anonymous_access():
@@ -173,13 +176,13 b' def test_clone_with_credentials(no_error'
173 176 except OSError:
174 177 raise
175 178
176 print 'checking if anonymous access is enabled'
179 print '\tchecking if anonymous access is enabled'
177 180 anonymous_access = get_anonymous_access()
178 181 if anonymous_access:
179 print 'enabled, disabling it '
182 print '\tenabled, disabling it '
180 183 set_anonymous_access(enable=False)
181 184 time.sleep(1)
182
185
183 186 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
184 187 {'user':USER,
185 188 'pass':PASS,
@@ -206,13 +209,13 b' def test_clone_anonymous():'
206 209 raise
207 210
208 211
209 print 'checking if anonymous access is enabled'
212 print '\tchecking if anonymous access is enabled'
210 213 anonymous_access = get_anonymous_access()
211 214 if not anonymous_access:
212 print 'not enabled, enabling it '
215 print '\tnot enabled, enabling it '
213 216 set_anonymous_access(enable=True)
214 217 time.sleep(1)
215
218
216 219 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
217 220 {'user':USER,
218 221 'pass':PASS,
@@ -227,7 +230,7 b' def test_clone_anonymous():'
227 230
228 231 #disable if it was enabled
229 232 if not anonymous_access:
230 print 'disabling anonymous access'
233 print '\tdisabling anonymous access'
231 234 set_anonymous_access(enable=False)
232 235
233 236 @test_wrapp
@@ -241,12 +244,12 b' def test_clone_wrong_credentials():'
241 244 except OSError:
242 245 raise
243 246
244 print 'checking if anonymous access is enabled'
247 print '\tchecking if anonymous access is enabled'
245 248 anonymous_access = get_anonymous_access()
246 249 if anonymous_access:
247 print 'enabled, disabling it '
250 print '\tenabled, disabling it '
248 251 set_anonymous_access(enable=False)
249
252
250 253 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
251 254 {'user':USER + 'error',
252 255 'pass':PASS,
@@ -257,7 +260,7 b' def test_clone_wrong_credentials():'
257 260 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
258 261
259 262 if not """abort: authorization failed""" in stderr:
260 raise Exception('Failure')
263 raise Exception('Failure')
261 264
262 265 @test_wrapp
263 266 def test_pull():
@@ -335,7 +338,7 b' def test_push_wrong_path():'
335 338 try:
336 339 shutil.rmtree(path, ignore_errors=True)
337 340 os.makedirs(path)
338 print 'made dirs %s' % jn(path)
341 print '\tmade dirs %s' % jn(path)
339 342 except OSError:
340 343 raise
341 344
@@ -361,19 +364,37 b' def test_push_wrong_path():'
361 364 if not """abort: HTTP Error 403: Forbidden""" in stderr:
362 365 raise Exception('Failure')
363 366
367 @test_wrapp
368 def get_logs():
369 sa = get_session()
370 return len(sa.query(UserLog).all())
371
372 @test_wrapp
373 def test_logs(initial):
374 sa = get_session()
375 logs = sa.query(UserLog).all()
376 operations = 7
377 if initial + operations != len(logs):
378 raise Exception("missing number of logs %s vs %s" % (initial, len(logs)))
379
364 380
365 381 if __name__ == '__main__':
366 382 create_test_user(force=False)
367 383 create_test_repo()
368
384
385 initial_logs = get_logs()
386
369 387 # test_push_modify_file()
370 388 test_clone_with_credentials()
371 389 test_clone_wrong_credentials()
372 390
373 391
374 392 test_push_new_file(commits=2, with_clone=True)
375 #
393
394 test_clone_anonymous()
376 395 test_push_wrong_path()
377
378 test_clone_anonymous()
379 test_push_wrong_credentials() No newline at end of file
396
397
398 test_push_wrong_credentials()
399
400 test_logs(initial_logs)
General Comments 0
You need to be logged in to leave comments. Login now