##// END OF EJS Templates
svn-support: Use a queue and worker thread to serialize execution of event subscriber.
Martin Bornhold -
r1017:9910c00d default
parent child Browse files
Show More
@@ -1,132 +1,156 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 import Queue
22 23 import subprocess32
23 24 from threading import Thread
24 25
25 26
26 27 from .utils import generate_mod_dav_svn_config
27 28
28 29
29 30 log = logging.getLogger(__name__)
30 31
31 32
32 33 def generate_config_subscriber(event):
33 34 """
34 35 Subscriber to the `rhodcode.events.RepoGroupEvent`. This triggers the
35 36 automatic generation of mod_dav_svn config file on repository group
36 37 changes.
37 38 """
38 39 try:
39 40 generate_mod_dav_svn_config(event.request.registry)
40 41 except Exception:
41 42 log.exception(
42 43 'Exception while generating subversion mod_dav_svn configuration.')
43 44
44 45
45 46 class Subscriber(object):
46 47 def __call__(self, event):
47 48 self.run(event)
48 49
49 50 def run(self, event):
50 51 raise NotImplementedError('Subclass has to implement this.')
51 52
52 53
53 54 class AsyncSubscriber(Subscriber):
54 def __init__(self, *args, **kwargs):
55 self._init_args = args
56 self._init_kwargs = kwargs
55 def __init__(self):
56 self._stop = False
57 self._eventq = Queue.Queue()
58 self._worker = self.create_worker()
59 self._worker.start()
57 60
58 61 def __call__(self, event):
59 kwargs = {'event': event}
60 kwargs.update(self._init_kwargs)
61 self.thread = Thread(
62 target=self.run, args=self._init_args, kwargs=kwargs)
63 self.thread.start()
62 self._eventq.put(event)
63
64 def create_worker(self):
65 worker = Thread(target=self.do_work)
66 worker.daemon = True
67 return worker
68
69 def stop_worker(self):
70 self._stop = False
71 self._eventq.put(None)
72 self._worker.join()
73
74 def do_work(self):
75 while not self._stop:
76 event = self._eventq.get()
77 if event is not None:
78 self.run(event)
64 79
65 80
66 81 class AsyncSubprocessSubscriber(AsyncSubscriber):
67 def run(self, event, cmd, timeout=None):
82
83 def __init__(self, cmd, timeout=None):
84 super(AsyncSubprocessSubscriber, self).__init__()
85 self._cmd = cmd
86 self._timeout = timeout
87
88 def run(self, event):
89 cmd = self._cmd
90 timeout = self._timeout
68 91 log.debug('Executing command %s.', cmd)
92
69 93 try:
70 94 output = subprocess32.check_output(
71 95 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
72 96 log.debug('Command finished %s', cmd)
73 97 if output:
74 98 log.debug('Command output: %s', output)
75 99 except subprocess32.TimeoutExpired as e:
76 100 log.exception('Timeout while executing command.')
77 101 if e.output:
78 102 log.error('Command output: %s', e.output)
79 103 except subprocess32.CalledProcessError as e:
80 104 log.exception('Error while executing command.')
81 105 if e.output:
82 106 log.error('Command output: %s', e.output)
83 107 except:
84 108 log.exception(
85 109 'Exception while executing command %s.', cmd)
86 110
87 111
88 112 # class ReloadApacheSubscriber(object):
89 113 # """
90 114 # Subscriber to pyramids event system. It executes the Apache reload command
91 115 # if set in ini-file. The command is executed asynchronously in a separate
92 116 # task. This is done to prevent a delay of the function which triggered the
93 117 # event in case of a longer running command. If a timeout is passed to the
94 118 # constructor the command will be terminated after expiration.
95 119 # """
96 120 # def __init__(self, settings, timeout=None):
97 121 # self.thread = None
98 122 # cmd = self.get_command_from_settings(settings)
99 123 # if cmd:
100 124 # kwargs = {
101 125 # 'cmd': cmd,
102 126 # 'timeout': timeout,
103 127 # }
104 128 # self.thread = Thread(target=self.run, kwargs=kwargs)
105 129
106 130 # def __call__(self, event):
107 131 # if self.thread is not None:
108 132 # self.thread.start()
109 133
110 134 # def get_command_from_settings(self, settings):
111 135 # cmd = settings[config_keys.reload_command]
112 136 # return cmd.split(' ') if cmd else cmd
113 137
114 138 # def run(self, cmd, timeout=None):
115 139 # log.debug('Executing svn proxy reload command %s.', cmd)
116 140 # try:
117 141 # output = subprocess32.check_output(
118 142 # cmd, timeout=timeout, stderr=subprocess32.STDOUT)
119 143 # log.debug('Svn proxy reload command finished.')
120 144 # if output:
121 145 # log.debug('Command output: %s', output)
122 146 # except subprocess32.TimeoutExpired as e:
123 147 # log.exception('Timeout while executing svn proxy reload command.')
124 148 # if e.output:
125 149 # log.error('Command output: %s', e.output)
126 150 # except subprocess32.CalledProcessError as e:
127 151 # log.exception('Error while executing svn proxy reload command.')
128 152 # if e.output:
129 153 # log.error('Command output: %s', e.output)
130 154 # except:
131 155 # log.exception(
132 156 # 'Exception while executing svn proxy reload command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now