##// END OF EJS Templates
git: report errors from stdout if stderr is empty
marcink -
r350:5473f1d4 default
parent child Browse files
Show More
@@ -1,477 +1,481 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import subprocess32 as subprocess
27 27 from collections import deque
28 28 from threading import Event, Thread
29 29
30 30
31 31 class StreamFeeder(Thread):
32 32 """
33 33 Normal writing into pipe-like is blocking once the buffer is filled.
34 34 This thread allows a thread to seep data from a file-like into a pipe
35 35 without blocking the main thread.
36 36 We close inpipe once the end of the source stream is reached.
37 37 """
38 38
39 39 def __init__(self, source):
40 40 super(StreamFeeder, self).__init__()
41 41 self.daemon = True
42 42 filelike = False
43 43 self.bytes = bytes()
44 44 if type(source) in (type(''), bytes, bytearray): # string-like
45 45 self.bytes = bytes(source)
46 46 else: # can be either file pointer or file-like
47 47 if type(source) in (int, long): # file pointer it is
48 48 # converting file descriptor (int) stdin into file-like
49 49 try:
50 50 source = os.fdopen(source, 'rb', 16384)
51 51 except Exception:
52 52 pass
53 53 # let's see if source is file-like by now
54 54 try:
55 55 filelike = source.read
56 56 except Exception:
57 57 pass
58 58 if not filelike and not self.bytes:
59 59 raise TypeError("StreamFeeder's source object must be a readable "
60 60 "file-like, a file descriptor, or a string-like.")
61 61 self.source = source
62 62 self.readiface, self.writeiface = os.pipe()
63 63
64 64 def run(self):
65 65 t = self.writeiface
66 66 if self.bytes:
67 67 os.write(t, self.bytes)
68 68 else:
69 69 s = self.source
70 70 b = s.read(4096)
71 71 while b:
72 72 os.write(t, b)
73 73 b = s.read(4096)
74 74 os.close(t)
75 75
76 76 @property
77 77 def output(self):
78 78 return self.readiface
79 79
80 80
81 81 class InputStreamChunker(Thread):
82 82 def __init__(self, source, target, buffer_size, chunk_size):
83 83
84 84 super(InputStreamChunker, self).__init__()
85 85
86 86 self.daemon = True # die die die.
87 87
88 88 self.source = source
89 89 self.target = target
90 90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 91 self.chunk_size = chunk_size
92 92
93 93 self.data_added = Event()
94 94 self.data_added.clear()
95 95
96 96 self.keep_reading = Event()
97 97 self.keep_reading.set()
98 98
99 99 self.EOF = Event()
100 100 self.EOF.clear()
101 101
102 102 self.go = Event()
103 103 self.go.set()
104 104
105 105 def stop(self):
106 106 self.go.clear()
107 107 self.EOF.set()
108 108 try:
109 109 # this is not proper, but is done to force the reader thread let
110 110 # go of the input because, if successful, .close() will send EOF
111 111 # down the pipe.
112 112 self.source.close()
113 113 except:
114 114 pass
115 115
116 116 def run(self):
117 117 s = self.source
118 118 t = self.target
119 119 cs = self.chunk_size
120 120 ccm = self.chunk_count_max
121 121 keep_reading = self.keep_reading
122 122 da = self.data_added
123 123 go = self.go
124 124
125 125 try:
126 126 b = s.read(cs)
127 127 except ValueError:
128 128 b = ''
129 129
130 130 while b and go.is_set():
131 131 if len(t) > ccm:
132 132 keep_reading.clear()
133 133 keep_reading.wait(2)
134 134
135 135 if not keep_reading.wait(10):
136 136 raise Exception(
137 137 "Timed out while waiting for input to be read.")
138 138
139 139 t.append(b)
140 140 da.set()
141 141 b = s.read(cs)
142 142 self.EOF.set()
143 143 da.set() # for cases when done but there was no input.
144 144
145 145
146 146 class BufferedGenerator(object):
147 147 """
148 148 Class behaves as a non-blocking, buffered pipe reader.
149 149 Reads chunks of data (through a thread)
150 150 from a blocking pipe, and attaches these to an array (Deque) of chunks.
151 151 Reading is halted in the thread when max chunks is internally buffered.
152 152 The .next() may operate in blocking or non-blocking fashion by yielding
153 153 '' if no data is ready
154 154 to be sent or by not returning until there is some data to send
155 155 When we get EOF from underlying source pipe we raise the marker to raise
156 156 StopIteration after the last chunk of data is yielded.
157 157 """
158 158
159 159 def __init__(self, source, buffer_size=65536, chunk_size=4096,
160 160 starting_values=None, bottomless=False):
161 161 starting_values = starting_values or []
162 162
163 163 if bottomless:
164 164 maxlen = int(buffer_size / chunk_size)
165 165 else:
166 166 maxlen = None
167 167
168 168 self.data = deque(starting_values, maxlen)
169 169 self.worker = InputStreamChunker(source, self.data, buffer_size,
170 170 chunk_size)
171 171 if starting_values:
172 172 self.worker.data_added.set()
173 173 self.worker.start()
174 174
175 175 ####################
176 176 # Generator's methods
177 177 ####################
178 178
179 179 def __iter__(self):
180 180 return self
181 181
182 182 def next(self):
183 183 while not len(self.data) and not self.worker.EOF.is_set():
184 184 self.worker.data_added.clear()
185 185 self.worker.data_added.wait(0.2)
186 186 if len(self.data):
187 187 self.worker.keep_reading.set()
188 188 return bytes(self.data.popleft())
189 189 elif self.worker.EOF.is_set():
190 190 raise StopIteration
191 191
192 192 def throw(self, exc_type, value=None, traceback=None):
193 193 if not self.worker.EOF.is_set():
194 194 raise exc_type(value)
195 195
196 196 def start(self):
197 197 self.worker.start()
198 198
199 199 def stop(self):
200 200 self.worker.stop()
201 201
202 202 def close(self):
203 203 try:
204 204 self.worker.stop()
205 205 self.throw(GeneratorExit)
206 206 except (GeneratorExit, StopIteration):
207 207 pass
208 208
209 209 def __del__(self):
210 210 self.close()
211 211
212 212 ####################
213 213 # Threaded reader's infrastructure.
214 214 ####################
215 215 @property
216 216 def input(self):
217 217 return self.worker.w
218 218
219 219 @property
220 220 def data_added_event(self):
221 221 return self.worker.data_added
222 222
223 223 @property
224 224 def data_added(self):
225 225 return self.worker.data_added.is_set()
226 226
227 227 @property
228 228 def reading_paused(self):
229 229 return not self.worker.keep_reading.is_set()
230 230
231 231 @property
232 232 def done_reading_event(self):
233 233 """
234 234 Done_reding does not mean that the iterator's buffer is empty.
235 235 Iterator might have done reading from underlying source, but the read
236 236 chunks might still be available for serving through .next() method.
237 237
238 238 :returns: An Event class instance.
239 239 """
240 240 return self.worker.EOF
241 241
242 242 @property
243 243 def done_reading(self):
244 244 """
245 245 Done_reding does not mean that the iterator's buffer is empty.
246 246 Iterator might have done reading from underlying source, but the read
247 247 chunks might still be available for serving through .next() method.
248 248
249 249 :returns: An Bool value.
250 250 """
251 251 return self.worker.EOF.is_set()
252 252
253 253 @property
254 254 def length(self):
255 255 """
256 256 returns int.
257 257
258 258 This is the lenght of the que of chunks, not the length of
259 259 the combined contents in those chunks.
260 260
261 261 __len__() cannot be meaningfully implemented because this
262 262 reader is just flying throuh a bottomless pit content and
263 263 can only know the lenght of what it already saw.
264 264
265 265 If __len__() on WSGI server per PEP 3333 returns a value,
266 266 the responce's length will be set to that. In order not to
267 267 confuse WSGI PEP3333 servers, we will not implement __len__
268 268 at all.
269 269 """
270 270 return len(self.data)
271 271
272 272 def prepend(self, x):
273 273 self.data.appendleft(x)
274 274
275 275 def append(self, x):
276 276 self.data.append(x)
277 277
278 278 def extend(self, o):
279 279 self.data.extend(o)
280 280
281 281 def __getitem__(self, i):
282 282 return self.data[i]
283 283
284 284
285 285 class SubprocessIOChunker(object):
286 286 """
287 287 Processor class wrapping handling of subprocess IO.
288 288
289 289 .. important::
290 290
291 291 Watch out for the method `__del__` on this class. If this object
292 292 is deleted, it will kill the subprocess, so avoid to
293 293 return the `output` attribute or usage of it like in the following
294 294 example::
295 295
296 296 # `args` expected to run a program that produces a lot of output
297 297 output = ''.join(SubprocessIOChunker(
298 298 args, shell=False, inputstream=inputstream, env=environ).output)
299 299
300 300 # `output` will not contain all the data, because the __del__ method
301 301 # has already killed the subprocess in this case before all output
302 302 # has been consumed.
303 303
304 304
305 305
306 306 In a way, this is a "communicate()" replacement with a twist.
307 307
308 308 - We are multithreaded. Writing in and reading out, err are all sep threads.
309 309 - We support concurrent (in and out) stream processing.
310 310 - The output is not a stream. It's a queue of read string (bytes, not unicode)
311 311 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
312 312 - We are non-blocking in more respects than communicate()
313 313 (reading from subprocess out pauses when internal buffer is full, but
314 314 does not block the parent calling code. On the flip side, reading from
315 315 slow-yielding subprocess may block the iteration until data shows up. This
316 316 does not block the parallel inpipe reading occurring parallel thread.)
317 317
318 318 The purpose of the object is to allow us to wrap subprocess interactions into
319 319 and interable that can be passed to a WSGI server as the application's return
320 320 value. Because of stream-processing-ability, WSGI does not have to read ALL
321 321 of the subprocess's output and buffer it, before handing it to WSGI server for
322 322 HTTP response. Instead, the class initializer reads just a bit of the stream
323 323 to figure out if error ocurred or likely to occur and if not, just hands the
324 324 further iteration over subprocess output to the server for completion of HTTP
325 325 response.
326 326
327 327 The real or perceived subprocess error is trapped and raised as one of
328 328 EnvironmentError family of exceptions
329 329
330 330 Example usage:
331 331 # try:
332 332 # answer = SubprocessIOChunker(
333 333 # cmd,
334 334 # input,
335 335 # buffer_size = 65536,
336 336 # chunk_size = 4096
337 337 # )
338 338 # except (EnvironmentError) as e:
339 339 # print str(e)
340 340 # raise e
341 341 #
342 342 # return answer
343 343
344 344
345 345 """
346 346
347 347 # TODO: johbo: This is used to make sure that the open end of the PIPE
348 348 # is closed in the end. It would be way better to wrap this into an
349 349 # object, so that it is closed automatically once it is consumed or
350 350 # something similar.
351 351 _close_input_fd = None
352 352
353 353 _closed = False
354 354
355 355 def __init__(self, cmd, inputstream=None, buffer_size=65536,
356 356 chunk_size=4096, starting_values=None, fail_on_stderr=True,
357 357 fail_on_return_code=True, **kwargs):
358 358 """
359 359 Initializes SubprocessIOChunker
360 360
361 361 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
362 362 :param inputstream: (Default: None) A file-like, string, or file pointer.
363 363 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
364 364 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
365 365 :param starting_values: (Default: []) An array of strings to put in front of output que.
366 366 :param fail_on_stderr: (Default: True) Whether to raise an exception in
367 367 case something is written to stderr.
368 368 :param fail_on_return_code: (Default: True) Whether to raise an
369 369 exception if the return code is not 0.
370 370 """
371 371
372 372 starting_values = starting_values or []
373 373 if inputstream:
374 374 input_streamer = StreamFeeder(inputstream)
375 375 input_streamer.start()
376 376 inputstream = input_streamer.output
377 377 self._close_input_fd = inputstream
378 378
379 379 self._fail_on_stderr = fail_on_stderr
380 380 self._fail_on_return_code = fail_on_return_code
381 381
382 382 _shell = kwargs.get('shell', True)
383 383 kwargs['shell'] = _shell
384 384
385 385 _p = subprocess.Popen(cmd, bufsize=-1,
386 386 stdin=inputstream,
387 387 stdout=subprocess.PIPE,
388 388 stderr=subprocess.PIPE,
389 389 **kwargs)
390 390
391 391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
392 392 starting_values)
393 393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
394 394
395 395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
396 396 # doing this until we reach either end of file, or end of buffer.
397 397 bg_out.data_added_event.wait(1)
398 398 bg_out.data_added_event.clear()
399 399
400 400 # at this point it's still ambiguous if we are done reading or just full buffer.
401 401 # Either way, if error (returned by ended process, or implied based on
402 402 # presence of stuff in stderr output) we error out.
403 403 # Else, we are happy.
404 404 _returncode = _p.poll()
405 405
406 406 if ((_returncode and fail_on_return_code) or
407 407 (fail_on_stderr and _returncode is None and bg_err.length)):
408 408 try:
409 409 _p.terminate()
410 410 except Exception:
411 411 pass
412 412 bg_out.stop()
413 413 bg_err.stop()
414 414 if fail_on_stderr:
415 415 err = ''.join(bg_err)
416 416 raise EnvironmentError(
417 417 "Subprocess exited due to an error:\n" + err)
418 418 if _returncode and fail_on_return_code:
419 419 err = ''.join(bg_err)
420 if not err:
421 # maybe get empty stderr, try stdout instead
422 # in many cases git reports the errors on stdout too
423 err = ''.join(bg_out)
420 424 raise EnvironmentError(
421 425 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
422 426 _returncode, err))
423 427
424 428 self.process = _p
425 429 self.output = bg_out
426 430 self.error = bg_err
427 431
428 432 def __iter__(self):
429 433 return self
430 434
431 435 def next(self):
432 436 # Note: mikhail: We need to be sure that we are checking the return
433 437 # code after the stdout stream is closed. Some processes, e.g. git
434 438 # are doing some magic in between closing stdout and terminating the
435 439 # process and, as a result, we are not getting return code on "slow"
436 440 # systems.
437 441 result = None
438 442 stop_iteration = None
439 443 try:
440 444 result = self.output.next()
441 445 except StopIteration as e:
442 446 stop_iteration = e
443 447
444 448 if self.process.poll() and self._fail_on_return_code:
445 449 err = '%s' % ''.join(self.error)
446 450 raise EnvironmentError(
447 451 "Subprocess exited due to an error:\n" + err)
448 452
449 453 if stop_iteration:
450 454 raise stop_iteration
451 455 return result
452 456
453 457 def throw(self, type, value=None, traceback=None):
454 458 if self.output.length or not self.output.done_reading:
455 459 raise type(value)
456 460
457 461 def close(self):
458 462 if self._closed:
459 463 return
460 464 self._closed = True
461 465 try:
462 466 self.process.terminate()
463 467 except:
464 468 pass
465 469 if self._close_input_fd:
466 470 os.close(self._close_input_fd)
467 471 try:
468 472 self.output.close()
469 473 except:
470 474 pass
471 475 try:
472 476 self.error.close()
473 477 except:
474 478 pass
475 479
476 480 def __del__(self):
477 481 self.close()
General Comments 0
You need to be logged in to leave comments. Login now