##// END OF EJS Templates
Fixed test_hg_operations test and added concurency test
marcink -
r1529:0b268dd3 beta
parent child Browse files
Show More
@@ -0,0 +1,177 b''
1 # -*- coding: utf-8 -*-
2 """
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
6 Test suite for making push/pull operations
7
8 :created_on: Dec 30, 2010
9 :copyright: (c) 2010 by marcink.
10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 """
12
13 import os
14 import sys
15 import shutil
16 import logging
17 from os.path import join as jn
18 from os.path import dirname as dn
19
20 from tempfile import _RandomNameSequence
21 from subprocess import Popen, PIPE
22
23 from paste.deploy import appconfig
24 from pylons import config
25 from sqlalchemy import engine_from_config
26
27 from rhodecode.lib.utils import add_cache
28 from rhodecode.model import init_model
29 from rhodecode.model import meta
30 from rhodecode.model.db import User, Repository
31 from rhodecode.lib.auth import get_crypt_password
32
33 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
34 from rhodecode.config.environment import load_environment
35
36 rel_path = dn(dn(dn(os.path.abspath(__file__))))
37 conf = appconfig('config:development.ini', relative_to=rel_path)
38 load_environment(conf.global_conf, conf.local_conf)
39
40 add_cache(conf)
41
42 USER = 'test_admin'
43 PASS = 'test12'
44 HOST = '127.0.0.1:5000'
45 DEBUG = True
46 log = logging.getLogger(__name__)
47
48
49 class Command(object):
50
51 def __init__(self, cwd):
52 self.cwd = cwd
53
54 def execute(self, cmd, *args):
55 """Runs command on the system with given ``args``.
56 """
57
58 command = cmd + ' ' + ' '.join(args)
59 log.debug('Executing %s' % command)
60 if DEBUG:
61 print command
62 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
63 stdout, stderr = p.communicate()
64 if DEBUG:
65 print stdout, stderr
66 return stdout, stderr
67
68 def get_session():
69 engine = engine_from_config(conf, 'sqlalchemy.db1.')
70 init_model(engine)
71 sa = meta.Session()
72 return sa
73
74
75 def create_test_user(force=True):
76 print 'creating test user'
77 sa = get_session()
78
79 user = sa.query(User).filter(User.username == USER).scalar()
80
81 if force and user is not None:
82 print 'removing current user'
83 for repo in sa.query(Repository).filter(Repository.user == user).all():
84 sa.delete(repo)
85 sa.delete(user)
86 sa.commit()
87
88 if user is None or force:
89 print 'creating new one'
90 new_usr = User()
91 new_usr.username = USER
92 new_usr.password = get_crypt_password(PASS)
93 new_usr.email = 'mail@mail.com'
94 new_usr.name = 'test'
95 new_usr.lastname = 'lasttestname'
96 new_usr.active = True
97 new_usr.admin = True
98 sa.add(new_usr)
99 sa.commit()
100
101 print 'done'
102
103
104 def create_test_repo(force=True):
105 print 'creating test repo'
106 from rhodecode.model.repo import RepoModel
107 sa = get_session()
108
109 user = sa.query(User).filter(User.username == USER).scalar()
110 if user is None:
111 raise Exception('user not found')
112
113
114 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
115
116 if repo is None:
117 print 'repo not found creating'
118
119 form_data = {'repo_name':HG_REPO,
120 'repo_type':'hg',
121 'private':False,
122 'clone_uri':'' }
123 rm = RepoModel(sa)
124 rm.base_path = '/home/hg'
125 rm.create(form_data, user)
126
127 print 'done'
128
129 def set_anonymous_access(enable=True):
130 sa = get_session()
131 user = sa.query(User).filter(User.username == 'default').one()
132 user.active = enable
133 sa.add(user)
134 sa.commit()
135
136 def get_anonymous_access():
137 sa = get_session()
138 return sa.query(User).filter(User.username == 'default').one().active
139
140
141 #==============================================================================
142 # TESTS
143 #==============================================================================
144 def test_clone_with_credentials(no_errors=False,repo=HG_REPO):
145 cwd = path = jn(TESTS_TMP_PATH, repo)
146
147
148 try:
149 shutil.rmtree(path, ignore_errors=True)
150 os.makedirs(path)
151 #print 'made dirs %s' % jn(path)
152 except OSError:
153 raise
154
155
156 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
157 {'user':USER,
158 'pass':PASS,
159 'host':HOST,
160 'cloned_repo':repo,
161 'dest':path+_RandomNameSequence().next()}
162
163 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
164
165 if no_errors is False:
166 assert """adding file changes""" in stdout, 'no messages about cloning'
167 assert """abort""" not in stderr , 'got error from clone'
168
169 if __name__ == '__main__':
170 try:
171 create_test_user(force=False)
172
173 for i in range(int(sys.argv[2])):
174 test_clone_with_credentials(repo=sys.argv[1])
175
176 except Exception,e:
177 sys.exit('stop on %s' % e)
@@ -1,287 +1,288 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.middleware.simplehg
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 SimpleHG middleware for handling mercurial protocol request
7 7 (push/clone etc.). It's implemented with basic auth function
8 8
9 9 :created_on: Apr 28, 2010
10 10 :author: marcink
11 11 :copyright: (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
12 12 :license: GPLv3, see COPYING for more details.
13 13 """
14 14 # This program is free software: you can redistribute it and/or modify
15 15 # it under the terms of the GNU General Public License as published by
16 16 # the Free Software Foundation, either version 3 of the License, or
17 17 # (at your option) any later version.
18 18 #
19 19 # This program is distributed in the hope that it will be useful,
20 20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 22 # GNU General Public License for more details.
23 23 #
24 24 # You should have received a copy of the GNU General Public License
25 25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 26
27 27 import os
28 28 import logging
29 29 import traceback
30 30
31 31 from mercurial.error import RepoError
32 32 from mercurial.hgweb import hgweb_mod
33 33
34 34 from paste.auth.basic import AuthBasicAuthenticator
35 35 from paste.httpheaders import REMOTE_USER, AUTH_TYPE
36 36
37 37 from rhodecode.lib import safe_str
38 38 from rhodecode.lib.auth import authfunc, HasPermissionAnyMiddleware
39 39 from rhodecode.lib.utils import make_ui, invalidate_cache, \
40 40 is_valid_repo, ui_sections
41 41 from rhodecode.model.db import User
42 42
43 43 from webob.exc import HTTPNotFound, HTTPForbidden, HTTPInternalServerError
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 def is_mercurial(environ):
49 49 """Returns True if request's target is mercurial server - header
50 50 ``HTTP_ACCEPT`` of such request would start with ``application/mercurial``.
51 51 """
52 52 http_accept = environ.get('HTTP_ACCEPT')
53 53 if http_accept and http_accept.startswith('application/mercurial'):
54 54 return True
55 55 return False
56 56
57 57
58 58 class SimpleHg(object):
59 59
60 60 def __init__(self, application, config):
61 61 self.application = application
62 62 self.config = config
63 63 # base path of repo locations
64 64 self.basepath = self.config['base_path']
65 65 #authenticate this mercurial request using authfunc
66 66 self.authenticate = AuthBasicAuthenticator('', authfunc)
67 67 self.ipaddr = '0.0.0.0'
68 68
69 69 def __call__(self, environ, start_response):
70 70 if not is_mercurial(environ):
71 71 return self.application(environ, start_response)
72 72
73 73 proxy_key = 'HTTP_X_REAL_IP'
74 74 def_key = 'REMOTE_ADDR'
75 75 ipaddr = environ.get(proxy_key, environ.get(def_key, '0.0.0.0'))
76 76
77 77 # skip passing error to error controller
78 78 environ['pylons.status_code_redirect'] = True
79
79
80 80 #======================================================================
81 81 # EXTRACT REPOSITORY NAME FROM ENV
82 82 #======================================================================
83 83 try:
84 84 repo_name = environ['REPO_NAME'] = self.__get_repository(environ)
85 85 log.debug('Extracted repo name is %s' % repo_name)
86 86 except:
87 87 return HTTPInternalServerError()(environ, start_response)
88 88
89 89 #======================================================================
90 90 # GET ACTION PULL or PUSH
91 91 #======================================================================
92 92 action = self.__get_action(environ)
93
93
94 94 #======================================================================
95 95 # CHECK ANONYMOUS PERMISSION
96 96 #======================================================================
97 97 if action in ['pull', 'push']:
98 98 anonymous_user = self.__get_user('default')
99
99 100 username = anonymous_user.username
100 101 anonymous_perm = self.__check_permission(action,
101 102 anonymous_user,
102 103 repo_name)
103 104
104 105 if anonymous_perm is not True or anonymous_user.active is False:
105 106 if anonymous_perm is not True:
106 107 log.debug('Not enough credentials to access this '
107 108 'repository as anonymous user')
108 109 if anonymous_user.active is False:
109 110 log.debug('Anonymous access is disabled, running '
110 111 'authentication')
111 112 #==============================================================
112 113 # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
113 114 # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
114 115 #==============================================================
115 116
116 117 if not REMOTE_USER(environ):
117 118 self.authenticate.realm = \
118 119 safe_str(self.config['rhodecode_realm'])
119 120 result = self.authenticate(environ)
120 121 if isinstance(result, str):
121 122 AUTH_TYPE.update(environ, 'basic')
122 123 REMOTE_USER.update(environ, result)
123 124 else:
124 125 return result.wsgi_application(environ, start_response)
125 126
126 127 #==============================================================
127 128 # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME FROM
128 129 # BASIC AUTH
129 130 #==============================================================
130 131
131 132 if action in ['pull', 'push']:
132 133 username = REMOTE_USER(environ)
133 134 try:
134 135 user = self.__get_user(username)
135 136 username = user.username
136 137 except:
137 138 log.error(traceback.format_exc())
138 139 return HTTPInternalServerError()(environ,
139 140 start_response)
140 141
141 142 #check permissions for this repository
142 143 perm = self.__check_permission(action, user,
143 144 repo_name)
144 145 if perm is not True:
145 146 return HTTPForbidden()(environ, start_response)
146 147
147 148 extras = {'ip': ipaddr,
148 149 'username': username,
149 150 'action': action,
150 151 'repository': repo_name}
151 152
152 153 #======================================================================
153 154 # MERCURIAL REQUEST HANDLING
154 155 #======================================================================
155
156
156 157 repo_path = safe_str(os.path.join(self.basepath, repo_name))
157 158 log.debug('Repository path is %s' % repo_path)
158
159
159 160 baseui = make_ui('db')
160 161 self.__inject_extras(repo_path, baseui, extras)
161
162
162 163
163 164 # quick check if that dir exists...
164 165 if is_valid_repo(repo_name, self.basepath) is False:
165 166 return HTTPNotFound()(environ, start_response)
166 167
167 168 try:
168 169 #invalidate cache on push
169 170 if action == 'push':
170 171 self.__invalidate_cache(repo_name)
171 172
172 173 app = self.__make_app(repo_path, baseui, extras)
173 174 return app(environ, start_response)
174 175 except RepoError, e:
175 176 if str(e).find('not found') != -1:
176 177 return HTTPNotFound()(environ, start_response)
177 178 except Exception:
178 179 log.error(traceback.format_exc())
179 180 return HTTPInternalServerError()(environ, start_response)
180 181
181 182 def __make_app(self, repo_name, baseui, extras):
182 183 """
183 184 Make an wsgi application using hgweb, and inject generated baseui
184 185 instance, additionally inject some extras into ui object
185 186 """
186 187 return hgweb_mod.hgweb(repo_name, name=repo_name, baseui=baseui)
187 188
188 189
189 190 def __check_permission(self, action, user, repo_name):
190 191 """
191 192 Checks permissions using action (push/pull) user and repository
192 193 name
193 194
194 195 :param action: push or pull action
195 196 :param user: user instance
196 197 :param repo_name: repository name
197 198 """
198 199 if action == 'push':
199 200 if not HasPermissionAnyMiddleware('repository.write',
200 201 'repository.admin')(user,
201 202 repo_name):
202 203 return False
203 204
204 205 else:
205 206 #any other action need at least read permission
206 207 if not HasPermissionAnyMiddleware('repository.read',
207 208 'repository.write',
208 209 'repository.admin')(user,
209 210 repo_name):
210 211 return False
211 212
212 213 return True
213 214
214 215 def __get_repository(self, environ):
215 216 """
216 217 Get's repository name out of PATH_INFO header
217 218
218 219 :param environ: environ where PATH_INFO is stored
219 220 """
220 221 try:
221 222 repo_name = '/'.join(environ['PATH_INFO'].split('/')[1:])
222 223 if repo_name.endswith('/'):
223 224 repo_name = repo_name.rstrip('/')
224 225 except:
225 226 log.error(traceback.format_exc())
226 227 raise
227 228
228 229 return repo_name
229 230
230 231 def __get_user(self, username):
231 232 return User.by_username(username)
232 233
233 234 def __get_action(self, environ):
234 235 """
235 236 Maps mercurial request commands into a clone,pull or push command.
236 237 This should always return a valid command string
237 238
238 239 :param environ:
239 240 """
240 241 mapping = {'changegroup': 'pull',
241 242 'changegroupsubset': 'pull',
242 243 'stream_out': 'pull',
243 244 'listkeys': 'pull',
244 245 'unbundle': 'push',
245 246 'pushkey': 'push', }
246 247 for qry in environ['QUERY_STRING'].split('&'):
247 248 if qry.startswith('cmd'):
248 249 cmd = qry.split('=')[-1]
249 250 if cmd in mapping:
250 251 return mapping[cmd]
251 252 else:
252 253 return 'pull'
253 254
254 255 def __invalidate_cache(self, repo_name):
255 256 """we know that some change was made to repositories and we should
256 257 invalidate the cache to see the changes right away but only for
257 258 push requests"""
258 259 invalidate_cache('get_repo_cached_%s' % repo_name)
259 260
260 def __inject_extras(self,repo_path, baseui, extras={}):
261 def __inject_extras(self, repo_path, baseui, extras={}):
261 262 """
262 263 Injects some extra params into baseui instance
263 264
264 265 also overwrites global settings with those takes from local hgrc file
265 266
266 267 :param baseui: baseui instance
267 268 :param extras: dict with extra params to put into baseui
268 269 """
269 270
270 271 hgrc = os.path.join(repo_path, '.hg', 'hgrc')
271 272
272 273 # make our hgweb quiet so it doesn't print output
273 274 baseui.setconfig('ui', 'quiet', 'true')
274 275
275 276 #inject some additional parameters that will be available in ui
276 277 #for hooks
277 278 for k, v in extras.items():
278 279 baseui.setconfig('rhodecode_extras', k, v)
279 280
280 281 repoui = make_ui('file', hgrc, False)
281 282
282 283 if repoui:
283 284 #overwrite our ui instance with the section from hgrc file
284 285 for section in ui_sections:
285 286 for k, v in repoui.configitems(section):
286 287 baseui.setconfig(section, k, v)
287 288
@@ -1,379 +1,400 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :copyright: (c) 2010 by marcink.
10 10 :license: LICENSE_NAME, see LICENSE_FILE for more details.
11 11 """
12 12
13 13 import os
14 14 import time
15 15 import sys
16 16 import shutil
17 17 import logging
18 18
19 19 from os.path import join as jn
20 20 from os.path import dirname as dn
21 21
22 22 from tempfile import _RandomNameSequence
23 23 from subprocess import Popen, PIPE
24 24
25 25 from paste.deploy import appconfig
26 26 from pylons import config
27 27 from sqlalchemy import engine_from_config
28 28
29 29 from rhodecode.lib.utils import add_cache
30 30 from rhodecode.model import init_model
31 31 from rhodecode.model import meta
32 from rhodecode.model.db import User, Repository
32 from rhodecode.model.db import User, Repository, UserLog
33 33 from rhodecode.lib.auth import get_crypt_password
34 34
35 35 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
36 36 from rhodecode.config.environment import load_environment
37 37
38 38 rel_path = dn(dn(dn(os.path.abspath(__file__))))
39 39 conf = appconfig('config:development.ini', relative_to=rel_path)
40 40 load_environment(conf.global_conf, conf.local_conf)
41 41
42 42 add_cache(conf)
43 43
44 44 USER = 'test_admin'
45 45 PASS = 'test12'
46 46 HOST = '127.0.0.1:5000'
47 47 DEBUG = bool(int(sys.argv[1]))
48 print 'DEBUG:',DEBUG
48 print 'DEBUG:', DEBUG
49 49 log = logging.getLogger(__name__)
50 50
51 51
52 52 class Command(object):
53 53
54 54 def __init__(self, cwd):
55 55 self.cwd = cwd
56 56
57 57 def execute(self, cmd, *args):
58 58 """Runs command on the system with given ``args``.
59 59 """
60 60
61 61 command = cmd + ' ' + ' '.join(args)
62 62 log.debug('Executing %s' % command)
63 63 if DEBUG:
64 64 print command
65 65 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
66 66 stdout, stderr = p.communicate()
67 67 if DEBUG:
68 68 print stdout, stderr
69 69 return stdout, stderr
70 70
71 71
72 72 def test_wrapp(func):
73
74 def __wrapp(*args,**kwargs):
75 print '###%s###' %func.__name__
73
74 def __wrapp(*args, **kwargs):
75 print '###%s###' % func.__name__
76 76 try:
77 res = func(*args,**kwargs)
78 except:
79 print '--%s failed--' % func.__name__
80 return
81 print 'ok'
77 res = func(*args, **kwargs)
78 except Exception, e:
79 print ('###############\n-'
80 '--%s failed %s--\n'
81 '###############\n' % (func.__name__, e))
82 sys.exit()
83 print '++OK++'
82 84 return res
83 85 return __wrapp
84 86
85 87 def get_session():
86 88 engine = engine_from_config(conf, 'sqlalchemy.db1.')
87 89 init_model(engine)
88 90 sa = meta.Session
89 91 return sa
90 92
91 93
92 94 def create_test_user(force=True):
93 print 'creating test user'
95 print '\tcreating test user'
94 96 sa = get_session()
95 97
96 98 user = sa.query(User).filter(User.username == USER).scalar()
97 99
98 100 if force and user is not None:
99 print 'removing current user'
101 print '\tremoving current user'
100 102 for repo in sa.query(Repository).filter(Repository.user == user).all():
101 103 sa.delete(repo)
102 104 sa.delete(user)
103 105 sa.commit()
104 106
105 107 if user is None or force:
106 print 'creating new one'
108 print '\tcreating new one'
107 109 new_usr = User()
108 110 new_usr.username = USER
109 111 new_usr.password = get_crypt_password(PASS)
110 112 new_usr.email = 'mail@mail.com'
111 113 new_usr.name = 'test'
112 114 new_usr.lastname = 'lasttestname'
113 115 new_usr.active = True
114 116 new_usr.admin = True
115 117 sa.add(new_usr)
116 118 sa.commit()
117 119
118 print 'done'
120 print '\tdone'
119 121
120 122
121 123 def create_test_repo(force=True):
122 124 from rhodecode.model.repo import RepoModel
123 125 sa = get_session()
124 126
125 127 user = sa.query(User).filter(User.username == USER).scalar()
126 128 if user is None:
127 129 raise Exception('user not found')
128 130
129 131
130 132 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
131 133
132 134 if repo is None:
133 print 'repo not found creating'
135 print '\trepo not found creating'
134 136
135 137 form_data = {'repo_name':HG_REPO,
136 138 'repo_type':'hg',
137 139 'private':False,
138 140 'clone_uri':'' }
139 141 rm = RepoModel(sa)
140 142 rm.base_path = '/home/hg'
141 143 rm.create(form_data, user)
142 144
143 145
144 146 def set_anonymous_access(enable=True):
145 147 sa = get_session()
146 148 user = sa.query(User).filter(User.username == 'default').one()
149 sa.expire(user)
147 150 user.active = enable
148 151 sa.add(user)
149 152 sa.commit()
150 153 sa.remove()
151
152 print 'anonymous access is now:',enable
154 import time;time.sleep(3)
155 print '\tanonymous access is now:', enable
153 156
154 157
155 158 def get_anonymous_access():
156 159 sa = get_session()
157 160 obj1 = sa.query(User).filter(User.username == 'default').one()
158 161 sa.expire(obj1)
159 162 return obj1.active
160 163
161 164
162 165 #==============================================================================
163 166 # TESTS
164 167 #==============================================================================
165 168 @test_wrapp
166 169 def test_clone_with_credentials(no_errors=False):
167 170 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
168 171
169 172 try:
170 173 shutil.rmtree(path, ignore_errors=True)
171 174 os.makedirs(path)
172 175 #print 'made dirs %s' % jn(path)
173 176 except OSError:
174 177 raise
175 178
176 print 'checking if anonymous access is enabled'
179 print '\tchecking if anonymous access is enabled'
177 180 anonymous_access = get_anonymous_access()
178 181 if anonymous_access:
179 print 'enabled, disabling it '
182 print '\tenabled, disabling it '
180 183 set_anonymous_access(enable=False)
181 184 time.sleep(1)
182
185
183 186 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
184 187 {'user':USER,
185 188 'pass':PASS,
186 189 'host':HOST,
187 190 'cloned_repo':HG_REPO,
188 191 'dest':path}
189 192
190 193 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
191 194
192 195 if no_errors is False:
193 196 assert """adding file changes""" in stdout, 'no messages about cloning'
194 197 assert """abort""" not in stderr , 'got error from clone'
195 198
196 199
197 200 @test_wrapp
198 201 def test_clone_anonymous():
199 202 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
200 203
201 204 try:
202 205 shutil.rmtree(path, ignore_errors=True)
203 206 os.makedirs(path)
204 207 #print 'made dirs %s' % jn(path)
205 208 except OSError:
206 209 raise
207 210
208 211
209 print 'checking if anonymous access is enabled'
212 print '\tchecking if anonymous access is enabled'
210 213 anonymous_access = get_anonymous_access()
211 214 if not anonymous_access:
212 print 'not enabled, enabling it '
215 print '\tnot enabled, enabling it '
213 216 set_anonymous_access(enable=True)
214 217 time.sleep(1)
215
218
216 219 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
217 220 {'user':USER,
218 221 'pass':PASS,
219 222 'host':HOST,
220 223 'cloned_repo':HG_REPO,
221 224 'dest':path}
222 225
223 226 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
224 227
225 228 assert """adding file changes""" in stdout, 'no messages about cloning'
226 229 assert """abort""" not in stderr , 'got error from clone'
227 230
228 231 #disable if it was enabled
229 232 if not anonymous_access:
230 print 'disabling anonymous access'
233 print '\tdisabling anonymous access'
231 234 set_anonymous_access(enable=False)
232 235
233 236 @test_wrapp
234 237 def test_clone_wrong_credentials():
235 238 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
236 239
237 240 try:
238 241 shutil.rmtree(path, ignore_errors=True)
239 242 os.makedirs(path)
240 243 #print 'made dirs %s' % jn(path)
241 244 except OSError:
242 245 raise
243 246
244 print 'checking if anonymous access is enabled'
247 print '\tchecking if anonymous access is enabled'
245 248 anonymous_access = get_anonymous_access()
246 249 if anonymous_access:
247 print 'enabled, disabling it '
250 print '\tenabled, disabling it '
248 251 set_anonymous_access(enable=False)
249
252
250 253 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
251 254 {'user':USER + 'error',
252 255 'pass':PASS,
253 256 'host':HOST,
254 257 'cloned_repo':HG_REPO,
255 258 'dest':path}
256 259
257 260 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
258 261
259 262 if not """abort: authorization failed""" in stderr:
260 raise Exception('Failure')
263 raise Exception('Failure')
261 264
262 265 @test_wrapp
263 266 def test_pull():
264 267 pass
265 268
266 269 @test_wrapp
267 270 def test_push_modify_file(f_name='setup.py'):
268 271 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
269 272 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
270 273 for i in xrange(5):
271 274 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
272 275 Command(cwd).execute(cmd)
273 276
274 277 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
275 278 Command(cwd).execute(cmd)
276 279
277 280 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
278 281
279 282 @test_wrapp
280 283 def test_push_new_file(commits=15, with_clone=True):
281 284
282 285 if with_clone:
283 286 test_clone_with_credentials(no_errors=True)
284 287
285 288 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
286 289 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
287 290
288 291 Command(cwd).execute('touch %s' % added_file)
289 292
290 293 Command(cwd).execute('hg add %s' % added_file)
291 294
292 295 for i in xrange(commits):
293 296 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
294 297 Command(cwd).execute(cmd)
295 298
296 299 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
297 300 'Marcin KuΕΊminski <marcin@python-blog.com>',
298 301 added_file)
299 302 Command(cwd).execute(cmd)
300 303
301 304 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
302 305 {'user':USER,
303 306 'pass':PASS,
304 307 'host':HOST,
305 308 'cloned_repo':HG_REPO,
306 309 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
307 310
308 311 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
309 312
310 313 @test_wrapp
311 314 def test_push_wrong_credentials():
312 315 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
313 316 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
314 317 {'user':USER + 'xxx',
315 318 'pass':PASS,
316 319 'host':HOST,
317 320 'cloned_repo':HG_REPO,
318 321 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
319 322
320 323 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
321 324 for i in xrange(5):
322 325 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
323 326 Command(cwd).execute(cmd)
324 327
325 328 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
326 329 Command(cwd).execute(cmd)
327 330
328 331 Command(cwd).execute('hg push %s' % clone_url)
329 332
330 333 @test_wrapp
331 334 def test_push_wrong_path():
332 335 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
333 336 added_file = jn(path, 'somefile.py')
334 337
335 338 try:
336 339 shutil.rmtree(path, ignore_errors=True)
337 340 os.makedirs(path)
338 print 'made dirs %s' % jn(path)
341 print '\tmade dirs %s' % jn(path)
339 342 except OSError:
340 343 raise
341 344
342 345 Command(cwd).execute("""echo '' > %s""" % added_file)
343 346 Command(cwd).execute("""hg init %s""" % path)
344 347 Command(cwd).execute("""hg add %s""" % added_file)
345 348
346 349 for i in xrange(2):
347 350 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
348 351 Command(cwd).execute(cmd)
349 352
350 353 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
351 354 Command(cwd).execute(cmd)
352 355
353 356 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
354 357 {'user':USER,
355 358 'pass':PASS,
356 359 'host':HOST,
357 360 'cloned_repo':HG_REPO + '_error',
358 361 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
359 362
360 363 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
361 364 if not """abort: HTTP Error 403: Forbidden""" in stderr:
362 365 raise Exception('Failure')
363 366
367 @test_wrapp
368 def get_logs():
369 sa = get_session()
370 return len(sa.query(UserLog).all())
371
372 @test_wrapp
373 def test_logs(initial):
374 sa = get_session()
375 logs = sa.query(UserLog).all()
376 operations = 7
377 if initial + operations != len(logs):
378 raise Exception("missing number of logs %s vs %s" % (initial, len(logs)))
379
364 380
365 381 if __name__ == '__main__':
366 382 create_test_user(force=False)
367 383 create_test_repo()
368
384
385 initial_logs = get_logs()
386
369 387 # test_push_modify_file()
370 388 test_clone_with_credentials()
371 389 test_clone_wrong_credentials()
372 390
373 391
374 392 test_push_new_file(commits=2, with_clone=True)
375 #
393
394 test_clone_anonymous()
376 395 test_push_wrong_path()
377
378 test_clone_anonymous()
379 test_push_wrong_credentials() No newline at end of file
396
397
398 test_push_wrong_credentials()
399
400 test_logs(initial_logs)
General Comments 0
You need to be logged in to leave comments. Login now