##// END OF EJS Templates
updated testing script for hg operations
marcink -
r1008:a9421a8a beta
parent child Browse files
Show More
@@ -1,276 +1,303
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :copyright: (c) 2010 by marcink.
10 10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 11 """
12 12
13 13 import os
14 14 import shutil
15 15 import logging
16 16 from os.path import join as jn
17 17
18 18 from tempfile import _RandomNameSequence
19 19 from subprocess import Popen, PIPE
20 20
21 21 from paste.deploy import appconfig
22 22 from pylons import config
23 23 from sqlalchemy import engine_from_config
24 24
25 25 from rhodecode.lib.utils import add_cache
26 26 from rhodecode.model import init_model
27 27 from rhodecode.model import meta
28 from rhodecode.model.db import User
28 from rhodecode.model.db import User, Repository
29 29 from rhodecode.lib.auth import get_crypt_password
30 30
31 31 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
32 32 from rhodecode.config.environment import load_environment
33 33
34 34 conf = appconfig('config:development.ini', relative_to='./../../')
35 35 load_environment(conf.global_conf, conf.local_conf)
36 36
37 37 add_cache(conf)
38 38
39 39 USER = 'test_admin'
40 40 PASS = 'test12'
41 41 HOST = '127.0.0.1:5000'
42 42 DEBUG = True
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 class Command(object):
47 47
48 48 def __init__(self, cwd):
49 49 self.cwd = cwd
50 50
51 51 def execute(self, cmd, *args):
52 52 """Runs command on the system with given ``args``.
53 53 """
54 54
55 55 command = cmd + ' ' + ' '.join(args)
56 56 log.debug('Executing %s' % command)
57 57 if DEBUG:
58 58 print command
59 59 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
60 60 stdout, stderr = p.communicate()
61 61 if DEBUG:
62 62 print stdout, stderr
63 63 return stdout, stderr
64 64
65 65 def get_session():
66 66 engine = engine_from_config(conf, 'sqlalchemy.db1.')
67 67 init_model(engine)
68 68 sa = meta.Session()
69 69 return sa
70 70
71 71
72 72 def create_test_user(force=True):
73 print 'creating test user'
73 74 sa = get_session()
74 75
75 76 user = sa.query(User).filter(User.username == USER).scalar()
76 77
77 if force and user:
78 if force and user is not None:
79 print 'removing current user'
78 80 sa.delete(user)
79 81 sa.commit()
80 82
81 83 if user is None or force:
84 print 'creating new one'
82 85 new_usr = User()
83 86 new_usr.username = USER
84 87 new_usr.password = get_crypt_password(PASS)
85 88 new_usr.email = 'mail@mail.com'
86 89 new_usr.name = 'test'
87 90 new_usr.lastname = 'lasttestname'
88 91 new_usr.active = True
89
92 new_usr.admin = True
90 93 sa.add(new_usr)
91 94 sa.commit()
92 95
96 print 'done'
97
98
99 def create_test_repo(force=True):
100 from rhodecode.model.repo import RepoModel
101 sa = get_session()
102
103 user = sa.query(User).filter(User.username == USER).scalar()
104 if user is None:
105 raise Exception('user not found')
93 106
94 107
108 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
109
110 if repo is None:
111 print 'repo not found creating'
112
113 form_data = {'repo_name':HG_REPO,
114 'repo_type':'hg',
115 'private':False, }
116 rm = RepoModel(sa)
117 rm.base_path = '/home/hg'
118 rm.create(form_data, user)
95 119
96 120 #==============================================================================
97 121 # TESTS
98 122 #==============================================================================
99 def test_clone():
123 def test_clone(no_errors=False):
100 124 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
101 125
102 126 try:
103 127 shutil.rmtree(path, ignore_errors=True)
104 128 os.makedirs(path)
105 129 #print 'made dirs %s' % jn(path)
106 130 except OSError:
107 131 raise
108 132
109 133
110 134 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
111 135 {'user':USER,
112 136 'pass':PASS,
113 137 'host':HOST,
114 138 'cloned_repo':HG_REPO,
115 139 'dest':path}
116 140
117 141 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
118 142
143 if no_errors is False:
119 144 assert """adding file changes""" in stdout, 'no messages about cloning'
120 145 assert """abort""" not in stderr , 'got error from clone'
121 146
122 147
123 148
124 149 def test_clone_anonymous_ok():
125 150 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
126 151
127 152 try:
128 153 shutil.rmtree(path, ignore_errors=True)
129 154 os.makedirs(path)
130 155 #print 'made dirs %s' % jn(path)
131 156 except OSError:
132 157 raise
133 158
134 159
135 160 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
136 161 {'user':USER,
137 162 'pass':PASS,
138 163 'host':HOST,
139 164 'cloned_repo':HG_REPO,
140 165 'dest':path}
141 166
142 167 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
143 168 print stdout, stderr
144 169 assert """adding file changes""" in stdout, 'no messages about cloning'
145 170 assert """abort""" not in stderr , 'got error from clone'
146 171
147 172 def test_clone_wrong_credentials():
148 173 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
149 174
150 175 try:
151 176 shutil.rmtree(path, ignore_errors=True)
152 177 os.makedirs(path)
153 178 #print 'made dirs %s' % jn(path)
154 179 except OSError:
155 180 raise
156 181
157 182
158 183 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
159 184 {'user':USER + 'error',
160 185 'pass':PASS,
161 186 'host':HOST,
162 187 'cloned_repo':HG_REPO,
163 188 'dest':path}
164 189
165 190 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
166 191
167 192 assert """abort: authorization failed""" in stderr , 'no error from wrong credentials'
168 193
169 194
170 195 def test_pull():
171 196 pass
172 197
173 def test_push():
174
175 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
198 def test_push_modify_file(f_name='setup.py'):
199 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
200 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
176 201 for i in xrange(5):
177 202 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
178 203 Command(cwd).execute(cmd)
179 204
180 205 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
181 206 Command(cwd).execute(cmd)
182 207
183 208 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
184 209
185 210 def test_push_new_file(commits=15):
186 211
187 test_clone()
212 test_clone(no_errors=True)
188 213
189 214 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
190 215 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
191 216
192 217 Command(cwd).execute('touch %s' % added_file)
193 218
194 219 Command(cwd).execute('hg add %s' % added_file)
195 220
196 221 for i in xrange(commits):
197 222 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
198 223 Command(cwd).execute(cmd)
199 224
200 225 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
201 226 Command(cwd).execute(cmd)
202 227
203 228 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
204 229 {'user':USER,
205 230 'pass':PASS,
206 231 'host':HOST,
207 232 'cloned_repo':HG_REPO,
208 233 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
209 234
210 235 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
211 236
212 237 def test_push_wrong_credentials():
213
238 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
214 239 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
215 240 {'user':USER + 'xxx',
216 241 'pass':PASS,
217 242 'host':HOST,
218 243 'cloned_repo':HG_REPO,
219 244 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
220 245
221 246 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
222 247 for i in xrange(5):
223 248 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
224 249 Command(cwd).execute(cmd)
225 250
226 251 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
227 252 Command(cwd).execute(cmd)
228 253
229 254 Command(cwd).execute('hg push %s' % clone_url)
230 255
231 256 def test_push_wrong_path():
232 257 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
233 258 added_file = jn(path, 'somefile.py')
234 259
235 260 try:
236 261 shutil.rmtree(path, ignore_errors=True)
237 262 os.makedirs(path)
238 263 print 'made dirs %s' % jn(path)
239 264 except OSError:
240 265 raise
241 266
242 267 Command(cwd).execute("""echo '' > %s""" % added_file)
243 268 Command(cwd).execute("""hg init %s""" % path)
244 269 Command(cwd).execute("""hg add %s""" % added_file)
245 270
246 271 for i in xrange(2):
247 272 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
248 273 Command(cwd).execute(cmd)
249 274
250 275 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
251 276 Command(cwd).execute(cmd)
252 277
253 278 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
254 279 {'user':USER,
255 280 'pass':PASS,
256 281 'host':HOST,
257 282 'cloned_repo':HG_REPO + '_error',
258 283 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
259 284
260 285 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
261 286 assert """abort: HTTP Error 403: Forbidden""" in stderr
262 287
263 288
264 289 if __name__ == '__main__':
265 #create_test_user()
266 test_clone()
267 test_clone_anonymous_ok()
290 create_test_user(force=False)
291 create_test_repo()
292 #test_push_modify_file()
293 #test_clone()
294 #test_clone_anonymous_ok()
268 295
269 296 #test_clone_wrong_credentials()
270 297
271 298 #test_pull()
272 #test_push_new_file(3)
299 test_push_new_file(commits=3)
273 300 #test_push_wrong_path()
274 301 #test_push_wrong_credentials()
275 302
276 303
General Comments 0
You need to be logged in to leave comments. Login now