##// END OF EJS Templates
svn-support: Use a queue and worker thread to serialize execution of event subscriber.
Martin Bornhold -
r1017:9910c00d default
parent child Browse files
Show More
@@ -1,132 +1,156 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import logging
21 import logging
22 import Queue
22 import subprocess32
23 import subprocess32
23 from threading import Thread
24 from threading import Thread
24
25
25
26
26 from .utils import generate_mod_dav_svn_config
27 from .utils import generate_mod_dav_svn_config
27
28
28
29
29 log = logging.getLogger(__name__)
30 log = logging.getLogger(__name__)
30
31
31
32
32 def generate_config_subscriber(event):
33 def generate_config_subscriber(event):
33 """
34 """
34 Subscriber to the `rhodcode.events.RepoGroupEvent`. This triggers the
35 Subscriber to the `rhodcode.events.RepoGroupEvent`. This triggers the
35 automatic generation of mod_dav_svn config file on repository group
36 automatic generation of mod_dav_svn config file on repository group
36 changes.
37 changes.
37 """
38 """
38 try:
39 try:
39 generate_mod_dav_svn_config(event.request.registry)
40 generate_mod_dav_svn_config(event.request.registry)
40 except Exception:
41 except Exception:
41 log.exception(
42 log.exception(
42 'Exception while generating subversion mod_dav_svn configuration.')
43 'Exception while generating subversion mod_dav_svn configuration.')
43
44
44
45
45 class Subscriber(object):
46 class Subscriber(object):
46 def __call__(self, event):
47 def __call__(self, event):
47 self.run(event)
48 self.run(event)
48
49
49 def run(self, event):
50 def run(self, event):
50 raise NotImplementedError('Subclass has to implement this.')
51 raise NotImplementedError('Subclass has to implement this.')
51
52
52
53
53 class AsyncSubscriber(Subscriber):
54 class AsyncSubscriber(Subscriber):
54 def __init__(self, *args, **kwargs):
55 def __init__(self):
55 self._init_args = args
56 self._stop = False
56 self._init_kwargs = kwargs
57 self._eventq = Queue.Queue()
58 self._worker = self.create_worker()
59 self._worker.start()
57
60
58 def __call__(self, event):
61 def __call__(self, event):
59 kwargs = {'event': event}
62 self._eventq.put(event)
60 kwargs.update(self._init_kwargs)
63
61 self.thread = Thread(
64 def create_worker(self):
62 target=self.run, args=self._init_args, kwargs=kwargs)
65 worker = Thread(target=self.do_work)
63 self.thread.start()
66 worker.daemon = True
67 return worker
68
69 def stop_worker(self):
70 self._stop = False
71 self._eventq.put(None)
72 self._worker.join()
73
74 def do_work(self):
75 while not self._stop:
76 event = self._eventq.get()
77 if event is not None:
78 self.run(event)
64
79
65
80
66 class AsyncSubprocessSubscriber(AsyncSubscriber):
81 class AsyncSubprocessSubscriber(AsyncSubscriber):
67 def run(self, event, cmd, timeout=None):
82
83 def __init__(self, cmd, timeout=None):
84 super(AsyncSubprocessSubscriber, self).__init__()
85 self._cmd = cmd
86 self._timeout = timeout
87
88 def run(self, event):
89 cmd = self._cmd
90 timeout = self._timeout
68 log.debug('Executing command %s.', cmd)
91 log.debug('Executing command %s.', cmd)
92
69 try:
93 try:
70 output = subprocess32.check_output(
94 output = subprocess32.check_output(
71 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
95 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
72 log.debug('Command finished %s', cmd)
96 log.debug('Command finished %s', cmd)
73 if output:
97 if output:
74 log.debug('Command output: %s', output)
98 log.debug('Command output: %s', output)
75 except subprocess32.TimeoutExpired as e:
99 except subprocess32.TimeoutExpired as e:
76 log.exception('Timeout while executing command.')
100 log.exception('Timeout while executing command.')
77 if e.output:
101 if e.output:
78 log.error('Command output: %s', e.output)
102 log.error('Command output: %s', e.output)
79 except subprocess32.CalledProcessError as e:
103 except subprocess32.CalledProcessError as e:
80 log.exception('Error while executing command.')
104 log.exception('Error while executing command.')
81 if e.output:
105 if e.output:
82 log.error('Command output: %s', e.output)
106 log.error('Command output: %s', e.output)
83 except:
107 except:
84 log.exception(
108 log.exception(
85 'Exception while executing command %s.', cmd)
109 'Exception while executing command %s.', cmd)
86
110
87
111
88 # class ReloadApacheSubscriber(object):
112 # class ReloadApacheSubscriber(object):
89 # """
113 # """
90 # Subscriber to pyramids event system. It executes the Apache reload command
114 # Subscriber to pyramids event system. It executes the Apache reload command
91 # if set in ini-file. The command is executed asynchronously in a separate
115 # if set in ini-file. The command is executed asynchronously in a separate
92 # task. This is done to prevent a delay of the function which triggered the
116 # task. This is done to prevent a delay of the function which triggered the
93 # event in case of a longer running command. If a timeout is passed to the
117 # event in case of a longer running command. If a timeout is passed to the
94 # constructor the command will be terminated after expiration.
118 # constructor the command will be terminated after expiration.
95 # """
119 # """
96 # def __init__(self, settings, timeout=None):
120 # def __init__(self, settings, timeout=None):
97 # self.thread = None
121 # self.thread = None
98 # cmd = self.get_command_from_settings(settings)
122 # cmd = self.get_command_from_settings(settings)
99 # if cmd:
123 # if cmd:
100 # kwargs = {
124 # kwargs = {
101 # 'cmd': cmd,
125 # 'cmd': cmd,
102 # 'timeout': timeout,
126 # 'timeout': timeout,
103 # }
127 # }
104 # self.thread = Thread(target=self.run, kwargs=kwargs)
128 # self.thread = Thread(target=self.run, kwargs=kwargs)
105
129
106 # def __call__(self, event):
130 # def __call__(self, event):
107 # if self.thread is not None:
131 # if self.thread is not None:
108 # self.thread.start()
132 # self.thread.start()
109
133
110 # def get_command_from_settings(self, settings):
134 # def get_command_from_settings(self, settings):
111 # cmd = settings[config_keys.reload_command]
135 # cmd = settings[config_keys.reload_command]
112 # return cmd.split(' ') if cmd else cmd
136 # return cmd.split(' ') if cmd else cmd
113
137
114 # def run(self, cmd, timeout=None):
138 # def run(self, cmd, timeout=None):
115 # log.debug('Executing svn proxy reload command %s.', cmd)
139 # log.debug('Executing svn proxy reload command %s.', cmd)
116 # try:
140 # try:
117 # output = subprocess32.check_output(
141 # output = subprocess32.check_output(
118 # cmd, timeout=timeout, stderr=subprocess32.STDOUT)
142 # cmd, timeout=timeout, stderr=subprocess32.STDOUT)
119 # log.debug('Svn proxy reload command finished.')
143 # log.debug('Svn proxy reload command finished.')
120 # if output:
144 # if output:
121 # log.debug('Command output: %s', output)
145 # log.debug('Command output: %s', output)
122 # except subprocess32.TimeoutExpired as e:
146 # except subprocess32.TimeoutExpired as e:
123 # log.exception('Timeout while executing svn proxy reload command.')
147 # log.exception('Timeout while executing svn proxy reload command.')
124 # if e.output:
148 # if e.output:
125 # log.error('Command output: %s', e.output)
149 # log.error('Command output: %s', e.output)
126 # except subprocess32.CalledProcessError as e:
150 # except subprocess32.CalledProcessError as e:
127 # log.exception('Error while executing svn proxy reload command.')
151 # log.exception('Error while executing svn proxy reload command.')
128 # if e.output:
152 # if e.output:
129 # log.error('Command output: %s', e.output)
153 # log.error('Command output: %s', e.output)
130 # except:
154 # except:
131 # log.exception(
155 # log.exception(
132 # 'Exception while executing svn proxy reload command %s.', cmd)
156 # 'Exception while executing svn proxy reload command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now