##// END OF EJS Templates
git: handle flacky and slow connection issues with git....
marcink -
r357:305b33c6 stable
parent child Browse files
Show More
@@ -1,481 +1,480 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import subprocess32 as subprocess
27 27 from collections import deque
28 28 from threading import Event, Thread
29 29
30 30
31 31 class StreamFeeder(Thread):
32 32 """
33 33 Normal writing into pipe-like is blocking once the buffer is filled.
34 34 This thread allows a thread to seep data from a file-like into a pipe
35 35 without blocking the main thread.
36 36 We close inpipe once the end of the source stream is reached.
37 37 """
38 38
39 39 def __init__(self, source):
40 40 super(StreamFeeder, self).__init__()
41 41 self.daemon = True
42 42 filelike = False
43 43 self.bytes = bytes()
44 44 if type(source) in (type(''), bytes, bytearray): # string-like
45 45 self.bytes = bytes(source)
46 46 else: # can be either file pointer or file-like
47 47 if type(source) in (int, long): # file pointer it is
48 48 # converting file descriptor (int) stdin into file-like
49 49 try:
50 50 source = os.fdopen(source, 'rb', 16384)
51 51 except Exception:
52 52 pass
53 53 # let's see if source is file-like by now
54 54 try:
55 55 filelike = source.read
56 56 except Exception:
57 57 pass
58 58 if not filelike and not self.bytes:
59 59 raise TypeError("StreamFeeder's source object must be a readable "
60 60 "file-like, a file descriptor, or a string-like.")
61 61 self.source = source
62 62 self.readiface, self.writeiface = os.pipe()
63 63
64 64 def run(self):
65 65 t = self.writeiface
66 66 if self.bytes:
67 67 os.write(t, self.bytes)
68 68 else:
69 69 s = self.source
70 70 b = s.read(4096)
71 71 while b:
72 72 os.write(t, b)
73 73 b = s.read(4096)
74 74 os.close(t)
75 75
76 76 @property
77 77 def output(self):
78 78 return self.readiface
79 79
80 80
81 81 class InputStreamChunker(Thread):
82 82 def __init__(self, source, target, buffer_size, chunk_size):
83 83
84 84 super(InputStreamChunker, self).__init__()
85 85
86 86 self.daemon = True # die die die.
87 87
88 88 self.source = source
89 89 self.target = target
90 90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 91 self.chunk_size = chunk_size
92 92
93 93 self.data_added = Event()
94 94 self.data_added.clear()
95 95
96 96 self.keep_reading = Event()
97 97 self.keep_reading.set()
98 98
99 99 self.EOF = Event()
100 100 self.EOF.clear()
101 101
102 102 self.go = Event()
103 103 self.go.set()
104 104
105 105 def stop(self):
106 106 self.go.clear()
107 107 self.EOF.set()
108 108 try:
109 109 # this is not proper, but is done to force the reader thread let
110 110 # go of the input because, if successful, .close() will send EOF
111 111 # down the pipe.
112 112 self.source.close()
113 113 except:
114 114 pass
115 115
116 116 def run(self):
117 117 s = self.source
118 118 t = self.target
119 119 cs = self.chunk_size
120 ccm = self.chunk_count_max
120 chunk_count_max = self.chunk_count_max
121 121 keep_reading = self.keep_reading
122 122 da = self.data_added
123 123 go = self.go
124 124
125 125 try:
126 126 b = s.read(cs)
127 127 except ValueError:
128 128 b = ''
129 129
130 timeout_input = 20
130 131 while b and go.is_set():
131 if len(t) > ccm:
132 if len(t) > chunk_count_max:
132 133 keep_reading.clear()
133 keep_reading.wait(2)
134
135 if not keep_reading.wait(10):
136 raise Exception(
137 "Timed out while waiting for input to be read.")
138
134 keep_reading.wait(timeout_input)
135 if len(t) > chunk_count_max + timeout_input:
136 raise IOError(
137 "Timed out while waiting for input from subprocess.")
139 138 t.append(b)
140 139 da.set()
141 140 b = s.read(cs)
142 141 self.EOF.set()
143 142 da.set() # for cases when done but there was no input.
144 143
145 144
146 145 class BufferedGenerator(object):
147 146 """
148 147 Class behaves as a non-blocking, buffered pipe reader.
149 148 Reads chunks of data (through a thread)
150 149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
151 150 Reading is halted in the thread when max chunks is internally buffered.
152 151 The .next() may operate in blocking or non-blocking fashion by yielding
153 152 '' if no data is ready
154 153 to be sent or by not returning until there is some data to send
155 154 When we get EOF from underlying source pipe we raise the marker to raise
156 155 StopIteration after the last chunk of data is yielded.
157 156 """
158 157
159 158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
160 159 starting_values=None, bottomless=False):
161 160 starting_values = starting_values or []
162 161
163 162 if bottomless:
164 163 maxlen = int(buffer_size / chunk_size)
165 164 else:
166 165 maxlen = None
167 166
168 167 self.data = deque(starting_values, maxlen)
169 168 self.worker = InputStreamChunker(source, self.data, buffer_size,
170 169 chunk_size)
171 170 if starting_values:
172 171 self.worker.data_added.set()
173 172 self.worker.start()
174 173
175 174 ####################
176 175 # Generator's methods
177 176 ####################
178 177
179 178 def __iter__(self):
180 179 return self
181 180
182 181 def next(self):
183 182 while not len(self.data) and not self.worker.EOF.is_set():
184 183 self.worker.data_added.clear()
185 184 self.worker.data_added.wait(0.2)
186 185 if len(self.data):
187 186 self.worker.keep_reading.set()
188 187 return bytes(self.data.popleft())
189 188 elif self.worker.EOF.is_set():
190 189 raise StopIteration
191 190
192 191 def throw(self, exc_type, value=None, traceback=None):
193 192 if not self.worker.EOF.is_set():
194 193 raise exc_type(value)
195 194
196 195 def start(self):
197 196 self.worker.start()
198 197
199 198 def stop(self):
200 199 self.worker.stop()
201 200
202 201 def close(self):
203 202 try:
204 203 self.worker.stop()
205 204 self.throw(GeneratorExit)
206 205 except (GeneratorExit, StopIteration):
207 206 pass
208 207
209 208 def __del__(self):
210 209 self.close()
211 210
212 211 ####################
213 212 # Threaded reader's infrastructure.
214 213 ####################
215 214 @property
216 215 def input(self):
217 216 return self.worker.w
218 217
219 218 @property
220 219 def data_added_event(self):
221 220 return self.worker.data_added
222 221
223 222 @property
224 223 def data_added(self):
225 224 return self.worker.data_added.is_set()
226 225
227 226 @property
228 227 def reading_paused(self):
229 228 return not self.worker.keep_reading.is_set()
230 229
231 230 @property
232 231 def done_reading_event(self):
233 232 """
234 233 Done_reding does not mean that the iterator's buffer is empty.
235 234 Iterator might have done reading from underlying source, but the read
236 235 chunks might still be available for serving through .next() method.
237 236
238 237 :returns: An Event class instance.
239 238 """
240 239 return self.worker.EOF
241 240
242 241 @property
243 242 def done_reading(self):
244 243 """
245 244 Done_reding does not mean that the iterator's buffer is empty.
246 245 Iterator might have done reading from underlying source, but the read
247 246 chunks might still be available for serving through .next() method.
248 247
249 248 :returns: An Bool value.
250 249 """
251 250 return self.worker.EOF.is_set()
252 251
253 252 @property
254 253 def length(self):
255 254 """
256 255 returns int.
257 256
258 257 This is the lenght of the que of chunks, not the length of
259 258 the combined contents in those chunks.
260 259
261 260 __len__() cannot be meaningfully implemented because this
262 261 reader is just flying throuh a bottomless pit content and
263 262 can only know the lenght of what it already saw.
264 263
265 264 If __len__() on WSGI server per PEP 3333 returns a value,
266 265 the responce's length will be set to that. In order not to
267 266 confuse WSGI PEP3333 servers, we will not implement __len__
268 267 at all.
269 268 """
270 269 return len(self.data)
271 270
272 271 def prepend(self, x):
273 272 self.data.appendleft(x)
274 273
275 274 def append(self, x):
276 275 self.data.append(x)
277 276
278 277 def extend(self, o):
279 278 self.data.extend(o)
280 279
281 280 def __getitem__(self, i):
282 281 return self.data[i]
283 282
284 283
285 284 class SubprocessIOChunker(object):
286 285 """
287 286 Processor class wrapping handling of subprocess IO.
288 287
289 288 .. important::
290 289
291 290 Watch out for the method `__del__` on this class. If this object
292 291 is deleted, it will kill the subprocess, so avoid to
293 292 return the `output` attribute or usage of it like in the following
294 293 example::
295 294
296 295 # `args` expected to run a program that produces a lot of output
297 296 output = ''.join(SubprocessIOChunker(
298 297 args, shell=False, inputstream=inputstream, env=environ).output)
299 298
300 299 # `output` will not contain all the data, because the __del__ method
301 300 # has already killed the subprocess in this case before all output
302 301 # has been consumed.
303 302
304 303
305 304
306 305 In a way, this is a "communicate()" replacement with a twist.
307 306
308 307 - We are multithreaded. Writing in and reading out, err are all sep threads.
309 308 - We support concurrent (in and out) stream processing.
310 309 - The output is not a stream. It's a queue of read string (bytes, not unicode)
311 310 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
312 311 - We are non-blocking in more respects than communicate()
313 312 (reading from subprocess out pauses when internal buffer is full, but
314 313 does not block the parent calling code. On the flip side, reading from
315 314 slow-yielding subprocess may block the iteration until data shows up. This
316 315 does not block the parallel inpipe reading occurring parallel thread.)
317 316
318 317 The purpose of the object is to allow us to wrap subprocess interactions into
319 318 and interable that can be passed to a WSGI server as the application's return
320 319 value. Because of stream-processing-ability, WSGI does not have to read ALL
321 320 of the subprocess's output and buffer it, before handing it to WSGI server for
322 321 HTTP response. Instead, the class initializer reads just a bit of the stream
323 322 to figure out if error ocurred or likely to occur and if not, just hands the
324 323 further iteration over subprocess output to the server for completion of HTTP
325 324 response.
326 325
327 326 The real or perceived subprocess error is trapped and raised as one of
328 327 EnvironmentError family of exceptions
329 328
330 329 Example usage:
331 330 # try:
332 331 # answer = SubprocessIOChunker(
333 332 # cmd,
334 333 # input,
335 334 # buffer_size = 65536,
336 335 # chunk_size = 4096
337 336 # )
338 337 # except (EnvironmentError) as e:
339 338 # print str(e)
340 339 # raise e
341 340 #
342 341 # return answer
343 342
344 343
345 344 """
346 345
347 346 # TODO: johbo: This is used to make sure that the open end of the PIPE
348 347 # is closed in the end. It would be way better to wrap this into an
349 348 # object, so that it is closed automatically once it is consumed or
350 349 # something similar.
351 350 _close_input_fd = None
352 351
353 352 _closed = False
354 353
355 354 def __init__(self, cmd, inputstream=None, buffer_size=65536,
356 355 chunk_size=4096, starting_values=None, fail_on_stderr=True,
357 356 fail_on_return_code=True, **kwargs):
358 357 """
359 358 Initializes SubprocessIOChunker
360 359
361 360 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
362 361 :param inputstream: (Default: None) A file-like, string, or file pointer.
363 362 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
364 363 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
365 364 :param starting_values: (Default: []) An array of strings to put in front of output que.
366 365 :param fail_on_stderr: (Default: True) Whether to raise an exception in
367 366 case something is written to stderr.
368 367 :param fail_on_return_code: (Default: True) Whether to raise an
369 368 exception if the return code is not 0.
370 369 """
371 370
372 371 starting_values = starting_values or []
373 372 if inputstream:
374 373 input_streamer = StreamFeeder(inputstream)
375 374 input_streamer.start()
376 375 inputstream = input_streamer.output
377 376 self._close_input_fd = inputstream
378 377
379 378 self._fail_on_stderr = fail_on_stderr
380 379 self._fail_on_return_code = fail_on_return_code
381 380
382 381 _shell = kwargs.get('shell', True)
383 382 kwargs['shell'] = _shell
384 383
385 384 _p = subprocess.Popen(cmd, bufsize=-1,
386 385 stdin=inputstream,
387 386 stdout=subprocess.PIPE,
388 387 stderr=subprocess.PIPE,
389 388 **kwargs)
390 389
391 390 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
392 391 starting_values)
393 392 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
394 393
395 394 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
396 395 # doing this until we reach either end of file, or end of buffer.
397 396 bg_out.data_added_event.wait(1)
398 397 bg_out.data_added_event.clear()
399 398
400 399 # at this point it's still ambiguous if we are done reading or just full buffer.
401 400 # Either way, if error (returned by ended process, or implied based on
402 401 # presence of stuff in stderr output) we error out.
403 402 # Else, we are happy.
404 403 _returncode = _p.poll()
405 404
406 405 if ((_returncode and fail_on_return_code) or
407 406 (fail_on_stderr and _returncode is None and bg_err.length)):
408 407 try:
409 408 _p.terminate()
410 409 except Exception:
411 410 pass
412 411 bg_out.stop()
413 412 bg_err.stop()
414 413 if fail_on_stderr:
415 414 err = ''.join(bg_err)
416 415 raise EnvironmentError(
417 416 "Subprocess exited due to an error:\n" + err)
418 417 if _returncode and fail_on_return_code:
419 418 err = ''.join(bg_err)
420 419 if not err:
421 420 # maybe get empty stderr, try stdout instead
422 421 # in many cases git reports the errors on stdout too
423 422 err = ''.join(bg_out)
424 423 raise EnvironmentError(
425 424 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
426 425 _returncode, err))
427 426
428 427 self.process = _p
429 428 self.output = bg_out
430 429 self.error = bg_err
431 430
432 431 def __iter__(self):
433 432 return self
434 433
435 434 def next(self):
436 435 # Note: mikhail: We need to be sure that we are checking the return
437 436 # code after the stdout stream is closed. Some processes, e.g. git
438 437 # are doing some magic in between closing stdout and terminating the
439 438 # process and, as a result, we are not getting return code on "slow"
440 439 # systems.
441 440 result = None
442 441 stop_iteration = None
443 442 try:
444 443 result = self.output.next()
445 444 except StopIteration as e:
446 445 stop_iteration = e
447 446
448 447 if self.process.poll() and self._fail_on_return_code:
449 448 err = '%s' % ''.join(self.error)
450 449 raise EnvironmentError(
451 450 "Subprocess exited due to an error:\n" + err)
452 451
453 452 if stop_iteration:
454 453 raise stop_iteration
455 454 return result
456 455
457 456 def throw(self, type, value=None, traceback=None):
458 457 if self.output.length or not self.output.done_reading:
459 458 raise type(value)
460 459
461 460 def close(self):
462 461 if self._closed:
463 462 return
464 463 self._closed = True
465 464 try:
466 465 self.process.terminate()
467 466 except:
468 467 pass
469 468 if self._close_input_fd:
470 469 os.close(self._close_input_fd)
471 470 try:
472 471 self.output.close()
473 472 except:
474 473 pass
475 474 try:
476 475 self.error.close()
477 476 except:
478 477 pass
479 478
480 479 def __del__(self):
481 480 self.close()
General Comments 0
You need to be logged in to leave comments. Login now