##// END OF EJS Templates
updated testing script for hg operations
marcink -
r1008:a9421a8a beta
parent child Browse files
Show More
@@ -1,276 +1,303
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_hg_operations
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations
6 Test suite for making push/pull operations
7
7
8 :created_on: Dec 30, 2010
8 :created_on: Dec 30, 2010
9 :copyright: (c) 2010 by marcink.
9 :copyright: (c) 2010 by marcink.
10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 """
11 """
12
12
13 import os
13 import os
14 import shutil
14 import shutil
15 import logging
15 import logging
16 from os.path import join as jn
16 from os.path import join as jn
17
17
18 from tempfile import _RandomNameSequence
18 from tempfile import _RandomNameSequence
19 from subprocess import Popen, PIPE
19 from subprocess import Popen, PIPE
20
20
21 from paste.deploy import appconfig
21 from paste.deploy import appconfig
22 from pylons import config
22 from pylons import config
23 from sqlalchemy import engine_from_config
23 from sqlalchemy import engine_from_config
24
24
25 from rhodecode.lib.utils import add_cache
25 from rhodecode.lib.utils import add_cache
26 from rhodecode.model import init_model
26 from rhodecode.model import init_model
27 from rhodecode.model import meta
27 from rhodecode.model import meta
28 from rhodecode.model.db import User
28 from rhodecode.model.db import User, Repository
29 from rhodecode.lib.auth import get_crypt_password
29 from rhodecode.lib.auth import get_crypt_password
30
30
31 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
31 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
32 from rhodecode.config.environment import load_environment
32 from rhodecode.config.environment import load_environment
33
33
34 conf = appconfig('config:development.ini', relative_to='./../../')
34 conf = appconfig('config:development.ini', relative_to='./../../')
35 load_environment(conf.global_conf, conf.local_conf)
35 load_environment(conf.global_conf, conf.local_conf)
36
36
37 add_cache(conf)
37 add_cache(conf)
38
38
39 USER = 'test_admin'
39 USER = 'test_admin'
40 PASS = 'test12'
40 PASS = 'test12'
41 HOST = '127.0.0.1:5000'
41 HOST = '127.0.0.1:5000'
42 DEBUG = True
42 DEBUG = True
43 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
44
44
45
45
46 class Command(object):
46 class Command(object):
47
47
48 def __init__(self, cwd):
48 def __init__(self, cwd):
49 self.cwd = cwd
49 self.cwd = cwd
50
50
51 def execute(self, cmd, *args):
51 def execute(self, cmd, *args):
52 """Runs command on the system with given ``args``.
52 """Runs command on the system with given ``args``.
53 """
53 """
54
54
55 command = cmd + ' ' + ' '.join(args)
55 command = cmd + ' ' + ' '.join(args)
56 log.debug('Executing %s' % command)
56 log.debug('Executing %s' % command)
57 if DEBUG:
57 if DEBUG:
58 print command
58 print command
59 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
59 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
60 stdout, stderr = p.communicate()
60 stdout, stderr = p.communicate()
61 if DEBUG:
61 if DEBUG:
62 print stdout, stderr
62 print stdout, stderr
63 return stdout, stderr
63 return stdout, stderr
64
64
65 def get_session():
65 def get_session():
66 engine = engine_from_config(conf, 'sqlalchemy.db1.')
66 engine = engine_from_config(conf, 'sqlalchemy.db1.')
67 init_model(engine)
67 init_model(engine)
68 sa = meta.Session()
68 sa = meta.Session()
69 return sa
69 return sa
70
70
71
71
72 def create_test_user(force=True):
72 def create_test_user(force=True):
73 print 'creating test user'
73 sa = get_session()
74 sa = get_session()
74
75
75 user = sa.query(User).filter(User.username == USER).scalar()
76 user = sa.query(User).filter(User.username == USER).scalar()
76
77
77 if force and user:
78 if force and user is not None:
79 print 'removing current user'
78 sa.delete(user)
80 sa.delete(user)
79 sa.commit()
81 sa.commit()
80
82
81 if user is None or force:
83 if user is None or force:
84 print 'creating new one'
82 new_usr = User()
85 new_usr = User()
83 new_usr.username = USER
86 new_usr.username = USER
84 new_usr.password = get_crypt_password(PASS)
87 new_usr.password = get_crypt_password(PASS)
85 new_usr.email = 'mail@mail.com'
88 new_usr.email = 'mail@mail.com'
86 new_usr.name = 'test'
89 new_usr.name = 'test'
87 new_usr.lastname = 'lasttestname'
90 new_usr.lastname = 'lasttestname'
88 new_usr.active = True
91 new_usr.active = True
89
92 new_usr.admin = True
90 sa.add(new_usr)
93 sa.add(new_usr)
91 sa.commit()
94 sa.commit()
92
95
96 print 'done'
97
98
99 def create_test_repo(force=True):
100 from rhodecode.model.repo import RepoModel
101 sa = get_session()
102
103 user = sa.query(User).filter(User.username == USER).scalar()
104 if user is None:
105 raise Exception('user not found')
93
106
94
107
108 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
109
110 if repo is None:
111 print 'repo not found creating'
112
113 form_data = {'repo_name':HG_REPO,
114 'repo_type':'hg',
115 'private':False, }
116 rm = RepoModel(sa)
117 rm.base_path = '/home/hg'
118 rm.create(form_data, user)
95
119
96 #==============================================================================
120 #==============================================================================
97 # TESTS
121 # TESTS
98 #==============================================================================
122 #==============================================================================
99 def test_clone():
123 def test_clone(no_errors=False):
100 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
124 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
101
125
102 try:
126 try:
103 shutil.rmtree(path, ignore_errors=True)
127 shutil.rmtree(path, ignore_errors=True)
104 os.makedirs(path)
128 os.makedirs(path)
105 #print 'made dirs %s' % jn(path)
129 #print 'made dirs %s' % jn(path)
106 except OSError:
130 except OSError:
107 raise
131 raise
108
132
109
133
110 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
134 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
111 {'user':USER,
135 {'user':USER,
112 'pass':PASS,
136 'pass':PASS,
113 'host':HOST,
137 'host':HOST,
114 'cloned_repo':HG_REPO,
138 'cloned_repo':HG_REPO,
115 'dest':path}
139 'dest':path}
116
140
117 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
141 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
118
142
119 assert """adding file changes""" in stdout, 'no messages about cloning'
143 if no_errors is False:
120 assert """abort""" not in stderr , 'got error from clone'
144 assert """adding file changes""" in stdout, 'no messages about cloning'
145 assert """abort""" not in stderr , 'got error from clone'
121
146
122
147
123
148
124 def test_clone_anonymous_ok():
149 def test_clone_anonymous_ok():
125 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
150 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
126
151
127 try:
152 try:
128 shutil.rmtree(path, ignore_errors=True)
153 shutil.rmtree(path, ignore_errors=True)
129 os.makedirs(path)
154 os.makedirs(path)
130 #print 'made dirs %s' % jn(path)
155 #print 'made dirs %s' % jn(path)
131 except OSError:
156 except OSError:
132 raise
157 raise
133
158
134
159
135 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
160 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
136 {'user':USER,
161 {'user':USER,
137 'pass':PASS,
162 'pass':PASS,
138 'host':HOST,
163 'host':HOST,
139 'cloned_repo':HG_REPO,
164 'cloned_repo':HG_REPO,
140 'dest':path}
165 'dest':path}
141
166
142 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
167 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
143 print stdout, stderr
168 print stdout, stderr
144 assert """adding file changes""" in stdout, 'no messages about cloning'
169 assert """adding file changes""" in stdout, 'no messages about cloning'
145 assert """abort""" not in stderr , 'got error from clone'
170 assert """abort""" not in stderr , 'got error from clone'
146
171
147 def test_clone_wrong_credentials():
172 def test_clone_wrong_credentials():
148 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
173 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
149
174
150 try:
175 try:
151 shutil.rmtree(path, ignore_errors=True)
176 shutil.rmtree(path, ignore_errors=True)
152 os.makedirs(path)
177 os.makedirs(path)
153 #print 'made dirs %s' % jn(path)
178 #print 'made dirs %s' % jn(path)
154 except OSError:
179 except OSError:
155 raise
180 raise
156
181
157
182
158 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
183 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
159 {'user':USER + 'error',
184 {'user':USER + 'error',
160 'pass':PASS,
185 'pass':PASS,
161 'host':HOST,
186 'host':HOST,
162 'cloned_repo':HG_REPO,
187 'cloned_repo':HG_REPO,
163 'dest':path}
188 'dest':path}
164
189
165 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
190 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
166
191
167 assert """abort: authorization failed""" in stderr , 'no error from wrong credentials'
192 assert """abort: authorization failed""" in stderr , 'no error from wrong credentials'
168
193
169
194
170 def test_pull():
195 def test_pull():
171 pass
196 pass
172
197
173 def test_push():
198 def test_push_modify_file(f_name='setup.py'):
174
199 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
175 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
200 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
176 for i in xrange(5):
201 for i in xrange(5):
177 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
202 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
178 Command(cwd).execute(cmd)
203 Command(cwd).execute(cmd)
179
204
180 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
205 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
181 Command(cwd).execute(cmd)
206 Command(cwd).execute(cmd)
182
207
183 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
208 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
184
209
185 def test_push_new_file(commits=15):
210 def test_push_new_file(commits=15):
186
211
187 test_clone()
212 test_clone(no_errors=True)
188
213
189 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
214 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
190 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
215 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
191
216
192 Command(cwd).execute('touch %s' % added_file)
217 Command(cwd).execute('touch %s' % added_file)
193
218
194 Command(cwd).execute('hg add %s' % added_file)
219 Command(cwd).execute('hg add %s' % added_file)
195
220
196 for i in xrange(commits):
221 for i in xrange(commits):
197 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
222 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
198 Command(cwd).execute(cmd)
223 Command(cwd).execute(cmd)
199
224
200 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
225 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
201 Command(cwd).execute(cmd)
226 Command(cwd).execute(cmd)
202
227
203 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
228 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
204 {'user':USER,
229 {'user':USER,
205 'pass':PASS,
230 'pass':PASS,
206 'host':HOST,
231 'host':HOST,
207 'cloned_repo':HG_REPO,
232 'cloned_repo':HG_REPO,
208 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
233 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
209
234
210 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
235 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
211
236
212 def test_push_wrong_credentials():
237 def test_push_wrong_credentials():
213
238 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
214 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
239 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
215 {'user':USER + 'xxx',
240 {'user':USER + 'xxx',
216 'pass':PASS,
241 'pass':PASS,
217 'host':HOST,
242 'host':HOST,
218 'cloned_repo':HG_REPO,
243 'cloned_repo':HG_REPO,
219 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
244 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
220
245
221 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
246 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
222 for i in xrange(5):
247 for i in xrange(5):
223 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
248 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
224 Command(cwd).execute(cmd)
249 Command(cwd).execute(cmd)
225
250
226 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
251 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
227 Command(cwd).execute(cmd)
252 Command(cwd).execute(cmd)
228
253
229 Command(cwd).execute('hg push %s' % clone_url)
254 Command(cwd).execute('hg push %s' % clone_url)
230
255
231 def test_push_wrong_path():
256 def test_push_wrong_path():
232 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
257 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
233 added_file = jn(path, 'somefile.py')
258 added_file = jn(path, 'somefile.py')
234
259
235 try:
260 try:
236 shutil.rmtree(path, ignore_errors=True)
261 shutil.rmtree(path, ignore_errors=True)
237 os.makedirs(path)
262 os.makedirs(path)
238 print 'made dirs %s' % jn(path)
263 print 'made dirs %s' % jn(path)
239 except OSError:
264 except OSError:
240 raise
265 raise
241
266
242 Command(cwd).execute("""echo '' > %s""" % added_file)
267 Command(cwd).execute("""echo '' > %s""" % added_file)
243 Command(cwd).execute("""hg init %s""" % path)
268 Command(cwd).execute("""hg init %s""" % path)
244 Command(cwd).execute("""hg add %s""" % added_file)
269 Command(cwd).execute("""hg add %s""" % added_file)
245
270
246 for i in xrange(2):
271 for i in xrange(2):
247 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
272 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
248 Command(cwd).execute(cmd)
273 Command(cwd).execute(cmd)
249
274
250 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
275 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
251 Command(cwd).execute(cmd)
276 Command(cwd).execute(cmd)
252
277
253 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
278 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
254 {'user':USER,
279 {'user':USER,
255 'pass':PASS,
280 'pass':PASS,
256 'host':HOST,
281 'host':HOST,
257 'cloned_repo':HG_REPO + '_error',
282 'cloned_repo':HG_REPO + '_error',
258 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
283 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
259
284
260 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
285 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
261 assert """abort: HTTP Error 403: Forbidden""" in stderr
286 assert """abort: HTTP Error 403: Forbidden""" in stderr
262
287
263
288
264 if __name__ == '__main__':
289 if __name__ == '__main__':
265 #create_test_user()
290 create_test_user(force=False)
266 test_clone()
291 create_test_repo()
267 test_clone_anonymous_ok()
292 #test_push_modify_file()
293 #test_clone()
294 #test_clone_anonymous_ok()
268
295
269 #test_clone_wrong_credentials()
296 #test_clone_wrong_credentials()
270
297
271 #test_pull()
298 #test_pull()
272 #test_push_new_file(3)
299 test_push_new_file(commits=3)
273 #test_push_wrong_path()
300 #test_push_wrong_path()
274 #test_push_wrong_credentials()
301 #test_push_wrong_credentials()
275
302
276
303
General Comments 0
You need to be logged in to leave comments. Login now