##// END OF EJS Templates
pyro: remove db Session when callback finishes to avoid leaving...
dan -
r670:8607f5e9 stable
parent child Browse files
Show More
@@ -1,278 +1,279 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import json
21 import json
22 import logging
22 import logging
23 import urlparse
23 import urlparse
24 import threading
24 import threading
25 from BaseHTTPServer import BaseHTTPRequestHandler
25 from BaseHTTPServer import BaseHTTPRequestHandler
26 from SocketServer import TCPServer
26 from SocketServer import TCPServer
27 from routes.util import URLGenerator
27 from routes.util import URLGenerator
28
28
29 import Pyro4
29 import Pyro4
30 import pylons
30 import pylons
31 import rhodecode
31 import rhodecode
32
32
33 from rhodecode.model import meta
33 from rhodecode.model import meta
34 from rhodecode.lib import hooks_base
34 from rhodecode.lib import hooks_base
35 from rhodecode.lib.utils2 import (
35 from rhodecode.lib.utils2 import (
36 AttributeDict, safe_str, get_routes_generator_for_server_url)
36 AttributeDict, safe_str, get_routes_generator_for_server_url)
37
37
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41
42 class HooksHttpHandler(BaseHTTPRequestHandler):
42 class HooksHttpHandler(BaseHTTPRequestHandler):
43 def do_POST(self):
43 def do_POST(self):
44 method, extras = self._read_request()
44 method, extras = self._read_request()
45 try:
45 try:
46 result = self._call_hook(method, extras)
46 result = self._call_hook(method, extras)
47 except Exception as e:
47 except Exception as e:
48 result = {
48 result = {
49 'exception': e.__class__.__name__,
49 'exception': e.__class__.__name__,
50 'exception_args': e.args
50 'exception_args': e.args
51 }
51 }
52 self._write_response(result)
52 self._write_response(result)
53
53
54 def _read_request(self):
54 def _read_request(self):
55 length = int(self.headers['Content-Length'])
55 length = int(self.headers['Content-Length'])
56 body = self.rfile.read(length).decode('utf-8')
56 body = self.rfile.read(length).decode('utf-8')
57 data = json.loads(body)
57 data = json.loads(body)
58 return data['method'], data['extras']
58 return data['method'], data['extras']
59
59
60 def _write_response(self, result):
60 def _write_response(self, result):
61 self.send_response(200)
61 self.send_response(200)
62 self.send_header("Content-type", "text/json")
62 self.send_header("Content-type", "text/json")
63 self.end_headers()
63 self.end_headers()
64 self.wfile.write(json.dumps(result))
64 self.wfile.write(json.dumps(result))
65
65
66 def _call_hook(self, method, extras):
66 def _call_hook(self, method, extras):
67 hooks = Hooks()
67 hooks = Hooks()
68 try:
68 try:
69 result = getattr(hooks, method)(extras)
69 result = getattr(hooks, method)(extras)
70 finally:
70 finally:
71 meta.Session.remove()
71 meta.Session.remove()
72 return result
72 return result
73
73
74 def log_message(self, format, *args):
74 def log_message(self, format, *args):
75 """
75 """
76 This is an overriden method of BaseHTTPRequestHandler which logs using
76 This is an overriden method of BaseHTTPRequestHandler which logs using
77 logging library instead of writing directly to stderr.
77 logging library instead of writing directly to stderr.
78 """
78 """
79
79
80 message = format % args
80 message = format % args
81
81
82 # TODO: mikhail: add different log levels support
82 # TODO: mikhail: add different log levels support
83 log.debug(
83 log.debug(
84 "%s - - [%s] %s", self.client_address[0],
84 "%s - - [%s] %s", self.client_address[0],
85 self.log_date_time_string(), message)
85 self.log_date_time_string(), message)
86
86
87
87
88 class DummyHooksCallbackDaemon(object):
88 class DummyHooksCallbackDaemon(object):
89 def __init__(self):
89 def __init__(self):
90 self.hooks_module = Hooks.__module__
90 self.hooks_module = Hooks.__module__
91
91
92 def __enter__(self):
92 def __enter__(self):
93 log.debug('Running dummy hooks callback daemon')
93 log.debug('Running dummy hooks callback daemon')
94 return self
94 return self
95
95
96 def __exit__(self, exc_type, exc_val, exc_tb):
96 def __exit__(self, exc_type, exc_val, exc_tb):
97 log.debug('Exiting dummy hooks callback daemon')
97 log.debug('Exiting dummy hooks callback daemon')
98
98
99
99
100 class ThreadedHookCallbackDaemon(object):
100 class ThreadedHookCallbackDaemon(object):
101
101
102 _callback_thread = None
102 _callback_thread = None
103 _daemon = None
103 _daemon = None
104 _done = False
104 _done = False
105
105
106 def __init__(self):
106 def __init__(self):
107 self._prepare()
107 self._prepare()
108
108
109 def __enter__(self):
109 def __enter__(self):
110 self._run()
110 self._run()
111 return self
111 return self
112
112
113 def __exit__(self, exc_type, exc_val, exc_tb):
113 def __exit__(self, exc_type, exc_val, exc_tb):
114 self._stop()
114 self._stop()
115
115
116 def _prepare(self):
116 def _prepare(self):
117 raise NotImplementedError()
117 raise NotImplementedError()
118
118
119 def _run(self):
119 def _run(self):
120 raise NotImplementedError()
120 raise NotImplementedError()
121
121
122 def _stop(self):
122 def _stop(self):
123 raise NotImplementedError()
123 raise NotImplementedError()
124
124
125
125
126 class Pyro4HooksCallbackDaemon(ThreadedHookCallbackDaemon):
126 class Pyro4HooksCallbackDaemon(ThreadedHookCallbackDaemon):
127 """
127 """
128 Context manager which will run a callback daemon in a background thread.
128 Context manager which will run a callback daemon in a background thread.
129 """
129 """
130
130
131 hooks_uri = None
131 hooks_uri = None
132
132
133 def _prepare(self):
133 def _prepare(self):
134 log.debug("Preparing callback daemon and registering hook object")
134 log.debug("Preparing callback daemon and registering hook object")
135 self._daemon = Pyro4.Daemon()
135 self._daemon = Pyro4.Daemon()
136 hooks_interface = Hooks()
136 hooks_interface = Hooks()
137 self.hooks_uri = str(self._daemon.register(hooks_interface))
137 self.hooks_uri = str(self._daemon.register(hooks_interface))
138 log.debug("Hooks uri is: %s", self.hooks_uri)
138 log.debug("Hooks uri is: %s", self.hooks_uri)
139
139
140 def _run(self):
140 def _run(self):
141 log.debug("Running event loop of callback daemon in background thread")
141 log.debug("Running event loop of callback daemon in background thread")
142 callback_thread = threading.Thread(
142 callback_thread = threading.Thread(
143 target=self._daemon.requestLoop,
143 target=self._daemon.requestLoop,
144 kwargs={'loopCondition': lambda: not self._done})
144 kwargs={'loopCondition': lambda: not self._done})
145 callback_thread.daemon = True
145 callback_thread.daemon = True
146 callback_thread.start()
146 callback_thread.start()
147 self._callback_thread = callback_thread
147 self._callback_thread = callback_thread
148
148
149 def _stop(self):
149 def _stop(self):
150 log.debug("Waiting for background thread to finish.")
150 log.debug("Waiting for background thread to finish.")
151 self._done = True
151 self._done = True
152 self._callback_thread.join()
152 self._callback_thread.join()
153 self._daemon.close()
153 self._daemon.close()
154 self._daemon = None
154 self._daemon = None
155 self._callback_thread = None
155 self._callback_thread = None
156
156
157
157
158 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
158 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
159 """
159 """
160 Context manager which will run a callback daemon in a background thread.
160 Context manager which will run a callback daemon in a background thread.
161 """
161 """
162
162
163 hooks_uri = None
163 hooks_uri = None
164
164
165 IP_ADDRESS = '127.0.0.1'
165 IP_ADDRESS = '127.0.0.1'
166
166
167 # From Python docs: Polling reduces our responsiveness to a shutdown
167 # From Python docs: Polling reduces our responsiveness to a shutdown
168 # request and wastes cpu at all other times.
168 # request and wastes cpu at all other times.
169 POLL_INTERVAL = 0.1
169 POLL_INTERVAL = 0.1
170
170
171 def _prepare(self):
171 def _prepare(self):
172 log.debug("Preparing callback daemon and registering hook object")
172 log.debug("Preparing callback daemon and registering hook object")
173
173
174 self._done = False
174 self._done = False
175 self._daemon = TCPServer((self.IP_ADDRESS, 0), HooksHttpHandler)
175 self._daemon = TCPServer((self.IP_ADDRESS, 0), HooksHttpHandler)
176 _, port = self._daemon.server_address
176 _, port = self._daemon.server_address
177 self.hooks_uri = '{}:{}'.format(self.IP_ADDRESS, port)
177 self.hooks_uri = '{}:{}'.format(self.IP_ADDRESS, port)
178
178
179 log.debug("Hooks uri is: %s", self.hooks_uri)
179 log.debug("Hooks uri is: %s", self.hooks_uri)
180
180
181 def _run(self):
181 def _run(self):
182 log.debug("Running event loop of callback daemon in background thread")
182 log.debug("Running event loop of callback daemon in background thread")
183 callback_thread = threading.Thread(
183 callback_thread = threading.Thread(
184 target=self._daemon.serve_forever,
184 target=self._daemon.serve_forever,
185 kwargs={'poll_interval': self.POLL_INTERVAL})
185 kwargs={'poll_interval': self.POLL_INTERVAL})
186 callback_thread.daemon = True
186 callback_thread.daemon = True
187 callback_thread.start()
187 callback_thread.start()
188 self._callback_thread = callback_thread
188 self._callback_thread = callback_thread
189
189
190 def _stop(self):
190 def _stop(self):
191 log.debug("Waiting for background thread to finish.")
191 log.debug("Waiting for background thread to finish.")
192 self._daemon.shutdown()
192 self._daemon.shutdown()
193 self._callback_thread.join()
193 self._callback_thread.join()
194 self._daemon = None
194 self._daemon = None
195 self._callback_thread = None
195 self._callback_thread = None
196
196
197
197
198 def prepare_callback_daemon(extras, protocol=None, use_direct_calls=False):
198 def prepare_callback_daemon(extras, protocol=None, use_direct_calls=False):
199 callback_daemon = None
199 callback_daemon = None
200 protocol = protocol.lower() if protocol else None
200 protocol = protocol.lower() if protocol else None
201
201
202 if use_direct_calls:
202 if use_direct_calls:
203 callback_daemon = DummyHooksCallbackDaemon()
203 callback_daemon = DummyHooksCallbackDaemon()
204 extras['hooks_module'] = callback_daemon.hooks_module
204 extras['hooks_module'] = callback_daemon.hooks_module
205 else:
205 else:
206 if protocol == 'pyro4':
206 if protocol == 'pyro4':
207 callback_daemon = Pyro4HooksCallbackDaemon()
207 callback_daemon = Pyro4HooksCallbackDaemon()
208 elif protocol == 'http':
208 elif protocol == 'http':
209 callback_daemon = HttpHooksCallbackDaemon()
209 callback_daemon = HttpHooksCallbackDaemon()
210 else:
210 else:
211 log.error('Unsupported callback daemon protocol "%s"', protocol)
211 log.error('Unsupported callback daemon protocol "%s"', protocol)
212 raise Exception('Unsupported callback daemon protocol.')
212 raise Exception('Unsupported callback daemon protocol.')
213
213
214 extras['hooks_uri'] = callback_daemon.hooks_uri
214 extras['hooks_uri'] = callback_daemon.hooks_uri
215 extras['hooks_protocol'] = protocol
215 extras['hooks_protocol'] = protocol
216
216
217 return callback_daemon, extras
217 return callback_daemon, extras
218
218
219
219
220 class Hooks(object):
220 class Hooks(object):
221 """
221 """
222 Exposes the hooks for remote call backs
222 Exposes the hooks for remote call backs
223 """
223 """
224
224
225 @Pyro4.callback
225 @Pyro4.callback
226 def repo_size(self, extras):
226 def repo_size(self, extras):
227 log.debug("Called repo_size of Hooks object")
227 log.debug("Called repo_size of Hooks object")
228 return self._call_hook(hooks_base.repo_size, extras)
228 return self._call_hook(hooks_base.repo_size, extras)
229
229
230 @Pyro4.callback
230 @Pyro4.callback
231 def pre_pull(self, extras):
231 def pre_pull(self, extras):
232 log.debug("Called pre_pull of Hooks object")
232 log.debug("Called pre_pull of Hooks object")
233 return self._call_hook(hooks_base.pre_pull, extras)
233 return self._call_hook(hooks_base.pre_pull, extras)
234
234
235 @Pyro4.callback
235 @Pyro4.callback
236 def post_pull(self, extras):
236 def post_pull(self, extras):
237 log.debug("Called post_pull of Hooks object")
237 log.debug("Called post_pull of Hooks object")
238 return self._call_hook(hooks_base.post_pull, extras)
238 return self._call_hook(hooks_base.post_pull, extras)
239
239
240 @Pyro4.callback
240 @Pyro4.callback
241 def pre_push(self, extras):
241 def pre_push(self, extras):
242 log.debug("Called pre_push of Hooks object")
242 log.debug("Called pre_push of Hooks object")
243 return self._call_hook(hooks_base.pre_push, extras)
243 return self._call_hook(hooks_base.pre_push, extras)
244
244
245 @Pyro4.callback
245 @Pyro4.callback
246 def post_push(self, extras):
246 def post_push(self, extras):
247 log.debug("Called post_push of Hooks object")
247 log.debug("Called post_push of Hooks object")
248 return self._call_hook(hooks_base.post_push, extras)
248 return self._call_hook(hooks_base.post_push, extras)
249
249
250 def _call_hook(self, hook, extras):
250 def _call_hook(self, hook, extras):
251 extras = AttributeDict(extras)
251 extras = AttributeDict(extras)
252 pylons_router = get_routes_generator_for_server_url(extras.server_url)
252 pylons_router = get_routes_generator_for_server_url(extras.server_url)
253 pylons.url._push_object(pylons_router)
253 pylons.url._push_object(pylons_router)
254
254
255 try:
255 try:
256 result = hook(extras)
256 result = hook(extras)
257 except Exception as error:
257 except Exception as error:
258 log.exception('Exception when handling hook %s', hook)
258 log.exception('Exception when handling hook %s', hook)
259 error_args = error.args
259 error_args = error.args
260 return {
260 return {
261 'status': 128,
261 'status': 128,
262 'output': '',
262 'output': '',
263 'exception': type(error).__name__,
263 'exception': type(error).__name__,
264 'exception_args': error_args,
264 'exception_args': error_args,
265 }
265 }
266 finally:
266 finally:
267 pylons.url._pop_object()
267 pylons.url._pop_object()
268 meta.Session.remove()
268
269
269 return {
270 return {
270 'status': result.status,
271 'status': result.status,
271 'output': result.output,
272 'output': result.output,
272 }
273 }
273
274
274 def __enter__(self):
275 def __enter__(self):
275 return self
276 return self
276
277
277 def __exit__(self, exc_type, exc_val, exc_tb):
278 def __exit__(self, exc_type, exc_val, exc_tb):
278 pass
279 pass
General Comments 0
You need to be logged in to leave comments. Login now