##// END OF EJS Templates
changed dulwich git interface to gitweb + subprocessio
marcink -
r2382:034e4fe1 beta
parent child Browse files
Show More
@@ -0,0 +1,181 b''
1 import os
2 import socket
3 import logging
4 import subprocess
5
6 from webob import Request, Response, exc
7
8 from rhodecode.lib import subprocessio
9
10 log = logging.getLogger(__name__)
11
12
13 class FileWrapper(object):
14
15 def __init__(self, fd, content_length):
16 self.fd = fd
17 self.content_length = content_length
18 self.remain = content_length
19
20 def read(self, size):
21 if size <= self.remain:
22 try:
23 data = self.fd.read(size)
24 except socket.error:
25 raise IOError(self)
26 self.remain -= size
27 elif self.remain:
28 data = self.fd.read(self.remain)
29 self.remain = 0
30 else:
31 data = None
32 return data
33
34 def __repr__(self):
35 return '<FileWrapper %s len: %s, read: %s>' % (
36 self.fd, self.content_length, self.content_length - self.remain
37 )
38
39
40 class GitRepository(object):
41 git_folder_signature = set(['config', 'head', 'info', 'objects', 'refs'])
42 commands = ['git-upload-pack', 'git-receive-pack']
43
44 def __init__(self, repo_name, content_path):
45 files = set([f.lower() for f in os.listdir(content_path)])
46 if not (self.git_folder_signature.intersection(files)
47 == self.git_folder_signature):
48 raise OSError('%s missing git signature' % content_path)
49 self.content_path = content_path
50 self.valid_accepts = ['application/x-%s-result' %
51 c for c in self.commands]
52 self.repo_name = repo_name
53
54 def _get_fixedpath(self, path):
55 """
56 Small fix for repo_path
57
58 :param path:
59 :type path:
60 """
61 return path.split(self.repo_name, 1)[-1].strip('/')
62
63 def inforefs(self, request, environ):
64 """
65 WSGI Response producer for HTTP GET Git Smart
66 HTTP /info/refs request.
67 """
68
69 git_command = request.GET['service']
70 if git_command not in self.commands:
71 log.debug('command %s not allowed' % git_command)
72 return exc.HTTPMethodNotAllowed()
73
74 # note to self:
75 # please, resist the urge to add '\n' to git capture and increment
76 # line count by 1.
77 # The code in Git client not only does NOT need '\n', but actually
78 # blows up if you sprinkle "flush" (0000) as "0001\n".
79 # It reads binary, per number of bytes specified.
80 # if you do add '\n' as part of data, count it.
81 smart_server_advert = '# service=%s' % git_command
82 try:
83 out = subprocessio.SubprocessIOChunker(
84 r'git %s --stateless-rpc --advertise-refs "%s"' % (
85 git_command[4:], self.content_path),
86 starting_values=[
87 str(hex(len(smart_server_advert) + 4)[2:]
88 .rjust(4, '0') + smart_server_advert + '0000')
89 ]
90 )
91 except EnvironmentError, e:
92 log.exception(e)
93 raise exc.HTTPExpectationFailed()
94 resp = Response()
95 resp.content_type = 'application/x-%s-advertisement' % str(git_command)
96 resp.app_iter = out
97 return resp
98
99 def backend(self, request, environ):
100 """
101 WSGI Response producer for HTTP POST Git Smart HTTP requests.
102 Reads commands and data from HTTP POST's body.
103 returns an iterator obj with contents of git command's
104 response to stdout
105 """
106 git_command = self._get_fixedpath(request.path_info)
107 if git_command not in self.commands:
108 log.debug('command %s not allowed' % git_command)
109 return exc.HTTPMethodNotAllowed()
110
111 if 'CONTENT_LENGTH' in environ:
112 inputstream = FileWrapper(environ['wsgi.input'],
113 request.content_length)
114 else:
115 inputstream = environ['wsgi.input']
116
117 try:
118 out = subprocessio.SubprocessIOChunker(
119 r'git %s --stateless-rpc "%s"' % (git_command[4:],
120 self.content_path),
121 inputstream=inputstream
122 )
123 except EnvironmentError, e:
124 log.exception(e)
125 raise exc.HTTPExpectationFailed()
126
127 if git_command in [u'git-receive-pack']:
128 # updating refs manually after each push.
129 # Needed for pre-1.7.0.4 git clients using regular HTTP mode.
130 subprocess.call(u'git --git-dir "%s" '
131 'update-server-info' % self.content_path,
132 shell=True)
133
134 resp = Response()
135 resp.content_type = 'application/x-%s-result' % git_command.encode('utf8')
136 resp.app_iter = out
137 return resp
138
139 def __call__(self, environ, start_response):
140 request = Request(environ)
141 _path = self._get_fixedpath(request.path_info)
142 if _path.startswith('info/refs'):
143 app = self.inforefs
144 elif [a for a in self.valid_accepts if a in request.accept]:
145 app = self.backend
146 try:
147 resp = app(request, environ)
148 except exc.HTTPException, e:
149 resp = e
150 log.exception(e)
151 except Exception, e:
152 log.exception(e)
153 resp = exc.HTTPInternalServerError()
154 return resp(environ, start_response)
155
156
157 class GitDirectory(object):
158
159 def __init__(self, repo_root, repo_name):
160 repo_location = os.path.join(repo_root, repo_name)
161 if not os.path.isdir(repo_location):
162 raise OSError(repo_location)
163
164 self.content_path = repo_location
165 self.repo_name = repo_name
166 self.repo_location = repo_location
167
168 def __call__(self, environ, start_response):
169 content_path = self.content_path
170 try:
171 app = GitRepository(self.repo_name, content_path)
172 except (AssertionError, OSError):
173 if os.path.isdir(os.path.join(content_path, '.git')):
174 app = GitRepository(os.path.join(content_path, '.git'))
175 else:
176 return exc.HTTPNotFound()(environ, start_response)
177 return app(environ, start_response)
178
179
180 def make_wsgi_app(repo_name, repo_root):
181 return GitDirectory(repo_root, repo_name)
@@ -0,0 +1,401 b''
1 '''
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8
9 This file is part of git_http_backend.py Project.
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
20
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
24 '''
25 import os
26 import subprocess
27 import threading
28 from collections import deque
29
30
31 class StreamFeeder(threading.Thread):
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
37 """
38 def __init__(self, source):
39 super(StreamFeeder, self).__init__()
40 self.daemon = True
41 filelike = False
42 self.bytes = b''
43 if type(source) in (type(''), bytes, bytearray): # string-like
44 self.bytes = bytes(source)
45 else: # can be either file pointer or file-like
46 if type(source) in (int, long): # file pointer it is
47 ## converting file descriptor (int) stdin into file-like
48 try:
49 source = os.fdopen(source, 'rb', 16384)
50 except:
51 pass
52 # let's see if source is file-like by now
53 try:
54 filelike = source.read
55 except:
56 pass
57 if not filelike and not self.bytes:
58 raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.")
59 self.source = source
60 self.readiface, self.writeiface = os.pipe()
61
62 def run(self):
63 t = self.writeiface
64 if self.bytes:
65 os.write(t, self.bytes)
66 else:
67 s = self.source
68 b = s.read(4096)
69 while b:
70 os.write(t, b)
71 b = s.read(4096)
72 os.close(t)
73
74 @property
75 def output(self):
76 return self.readiface
77
78
79 class InputStreamChunker(threading.Thread):
80 def __init__(self, source, target, buffer_size, chunk_size):
81
82 super(InputStreamChunker, self).__init__()
83
84 self.daemon = True # die die die.
85
86 self.source = source
87 self.target = target
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 self.chunk_size = chunk_size
90
91 self.data_added = threading.Event()
92 self.data_added.clear()
93
94 self.keep_reading = threading.Event()
95 self.keep_reading.set()
96
97 self.EOF = threading.Event()
98 self.EOF.clear()
99
100 self.go = threading.Event()
101 self.go.set()
102
103 def stop(self):
104 self.go.clear()
105 self.EOF.set()
106 try:
107 # this is not proper, but is done to force the reader thread let
108 # go of the input because, if successful, .close() will send EOF
109 # down the pipe.
110 self.source.close()
111 except:
112 pass
113
114 def run(self):
115 s = self.source
116 t = self.target
117 cs = self.chunk_size
118 ccm = self.chunk_count_max
119 kr = self.keep_reading
120 da = self.data_added
121 go = self.go
122 b = s.read(cs)
123 while b and go.is_set():
124 if len(t) > ccm:
125 kr.clear()
126 kr.wait(2)
127 # # this only works on 2.7.x and up
128 # if not kr.wait(10):
129 # raise Exception("Timed out while waiting for input to be read.")
130 # instead we'll use this
131 if len(t) > ccm + 3:
132 raise IOError("Timed out while waiting for input from subprocess.")
133 t.append(b)
134 da.set()
135 b = s.read(cs)
136 self.EOF.set()
137 da.set() # for cases when done but there was no input.
138
139
140 class BufferedGenerator():
141 '''
142 Class behaves as a non-blocking, buffered pipe reader.
143 Reads chunks of data (through a thread)
144 from a blocking pipe, and attaches these to an array (Deque) of chunks.
145 Reading is halted in the thread when max chunks is internally buffered.
146 The .next() may operate in blocking or non-blocking fashion by yielding
147 '' if no data is ready
148 to be sent or by not returning until there is some data to send
149 When we get EOF from underlying source pipe we raise the marker to raise
150 StopIteration after the last chunk of data is yielded.
151 '''
152
153 def __init__(self, source, buffer_size=65536, chunk_size=4096,
154 starting_values=[], bottomless=False):
155
156 if bottomless:
157 maxlen = int(buffer_size / chunk_size)
158 else:
159 maxlen = None
160
161 self.data = deque(starting_values, maxlen)
162
163 self.worker = InputStreamChunker(source, self.data, buffer_size,
164 chunk_size)
165 if starting_values:
166 self.worker.data_added.set()
167 self.worker.start()
168
169 ####################
170 # Generator's methods
171 ####################
172
173 def __iter__(self):
174 return self
175
176 def next(self):
177 while not len(self.data) and not self.worker.EOF.is_set():
178 self.worker.data_added.clear()
179 self.worker.data_added.wait(0.2)
180 if len(self.data):
181 self.worker.keep_reading.set()
182 return bytes(self.data.popleft())
183 elif self.worker.EOF.is_set():
184 raise StopIteration
185
186 def throw(self, type, value=None, traceback=None):
187 if not self.worker.EOF.is_set():
188 raise type(value)
189
190 def start(self):
191 self.worker.start()
192
193 def stop(self):
194 self.worker.stop()
195
196 def close(self):
197 try:
198 self.worker.stop()
199 self.throw(GeneratorExit)
200 except (GeneratorExit, StopIteration):
201 pass
202
203 def __del__(self):
204 self.close()
205
206 ####################
207 # Threaded reader's infrastructure.
208 ####################
209 @property
210 def input(self):
211 return self.worker.w
212
213 @property
214 def data_added_event(self):
215 return self.worker.data_added
216
217 @property
218 def data_added(self):
219 return self.worker.data_added.is_set()
220
221 @property
222 def reading_paused(self):
223 return not self.worker.keep_reading.is_set()
224
225 @property
226 def done_reading_event(self):
227 '''
228 Done_reding does not mean that the iterator's buffer is empty.
229 Iterator might have done reading from underlying source, but the read
230 chunks might still be available for serving through .next() method.
231
232 @return An Event class instance.
233 '''
234 return self.worker.EOF
235
236 @property
237 def done_reading(self):
238 '''
239 Done_reding does not mean that the iterator's buffer is empty.
240 Iterator might have done reading from underlying source, but the read
241 chunks might still be available for serving through .next() method.
242
243 @return An Bool value.
244 '''
245 return self.worker.EOF.is_set()
246
247 @property
248 def length(self):
249 '''
250 returns int.
251
252 This is the lenght of the que of chunks, not the length of
253 the combined contents in those chunks.
254
255 __len__() cannot be meaningfully implemented because this
256 reader is just flying throuh a bottomless pit content and
257 can only know the lenght of what it already saw.
258
259 If __len__() on WSGI server per PEP 3333 returns a value,
260 the responce's length will be set to that. In order not to
261 confuse WSGI PEP3333 servers, we will not implement __len__
262 at all.
263 '''
264 return len(self.data)
265
266 def prepend(self, x):
267 self.data.appendleft(x)
268
269 def append(self, x):
270 self.data.append(x)
271
272 def extend(self, o):
273 self.data.extend(o)
274
275 def __getitem__(self, i):
276 return self.data[i]
277
278
279 class SubprocessIOChunker():
280 '''
281 Processor class wrapping handling of subprocess IO.
282
283 In a way, this is a "communicate()" replacement with a twist.
284
285 - We are multithreaded. Writing in and reading out, err are all sep threads.
286 - We support concurrent (in and out) stream processing.
287 - The output is not a stream. It's a queue of read string (bytes, not unicode)
288 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
289 - We are non-blocking in more respects than communicate()
290 (reading from subprocess out pauses when internal buffer is full, but
291 does not block the parent calling code. On the flip side, reading from
292 slow-yielding subprocess may block the iteration until data shows up. This
293 does not block the parallel inpipe reading occurring parallel thread.)
294
295 The purpose of the object is to allow us to wrap subprocess interactions into
296 and interable that can be passed to a WSGI server as the application's return
297 value. Because of stream-processing-ability, WSGI does not have to read ALL
298 of the subprocess's output and buffer it, before handing it to WSGI server for
299 HTTP response. Instead, the class initializer reads just a bit of the stream
300 to figure out if error ocurred or likely to occur and if not, just hands the
301 further iteration over subprocess output to the server for completion of HTTP
302 response.
303
304 The real or perceived subprocess error is trapped and raised as one of
305 EnvironmentError family of exceptions
306
307 Example usage:
308 # try:
309 # answer = SubprocessIOChunker(
310 # cmd,
311 # input,
312 # buffer_size = 65536,
313 # chunk_size = 4096
314 # )
315 # except (EnvironmentError) as e:
316 # print str(e)
317 # raise e
318 #
319 # return answer
320
321
322 '''
323 def __init__(self, cmd, inputstream=None, buffer_size=65536,
324 chunk_size=4096, starting_values=[]):
325 '''
326 Initializes SubprocessIOChunker
327
328 @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings
329 @param inputstream (Default: None) A file-like, string, or file pointer.
330 @param buffer_size (Default: 65536) A size of total buffer per stream in bytes.
331 @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
332 @param starting_values (Default: []) An array of strings to put in front of output que.
333 '''
334
335 if inputstream:
336 input_streamer = StreamFeeder(inputstream)
337 input_streamer.start()
338 inputstream = input_streamer.output
339
340 _p = subprocess.Popen(cmd,
341 bufsize=-1,
342 shell=True,
343 stdin=inputstream,
344 stdout=subprocess.PIPE,
345 stderr=subprocess.PIPE
346 )
347
348 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
349 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
350
351 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
352 # doing this until we reach either end of file, or end of buffer.
353 bg_out.data_added_event.wait(1)
354 bg_out.data_added_event.clear()
355
356 # at this point it's still ambiguous if we are done reading or just full buffer.
357 # Either way, if error (returned by ended process, or implied based on
358 # presence of stuff in stderr output) we error out.
359 # Else, we are happy.
360 _returncode = _p.poll()
361 if _returncode or (_returncode == None and bg_err.length):
362 try:
363 _p.terminate()
364 except:
365 pass
366 bg_out.stop()
367 bg_err.stop()
368 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
369
370 self.process = _p
371 self.output = bg_out
372 self.error = bg_err
373
374 def __iter__(self):
375 return self
376
377 def next(self):
378 if self.process.poll():
379 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
380 return self.output.next()
381
382 def throw(self, type, value=None, traceback=None):
383 if self.output.length or not self.output.done_reading:
384 raise type(value)
385
386 def close(self):
387 try:
388 self.process.terminate()
389 except:
390 pass
391 try:
392 self.output.close()
393 except:
394 pass
395 try:
396 self.error.close()
397 except:
398 pass
399
400 def __del__(self):
401 self.close()
@@ -23,6 +23,8 b' news'
23 23 - #469 added --update-only option to whoosh to re-index only given list
24 24 of repos in index
25 25 - rhodecode-api CLI client
26 - new git http protocol replaced buggy dulwich implementation.
27 Now based on pygrack & gitweb
26 28
27 29 fixes
28 30 +++++
@@ -218,11 +218,13 b' class SimpleGit(BaseVCSController):'
218 218 :param repo_name: name of the repository
219 219 :param repo_path: full path to the repository
220 220 """
221 _d = {'/' + repo_name: Repo(repo_path)}
222 backend = dulserver.DictBackend(_d)
223 gitserve = make_wsgi_chain(backend)
224 221
225 return gitserve
222 from rhodecode.lib.middleware.pygrack import make_wsgi_app
223 app = make_wsgi_app(
224 repo_root=os.path.dirname(repo_path),
225 repo_name=repo_name,
226 )
227 return app
226 228
227 229 def __get_repository(self, environ):
228 230 """
General Comments 0
You need to be logged in to leave comments. Login now