diff --git a/rhodecode/svn_support/subscribers.py b/rhodecode/svn_support/subscribers.py --- a/rhodecode/svn_support/subscribers.py +++ b/rhodecode/svn_support/subscribers.py @@ -19,6 +19,7 @@ # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging +import Queue import subprocess32 from threading import Thread @@ -51,21 +52,44 @@ class Subscriber(object): class AsyncSubscriber(Subscriber): - def __init__(self, *args, **kwargs): - self._init_args = args - self._init_kwargs = kwargs + def __init__(self): + self._stop = False + self._eventq = Queue.Queue() + self._worker = self.create_worker() + self._worker.start() def __call__(self, event): - kwargs = {'event': event} - kwargs.update(self._init_kwargs) - self.thread = Thread( - target=self.run, args=self._init_args, kwargs=kwargs) - self.thread.start() + self._eventq.put(event) + + def create_worker(self): + worker = Thread(target=self.do_work) + worker.daemon = True + return worker + + def stop_worker(self): + self._stop = False + self._eventq.put(None) + self._worker.join() + + def do_work(self): + while not self._stop: + event = self._eventq.get() + if event is not None: + self.run(event) class AsyncSubprocessSubscriber(AsyncSubscriber): - def run(self, event, cmd, timeout=None): + + def __init__(self, cmd, timeout=None): + super(AsyncSubprocessSubscriber, self).__init__() + self._cmd = cmd + self._timeout = timeout + + def run(self, event): + cmd = self._cmd + timeout = self._timeout log.debug('Executing command %s.', cmd) + try: output = subprocess32.check_output( cmd, timeout=timeout, stderr=subprocess32.STDOUT)