##// END OF EJS Templates
small fixes to test_hg_operations script
marcink -
r1296:1bd6ed0f beta
parent child Browse files
Show More
@@ -1,332 +1,337
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :copyright: (c) 2010 by marcink.
10 10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 11 """
12 12
13 13 import os
14 14 import shutil
15 15 import logging
16 16 from os.path import join as jn
17 17 from os.path import dirname as dn
18 18
19 19 from tempfile import _RandomNameSequence
20 20 from subprocess import Popen, PIPE
21 21
22 22 from paste.deploy import appconfig
23 23 from pylons import config
24 24 from sqlalchemy import engine_from_config
25 25
26 26 from rhodecode.lib.utils import add_cache
27 27 from rhodecode.model import init_model
28 28 from rhodecode.model import meta
29 29 from rhodecode.model.db import User, Repository
30 30 from rhodecode.lib.auth import get_crypt_password
31 31
32 32 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
33 33 from rhodecode.config.environment import load_environment
34 34
35 35 rel_path = dn(dn(dn(os.path.abspath(__file__))))
36 36 conf = appconfig('config:development.ini', relative_to=rel_path)
37 37 load_environment(conf.global_conf, conf.local_conf)
38 38
39 39 add_cache(conf)
40 40
41 41 USER = 'test_admin'
42 42 PASS = 'test12'
43 43 HOST = '127.0.0.1:5000'
44 44 DEBUG = True
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class Command(object):
49 49
50 50 def __init__(self, cwd):
51 51 self.cwd = cwd
52 52
53 53 def execute(self, cmd, *args):
54 54 """Runs command on the system with given ``args``.
55 55 """
56 56
57 57 command = cmd + ' ' + ' '.join(args)
58 58 log.debug('Executing %s' % command)
59 59 if DEBUG:
60 60 print command
61 61 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
62 62 stdout, stderr = p.communicate()
63 63 if DEBUG:
64 64 print stdout, stderr
65 65 return stdout, stderr
66 66
67 67 def get_session():
68 68 engine = engine_from_config(conf, 'sqlalchemy.db1.')
69 69 init_model(engine)
70 70 sa = meta.Session()
71 71 return sa
72 72
73 73
74 74 def create_test_user(force=True):
75 75 print 'creating test user'
76 76 sa = get_session()
77 77
78 78 user = sa.query(User).filter(User.username == USER).scalar()
79 79
80 80 if force and user is not None:
81 81 print 'removing current user'
82 82 for repo in sa.query(Repository).filter(Repository.user == user).all():
83 83 sa.delete(repo)
84 84 sa.delete(user)
85 85 sa.commit()
86 86
87 87 if user is None or force:
88 88 print 'creating new one'
89 89 new_usr = User()
90 90 new_usr.username = USER
91 91 new_usr.password = get_crypt_password(PASS)
92 92 new_usr.email = 'mail@mail.com'
93 93 new_usr.name = 'test'
94 94 new_usr.lastname = 'lasttestname'
95 95 new_usr.active = True
96 96 new_usr.admin = True
97 97 sa.add(new_usr)
98 98 sa.commit()
99 99
100 100 print 'done'
101 101
102 102
103 103 def create_test_repo(force=True):
104 104 from rhodecode.model.repo import RepoModel
105 105 sa = get_session()
106 106
107 107 user = sa.query(User).filter(User.username == USER).scalar()
108 108 if user is None:
109 109 raise Exception('user not found')
110 110
111 111
112 112 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
113 113
114 114 if repo is None:
115 115 print 'repo not found creating'
116 116
117 117 form_data = {'repo_name':HG_REPO,
118 118 'repo_type':'hg',
119 'private':False, }
119 'private':False,
120 'clone_uri':'' }
120 121 rm = RepoModel(sa)
121 122 rm.base_path = '/home/hg'
122 123 rm.create(form_data, user)
123 124
124 125
125 126 def set_anonymous_access(enable=True):
126 127 sa = get_session()
127 128 user = sa.query(User).filter(User.username == 'default').one()
128 129 user.active = enable
129 130 sa.add(user)
130 131 sa.commit()
131 132
132 133 def get_anonymous_access():
133 134 sa = get_session()
134 135 return sa.query(User).filter(User.username == 'default').one().active
135 136
136 137
137 138 #==============================================================================
138 139 # TESTS
139 140 #==============================================================================
140 141 def test_clone(no_errors=False):
141 142 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
142 143
143 144 try:
144 145 shutil.rmtree(path, ignore_errors=True)
145 146 os.makedirs(path)
146 147 #print 'made dirs %s' % jn(path)
147 148 except OSError:
148 149 raise
149 150
150 151
151 152 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
152 153 {'user':USER,
153 154 'pass':PASS,
154 155 'host':HOST,
155 156 'cloned_repo':HG_REPO,
156 157 'dest':path}
157 158
158 159 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
159 160
160 161 if no_errors is False:
161 162 assert """adding file changes""" in stdout, 'no messages about cloning'
162 163 assert """abort""" not in stderr , 'got error from clone'
163 164
164 165
165 166
166 167 def test_clone_anonymous_ok():
167 168 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
168 169
169 170 try:
170 171 shutil.rmtree(path, ignore_errors=True)
171 172 os.makedirs(path)
172 173 #print 'made dirs %s' % jn(path)
173 174 except OSError:
174 175 raise
175 176
176 177
177 178 print 'checking if anonymous access is enabled'
178 179 anonymous_access = get_anonymous_access()
179 180 if not anonymous_access:
180 181 print 'not enabled, enabling it '
181 182 set_anonymous_access(enable=True)
182 183
183 184 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
184 185 {'user':USER,
185 186 'pass':PASS,
186 187 'host':HOST,
187 188 'cloned_repo':HG_REPO,
188 189 'dest':path}
189 190
190 191 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
191 192 print stdout, stderr
192 193
193 194
194 195 assert """adding file changes""" in stdout, 'no messages about cloning'
195 196 assert """abort""" not in stderr , 'got error from clone'
196 197
197 198 #disable if it was enabled
198 199 if not anonymous_access:
199 200 print 'disabling anonymous access'
200 201 set_anonymous_access(enable=False)
201 202
202 203
203 204 def test_clone_wrong_credentials():
204 205 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
205 206
206 207 try:
207 208 shutil.rmtree(path, ignore_errors=True)
208 209 os.makedirs(path)
209 210 #print 'made dirs %s' % jn(path)
210 211 except OSError:
211 212 raise
212 213
213 214
214 215 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
215 216 {'user':USER + 'error',
216 217 'pass':PASS,
217 218 'host':HOST,
218 219 'cloned_repo':HG_REPO,
219 220 'dest':path}
220 221
221 222 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
222 223
223 224 assert """abort: authorization failed""" in stderr , 'no error from wrong credentials'
224 225
225 226
226 227 def test_pull():
227 228 pass
228 229
229 230 def test_push_modify_file(f_name='setup.py'):
230 231 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
231 232 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
232 233 for i in xrange(5):
233 234 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
234 235 Command(cwd).execute(cmd)
235 236
236 237 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
237 238 Command(cwd).execute(cmd)
238 239
239 240 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
240 241
241 def test_push_new_file(commits=15):
242 def test_push_new_file(commits=15, with_clone=True):
242 243
243 test_clone(no_errors=True)
244 if with_clone:
245 test_clone(no_errors=True)
244 246
245 247 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
246 248 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
247 249
248 250 Command(cwd).execute('touch %s' % added_file)
249 251
250 252 Command(cwd).execute('hg add %s' % added_file)
251 253
252 254 for i in xrange(commits):
253 255 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
254 256 Command(cwd).execute(cmd)
255 257
256 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
258 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
259 'Marcin KuΕΊminski <marcin@python-blog.com>',
260 added_file)
257 261 Command(cwd).execute(cmd)
258 262
259 263 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
260 264 {'user':USER,
261 265 'pass':PASS,
262 266 'host':HOST,
263 267 'cloned_repo':HG_REPO,
264 268 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
265 269
266 270 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
267 271
268 272 def test_push_wrong_credentials():
269 273 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
270 274 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
271 275 {'user':USER + 'xxx',
272 276 'pass':PASS,
273 277 'host':HOST,
274 278 'cloned_repo':HG_REPO,
275 279 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
276 280
277 281 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
278 282 for i in xrange(5):
279 283 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
280 284 Command(cwd).execute(cmd)
281 285
282 286 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
283 287 Command(cwd).execute(cmd)
284 288
285 289 Command(cwd).execute('hg push %s' % clone_url)
286 290
287 291 def test_push_wrong_path():
288 292 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
289 293 added_file = jn(path, 'somefile.py')
290 294
291 295 try:
292 296 shutil.rmtree(path, ignore_errors=True)
293 297 os.makedirs(path)
294 298 print 'made dirs %s' % jn(path)
295 299 except OSError:
296 300 raise
297 301
298 302 Command(cwd).execute("""echo '' > %s""" % added_file)
299 303 Command(cwd).execute("""hg init %s""" % path)
300 304 Command(cwd).execute("""hg add %s""" % added_file)
301 305
302 306 for i in xrange(2):
303 307 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
304 308 Command(cwd).execute(cmd)
305 309
306 310 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
307 311 Command(cwd).execute(cmd)
308 312
309 313 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
310 314 {'user':USER,
311 315 'pass':PASS,
312 316 'host':HOST,
313 317 'cloned_repo':HG_REPO + '_error',
314 318 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
315 319
316 320 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
317 321 assert """abort: HTTP Error 403: Forbidden""" in stderr
318 322
319 323
320 324 if __name__ == '__main__':
321 325 create_test_user(force=False)
322 326 create_test_repo()
323 327 #test_push_modify_file()
324 328 #test_clone()
325 329 #test_clone_anonymous_ok()
326 330
327 331 #test_clone_wrong_credentials()
328 332
329 #test_pull()
330 test_push_new_file(commits=15)
333 test_pull()
334 test_push_new_file(commits=2, with_clone=True)
335
331 336 #test_push_wrong_path()
332 337 #test_push_wrong_credentials()
General Comments 0
You need to be logged in to leave comments. Login now