##// END OF EJS Templates
subprocession: limit fd leaks
marcink -
r624:560860c2 default
parent child Browse files
Show More
@@ -1,516 +1,523 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import logging
27 27 import subprocess32 as subprocess
28 28 from collections import deque
29 29 from threading import Event, Thread
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class StreamFeeder(Thread):
35 35 """
36 36 Normal writing into pipe-like is blocking once the buffer is filled.
37 37 This thread allows a thread to seep data from a file-like into a pipe
38 38 without blocking the main thread.
39 39 We close inpipe once the end of the source stream is reached.
40 40 """
41 41
42 42 def __init__(self, source):
43 43 super(StreamFeeder, self).__init__()
44 44 self.daemon = True
45 45 filelike = False
46 46 self.bytes = bytes()
47 47 if type(source) in (type(''), bytes, bytearray): # string-like
48 48 self.bytes = bytes(source)
49 49 else: # can be either file pointer or file-like
50 50 if type(source) in (int, long): # file pointer it is
51 51 # converting file descriptor (int) stdin into file-like
52 52 try:
53 53 source = os.fdopen(source, 'rb', 16384)
54 54 except Exception:
55 55 pass
56 56 # let's see if source is file-like by now
57 57 try:
58 58 filelike = source.read
59 59 except Exception:
60 60 pass
61 61 if not filelike and not self.bytes:
62 62 raise TypeError("StreamFeeder's source object must be a readable "
63 63 "file-like, a file descriptor, or a string-like.")
64 64 self.source = source
65 65 self.readiface, self.writeiface = os.pipe()
66 66
67 67 def run(self):
68 68 t = self.writeiface
69 if self.bytes:
70 os.write(t, self.bytes)
71 else:
72 s = self.source
73 b = s.read(4096)
74 while b:
75 os.write(t, b)
69 try:
70 if self.bytes:
71 os.write(t, self.bytes)
72 else:
73 s = self.source
76 74 b = s.read(4096)
77 os.close(t)
75 while b:
76 os.write(t, b)
77 b = s.read(4096)
78 finally:
79 os.close(t)
78 80
79 81 @property
80 82 def output(self):
81 83 return self.readiface
82 84
83 85
84 86 class InputStreamChunker(Thread):
85 87 def __init__(self, source, target, buffer_size, chunk_size):
86 88
87 89 super(InputStreamChunker, self).__init__()
88 90
89 91 self.daemon = True # die die die.
90 92
91 93 self.source = source
92 94 self.target = target
93 95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 96 self.chunk_size = chunk_size
95 97
96 98 self.data_added = Event()
97 99 self.data_added.clear()
98 100
99 101 self.keep_reading = Event()
100 102 self.keep_reading.set()
101 103
102 104 self.EOF = Event()
103 105 self.EOF.clear()
104 106
105 107 self.go = Event()
106 108 self.go.set()
107 109
108 110 def stop(self):
109 111 self.go.clear()
110 112 self.EOF.set()
111 113 try:
112 114 # this is not proper, but is done to force the reader thread let
113 115 # go of the input because, if successful, .close() will send EOF
114 116 # down the pipe.
115 117 self.source.close()
116 118 except:
117 119 pass
118 120
119 121 def run(self):
120 122 s = self.source
121 123 t = self.target
122 124 cs = self.chunk_size
123 125 chunk_count_max = self.chunk_count_max
124 126 keep_reading = self.keep_reading
125 127 da = self.data_added
126 128 go = self.go
127 129
128 130 try:
129 131 b = s.read(cs)
130 132 except ValueError:
131 133 b = ''
132 134
133 135 timeout_input = 20
134 136 while b and go.is_set():
135 137 if len(t) > chunk_count_max:
136 138 keep_reading.clear()
137 139 keep_reading.wait(timeout_input)
138 140 if len(t) > chunk_count_max + timeout_input:
139 141 log.error("Timed out while waiting for input from subprocess.")
140 142 os._exit(-1) # this will cause the worker to recycle itself
141 143
142 144 t.append(b)
143 145 da.set()
144 146
145 147 try:
146 148 b = s.read(cs)
147 149 except ValueError:
148 150 b = ''
149 151
150 152 self.EOF.set()
151 153 da.set() # for cases when done but there was no input.
152 154
153 155
154 156 class BufferedGenerator(object):
155 157 """
156 158 Class behaves as a non-blocking, buffered pipe reader.
157 159 Reads chunks of data (through a thread)
158 160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
159 161 Reading is halted in the thread when max chunks is internally buffered.
160 162 The .next() may operate in blocking or non-blocking fashion by yielding
161 163 '' if no data is ready
162 164 to be sent or by not returning until there is some data to send
163 165 When we get EOF from underlying source pipe we raise the marker to raise
164 166 StopIteration after the last chunk of data is yielded.
165 167 """
166 168
167 169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
168 170 starting_values=None, bottomless=False):
169 171 starting_values = starting_values or []
170 172
171 173 if bottomless:
172 174 maxlen = int(buffer_size / chunk_size)
173 175 else:
174 176 maxlen = None
175 177
176 178 self.data = deque(starting_values, maxlen)
177 179 self.worker = InputStreamChunker(source, self.data, buffer_size,
178 180 chunk_size)
179 181 if starting_values:
180 182 self.worker.data_added.set()
181 183 self.worker.start()
182 184
183 185 ####################
184 186 # Generator's methods
185 187 ####################
186 188
187 189 def __iter__(self):
188 190 return self
189 191
190 192 def next(self):
191 193 while not len(self.data) and not self.worker.EOF.is_set():
192 194 self.worker.data_added.clear()
193 195 self.worker.data_added.wait(0.2)
194 196 if len(self.data):
195 197 self.worker.keep_reading.set()
196 198 return bytes(self.data.popleft())
197 199 elif self.worker.EOF.is_set():
198 200 raise StopIteration
199 201
200 202 def throw(self, exc_type, value=None, traceback=None):
201 203 if not self.worker.EOF.is_set():
202 204 raise exc_type(value)
203 205
204 206 def start(self):
205 207 self.worker.start()
206 208
207 209 def stop(self):
208 210 self.worker.stop()
209 211
210 212 def close(self):
211 213 try:
212 214 self.worker.stop()
213 215 self.throw(GeneratorExit)
214 216 except (GeneratorExit, StopIteration):
215 217 pass
216 218
217 219 def __del__(self):
218 220 self.close()
219 221
220 222 ####################
221 223 # Threaded reader's infrastructure.
222 224 ####################
223 225 @property
224 226 def input(self):
225 227 return self.worker.w
226 228
227 229 @property
228 230 def data_added_event(self):
229 231 return self.worker.data_added
230 232
231 233 @property
232 234 def data_added(self):
233 235 return self.worker.data_added.is_set()
234 236
235 237 @property
236 238 def reading_paused(self):
237 239 return not self.worker.keep_reading.is_set()
238 240
239 241 @property
240 242 def done_reading_event(self):
241 243 """
242 244 Done_reding does not mean that the iterator's buffer is empty.
243 245 Iterator might have done reading from underlying source, but the read
244 246 chunks might still be available for serving through .next() method.
245 247
246 248 :returns: An Event class instance.
247 249 """
248 250 return self.worker.EOF
249 251
250 252 @property
251 253 def done_reading(self):
252 254 """
253 255 Done_reding does not mean that the iterator's buffer is empty.
254 256 Iterator might have done reading from underlying source, but the read
255 257 chunks might still be available for serving through .next() method.
256 258
257 259 :returns: An Bool value.
258 260 """
259 261 return self.worker.EOF.is_set()
260 262
261 263 @property
262 264 def length(self):
263 265 """
264 266 returns int.
265 267
266 268 This is the lenght of the que of chunks, not the length of
267 269 the combined contents in those chunks.
268 270
269 271 __len__() cannot be meaningfully implemented because this
270 272 reader is just flying throuh a bottomless pit content and
271 273 can only know the lenght of what it already saw.
272 274
273 275 If __len__() on WSGI server per PEP 3333 returns a value,
274 276 the responce's length will be set to that. In order not to
275 277 confuse WSGI PEP3333 servers, we will not implement __len__
276 278 at all.
277 279 """
278 280 return len(self.data)
279 281
280 282 def prepend(self, x):
281 283 self.data.appendleft(x)
282 284
283 285 def append(self, x):
284 286 self.data.append(x)
285 287
286 288 def extend(self, o):
287 289 self.data.extend(o)
288 290
289 291 def __getitem__(self, i):
290 292 return self.data[i]
291 293
292 294
293 295 class SubprocessIOChunker(object):
294 296 """
295 297 Processor class wrapping handling of subprocess IO.
296 298
297 299 .. important::
298 300
299 301 Watch out for the method `__del__` on this class. If this object
300 302 is deleted, it will kill the subprocess, so avoid to
301 303 return the `output` attribute or usage of it like in the following
302 304 example::
303 305
304 306 # `args` expected to run a program that produces a lot of output
305 307 output = ''.join(SubprocessIOChunker(
306 308 args, shell=False, inputstream=inputstream, env=environ).output)
307 309
308 310 # `output` will not contain all the data, because the __del__ method
309 311 # has already killed the subprocess in this case before all output
310 312 # has been consumed.
311 313
312 314
313 315
314 316 In a way, this is a "communicate()" replacement with a twist.
315 317
316 318 - We are multithreaded. Writing in and reading out, err are all sep threads.
317 319 - We support concurrent (in and out) stream processing.
318 320 - The output is not a stream. It's a queue of read string (bytes, not unicode)
319 321 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
320 322 - We are non-blocking in more respects than communicate()
321 323 (reading from subprocess out pauses when internal buffer is full, but
322 324 does not block the parent calling code. On the flip side, reading from
323 325 slow-yielding subprocess may block the iteration until data shows up. This
324 326 does not block the parallel inpipe reading occurring parallel thread.)
325 327
326 328 The purpose of the object is to allow us to wrap subprocess interactions into
327 329 and interable that can be passed to a WSGI server as the application's return
328 330 value. Because of stream-processing-ability, WSGI does not have to read ALL
329 331 of the subprocess's output and buffer it, before handing it to WSGI server for
330 332 HTTP response. Instead, the class initializer reads just a bit of the stream
331 333 to figure out if error ocurred or likely to occur and if not, just hands the
332 334 further iteration over subprocess output to the server for completion of HTTP
333 335 response.
334 336
335 337 The real or perceived subprocess error is trapped and raised as one of
336 338 EnvironmentError family of exceptions
337 339
338 340 Example usage:
339 341 # try:
340 342 # answer = SubprocessIOChunker(
341 343 # cmd,
342 344 # input,
343 345 # buffer_size = 65536,
344 346 # chunk_size = 4096
345 347 # )
346 348 # except (EnvironmentError) as e:
347 349 # print str(e)
348 350 # raise e
349 351 #
350 352 # return answer
351 353
352 354
353 355 """
354 356
355 357 # TODO: johbo: This is used to make sure that the open end of the PIPE
356 358 # is closed in the end. It would be way better to wrap this into an
357 359 # object, so that it is closed automatically once it is consumed or
358 360 # something similar.
359 361 _close_input_fd = None
360 362
361 363 _closed = False
362 364
363 365 def __init__(self, cmd, inputstream=None, buffer_size=65536,
364 366 chunk_size=4096, starting_values=None, fail_on_stderr=True,
365 367 fail_on_return_code=True, **kwargs):
366 368 """
367 369 Initializes SubprocessIOChunker
368 370
369 371 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
370 372 :param inputstream: (Default: None) A file-like, string, or file pointer.
371 373 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
372 374 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
373 375 :param starting_values: (Default: []) An array of strings to put in front of output que.
374 376 :param fail_on_stderr: (Default: True) Whether to raise an exception in
375 377 case something is written to stderr.
376 378 :param fail_on_return_code: (Default: True) Whether to raise an
377 379 exception if the return code is not 0.
378 380 """
379 381
380 382 starting_values = starting_values or []
381 383 if inputstream:
382 384 input_streamer = StreamFeeder(inputstream)
383 385 input_streamer.start()
384 386 inputstream = input_streamer.output
385 387 self._close_input_fd = inputstream
386 388
387 389 self._fail_on_stderr = fail_on_stderr
388 390 self._fail_on_return_code = fail_on_return_code
389 391
390 392 _shell = kwargs.get('shell', True)
391 393 kwargs['shell'] = _shell
392 394
393 395 _p = subprocess.Popen(cmd, bufsize=-1,
394 396 stdin=inputstream,
395 397 stdout=subprocess.PIPE,
396 398 stderr=subprocess.PIPE,
397 399 **kwargs)
398 400
399 401 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
400 402 starting_values)
401 403 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
402 404
403 405 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
404 406 # doing this until we reach either end of file, or end of buffer.
405 407 bg_out.data_added_event.wait(1)
406 408 bg_out.data_added_event.clear()
407 409
408 410 # at this point it's still ambiguous if we are done reading or just full buffer.
409 411 # Either way, if error (returned by ended process, or implied based on
410 412 # presence of stuff in stderr output) we error out.
411 413 # Else, we are happy.
412 414 _returncode = _p.poll()
413 415
414 416 if ((_returncode and fail_on_return_code) or
415 417 (fail_on_stderr and _returncode is None and bg_err.length)):
416 418 try:
417 419 _p.terminate()
418 420 except Exception:
419 421 pass
420 422 bg_out.stop()
421 423 bg_err.stop()
422 424 if fail_on_stderr:
423 425 err = ''.join(bg_err)
424 426 raise EnvironmentError(
425 427 "Subprocess exited due to an error:\n" + err)
426 428 if _returncode and fail_on_return_code:
427 429 err = ''.join(bg_err)
428 430 if not err:
429 431 # maybe get empty stderr, try stdout instead
430 432 # in many cases git reports the errors on stdout too
431 433 err = ''.join(bg_out)
432 434 raise EnvironmentError(
433 435 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
434 436 _returncode, err))
435 437
436 438 self.process = _p
437 439 self.output = bg_out
438 440 self.error = bg_err
441 self.inputstream = inputstream
439 442
440 443 def __iter__(self):
441 444 return self
442 445
443 446 def next(self):
444 447 # Note: mikhail: We need to be sure that we are checking the return
445 448 # code after the stdout stream is closed. Some processes, e.g. git
446 449 # are doing some magic in between closing stdout and terminating the
447 450 # process and, as a result, we are not getting return code on "slow"
448 451 # systems.
449 452 result = None
450 453 stop_iteration = None
451 454 try:
452 455 result = self.output.next()
453 456 except StopIteration as e:
454 457 stop_iteration = e
455 458
456 459 if self.process.poll() and self._fail_on_return_code:
457 460 err = '%s' % ''.join(self.error)
458 461 raise EnvironmentError(
459 462 "Subprocess exited due to an error:\n" + err)
460 463
461 464 if stop_iteration:
462 465 raise stop_iteration
463 466 return result
464 467
465 468 def throw(self, type, value=None, traceback=None):
466 469 if self.output.length or not self.output.done_reading:
467 470 raise type(value)
468 471
469 472 def close(self):
470 473 if self._closed:
471 474 return
472 475 self._closed = True
473 476 try:
474 477 self.process.terminate()
475 478 except:
476 479 pass
477 480 if self._close_input_fd:
478 481 os.close(self._close_input_fd)
479 482 try:
480 483 self.output.close()
481 484 except:
482 485 pass
483 486 try:
484 487 self.error.close()
485 488 except:
486 489 pass
490 try:
491 os.close(self.inputstream)
492 except:
493 pass
487 494
488 495 def __del__(self):
489 496 self.close()
490 497
491 498
492 499 def run_command(arguments, env=None):
493 500 """
494 501 Run the specified command and return the stdout.
495 502
496 503 :param arguments: sequence of program arguments (including the program name)
497 504 :type arguments: list[str]
498 505 """
499 506
500 507 cmd = arguments
501 508 log.debug('Running subprocessio command %s', cmd)
502 509 try:
503 510 _opts = {'shell': False, 'fail_on_stderr': False}
504 511 if env:
505 512 _opts.update({'env': env})
506 513 p = SubprocessIOChunker(cmd, **_opts)
507 514 stdout = ''.join(p)
508 515 stderr = ''.join(''.join(p.error))
509 516 except (EnvironmentError, OSError) as err:
510 517 cmd = ' '.join(cmd) # human friendly CMD
511 518 tb_err = ("Couldn't run subprocessio command (%s).\n"
512 519 "Original error was:%s\n" % (cmd, err))
513 520 log.exception(tb_err)
514 521 raise Exception(tb_err)
515 522
516 523 return stdout, stderr
General Comments 0
You need to be logged in to leave comments. Login now