##// END OF EJS Templates
made concurrency test also for git
marcink -
r2895:f5dc0417 beta
parent child Browse files
Show More
@@ -1,213 +1,221 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import shutil
29 29 import logging
30 30 from os.path import join as jn
31 31 from os.path import dirname as dn
32 32
33 33 from tempfile import _RandomNameSequence
34 34 from subprocess import Popen, PIPE
35 35
36 36 from paste.deploy import appconfig
37 37 from pylons import config
38 38 from sqlalchemy import engine_from_config
39 39
40 40 from rhodecode.lib.utils import add_cache
41 41 from rhodecode.model import init_model
42 42 from rhodecode.model import meta
43 43 from rhodecode.model.db import User, Repository
44 44 from rhodecode.lib.auth import get_crypt_password
45 45
46 46 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
47 47 from rhodecode.config.environment import load_environment
48 48
49 rel_path = dn(dn(dn(os.path.abspath(__file__))))
50 conf = appconfig('config:development.ini', relative_to=rel_path)
49 rel_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
50 conf = appconfig('config:rc.ini', relative_to=rel_path)
51 51 load_environment(conf.global_conf, conf.local_conf)
52 52
53 53 add_cache(conf)
54 54
55 55 USER = 'test_admin'
56 56 PASS = 'test12'
57 HOST = 'hg.local'
57 HOST = 'rc.local'
58 58 METHOD = 'pull'
59 59 DEBUG = True
60 60 log = logging.getLogger(__name__)
61 61
62 62
63 63 class Command(object):
64 64
65 65 def __init__(self, cwd):
66 66 self.cwd = cwd
67 67
68 68 def execute(self, cmd, *args):
69 69 """Runs command on the system with given ``args``.
70 70 """
71 71
72 72 command = cmd + ' ' + ' '.join(args)
73 73 log.debug('Executing %s' % command)
74 74 if DEBUG:
75 75 print command
76 76 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
77 77 stdout, stderr = p.communicate()
78 78 if DEBUG:
79 79 print stdout, stderr
80 80 return stdout, stderr
81 81
82 82
83 83 def get_session():
84 84 engine = engine_from_config(conf, 'sqlalchemy.db1.')
85 85 init_model(engine)
86 86 sa = meta.Session
87 87 return sa
88 88
89 89
90 90 def create_test_user(force=True):
91 91 print 'creating test user'
92 92 sa = get_session()
93 93
94 94 user = sa.query(User).filter(User.username == USER).scalar()
95 95
96 96 if force and user is not None:
97 97 print 'removing current user'
98 98 for repo in sa.query(Repository).filter(Repository.user == user).all():
99 99 sa.delete(repo)
100 100 sa.delete(user)
101 101 sa.commit()
102 102
103 103 if user is None or force:
104 104 print 'creating new one'
105 105 new_usr = User()
106 106 new_usr.username = USER
107 107 new_usr.password = get_crypt_password(PASS)
108 108 new_usr.email = 'mail@mail.com'
109 109 new_usr.name = 'test'
110 110 new_usr.lastname = 'lasttestname'
111 111 new_usr.active = True
112 112 new_usr.admin = True
113 113 sa.add(new_usr)
114 114 sa.commit()
115 115
116 116 print 'done'
117 117
118 118
119 119 def create_test_repo(force=True):
120 120 print 'creating test repo'
121 121 from rhodecode.model.repo import RepoModel
122 122 sa = get_session()
123 123
124 124 user = sa.query(User).filter(User.username == USER).scalar()
125 125 if user is None:
126 126 raise Exception('user not found')
127 127
128 128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
129 129
130 130 if repo is None:
131 131 print 'repo not found creating'
132 132
133 form_data = {'repo_name':HG_REPO,
134 'repo_type':'hg',
133 form_data = {'repo_name': HG_REPO,
134 'repo_type': 'hg',
135 135 'private':False,
136 'clone_uri':'' }
136 'clone_uri': '' }
137 137 rm = RepoModel(sa)
138 138 rm.base_path = '/home/hg'
139 139 rm.create(form_data, user)
140 140
141 141 print 'done'
142 142
143 143
144 144 def set_anonymous_access(enable=True):
145 145 sa = get_session()
146 146 user = sa.query(User).filter(User.username == 'default').one()
147 147 user.active = enable
148 148 sa.add(user)
149 149 sa.commit()
150 150
151 151
152 152 def get_anonymous_access():
153 153 sa = get_session()
154 154 return sa.query(User).filter(User.username == 'default').one().active
155 155
156 156
157 157 #==============================================================================
158 158 # TESTS
159 159 #==============================================================================
160 160 def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
161 seq=None):
161 seq=None, backend='hg'):
162 162 cwd = path = jn(TESTS_TMP_PATH, repo)
163 163
164 164 if seq == None:
165 165 seq = _RandomNameSequence().next()
166 166
167 167 try:
168 168 shutil.rmtree(path, ignore_errors=True)
169 169 os.makedirs(path)
170 170 #print 'made dirs %s' % jn(path)
171 171 except OSError:
172 172 raise
173 173
174 174 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
175 {'user':USER,
176 'pass':PASS,
177 'host':HOST,
178 'cloned_repo':repo, }
175 {'user': USER,
176 'pass': PASS,
177 'host': HOST,
178 'cloned_repo': repo, }
179 179
180 180 dest = path + seq
181 181 if method == 'pull':
182 stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
182 stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url)
183 183 else:
184 stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
185
184 stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest)
185 print stdout,'sdasdsadsa'
186 186 if no_errors is False:
187 assert """adding file changes""" in stdout, 'no messages about cloning'
188 assert """abort""" not in stderr , 'got error from clone'
187 if backend == 'hg':
188 assert """adding file changes""" in stdout, 'no messages about cloning'
189 assert """abort""" not in stderr , 'got error from clone'
190 elif backend == 'git':
191 assert """Cloning into""" in stdout, 'no messages about cloning'
189 192
190 193 if __name__ == '__main__':
191 194 try:
192 195 create_test_user(force=False)
193 196 seq = None
194 197 import time
195 198
196 199 try:
197 200 METHOD = sys.argv[3]
198 201 except:
199 202 pass
200 203
204 try:
205 backend = sys.argv[4]
206 except:
207 backend = 'hg'
208
201 209 if METHOD == 'pull':
202 210 seq = _RandomNameSequence().next()
203 211 test_clone_with_credentials(repo=sys.argv[1], method='clone',
204 seq=seq)
212 seq=seq, backend=backend)
205 213 s = time.time()
206 214 for i in range(1, int(sys.argv[2]) + 1):
207 215 print 'take', i
208 216 test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
209 seq=seq)
217 seq=seq, backend=backend)
210 218 print 'time taken %.3f' % (time.time() - s)
211 219 except Exception, e:
212 220 raise
213 221 sys.exit('stop on %s' % e)
General Comments 0
You need to be logged in to leave comments. Login now