##// END OF EJS Templates
tests: Fix infinite loop.
Martin Bornhold -
r965:7c1d7296 default
parent child Browse files
Show More
@@ -1,69 +1,69 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import Pyro4
23 23 import pytest
24 24 from Pyro4.errors import TimeoutError, ConnectionClosedError
25 25
26 26 from rhodecode.lib.vcs import client, create_vcsserver_proxy
27 27 from rhodecode.lib.vcs.conf import settings
28 28
29 29
30 30 @pytest.fixture
31 31 def short_timeout(request):
32 32 old_timeout = Pyro4.config.COMMTIMEOUT
33 33 Pyro4.config.COMMTIMEOUT = 0.5
34 34
35 35 @request.addfinalizer
36 36 def cleanup():
37 37 Pyro4.config.COMMTIMEOUT = old_timeout
38 38
39 39 return Pyro4.config.COMMTIMEOUT
40 40
41 41
42 42 @pytest.mark.timeout(20)
43 43 def test_vcs_connections_have_a_timeout_set(pylonsapp, short_timeout):
44 44 server_and_port = pylonsapp.config['vcs.server']
45 45 proxy_objects = []
46 46 with pytest.raises(TimeoutError):
47 47 # TODO: johbo: Find a better way to set this number
48 while xrange(100):
48 for number in xrange(100):
49 49 server = create_vcsserver_proxy(server_and_port)
50 50 server.ping()
51 51 proxy_objects.append(server)
52 52
53 53
54 54 def test_vcs_remote_calls_are_bound_by_timeout(pylonsapp, short_timeout):
55 55 server_and_port = pylonsapp.config['vcs.server']
56 56 with pytest.raises(TimeoutError):
57 57 server = create_vcsserver_proxy(server_and_port)
58 58 server.sleep(short_timeout + 1.0)
59 59
60 60
61 61 def test_wrap_remote_call_triggers_reconnect():
62 62 # First call fails, the second is successful
63 63 func = mock.Mock(side_effect=[ConnectionClosedError, None])
64 64 proxy_mock = mock.Mock()
65 65
66 66 wrapped_func = client._wrap_remote_call(proxy_mock, func)
67 67 wrapped_func()
68 68 proxy_mock._pyroReconnect.assert_called_with(
69 69 tries=settings.PYRO_RECONNECT_TRIES)
General Comments 0
You need to be logged in to leave comments. Login now