##// END OF EJS Templates
tests: refactor TestINI to different name so it doesn't confuse pytest.
marcink -
r131:39302703 default
parent child Browse files
Show More
@@ -1,71 +1,71 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2016 RodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import os
19 19 import shutil
20 20 import tempfile
21 21
22 22 import configobj
23 23
24 24
25 class TestINI(object):
25 class ContextINI(object):
26 26 """
27 27 Allows to create a new test.ini file as a copy of existing one with edited
28 28 data. If existing file is not present, it creates a new one. Example usage::
29 29
30 30 with TestINI('test.ini', [{'section': {'key': 'val'}}]) as new_test_ini_path:
31 31 print 'vcsserver --config=%s' % new_test_ini
32 32 """
33 33
34 34 def __init__(self, ini_file_path, ini_params, new_file_prefix=None,
35 35 destroy=True):
36 36 self.ini_file_path = ini_file_path
37 37 self.ini_params = ini_params
38 38 self.new_path = None
39 39 self.new_path_prefix = new_file_prefix or 'test'
40 40 self.destroy = destroy
41 41
42 42 def __enter__(self):
43 43 _, pref = tempfile.mkstemp()
44 44 loc = tempfile.gettempdir()
45 45 self.new_path = os.path.join(loc, '{}_{}_{}'.format(
46 46 pref, self.new_path_prefix, self.ini_file_path))
47 47
48 48 # copy ini file and modify according to the params, if we re-use a file
49 49 if os.path.isfile(self.ini_file_path):
50 50 shutil.copy(self.ini_file_path, self.new_path)
51 51 else:
52 52 # create new dump file for configObj to write to.
53 53 with open(self.new_path, 'wb'):
54 54 pass
55 55
56 56 config = configobj.ConfigObj(
57 57 self.new_path, file_error=True, write_empty_values=True)
58 58
59 59 for data in self.ini_params:
60 60 section, ini_params = data.items()[0]
61 61 key, val = ini_params.items()[0]
62 62 if section not in config:
63 63 config[section] = {}
64 64 config[section][key] = val
65 65
66 66 config.write()
67 67 return self.new_path
68 68
69 69 def __exit__(self, exc_type, exc_val, exc_tb):
70 70 if self.destroy:
71 71 os.remove(self.new_path)
@@ -1,132 +1,132 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2016 RodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import subprocess
19 19 import StringIO
20 20 import time
21 21
22 22 import pytest
23 23
24 from fixture import TestINI
24 from fixture import ContextINI
25 25
26 26
27 27 @pytest.mark.parametrize("arguments, expected_texts", [
28 28 (['--threadpool=192'], [
29 29 'threadpool_size: 192',
30 30 'worker pool of size 192 created',
31 31 'Threadpool size set to 192']),
32 32 (['--locale=fake'], [
33 33 'Cannot set locale, not configuring the locale system']),
34 34 (['--timeout=5'], [
35 35 'Timeout for RPC calls set to 5.0 seconds']),
36 36 (['--log-level=info'], [
37 37 'log_level: info']),
38 38 (['--port={port}'], [
39 39 'port: {port}',
40 40 'created daemon on localhost:{port}']),
41 41 (['--host=127.0.0.1', '--port={port}'], [
42 42 'port: {port}',
43 43 'host: 127.0.0.1',
44 44 'created daemon on 127.0.0.1:{port}']),
45 45 (['--config=/bad/file'], ['OSError: File /bad/file does not exist']),
46 46 ])
47 47 def test_vcsserver_calls(arguments, expected_texts, vcsserver_port):
48 48 port_argument = '--port={port}'
49 49 if port_argument not in arguments:
50 50 arguments.append(port_argument)
51 51 arguments = _replace_port(arguments, vcsserver_port)
52 52 expected_texts = _replace_port(expected_texts, vcsserver_port)
53 53 output = call_vcs_server_with_arguments(arguments)
54 54 for text in expected_texts:
55 55 assert text in output
56 56
57 57
58 58 def _replace_port(values, port):
59 59 return [value.format(port=port) for value in values]
60 60
61 61
62 62 def test_vcsserver_with_config(vcsserver_port):
63 63 ini_def = [
64 64 {'DEFAULT': {'host': '127.0.0.1'}},
65 65 {'DEFAULT': {'threadpool_size': '111'}},
66 66 {'DEFAULT': {'port': vcsserver_port}},
67 67 ]
68 68
69 with TestINI('test.ini', ini_def) as new_test_ini_path:
69 with ContextINI('test.ini', ini_def) as new_test_ini_path:
70 70 output = call_vcs_server_with_arguments(
71 71 ['--config=' + new_test_ini_path])
72 72
73 73 expected_texts = [
74 74 'host: 127.0.0.1',
75 75 'Threadpool size set to 111',
76 76 ]
77 77 for text in expected_texts:
78 78 assert text in output
79 79
80 80
81 81 def test_vcsserver_with_config_cli_overwrite(vcsserver_port):
82 82 ini_def = [
83 83 {'DEFAULT': {'host': '127.0.0.1'}},
84 84 {'DEFAULT': {'port': vcsserver_port}},
85 85 {'DEFAULT': {'threadpool_size': '111'}},
86 86 {'DEFAULT': {'timeout': '0'}},
87 87 ]
88 with TestINI('test.ini', ini_def) as new_test_ini_path:
88 with ContextINI('test.ini', ini_def) as new_test_ini_path:
89 89 output = call_vcs_server_with_arguments([
90 90 '--config=' + new_test_ini_path,
91 91 '--host=128.0.0.1',
92 92 '--threadpool=256',
93 93 '--timeout=5'])
94 94 expected_texts = [
95 95 'host: 128.0.0.1',
96 96 'Threadpool size set to 256',
97 97 'Timeout for RPC calls set to 5.0 seconds',
98 98 ]
99 99 for text in expected_texts:
100 100 assert text in output
101 101
102 102
103 103 def call_vcs_server_with_arguments(args):
104 104 vcs = subprocess.Popen(
105 105 ["vcsserver"] + args,
106 106 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
107 107
108 108 output = read_output_until(
109 109 "Starting vcsserver.main", vcs.stdout)
110 110 vcs.terminate()
111 111 return output
112 112
113 113
114 114 def call_vcs_server_with_non_existing_config_file(args):
115 115 vcs = subprocess.Popen(
116 116 ["vcsserver", "--config=/tmp/bad"] + args,
117 117 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
118 118 output = read_output_until(
119 119 "Starting vcsserver.main", vcs.stdout)
120 120 vcs.terminate()
121 121 return output
122 122
123 123
124 124 def read_output_until(expected, source, timeout=5):
125 125 ts = time.time()
126 126 buf = StringIO.StringIO()
127 127 while time.time() - ts < timeout:
128 128 line = source.readline()
129 129 buf.write(line)
130 130 if expected in line:
131 131 break
132 132 return buf.getvalue()
General Comments 0
You need to be logged in to leave comments. Login now