##// END OF EJS Templates
events: Move subscriber classes from svn_support to rhodecode.subscribers....
Martin Bornhold -
r1019:1a069e4a default
parent child Browse files
Show More
@@ -19,14 +19,21 b''
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 import logging
22 23 import pylons
24 import Queue
25 import subprocess32
23 26
24 27 from pyramid.i18n import get_localizer
25 28 from pyramid.threadlocal import get_current_request
29 from threading import Thread
26 30
27 31 from rhodecode.translation import _ as tsf
28 32
29 33
34 log = logging.getLogger(__name__)
35
36
30 37 def add_renderer_globals(event):
31 38 # Put pylons stuff into the context. This will be removed as soon as
32 39 # migration to pyramid is finished.
@@ -68,3 +75,69 b' def scan_repositories_if_enabled(event):'
68 75 if vcs_server_enabled and import_on_startup:
69 76 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
70 77 repo2db_mapper(repositories, remove_obsolete=False)
78
79
80 class Subscriber(object):
81 def __call__(self, event):
82 self.run(event)
83
84 def run(self, event):
85 raise NotImplementedError('Subclass has to implement this.')
86
87
88 class AsyncSubscriber(Subscriber):
89 def __init__(self):
90 self._stop = False
91 self._eventq = Queue.Queue()
92 self._worker = self.create_worker()
93 self._worker.start()
94
95 def __call__(self, event):
96 self._eventq.put(event)
97
98 def create_worker(self):
99 worker = Thread(target=self.do_work)
100 worker.daemon = True
101 return worker
102
103 def stop_worker(self):
104 self._stop = False
105 self._eventq.put(None)
106 self._worker.join()
107
108 def do_work(self):
109 while not self._stop:
110 event = self._eventq.get()
111 if event is not None:
112 self.run(event)
113
114
115 class AsyncSubprocessSubscriber(AsyncSubscriber):
116
117 def __init__(self, cmd, timeout=None):
118 super(AsyncSubprocessSubscriber, self).__init__()
119 self._cmd = cmd
120 self._timeout = timeout
121
122 def run(self, event):
123 cmd = self._cmd
124 timeout = self._timeout
125 log.debug('Executing command %s.', cmd)
126
127 try:
128 output = subprocess32.check_output(
129 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
130 log.debug('Command finished %s', cmd)
131 if output:
132 log.debug('Command output: %s', output)
133 except subprocess32.TimeoutExpired as e:
134 log.exception('Timeout while executing command.')
135 if e.output:
136 log.error('Command output: %s', e.output)
137 except subprocess32.CalledProcessError as e:
138 log.exception('Error while executing command.')
139 if e.output:
140 log.error('Command output: %s', e.output)
141 except:
142 log.exception(
143 'Exception while executing command %s.', cmd)
@@ -25,11 +25,12 b' import shlex'
25 25 # Do not use `from rhodecode import events` here, it will be overridden by the
26 26 # events module in this package due to pythons import mechanism.
27 27 from rhodecode.events import RepoGroupEvent
28 from rhodecode.subscribers import AsyncSubprocessSubscriber
28 29 from rhodecode.config.middleware import (
29 30 _bool_setting, _string_setting, _int_setting)
30 31
31 32 from .events import ModDavSvnConfigChange
32 from .subscribers import generate_config_subscriber, AsyncSubprocessSubscriber
33 from .subscribers import generate_config_subscriber
33 34 from . import config_keys
34 35
35 36
@@ -19,9 +19,6 b''
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 import Queue
23 import subprocess32
24 from threading import Thread
25 22
26 23
27 24 from .utils import generate_mod_dav_svn_config
@@ -41,69 +38,3 b' def generate_config_subscriber(event):'
41 38 except Exception:
42 39 log.exception(
43 40 'Exception while generating subversion mod_dav_svn configuration.')
44
45
46 class Subscriber(object):
47 def __call__(self, event):
48 self.run(event)
49
50 def run(self, event):
51 raise NotImplementedError('Subclass has to implement this.')
52
53
54 class AsyncSubscriber(Subscriber):
55 def __init__(self):
56 self._stop = False
57 self._eventq = Queue.Queue()
58 self._worker = self.create_worker()
59 self._worker.start()
60
61 def __call__(self, event):
62 self._eventq.put(event)
63
64 def create_worker(self):
65 worker = Thread(target=self.do_work)
66 worker.daemon = True
67 return worker
68
69 def stop_worker(self):
70 self._stop = False
71 self._eventq.put(None)
72 self._worker.join()
73
74 def do_work(self):
75 while not self._stop:
76 event = self._eventq.get()
77 if event is not None:
78 self.run(event)
79
80
81 class AsyncSubprocessSubscriber(AsyncSubscriber):
82
83 def __init__(self, cmd, timeout=None):
84 super(AsyncSubprocessSubscriber, self).__init__()
85 self._cmd = cmd
86 self._timeout = timeout
87
88 def run(self, event):
89 cmd = self._cmd
90 timeout = self._timeout
91 log.debug('Executing command %s.', cmd)
92
93 try:
94 output = subprocess32.check_output(
95 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
96 log.debug('Command finished %s', cmd)
97 if output:
98 log.debug('Command output: %s', output)
99 except subprocess32.TimeoutExpired as e:
100 log.exception('Timeout while executing command.')
101 if e.output:
102 log.error('Command output: %s', e.output)
103 except subprocess32.CalledProcessError as e:
104 log.exception('Error while executing command.')
105 if e.output:
106 log.error('Command output: %s', e.output)
107 except:
108 log.exception(
109 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now