##// END OF EJS Templates
tests: Fix --without-vcsserver parameter...
johbo -
r816:43fb99bf default
parent child Browse files
Show More
@@ -1,331 +1,331 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Helpers for fixture generation
23 23 """
24 24
25 25 import os
26 26 import time
27 27 import tempfile
28 28 import shutil
29 29
30 30 import configobj
31 31
32 32 from rhodecode.tests import *
33 33 from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36 from rhodecode.model.user import UserModel
37 37 from rhodecode.model.repo_group import RepoGroupModel
38 38 from rhodecode.model.user_group import UserGroupModel
39 39 from rhodecode.model.gist import GistModel
40 40
41 41 dn = os.path.dirname
42 42 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
43 43
44 44
45 45 def error_function(*args, **kwargs):
46 46 raise Exception('Total Crash !')
47 47
48 48
49 49 class TestINI(object):
50 50 """
51 51 Allows to create a new test.ini file as a copy of existing one with edited
52 52 data. Example usage::
53 53
54 54 with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
55 55 print 'paster server %s' % new_test_ini
56 56 """
57 57
58 58 def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT',
59 59 destroy=True, dir=None):
60 60 self.ini_file_path = ini_file_path
61 61 self.ini_params = ini_params
62 62 self.new_path = None
63 63 self.new_path_prefix = new_file_prefix
64 64 self._destroy = destroy
65 65 self._dir = dir
66 66
67 67 def __enter__(self):
68 68 return self.create()
69 69
70 70 def __exit__(self, exc_type, exc_val, exc_tb):
71 71 self.destroy()
72 72
73 73 def create(self):
74 74 config = configobj.ConfigObj(
75 75 self.ini_file_path, file_error=True, write_empty_values=True)
76 76
77 77 for data in self.ini_params:
78 78 section, ini_params = data.items()[0]
79 key, val = ini_params.items()[0]
80 config[section][key] = val
79 for key, val in ini_params.items():
80 config[section][key] = val
81 81 with tempfile.NamedTemporaryFile(
82 82 prefix=self.new_path_prefix, suffix='.ini', dir=self._dir,
83 83 delete=False) as new_ini_file:
84 84 config.write(new_ini_file)
85 85 self.new_path = new_ini_file.name
86 86
87 87 return self.new_path
88 88
89 89 def destroy(self):
90 90 if self._destroy:
91 91 os.remove(self.new_path)
92 92
93 93
94 94 class Fixture(object):
95 95
96 96 def anon_access(self, status):
97 97 """
98 98 Context process for disabling anonymous access. use like:
99 99 fixture = Fixture()
100 100 with fixture.anon_access(False):
101 101 #tests
102 102
103 103 after this block anon access will be set to `not status`
104 104 """
105 105
106 106 class context(object):
107 107 def __enter__(self):
108 108 anon = User.get_default_user()
109 109 anon.active = status
110 110 Session().add(anon)
111 111 Session().commit()
112 112 time.sleep(1.5) # must sleep for cache (1s to expire)
113 113
114 114 def __exit__(self, exc_type, exc_val, exc_tb):
115 115 anon = User.get_default_user()
116 116 anon.active = not status
117 117 Session().add(anon)
118 118 Session().commit()
119 119
120 120 return context()
121 121
122 122 def _get_repo_create_params(self, **custom):
123 123 defs = {
124 124 'repo_name': None,
125 125 'repo_type': 'hg',
126 126 'clone_uri': '',
127 127 'repo_group': '-1',
128 128 'repo_description': 'DESC',
129 129 'repo_private': False,
130 130 'repo_landing_rev': 'rev:tip',
131 131 'repo_copy_permissions': False,
132 132 'repo_state': Repository.STATE_CREATED,
133 133 }
134 134 defs.update(custom)
135 135 if 'repo_name_full' not in custom:
136 136 defs.update({'repo_name_full': defs['repo_name']})
137 137
138 138 # fix the repo name if passed as repo_name_full
139 139 if defs['repo_name']:
140 140 defs['repo_name'] = defs['repo_name'].split('/')[-1]
141 141
142 142 return defs
143 143
144 144 def _get_group_create_params(self, **custom):
145 145 defs = {
146 146 'group_name': None,
147 147 'group_description': 'DESC',
148 148 'perm_updates': [],
149 149 'perm_additions': [],
150 150 'perm_deletions': [],
151 151 'group_parent_id': -1,
152 152 'enable_locking': False,
153 153 'recursive': False,
154 154 }
155 155 defs.update(custom)
156 156
157 157 return defs
158 158
159 159 def _get_user_create_params(self, name, **custom):
160 160 defs = {
161 161 'username': name,
162 162 'password': 'qweqwe',
163 163 'email': '%s+test@rhodecode.org' % name,
164 164 'firstname': 'TestUser',
165 165 'lastname': 'Test',
166 166 'active': True,
167 167 'admin': False,
168 168 'extern_type': 'rhodecode',
169 169 'extern_name': None,
170 170 }
171 171 defs.update(custom)
172 172
173 173 return defs
174 174
175 175 def _get_user_group_create_params(self, name, **custom):
176 176 defs = {
177 177 'users_group_name': name,
178 178 'user_group_description': 'DESC',
179 179 'users_group_active': True,
180 180 'user_group_data': {},
181 181 }
182 182 defs.update(custom)
183 183
184 184 return defs
185 185
186 186 def create_repo(self, name, **kwargs):
187 187 repo_group = kwargs.get('repo_group')
188 188 if isinstance(repo_group, RepoGroup):
189 189 kwargs['repo_group'] = repo_group.group_id
190 190 name = name.split(Repository.NAME_SEP)[-1]
191 191 name = Repository.NAME_SEP.join((repo_group.group_name, name))
192 192
193 193 if 'skip_if_exists' in kwargs:
194 194 del kwargs['skip_if_exists']
195 195 r = Repository.get_by_repo_name(name)
196 196 if r:
197 197 return r
198 198
199 199 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
200 200 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
201 201 RepoModel().create(form_data, cur_user)
202 202 Session().commit()
203 203 repo = Repository.get_by_repo_name(name)
204 204 assert repo
205 205 return repo
206 206
207 207 def create_fork(self, repo_to_fork, fork_name, **kwargs):
208 208 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
209 209
210 210 form_data = self._get_repo_create_params(repo_name=fork_name,
211 211 fork_parent_id=repo_to_fork.repo_id,
212 212 repo_type=repo_to_fork.repo_type,
213 213 **kwargs)
214 214 #TODO: fix it !!
215 215 form_data['description'] = form_data['repo_description']
216 216 form_data['private'] = form_data['repo_private']
217 217 form_data['landing_rev'] = form_data['repo_landing_rev']
218 218
219 219 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
220 220 RepoModel().create_fork(form_data, cur_user=owner)
221 221 Session().commit()
222 222 r = Repository.get_by_repo_name(fork_name)
223 223 assert r
224 224 return r
225 225
226 226 def destroy_repo(self, repo_name, **kwargs):
227 227 RepoModel().delete(repo_name, **kwargs)
228 228 Session().commit()
229 229
230 230 def destroy_repo_on_filesystem(self, repo_name):
231 231 rm_path = os.path.join(RepoModel().repos_path, repo_name)
232 232 if os.path.isdir(rm_path):
233 233 shutil.rmtree(rm_path)
234 234
235 235 def create_repo_group(self, name, **kwargs):
236 236 if 'skip_if_exists' in kwargs:
237 237 del kwargs['skip_if_exists']
238 238 gr = RepoGroup.get_by_group_name(group_name=name)
239 239 if gr:
240 240 return gr
241 241 form_data = self._get_group_create_params(group_name=name, **kwargs)
242 242 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
243 243 gr = RepoGroupModel().create(
244 244 group_name=form_data['group_name'],
245 245 group_description=form_data['group_name'],
246 246 owner=owner)
247 247 Session().commit()
248 248 gr = RepoGroup.get_by_group_name(gr.group_name)
249 249 return gr
250 250
251 251 def destroy_repo_group(self, repogroupid):
252 252 RepoGroupModel().delete(repogroupid)
253 253 Session().commit()
254 254
255 255 def create_user(self, name, **kwargs):
256 256 if 'skip_if_exists' in kwargs:
257 257 del kwargs['skip_if_exists']
258 258 user = User.get_by_username(name)
259 259 if user:
260 260 return user
261 261 form_data = self._get_user_create_params(name, **kwargs)
262 262 user = UserModel().create(form_data)
263 263 Session().commit()
264 264 user = User.get_by_username(user.username)
265 265 return user
266 266
267 267 def destroy_user(self, userid):
268 268 UserModel().delete(userid)
269 269 Session().commit()
270 270
271 271 def destroy_users(self, userid_iter):
272 272 for user_id in userid_iter:
273 273 if User.get_by_username(user_id):
274 274 UserModel().delete(user_id)
275 275 Session().commit()
276 276
277 277 def create_user_group(self, name, **kwargs):
278 278 if 'skip_if_exists' in kwargs:
279 279 del kwargs['skip_if_exists']
280 280 gr = UserGroup.get_by_group_name(group_name=name)
281 281 if gr:
282 282 return gr
283 283 form_data = self._get_user_group_create_params(name, **kwargs)
284 284 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
285 285 user_group = UserGroupModel().create(
286 286 name=form_data['users_group_name'],
287 287 description=form_data['user_group_description'],
288 288 owner=owner, active=form_data['users_group_active'],
289 289 group_data=form_data['user_group_data'])
290 290 Session().commit()
291 291 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
292 292 return user_group
293 293
294 294 def destroy_user_group(self, usergroupid):
295 295 UserGroupModel().delete(user_group=usergroupid, force=True)
296 296 Session().commit()
297 297
298 298 def create_gist(self, **kwargs):
299 299 form_data = {
300 300 'description': 'new-gist',
301 301 'owner': TEST_USER_ADMIN_LOGIN,
302 302 'gist_type': GistModel.cls.GIST_PUBLIC,
303 303 'lifetime': -1,
304 304 'acl_level': Gist.ACL_LEVEL_PUBLIC,
305 305 'gist_mapping': {'filename1.txt': {'content': 'hello world'},}
306 306 }
307 307 form_data.update(kwargs)
308 308 gist = GistModel().create(
309 309 description=form_data['description'], owner=form_data['owner'],
310 310 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
311 311 lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level']
312 312 )
313 313 Session().commit()
314 314 return gist
315 315
316 316 def destroy_gists(self, gistid=None):
317 317 for g in GistModel.cls.get_all():
318 318 if gistid:
319 319 if gistid == g.gist_access_id:
320 320 GistModel().delete(g)
321 321 else:
322 322 GistModel().delete(g)
323 323 Session().commit()
324 324
325 325 def load_resource(self, resource_name, strip=False):
326 326 with open(os.path.join(FIXTURES, resource_name)) as f:
327 327 source = f.read()
328 328 if strip:
329 329 source = source.strip()
330 330
331 331 return source
@@ -1,432 +1,439 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import json
22 22 import logging.config
23 23 import os
24 24 import platform
25 25 import socket
26 26 import subprocess
27 27 import time
28 28 from urllib2 import urlopen, URLError
29 29
30 30 import configobj
31 31 import pylons
32 32 import pytest
33 33 import webob
34 34 from beaker.session import SessionObject
35 35 from paste.deploy import loadapp
36 36 from pylons.i18n.translation import _get_translator
37 37 from pylons.util import ContextObj
38 38 from Pyro4.errors import CommunicationError
39 39 from routes.util import URLGenerator
40 40
41 41 from rhodecode.lib import vcs
42 42 from rhodecode.tests.fixture import TestINI
43 43 import rhodecode
44 44
45 45
46 46 def _parse_json(value):
47 47 return json.loads(value) if value else None
48 48
49 49
50 50 def pytest_addoption(parser):
51 51 group = parser.getgroup('pylons')
52 52 group.addoption(
53 53 '--with-pylons', dest='pylons_config',
54 54 help="Set up a Pylons environment with the specified config file.")
55 55 group.addoption(
56 56 '--pylons-config-override', action='store', type=_parse_json,
57 57 default=None, dest='pylons_config_override', help=(
58 58 "Overrides the .ini file settings. Should be specified in JSON"
59 59 " format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
60 60 )
61 61 )
62 62 parser.addini(
63 63 'pylons_config',
64 64 "Set up a Pylons environment with the specified config file.")
65 65
66 66 vcsgroup = parser.getgroup('vcs')
67 67 vcsgroup.addoption(
68 68 '--without-vcsserver', dest='with_vcsserver', action='store_false',
69 69 help="Do not start the VCSServer in a background process.")
70 70 vcsgroup.addoption(
71 71 '--with-vcsserver', dest='vcsserver_config',
72 72 help="Start the VCSServer with the specified config file.")
73 73 vcsgroup.addoption(
74 74 '--with-vcsserver-http', dest='vcsserver_config_http',
75 75 help="Start the HTTP VCSServer with the specified config file.")
76 76 vcsgroup.addoption(
77 77 '--vcsserver-protocol', dest='vcsserver_protocol',
78 78 help="Start the VCSServer with HTTP / Pyro4 protocol support.")
79 79 vcsgroup.addoption(
80 80 '--vcsserver-config-override', action='store', type=_parse_json,
81 81 default=None, dest='vcsserver_config_override', help=(
82 82 "Overrides the .ini file settings for the VCSServer. "
83 83 "Should be specified in JSON "
84 84 "format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
85 85 )
86 86 )
87 87 vcsgroup.addoption(
88 88 '--vcsserver-port', action='store', type=int,
89 89 default=None, help=(
90 90 "Allows to set the port of the vcsserver. Useful when testing "
91 91 "against an already running server and random ports cause "
92 92 "trouble."))
93 93 parser.addini(
94 94 'vcsserver_config',
95 95 "Start the VCSServer with the specified config file.")
96 96 parser.addini(
97 97 'vcsserver_config_http',
98 98 "Start the HTTP VCSServer with the specified config file.")
99 99 parser.addini(
100 100 'vcsserver_protocol',
101 101 "Start the VCSServer with HTTP / Pyro4 protocol support.")
102 102
103 103
104 104 @pytest.fixture(scope='session')
105 105 def vcsserver(request, vcsserver_port, vcsserver_factory):
106 106 """
107 107 Session scope VCSServer.
108 108
109 109 Tests wich need the VCSServer have to rely on this fixture in order
110 110 to ensure it will be running.
111 111
112 112 For specific needs, the fixture vcsserver_factory can be used. It allows to
113 113 adjust the configuration file for the test run.
114 114
115 115 Command line args:
116 116
117 117 --without-vcsserver: Allows to switch this fixture off. You have to
118 118 manually start the server.
119 119
120 120 --vcsserver-port: Will expect the VCSServer to listen on this port.
121 121 """
122 122
123 123 if not request.config.getoption('with_vcsserver'):
124 124 return None
125 125
126 126 use_http = _use_vcs_http_server(request.config)
127 127 return vcsserver_factory(
128 128 request, use_http=use_http, vcsserver_port=vcsserver_port)
129 129
130 130
131 131 @pytest.fixture(scope='session')
132 132 def vcsserver_factory(tmpdir_factory):
133 133 """
134 134 Use this if you need a running vcsserver with a special configuration.
135 135 """
136 136
137 137 def factory(request, use_http=False, overrides=(), vcsserver_port=None):
138 138
139 139 if vcsserver_port is None:
140 140 vcsserver_port = get_available_port()
141 141
142 142 overrides = list(overrides)
143 143 if use_http:
144 144 overrides.append({'server:main': {'port': vcsserver_port}})
145 145 else:
146 146 overrides.append({'DEFAULT': {'port': vcsserver_port}})
147 147
148 148 if is_cygwin():
149 149 platform_override = {'DEFAULT': {
150 150 'beaker.cache.repo_object.type': 'nocache'}}
151 151 overrides.append(platform_override)
152 152
153 153 option_name = (
154 154 'vcsserver_config_http' if use_http else 'vcsserver_config')
155 155 override_option_name = 'vcsserver_config_override'
156 156 config_file = get_config(
157 157 request.config, option_name=option_name,
158 158 override_option_name=override_option_name, overrides=overrides,
159 159 basetemp=tmpdir_factory.getbasetemp().strpath,
160 160 prefix='test_vcs_')
161 161
162 162 print "Using the VCSServer configuration", config_file
163 163 ServerClass = HttpVCSServer if use_http else Pyro4VCSServer
164 164 server = ServerClass(config_file)
165 165 server.start()
166 166
167 167 @request.addfinalizer
168 168 def cleanup():
169 169 server.shutdown()
170 170
171 171 server.wait_until_ready()
172 172 return server
173 173
174 174 return factory
175 175
176 176
177 177 def is_cygwin():
178 178 return 'cygwin' in platform.system().lower()
179 179
180 180
181 181 def _use_vcs_http_server(config):
182 182 protocol_option = 'vcsserver_protocol'
183 183 protocol = (
184 184 config.getoption(protocol_option) or
185 185 config.getini(protocol_option) or
186 186 'pyro4')
187 187 return protocol == 'http'
188 188
189 189
190 190 class VCSServer(object):
191 191 """
192 192 Represents a running VCSServer instance.
193 193 """
194 194
195 195 _args = []
196 196
197 197 def start(self):
198 198 print("Starting the VCSServer: {}".format(self._args))
199 199 self.process = subprocess.Popen(self._args)
200 200
201 201 def wait_until_ready(self, timeout=30):
202 202 raise NotImplementedError()
203 203
204 204 def shutdown(self):
205 205 self.process.kill()
206 206
207 207
208 208 class Pyro4VCSServer(VCSServer):
209 209 def __init__(self, config_file):
210 210 """
211 211 :param config_file: The config file to start the server with
212 212 """
213 213
214 214 config_data = configobj.ConfigObj(config_file)
215 215 self._config = config_data['DEFAULT']
216 216
217 217 args = ['vcsserver', '--config', config_file]
218 218 self._args = args
219 219
220 220 def wait_until_ready(self, timeout=30):
221 221 remote_server = vcs.create_vcsserver_proxy(self.server_and_port)
222 222 start = time.time()
223 223 with remote_server:
224 224 while time.time() - start < timeout:
225 225 try:
226 226 remote_server.ping()
227 227 break
228 228 except CommunicationError:
229 229 time.sleep(0.2)
230 230 else:
231 231 pytest.exit(
232 232 "Starting the VCSServer failed or took more than {} "
233 233 "seconds.".format(timeout))
234 234
235 235 @property
236 236 def server_and_port(self):
237 237 return '{host}:{port}'.format(**self._config)
238 238
239 239
240 240 class HttpVCSServer(VCSServer):
241 241 """
242 242 Represents a running VCSServer instance.
243 243 """
244 244 def __init__(self, config_file):
245 245 config_data = configobj.ConfigObj(config_file)
246 246 self._config = config_data['server:main']
247 247
248 248 args = ['pserve', config_file, 'http_host=0.0.0.0']
249 249 self._args = args
250 250
251 251 @property
252 252 def http_url(self):
253 253 template = 'http://{host}:{port}/'
254 254 return template.format(**self._config)
255 255
256 256 def start(self):
257 257 self.process = subprocess.Popen(self._args)
258 258
259 259 def wait_until_ready(self, timeout=30):
260 260 host = self._config['host']
261 261 port = self._config['port']
262 262 status_url = 'http://{host}:{port}/status'.format(host=host, port=port)
263 263 start = time.time()
264 264
265 265 while time.time() - start < timeout:
266 266 try:
267 267 urlopen(status_url)
268 268 break
269 269 except URLError:
270 270 time.sleep(0.2)
271 271 else:
272 272 pytest.exit(
273 273 "Starting the VCSServer failed or took more than {} "
274 274 "seconds.".format(timeout))
275 275
276 276 def shutdown(self):
277 277 self.process.kill()
278 278
279 279
280 280 @pytest.fixture(scope='session')
281 281 def pylons_config(request, tmpdir_factory, rcserver_port, vcsserver_port):
282 282 option_name = 'pylons_config'
283 283
284 284 overrides = [
285 285 {'server:main': {'port': rcserver_port}},
286 {'app:main': {'vcs.server': 'localhost:%s' % vcsserver_port}}]
286 {'app:main': {
287 'vcs.server': 'localhost:%s' % vcsserver_port,
288 # johbo: We will always start the VCSServer on our own based on the
289 # fixtures of the test cases. For the test run it must always be
290 # off in the INI file.
291 'vcs.start_server': 'false',
292 }},
293 ]
287 294 if _use_vcs_http_server(request.config):
288 295 overrides.append({'app:main': {'vcs.server.protocol': 'http'}})
289 296
290 297 filename = get_config(
291 298 request.config, option_name=option_name,
292 299 override_option_name='{}_override'.format(option_name),
293 300 overrides=overrides,
294 301 basetemp=tmpdir_factory.getbasetemp().strpath,
295 302 prefix='test_rce_')
296 303 return filename
297 304
298 305
299 306 @pytest.fixture(scope='session')
300 307 def rcserver_port(request):
301 308 port = get_available_port()
302 309 print 'Using rcserver port %s' % (port, )
303 310 return port
304 311
305 312
306 313 @pytest.fixture(scope='session')
307 314 def vcsserver_port(request):
308 315 port = request.config.getoption('--vcsserver-port')
309 316 if port is None:
310 317 port = get_available_port()
311 318 print 'Using vcsserver port %s' % (port, )
312 319 return port
313 320
314 321
315 322 def get_available_port():
316 323 family = socket.AF_INET
317 324 socktype = socket.SOCK_STREAM
318 325 host = '127.0.0.1'
319 326
320 327 mysocket = socket.socket(family, socktype)
321 328 mysocket.bind((host, 0))
322 329 port = mysocket.getsockname()[1]
323 330 mysocket.close()
324 331 del mysocket
325 332 return port
326 333
327 334
328 335 @pytest.fixture(scope='session')
329 336 def available_port_factory():
330 337 """
331 338 Returns a callable which returns free port numbers.
332 339 """
333 340 return get_available_port
334 341
335 342
336 343 @pytest.fixture
337 344 def available_port(available_port_factory):
338 345 """
339 346 Gives you one free port for the current test.
340 347
341 348 Uses "available_port_factory" to retrieve the port.
342 349 """
343 350 return available_port_factory()
344 351
345 352
346 353 @pytest.fixture(scope='session')
347 354 def pylonsapp(pylons_config, vcsserver, http_environ_session):
348 355 logging.config.fileConfig(
349 356 pylons_config, disable_existing_loggers=False)
350 357 app = _setup_pylons_environment(pylons_config, http_environ_session)
351 358 return app
352 359
353 360
354 361 @pytest.fixture(scope='session')
355 362 def testini_factory(tmpdir_factory, pylons_config):
356 363 """
357 364 Factory to create an INI file based on TestINI.
358 365
359 366 It will make sure to place the INI file in the correct directory.
360 367 """
361 368 basetemp = tmpdir_factory.getbasetemp().strpath
362 369 return TestIniFactory(basetemp, pylons_config)
363 370
364 371
365 372 class TestIniFactory(object):
366 373
367 374 def __init__(self, basetemp, template_ini):
368 375 self._basetemp = basetemp
369 376 self._template_ini = template_ini
370 377
371 378 def __call__(self, ini_params, new_file_prefix='test'):
372 379 ini_file = TestINI(
373 380 self._template_ini, ini_params=ini_params,
374 381 new_file_prefix=new_file_prefix, dir=self._basetemp)
375 382 result = ini_file.create()
376 383 return result
377 384
378 385
379 386 def get_config(
380 387 config, option_name, override_option_name, overrides=None,
381 388 basetemp=None, prefix='test'):
382 389 """
383 390 Find a configuration file and apply overrides for the given `prefix`.
384 391 """
385 392 config_file = (
386 393 config.getoption(option_name) or config.getini(option_name))
387 394 if not config_file:
388 395 pytest.exit(
389 396 "Configuration error, could not extract {}.".format(option_name))
390 397
391 398 overrides = overrides or []
392 399 config_override = config.getoption(override_option_name)
393 400 if config_override:
394 401 overrides.append(config_override)
395 402 temp_ini_file = TestINI(
396 403 config_file, ini_params=overrides, new_file_prefix=prefix,
397 404 dir=basetemp)
398 405
399 406 return temp_ini_file.create()
400 407
401 408
402 409 def _setup_pylons_environment(pylons_config, http_environ):
403 410 current_path = os.getcwd()
404 411 pylonsapp = loadapp(
405 412 'config:' + pylons_config, relative_to=current_path)
406 413
407 414 # Using rhodecode.CONFIG which is assigned during "load_environment".
408 415 # The indirect approach is used, because "pylonsapp" may actually be
409 416 # the Pyramid application.
410 417 pylonsapp_config = rhodecode.CONFIG
411 418 _init_stack(pylonsapp_config, environ=http_environ)
412 419
413 420 # For compatibility add the attribute "config" which would be
414 421 # present on the Pylons application.
415 422 pylonsapp.config = pylonsapp_config
416 423 return pylonsapp
417 424
418 425
419 426 def _init_stack(config=None, environ=None):
420 427 if not config:
421 428 config = pylons.test.pylonsapp.config
422 429 if not environ:
423 430 environ = {}
424 431 pylons.url._push_object(URLGenerator(config['routes.map'], environ or {}))
425 432 pylons.app_globals._push_object(config['pylons.app_globals'])
426 433 pylons.config._push_object(config)
427 434 pylons.tmpl_context._push_object(ContextObj())
428 435 # Initialize a translator for tests that utilize i18n
429 436 translator = _get_translator(pylons.config.get('lang'))
430 437 pylons.translator._push_object(translator)
431 438 pylons.session._push_object(SessionObject(environ or {}))
432 439 pylons.request._push_object(webob.Request.blank('', environ=environ))
General Comments 0
You need to be logged in to leave comments. Login now