##// END OF EJS Templates
subprocessio: instead of raising a Thread error we sent a signal to gunicorn....
marcink -
r358:12f7a361 stable
parent child Browse files
Show More
@@ -1,480 +1,484 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 import logging
26 27 import subprocess32 as subprocess
27 28 from collections import deque
28 29 from threading import Event, Thread
29 30
31 log = logging.getLogger(__name__)
32
30 33
31 34 class StreamFeeder(Thread):
32 35 """
33 36 Normal writing into pipe-like is blocking once the buffer is filled.
34 37 This thread allows a thread to seep data from a file-like into a pipe
35 38 without blocking the main thread.
36 39 We close inpipe once the end of the source stream is reached.
37 40 """
38 41
39 42 def __init__(self, source):
40 43 super(StreamFeeder, self).__init__()
41 44 self.daemon = True
42 45 filelike = False
43 46 self.bytes = bytes()
44 47 if type(source) in (type(''), bytes, bytearray): # string-like
45 48 self.bytes = bytes(source)
46 49 else: # can be either file pointer or file-like
47 50 if type(source) in (int, long): # file pointer it is
48 51 # converting file descriptor (int) stdin into file-like
49 52 try:
50 53 source = os.fdopen(source, 'rb', 16384)
51 54 except Exception:
52 55 pass
53 56 # let's see if source is file-like by now
54 57 try:
55 58 filelike = source.read
56 59 except Exception:
57 60 pass
58 61 if not filelike and not self.bytes:
59 62 raise TypeError("StreamFeeder's source object must be a readable "
60 63 "file-like, a file descriptor, or a string-like.")
61 64 self.source = source
62 65 self.readiface, self.writeiface = os.pipe()
63 66
64 67 def run(self):
65 68 t = self.writeiface
66 69 if self.bytes:
67 70 os.write(t, self.bytes)
68 71 else:
69 72 s = self.source
70 73 b = s.read(4096)
71 74 while b:
72 75 os.write(t, b)
73 76 b = s.read(4096)
74 77 os.close(t)
75 78
76 79 @property
77 80 def output(self):
78 81 return self.readiface
79 82
80 83
81 84 class InputStreamChunker(Thread):
82 85 def __init__(self, source, target, buffer_size, chunk_size):
83 86
84 87 super(InputStreamChunker, self).__init__()
85 88
86 89 self.daemon = True # die die die.
87 90
88 91 self.source = source
89 92 self.target = target
90 93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 94 self.chunk_size = chunk_size
92 95
93 96 self.data_added = Event()
94 97 self.data_added.clear()
95 98
96 99 self.keep_reading = Event()
97 100 self.keep_reading.set()
98 101
99 102 self.EOF = Event()
100 103 self.EOF.clear()
101 104
102 105 self.go = Event()
103 106 self.go.set()
104 107
105 108 def stop(self):
106 109 self.go.clear()
107 110 self.EOF.set()
108 111 try:
109 112 # this is not proper, but is done to force the reader thread let
110 113 # go of the input because, if successful, .close() will send EOF
111 114 # down the pipe.
112 115 self.source.close()
113 116 except:
114 117 pass
115 118
116 119 def run(self):
117 120 s = self.source
118 121 t = self.target
119 122 cs = self.chunk_size
120 123 chunk_count_max = self.chunk_count_max
121 124 keep_reading = self.keep_reading
122 125 da = self.data_added
123 126 go = self.go
124 127
125 128 try:
126 129 b = s.read(cs)
127 130 except ValueError:
128 131 b = ''
129 132
130 133 timeout_input = 20
131 134 while b and go.is_set():
132 135 if len(t) > chunk_count_max:
133 136 keep_reading.clear()
134 137 keep_reading.wait(timeout_input)
135 138 if len(t) > chunk_count_max + timeout_input:
136 raise IOError(
137 "Timed out while waiting for input from subprocess.")
139 log.error("Timed out while waiting for input from subprocess.")
140 os._exit(-1) # this will cause the worker to recycle itself
141
138 142 t.append(b)
139 143 da.set()
140 144 b = s.read(cs)
141 145 self.EOF.set()
142 146 da.set() # for cases when done but there was no input.
143 147
144 148
145 149 class BufferedGenerator(object):
146 150 """
147 151 Class behaves as a non-blocking, buffered pipe reader.
148 152 Reads chunks of data (through a thread)
149 153 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 154 Reading is halted in the thread when max chunks is internally buffered.
151 155 The .next() may operate in blocking or non-blocking fashion by yielding
152 156 '' if no data is ready
153 157 to be sent or by not returning until there is some data to send
154 158 When we get EOF from underlying source pipe we raise the marker to raise
155 159 StopIteration after the last chunk of data is yielded.
156 160 """
157 161
158 162 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 163 starting_values=None, bottomless=False):
160 164 starting_values = starting_values or []
161 165
162 166 if bottomless:
163 167 maxlen = int(buffer_size / chunk_size)
164 168 else:
165 169 maxlen = None
166 170
167 171 self.data = deque(starting_values, maxlen)
168 172 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 173 chunk_size)
170 174 if starting_values:
171 175 self.worker.data_added.set()
172 176 self.worker.start()
173 177
174 178 ####################
175 179 # Generator's methods
176 180 ####################
177 181
178 182 def __iter__(self):
179 183 return self
180 184
181 185 def next(self):
182 186 while not len(self.data) and not self.worker.EOF.is_set():
183 187 self.worker.data_added.clear()
184 188 self.worker.data_added.wait(0.2)
185 189 if len(self.data):
186 190 self.worker.keep_reading.set()
187 191 return bytes(self.data.popleft())
188 192 elif self.worker.EOF.is_set():
189 193 raise StopIteration
190 194
191 195 def throw(self, exc_type, value=None, traceback=None):
192 196 if not self.worker.EOF.is_set():
193 197 raise exc_type(value)
194 198
195 199 def start(self):
196 200 self.worker.start()
197 201
198 202 def stop(self):
199 203 self.worker.stop()
200 204
201 205 def close(self):
202 206 try:
203 207 self.worker.stop()
204 208 self.throw(GeneratorExit)
205 209 except (GeneratorExit, StopIteration):
206 210 pass
207 211
208 212 def __del__(self):
209 213 self.close()
210 214
211 215 ####################
212 216 # Threaded reader's infrastructure.
213 217 ####################
214 218 @property
215 219 def input(self):
216 220 return self.worker.w
217 221
218 222 @property
219 223 def data_added_event(self):
220 224 return self.worker.data_added
221 225
222 226 @property
223 227 def data_added(self):
224 228 return self.worker.data_added.is_set()
225 229
226 230 @property
227 231 def reading_paused(self):
228 232 return not self.worker.keep_reading.is_set()
229 233
230 234 @property
231 235 def done_reading_event(self):
232 236 """
233 237 Done_reding does not mean that the iterator's buffer is empty.
234 238 Iterator might have done reading from underlying source, but the read
235 239 chunks might still be available for serving through .next() method.
236 240
237 241 :returns: An Event class instance.
238 242 """
239 243 return self.worker.EOF
240 244
241 245 @property
242 246 def done_reading(self):
243 247 """
244 248 Done_reding does not mean that the iterator's buffer is empty.
245 249 Iterator might have done reading from underlying source, but the read
246 250 chunks might still be available for serving through .next() method.
247 251
248 252 :returns: An Bool value.
249 253 """
250 254 return self.worker.EOF.is_set()
251 255
252 256 @property
253 257 def length(self):
254 258 """
255 259 returns int.
256 260
257 261 This is the lenght of the que of chunks, not the length of
258 262 the combined contents in those chunks.
259 263
260 264 __len__() cannot be meaningfully implemented because this
261 265 reader is just flying throuh a bottomless pit content and
262 266 can only know the lenght of what it already saw.
263 267
264 268 If __len__() on WSGI server per PEP 3333 returns a value,
265 269 the responce's length will be set to that. In order not to
266 270 confuse WSGI PEP3333 servers, we will not implement __len__
267 271 at all.
268 272 """
269 273 return len(self.data)
270 274
271 275 def prepend(self, x):
272 276 self.data.appendleft(x)
273 277
274 278 def append(self, x):
275 279 self.data.append(x)
276 280
277 281 def extend(self, o):
278 282 self.data.extend(o)
279 283
280 284 def __getitem__(self, i):
281 285 return self.data[i]
282 286
283 287
284 288 class SubprocessIOChunker(object):
285 289 """
286 290 Processor class wrapping handling of subprocess IO.
287 291
288 292 .. important::
289 293
290 294 Watch out for the method `__del__` on this class. If this object
291 295 is deleted, it will kill the subprocess, so avoid to
292 296 return the `output` attribute or usage of it like in the following
293 297 example::
294 298
295 299 # `args` expected to run a program that produces a lot of output
296 300 output = ''.join(SubprocessIOChunker(
297 301 args, shell=False, inputstream=inputstream, env=environ).output)
298 302
299 303 # `output` will not contain all the data, because the __del__ method
300 304 # has already killed the subprocess in this case before all output
301 305 # has been consumed.
302 306
303 307
304 308
305 309 In a way, this is a "communicate()" replacement with a twist.
306 310
307 311 - We are multithreaded. Writing in and reading out, err are all sep threads.
308 312 - We support concurrent (in and out) stream processing.
309 313 - The output is not a stream. It's a queue of read string (bytes, not unicode)
310 314 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
311 315 - We are non-blocking in more respects than communicate()
312 316 (reading from subprocess out pauses when internal buffer is full, but
313 317 does not block the parent calling code. On the flip side, reading from
314 318 slow-yielding subprocess may block the iteration until data shows up. This
315 319 does not block the parallel inpipe reading occurring parallel thread.)
316 320
317 321 The purpose of the object is to allow us to wrap subprocess interactions into
318 322 and interable that can be passed to a WSGI server as the application's return
319 323 value. Because of stream-processing-ability, WSGI does not have to read ALL
320 324 of the subprocess's output and buffer it, before handing it to WSGI server for
321 325 HTTP response. Instead, the class initializer reads just a bit of the stream
322 326 to figure out if error ocurred or likely to occur and if not, just hands the
323 327 further iteration over subprocess output to the server for completion of HTTP
324 328 response.
325 329
326 330 The real or perceived subprocess error is trapped and raised as one of
327 331 EnvironmentError family of exceptions
328 332
329 333 Example usage:
330 334 # try:
331 335 # answer = SubprocessIOChunker(
332 336 # cmd,
333 337 # input,
334 338 # buffer_size = 65536,
335 339 # chunk_size = 4096
336 340 # )
337 341 # except (EnvironmentError) as e:
338 342 # print str(e)
339 343 # raise e
340 344 #
341 345 # return answer
342 346
343 347
344 348 """
345 349
346 350 # TODO: johbo: This is used to make sure that the open end of the PIPE
347 351 # is closed in the end. It would be way better to wrap this into an
348 352 # object, so that it is closed automatically once it is consumed or
349 353 # something similar.
350 354 _close_input_fd = None
351 355
352 356 _closed = False
353 357
354 358 def __init__(self, cmd, inputstream=None, buffer_size=65536,
355 359 chunk_size=4096, starting_values=None, fail_on_stderr=True,
356 360 fail_on_return_code=True, **kwargs):
357 361 """
358 362 Initializes SubprocessIOChunker
359 363
360 364 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
361 365 :param inputstream: (Default: None) A file-like, string, or file pointer.
362 366 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
363 367 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
364 368 :param starting_values: (Default: []) An array of strings to put in front of output que.
365 369 :param fail_on_stderr: (Default: True) Whether to raise an exception in
366 370 case something is written to stderr.
367 371 :param fail_on_return_code: (Default: True) Whether to raise an
368 372 exception if the return code is not 0.
369 373 """
370 374
371 375 starting_values = starting_values or []
372 376 if inputstream:
373 377 input_streamer = StreamFeeder(inputstream)
374 378 input_streamer.start()
375 379 inputstream = input_streamer.output
376 380 self._close_input_fd = inputstream
377 381
378 382 self._fail_on_stderr = fail_on_stderr
379 383 self._fail_on_return_code = fail_on_return_code
380 384
381 385 _shell = kwargs.get('shell', True)
382 386 kwargs['shell'] = _shell
383 387
384 388 _p = subprocess.Popen(cmd, bufsize=-1,
385 389 stdin=inputstream,
386 390 stdout=subprocess.PIPE,
387 391 stderr=subprocess.PIPE,
388 392 **kwargs)
389 393
390 394 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
391 395 starting_values)
392 396 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
393 397
394 398 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
395 399 # doing this until we reach either end of file, or end of buffer.
396 400 bg_out.data_added_event.wait(1)
397 401 bg_out.data_added_event.clear()
398 402
399 403 # at this point it's still ambiguous if we are done reading or just full buffer.
400 404 # Either way, if error (returned by ended process, or implied based on
401 405 # presence of stuff in stderr output) we error out.
402 406 # Else, we are happy.
403 407 _returncode = _p.poll()
404 408
405 409 if ((_returncode and fail_on_return_code) or
406 410 (fail_on_stderr and _returncode is None and bg_err.length)):
407 411 try:
408 412 _p.terminate()
409 413 except Exception:
410 414 pass
411 415 bg_out.stop()
412 416 bg_err.stop()
413 417 if fail_on_stderr:
414 418 err = ''.join(bg_err)
415 419 raise EnvironmentError(
416 420 "Subprocess exited due to an error:\n" + err)
417 421 if _returncode and fail_on_return_code:
418 422 err = ''.join(bg_err)
419 423 if not err:
420 424 # maybe get empty stderr, try stdout instead
421 425 # in many cases git reports the errors on stdout too
422 426 err = ''.join(bg_out)
423 427 raise EnvironmentError(
424 428 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
425 429 _returncode, err))
426 430
427 431 self.process = _p
428 432 self.output = bg_out
429 433 self.error = bg_err
430 434
431 435 def __iter__(self):
432 436 return self
433 437
434 438 def next(self):
435 439 # Note: mikhail: We need to be sure that we are checking the return
436 440 # code after the stdout stream is closed. Some processes, e.g. git
437 441 # are doing some magic in between closing stdout and terminating the
438 442 # process and, as a result, we are not getting return code on "slow"
439 443 # systems.
440 444 result = None
441 445 stop_iteration = None
442 446 try:
443 447 result = self.output.next()
444 448 except StopIteration as e:
445 449 stop_iteration = e
446 450
447 451 if self.process.poll() and self._fail_on_return_code:
448 452 err = '%s' % ''.join(self.error)
449 453 raise EnvironmentError(
450 454 "Subprocess exited due to an error:\n" + err)
451 455
452 456 if stop_iteration:
453 457 raise stop_iteration
454 458 return result
455 459
456 460 def throw(self, type, value=None, traceback=None):
457 461 if self.output.length or not self.output.done_reading:
458 462 raise type(value)
459 463
460 464 def close(self):
461 465 if self._closed:
462 466 return
463 467 self._closed = True
464 468 try:
465 469 self.process.terminate()
466 470 except:
467 471 pass
468 472 if self._close_input_fd:
469 473 os.close(self._close_input_fd)
470 474 try:
471 475 self.output.close()
472 476 except:
473 477 pass
474 478 try:
475 479 self.error.close()
476 480 except:
477 481 pass
478 482
479 483 def __del__(self):
480 484 self.close()
General Comments 0
You need to be logged in to leave comments. Login now