##// END OF EJS Templates
subprocessio: use safe b.read() and prevent potential valueErrors
marcink -
r375:270ea144 stable
parent child Browse files
Show More
@@ -1,511 +1,516 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import logging
27 27 import subprocess32 as subprocess
28 28 from collections import deque
29 29 from threading import Event, Thread
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class StreamFeeder(Thread):
35 35 """
36 36 Normal writing into pipe-like is blocking once the buffer is filled.
37 37 This thread allows a thread to seep data from a file-like into a pipe
38 38 without blocking the main thread.
39 39 We close inpipe once the end of the source stream is reached.
40 40 """
41 41
42 42 def __init__(self, source):
43 43 super(StreamFeeder, self).__init__()
44 44 self.daemon = True
45 45 filelike = False
46 46 self.bytes = bytes()
47 47 if type(source) in (type(''), bytes, bytearray): # string-like
48 48 self.bytes = bytes(source)
49 49 else: # can be either file pointer or file-like
50 50 if type(source) in (int, long): # file pointer it is
51 51 # converting file descriptor (int) stdin into file-like
52 52 try:
53 53 source = os.fdopen(source, 'rb', 16384)
54 54 except Exception:
55 55 pass
56 56 # let's see if source is file-like by now
57 57 try:
58 58 filelike = source.read
59 59 except Exception:
60 60 pass
61 61 if not filelike and not self.bytes:
62 62 raise TypeError("StreamFeeder's source object must be a readable "
63 63 "file-like, a file descriptor, or a string-like.")
64 64 self.source = source
65 65 self.readiface, self.writeiface = os.pipe()
66 66
67 67 def run(self):
68 68 t = self.writeiface
69 69 if self.bytes:
70 70 os.write(t, self.bytes)
71 71 else:
72 72 s = self.source
73 73 b = s.read(4096)
74 74 while b:
75 75 os.write(t, b)
76 76 b = s.read(4096)
77 77 os.close(t)
78 78
79 79 @property
80 80 def output(self):
81 81 return self.readiface
82 82
83 83
84 84 class InputStreamChunker(Thread):
85 85 def __init__(self, source, target, buffer_size, chunk_size):
86 86
87 87 super(InputStreamChunker, self).__init__()
88 88
89 89 self.daemon = True # die die die.
90 90
91 91 self.source = source
92 92 self.target = target
93 93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 94 self.chunk_size = chunk_size
95 95
96 96 self.data_added = Event()
97 97 self.data_added.clear()
98 98
99 99 self.keep_reading = Event()
100 100 self.keep_reading.set()
101 101
102 102 self.EOF = Event()
103 103 self.EOF.clear()
104 104
105 105 self.go = Event()
106 106 self.go.set()
107 107
108 108 def stop(self):
109 109 self.go.clear()
110 110 self.EOF.set()
111 111 try:
112 112 # this is not proper, but is done to force the reader thread let
113 113 # go of the input because, if successful, .close() will send EOF
114 114 # down the pipe.
115 115 self.source.close()
116 116 except:
117 117 pass
118 118
119 119 def run(self):
120 120 s = self.source
121 121 t = self.target
122 122 cs = self.chunk_size
123 123 chunk_count_max = self.chunk_count_max
124 124 keep_reading = self.keep_reading
125 125 da = self.data_added
126 126 go = self.go
127 127
128 128 try:
129 129 b = s.read(cs)
130 130 except ValueError:
131 131 b = ''
132 132
133 133 timeout_input = 20
134 134 while b and go.is_set():
135 135 if len(t) > chunk_count_max:
136 136 keep_reading.clear()
137 137 keep_reading.wait(timeout_input)
138 138 if len(t) > chunk_count_max + timeout_input:
139 139 log.error("Timed out while waiting for input from subprocess.")
140 140 os._exit(-1) # this will cause the worker to recycle itself
141 141
142 142 t.append(b)
143 143 da.set()
144 b = s.read(cs)
144
145 try:
146 b = s.read(cs)
147 except ValueError:
148 b = ''
149
145 150 self.EOF.set()
146 151 da.set() # for cases when done but there was no input.
147 152
148 153
149 154 class BufferedGenerator(object):
150 155 """
151 156 Class behaves as a non-blocking, buffered pipe reader.
152 157 Reads chunks of data (through a thread)
153 158 from a blocking pipe, and attaches these to an array (Deque) of chunks.
154 159 Reading is halted in the thread when max chunks is internally buffered.
155 160 The .next() may operate in blocking or non-blocking fashion by yielding
156 161 '' if no data is ready
157 162 to be sent or by not returning until there is some data to send
158 163 When we get EOF from underlying source pipe we raise the marker to raise
159 164 StopIteration after the last chunk of data is yielded.
160 165 """
161 166
162 167 def __init__(self, source, buffer_size=65536, chunk_size=4096,
163 168 starting_values=None, bottomless=False):
164 169 starting_values = starting_values or []
165 170
166 171 if bottomless:
167 172 maxlen = int(buffer_size / chunk_size)
168 173 else:
169 174 maxlen = None
170 175
171 176 self.data = deque(starting_values, maxlen)
172 177 self.worker = InputStreamChunker(source, self.data, buffer_size,
173 178 chunk_size)
174 179 if starting_values:
175 180 self.worker.data_added.set()
176 181 self.worker.start()
177 182
178 183 ####################
179 184 # Generator's methods
180 185 ####################
181 186
182 187 def __iter__(self):
183 188 return self
184 189
185 190 def next(self):
186 191 while not len(self.data) and not self.worker.EOF.is_set():
187 192 self.worker.data_added.clear()
188 193 self.worker.data_added.wait(0.2)
189 194 if len(self.data):
190 195 self.worker.keep_reading.set()
191 196 return bytes(self.data.popleft())
192 197 elif self.worker.EOF.is_set():
193 198 raise StopIteration
194 199
195 200 def throw(self, exc_type, value=None, traceback=None):
196 201 if not self.worker.EOF.is_set():
197 202 raise exc_type(value)
198 203
199 204 def start(self):
200 205 self.worker.start()
201 206
202 207 def stop(self):
203 208 self.worker.stop()
204 209
205 210 def close(self):
206 211 try:
207 212 self.worker.stop()
208 213 self.throw(GeneratorExit)
209 214 except (GeneratorExit, StopIteration):
210 215 pass
211 216
212 217 def __del__(self):
213 218 self.close()
214 219
215 220 ####################
216 221 # Threaded reader's infrastructure.
217 222 ####################
218 223 @property
219 224 def input(self):
220 225 return self.worker.w
221 226
222 227 @property
223 228 def data_added_event(self):
224 229 return self.worker.data_added
225 230
226 231 @property
227 232 def data_added(self):
228 233 return self.worker.data_added.is_set()
229 234
230 235 @property
231 236 def reading_paused(self):
232 237 return not self.worker.keep_reading.is_set()
233 238
234 239 @property
235 240 def done_reading_event(self):
236 241 """
237 242 Done_reding does not mean that the iterator's buffer is empty.
238 243 Iterator might have done reading from underlying source, but the read
239 244 chunks might still be available for serving through .next() method.
240 245
241 246 :returns: An Event class instance.
242 247 """
243 248 return self.worker.EOF
244 249
245 250 @property
246 251 def done_reading(self):
247 252 """
248 253 Done_reding does not mean that the iterator's buffer is empty.
249 254 Iterator might have done reading from underlying source, but the read
250 255 chunks might still be available for serving through .next() method.
251 256
252 257 :returns: An Bool value.
253 258 """
254 259 return self.worker.EOF.is_set()
255 260
256 261 @property
257 262 def length(self):
258 263 """
259 264 returns int.
260 265
261 266 This is the lenght of the que of chunks, not the length of
262 267 the combined contents in those chunks.
263 268
264 269 __len__() cannot be meaningfully implemented because this
265 270 reader is just flying throuh a bottomless pit content and
266 271 can only know the lenght of what it already saw.
267 272
268 273 If __len__() on WSGI server per PEP 3333 returns a value,
269 274 the responce's length will be set to that. In order not to
270 275 confuse WSGI PEP3333 servers, we will not implement __len__
271 276 at all.
272 277 """
273 278 return len(self.data)
274 279
275 280 def prepend(self, x):
276 281 self.data.appendleft(x)
277 282
278 283 def append(self, x):
279 284 self.data.append(x)
280 285
281 286 def extend(self, o):
282 287 self.data.extend(o)
283 288
284 289 def __getitem__(self, i):
285 290 return self.data[i]
286 291
287 292
288 293 class SubprocessIOChunker(object):
289 294 """
290 295 Processor class wrapping handling of subprocess IO.
291 296
292 297 .. important::
293 298
294 299 Watch out for the method `__del__` on this class. If this object
295 300 is deleted, it will kill the subprocess, so avoid to
296 301 return the `output` attribute or usage of it like in the following
297 302 example::
298 303
299 304 # `args` expected to run a program that produces a lot of output
300 305 output = ''.join(SubprocessIOChunker(
301 306 args, shell=False, inputstream=inputstream, env=environ).output)
302 307
303 308 # `output` will not contain all the data, because the __del__ method
304 309 # has already killed the subprocess in this case before all output
305 310 # has been consumed.
306 311
307 312
308 313
309 314 In a way, this is a "communicate()" replacement with a twist.
310 315
311 316 - We are multithreaded. Writing in and reading out, err are all sep threads.
312 317 - We support concurrent (in and out) stream processing.
313 318 - The output is not a stream. It's a queue of read string (bytes, not unicode)
314 319 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
315 320 - We are non-blocking in more respects than communicate()
316 321 (reading from subprocess out pauses when internal buffer is full, but
317 322 does not block the parent calling code. On the flip side, reading from
318 323 slow-yielding subprocess may block the iteration until data shows up. This
319 324 does not block the parallel inpipe reading occurring parallel thread.)
320 325
321 326 The purpose of the object is to allow us to wrap subprocess interactions into
322 327 and interable that can be passed to a WSGI server as the application's return
323 328 value. Because of stream-processing-ability, WSGI does not have to read ALL
324 329 of the subprocess's output and buffer it, before handing it to WSGI server for
325 330 HTTP response. Instead, the class initializer reads just a bit of the stream
326 331 to figure out if error ocurred or likely to occur and if not, just hands the
327 332 further iteration over subprocess output to the server for completion of HTTP
328 333 response.
329 334
330 335 The real or perceived subprocess error is trapped and raised as one of
331 336 EnvironmentError family of exceptions
332 337
333 338 Example usage:
334 339 # try:
335 340 # answer = SubprocessIOChunker(
336 341 # cmd,
337 342 # input,
338 343 # buffer_size = 65536,
339 344 # chunk_size = 4096
340 345 # )
341 346 # except (EnvironmentError) as e:
342 347 # print str(e)
343 348 # raise e
344 349 #
345 350 # return answer
346 351
347 352
348 353 """
349 354
350 355 # TODO: johbo: This is used to make sure that the open end of the PIPE
351 356 # is closed in the end. It would be way better to wrap this into an
352 357 # object, so that it is closed automatically once it is consumed or
353 358 # something similar.
354 359 _close_input_fd = None
355 360
356 361 _closed = False
357 362
358 363 def __init__(self, cmd, inputstream=None, buffer_size=65536,
359 364 chunk_size=4096, starting_values=None, fail_on_stderr=True,
360 365 fail_on_return_code=True, **kwargs):
361 366 """
362 367 Initializes SubprocessIOChunker
363 368
364 369 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
365 370 :param inputstream: (Default: None) A file-like, string, or file pointer.
366 371 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
367 372 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
368 373 :param starting_values: (Default: []) An array of strings to put in front of output que.
369 374 :param fail_on_stderr: (Default: True) Whether to raise an exception in
370 375 case something is written to stderr.
371 376 :param fail_on_return_code: (Default: True) Whether to raise an
372 377 exception if the return code is not 0.
373 378 """
374 379
375 380 starting_values = starting_values or []
376 381 if inputstream:
377 382 input_streamer = StreamFeeder(inputstream)
378 383 input_streamer.start()
379 384 inputstream = input_streamer.output
380 385 self._close_input_fd = inputstream
381 386
382 387 self._fail_on_stderr = fail_on_stderr
383 388 self._fail_on_return_code = fail_on_return_code
384 389
385 390 _shell = kwargs.get('shell', True)
386 391 kwargs['shell'] = _shell
387 392
388 393 _p = subprocess.Popen(cmd, bufsize=-1,
389 394 stdin=inputstream,
390 395 stdout=subprocess.PIPE,
391 396 stderr=subprocess.PIPE,
392 397 **kwargs)
393 398
394 399 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
395 400 starting_values)
396 401 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
397 402
398 403 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
399 404 # doing this until we reach either end of file, or end of buffer.
400 405 bg_out.data_added_event.wait(1)
401 406 bg_out.data_added_event.clear()
402 407
403 408 # at this point it's still ambiguous if we are done reading or just full buffer.
404 409 # Either way, if error (returned by ended process, or implied based on
405 410 # presence of stuff in stderr output) we error out.
406 411 # Else, we are happy.
407 412 _returncode = _p.poll()
408 413
409 414 if ((_returncode and fail_on_return_code) or
410 415 (fail_on_stderr and _returncode is None and bg_err.length)):
411 416 try:
412 417 _p.terminate()
413 418 except Exception:
414 419 pass
415 420 bg_out.stop()
416 421 bg_err.stop()
417 422 if fail_on_stderr:
418 423 err = ''.join(bg_err)
419 424 raise EnvironmentError(
420 425 "Subprocess exited due to an error:\n" + err)
421 426 if _returncode and fail_on_return_code:
422 427 err = ''.join(bg_err)
423 428 if not err:
424 429 # maybe get empty stderr, try stdout instead
425 430 # in many cases git reports the errors on stdout too
426 431 err = ''.join(bg_out)
427 432 raise EnvironmentError(
428 433 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
429 434 _returncode, err))
430 435
431 436 self.process = _p
432 437 self.output = bg_out
433 438 self.error = bg_err
434 439
435 440 def __iter__(self):
436 441 return self
437 442
438 443 def next(self):
439 444 # Note: mikhail: We need to be sure that we are checking the return
440 445 # code after the stdout stream is closed. Some processes, e.g. git
441 446 # are doing some magic in between closing stdout and terminating the
442 447 # process and, as a result, we are not getting return code on "slow"
443 448 # systems.
444 449 result = None
445 450 stop_iteration = None
446 451 try:
447 452 result = self.output.next()
448 453 except StopIteration as e:
449 454 stop_iteration = e
450 455
451 456 if self.process.poll() and self._fail_on_return_code:
452 457 err = '%s' % ''.join(self.error)
453 458 raise EnvironmentError(
454 459 "Subprocess exited due to an error:\n" + err)
455 460
456 461 if stop_iteration:
457 462 raise stop_iteration
458 463 return result
459 464
460 465 def throw(self, type, value=None, traceback=None):
461 466 if self.output.length or not self.output.done_reading:
462 467 raise type(value)
463 468
464 469 def close(self):
465 470 if self._closed:
466 471 return
467 472 self._closed = True
468 473 try:
469 474 self.process.terminate()
470 475 except:
471 476 pass
472 477 if self._close_input_fd:
473 478 os.close(self._close_input_fd)
474 479 try:
475 480 self.output.close()
476 481 except:
477 482 pass
478 483 try:
479 484 self.error.close()
480 485 except:
481 486 pass
482 487
483 488 def __del__(self):
484 489 self.close()
485 490
486 491
487 492 def run_command(arguments, env=None):
488 493 """
489 494 Run the specified command and return the stdout.
490 495
491 496 :param arguments: sequence of program arguments (including the program name)
492 497 :type arguments: list[str]
493 498 """
494 499
495 500 cmd = arguments
496 501 log.debug('Running subprocessio command %s', cmd)
497 502 try:
498 503 _opts = {'shell': False, 'fail_on_stderr': False}
499 504 if env:
500 505 _opts.update({'env': env})
501 506 p = SubprocessIOChunker(cmd, **_opts)
502 507 stdout = ''.join(p)
503 508 stderr = ''.join(''.join(p.error))
504 509 except (EnvironmentError, OSError) as err:
505 510 cmd = ' '.join(cmd) # human friendly CMD
506 511 tb_err = ("Couldn't run subprocessio command (%s).\n"
507 512 "Original error was:%s\n" % (cmd, err))
508 513 log.exception(tb_err)
509 514 raise Exception(tb_err)
510 515
511 516 return stdout, stderr
General Comments 0
You need to be logged in to leave comments. Login now