##// END OF EJS Templates
py3: drop subprocess32 backport.
marcink -
r977:f12b9f4d python3
parent child Browse files
Show More
@@ -1,46 +1,45 b''
1 1 ## dependencies
2 2
3 3 # our custom configobj
4 4 https://code.rhodecode.com/upstream/configobj/artifacts/download/0-012de99a-b1e1-4f64-a5c0-07a98a41b324.tar.gz?md5=6a513f51fe04b2c18cf84c1395a7c626#egg=configobj==5.0.6
5 5
6 6 dogpile.cache==0.9.0
7 7 dogpile.core==0.4.1
8 8 decorator==4.1.2
9 9 dulwich==0.13.0
10 10 hgsubversion==1.9.3
11 11 hg-evolve==9.1.0
12 12
13 13 mercurial==5.1.1
14 14 msgpack-python==0.5.6
15 15
16 16 pastedeploy==2.1.0
17 17 pyramid==1.10.4
18 18 pygit2==0.28.2
19 19
20 20 repoze.lru==0.7
21 21 redis==3.4.1
22 22 simplejson==3.16.0
23 subprocess32==3.5.4
24 23 subvertpy==0.10.1
25 24
26 25 six==1.11.0
27 26 translationstring==1.3
28 27 webob==1.8.5
29 28 zope.deprecation==4.4.0
30 29 zope.interface==4.6.0
31 30
32 31 ## http servers
33 32 gevent==1.5.0
34 33 greenlet==0.4.15
35 34 gunicorn==19.9.0
36 35 waitress==1.3.1
37 36
38 37 ## debug
39 38 ipdb==0.13.2
40 39 ipython==5.10.0
41 40
42 41 ## test related requirements
43 42 -r requirements_test.txt
44 43
45 44 ## uncomment to add the debug libraries
46 45 #-r requirements_debug.txt
@@ -1,519 +1,519 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import logging
27 import subprocess32 as subprocess
27 import subprocess
28 28 from collections import deque
29 29 from threading import Event, Thread
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class StreamFeeder(Thread):
35 35 """
36 36 Normal writing into pipe-like is blocking once the buffer is filled.
37 37 This thread allows a thread to seep data from a file-like into a pipe
38 38 without blocking the main thread.
39 39 We close inpipe once the end of the source stream is reached.
40 40 """
41 41
42 42 def __init__(self, source):
43 43 super(StreamFeeder, self).__init__()
44 44 self.daemon = True
45 45 filelike = False
46 46 self.bytes = bytes()
47 47 if type(source) in (type(''), bytes, bytearray): # string-like
48 48 self.bytes = bytes(source)
49 49 else: # can be either file pointer or file-like
50 50 if type(source) in (int, long): # file pointer it is
51 51 # converting file descriptor (int) stdin into file-like
52 52 try:
53 53 source = os.fdopen(source, 'rb', 16384)
54 54 except Exception:
55 55 pass
56 56 # let's see if source is file-like by now
57 57 try:
58 58 filelike = source.read
59 59 except Exception:
60 60 pass
61 61 if not filelike and not self.bytes:
62 62 raise TypeError("StreamFeeder's source object must be a readable "
63 63 "file-like, a file descriptor, or a string-like.")
64 64 self.source = source
65 65 self.readiface, self.writeiface = os.pipe()
66 66
67 67 def run(self):
68 68 t = self.writeiface
69 69 try:
70 70 if self.bytes:
71 71 os.write(t, self.bytes)
72 72 else:
73 73 s = self.source
74 74 b = s.read(4096)
75 75 while b:
76 76 os.write(t, b)
77 77 b = s.read(4096)
78 78 finally:
79 79 os.close(t)
80 80
81 81 @property
82 82 def output(self):
83 83 return self.readiface
84 84
85 85
86 86 class InputStreamChunker(Thread):
87 87 def __init__(self, source, target, buffer_size, chunk_size):
88 88
89 89 super(InputStreamChunker, self).__init__()
90 90
91 91 self.daemon = True # die die die.
92 92
93 93 self.source = source
94 94 self.target = target
95 95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
96 96 self.chunk_size = chunk_size
97 97
98 98 self.data_added = Event()
99 99 self.data_added.clear()
100 100
101 101 self.keep_reading = Event()
102 102 self.keep_reading.set()
103 103
104 104 self.EOF = Event()
105 105 self.EOF.clear()
106 106
107 107 self.go = Event()
108 108 self.go.set()
109 109
110 110 def stop(self):
111 111 self.go.clear()
112 112 self.EOF.set()
113 113 try:
114 114 # this is not proper, but is done to force the reader thread let
115 115 # go of the input because, if successful, .close() will send EOF
116 116 # down the pipe.
117 117 self.source.close()
118 118 except:
119 119 pass
120 120
121 121 def run(self):
122 122 s = self.source
123 123 t = self.target
124 124 cs = self.chunk_size
125 125 chunk_count_max = self.chunk_count_max
126 126 keep_reading = self.keep_reading
127 127 da = self.data_added
128 128 go = self.go
129 129
130 130 try:
131 131 b = s.read(cs)
132 132 except ValueError:
133 133 b = ''
134 134
135 135 timeout_input = 20
136 136 while b and go.is_set():
137 137 if len(t) > chunk_count_max:
138 138 keep_reading.clear()
139 139 keep_reading.wait(timeout_input)
140 140 if len(t) > chunk_count_max + timeout_input:
141 141 log.error("Timed out while waiting for input from subprocess.")
142 142 os._exit(-1) # this will cause the worker to recycle itself
143 143
144 144 t.append(b)
145 145 da.set()
146 146
147 147 try:
148 148 b = s.read(cs)
149 149 except ValueError:
150 150 b = ''
151 151
152 152 self.EOF.set()
153 153 da.set() # for cases when done but there was no input.
154 154
155 155
156 156 class BufferedGenerator(object):
157 157 """
158 158 Class behaves as a non-blocking, buffered pipe reader.
159 159 Reads chunks of data (through a thread)
160 160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
161 161 Reading is halted in the thread when max chunks is internally buffered.
162 162 The .next() may operate in blocking or non-blocking fashion by yielding
163 163 '' if no data is ready
164 164 to be sent or by not returning until there is some data to send
165 165 When we get EOF from underlying source pipe we raise the marker to raise
166 166 StopIteration after the last chunk of data is yielded.
167 167 """
168 168
169 169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
170 170 starting_values=None, bottomless=False):
171 171 starting_values = starting_values or []
172 172
173 173 if bottomless:
174 174 maxlen = int(buffer_size / chunk_size)
175 175 else:
176 176 maxlen = None
177 177
178 178 self.data = deque(starting_values, maxlen)
179 179 self.worker = InputStreamChunker(source, self.data, buffer_size,
180 180 chunk_size)
181 181 if starting_values:
182 182 self.worker.data_added.set()
183 183 self.worker.start()
184 184
185 185 ####################
186 186 # Generator's methods
187 187 ####################
188 188
189 189 def __iter__(self):
190 190 return self
191 191
192 192 def next(self):
193 193 while not len(self.data) and not self.worker.EOF.is_set():
194 194 self.worker.data_added.clear()
195 195 self.worker.data_added.wait(0.2)
196 196 if len(self.data):
197 197 self.worker.keep_reading.set()
198 198 return bytes(self.data.popleft())
199 199 elif self.worker.EOF.is_set():
200 200 raise StopIteration
201 201
202 202 def throw(self, exc_type, value=None, traceback=None):
203 203 if not self.worker.EOF.is_set():
204 204 raise exc_type(value)
205 205
206 206 def start(self):
207 207 self.worker.start()
208 208
209 209 def stop(self):
210 210 self.worker.stop()
211 211
212 212 def close(self):
213 213 try:
214 214 self.worker.stop()
215 215 self.throw(GeneratorExit)
216 216 except (GeneratorExit, StopIteration):
217 217 pass
218 218
219 219 ####################
220 220 # Threaded reader's infrastructure.
221 221 ####################
222 222 @property
223 223 def input(self):
224 224 return self.worker.w
225 225
226 226 @property
227 227 def data_added_event(self):
228 228 return self.worker.data_added
229 229
230 230 @property
231 231 def data_added(self):
232 232 return self.worker.data_added.is_set()
233 233
234 234 @property
235 235 def reading_paused(self):
236 236 return not self.worker.keep_reading.is_set()
237 237
238 238 @property
239 239 def done_reading_event(self):
240 240 """
241 241 Done_reding does not mean that the iterator's buffer is empty.
242 242 Iterator might have done reading from underlying source, but the read
243 243 chunks might still be available for serving through .next() method.
244 244
245 245 :returns: An Event class instance.
246 246 """
247 247 return self.worker.EOF
248 248
249 249 @property
250 250 def done_reading(self):
251 251 """
252 252 Done_reding does not mean that the iterator's buffer is empty.
253 253 Iterator might have done reading from underlying source, but the read
254 254 chunks might still be available for serving through .next() method.
255 255
256 256 :returns: An Bool value.
257 257 """
258 258 return self.worker.EOF.is_set()
259 259
260 260 @property
261 261 def length(self):
262 262 """
263 263 returns int.
264 264
265 265 This is the lenght of the que of chunks, not the length of
266 266 the combined contents in those chunks.
267 267
268 268 __len__() cannot be meaningfully implemented because this
269 269 reader is just flying throuh a bottomless pit content and
270 270 can only know the lenght of what it already saw.
271 271
272 272 If __len__() on WSGI server per PEP 3333 returns a value,
273 273 the responce's length will be set to that. In order not to
274 274 confuse WSGI PEP3333 servers, we will not implement __len__
275 275 at all.
276 276 """
277 277 return len(self.data)
278 278
279 279 def prepend(self, x):
280 280 self.data.appendleft(x)
281 281
282 282 def append(self, x):
283 283 self.data.append(x)
284 284
285 285 def extend(self, o):
286 286 self.data.extend(o)
287 287
288 288 def __getitem__(self, i):
289 289 return self.data[i]
290 290
291 291
292 292 class SubprocessIOChunker(object):
293 293 """
294 294 Processor class wrapping handling of subprocess IO.
295 295
296 296 .. important::
297 297
298 298 Watch out for the method `__del__` on this class. If this object
299 299 is deleted, it will kill the subprocess, so avoid to
300 300 return the `output` attribute or usage of it like in the following
301 301 example::
302 302
303 303 # `args` expected to run a program that produces a lot of output
304 304 output = ''.join(SubprocessIOChunker(
305 305 args, shell=False, inputstream=inputstream, env=environ).output)
306 306
307 307 # `output` will not contain all the data, because the __del__ method
308 308 # has already killed the subprocess in this case before all output
309 309 # has been consumed.
310 310
311 311
312 312
313 313 In a way, this is a "communicate()" replacement with a twist.
314 314
315 315 - We are multithreaded. Writing in and reading out, err are all sep threads.
316 316 - We support concurrent (in and out) stream processing.
317 317 - The output is not a stream. It's a queue of read string (bytes, not unicode)
318 318 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
319 319 - We are non-blocking in more respects than communicate()
320 320 (reading from subprocess out pauses when internal buffer is full, but
321 321 does not block the parent calling code. On the flip side, reading from
322 322 slow-yielding subprocess may block the iteration until data shows up. This
323 323 does not block the parallel inpipe reading occurring parallel thread.)
324 324
325 325 The purpose of the object is to allow us to wrap subprocess interactions into
326 326 and interable that can be passed to a WSGI server as the application's return
327 327 value. Because of stream-processing-ability, WSGI does not have to read ALL
328 328 of the subprocess's output and buffer it, before handing it to WSGI server for
329 329 HTTP response. Instead, the class initializer reads just a bit of the stream
330 330 to figure out if error ocurred or likely to occur and if not, just hands the
331 331 further iteration over subprocess output to the server for completion of HTTP
332 332 response.
333 333
334 334 The real or perceived subprocess error is trapped and raised as one of
335 335 EnvironmentError family of exceptions
336 336
337 337 Example usage:
338 338 # try:
339 339 # answer = SubprocessIOChunker(
340 340 # cmd,
341 341 # input,
342 342 # buffer_size = 65536,
343 343 # chunk_size = 4096
344 344 # )
345 345 # except (EnvironmentError) as e:
346 346 # print str(e)
347 347 # raise e
348 348 #
349 349 # return answer
350 350
351 351
352 352 """
353 353
354 354 # TODO: johbo: This is used to make sure that the open end of the PIPE
355 355 # is closed in the end. It would be way better to wrap this into an
356 356 # object, so that it is closed automatically once it is consumed or
357 357 # something similar.
358 358 _close_input_fd = None
359 359
360 360 _closed = False
361 361
362 362 def __init__(self, cmd, inputstream=None, buffer_size=65536,
363 363 chunk_size=4096, starting_values=None, fail_on_stderr=True,
364 364 fail_on_return_code=True, **kwargs):
365 365 """
366 366 Initializes SubprocessIOChunker
367 367
368 368 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
369 369 :param inputstream: (Default: None) A file-like, string, or file pointer.
370 370 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
371 371 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
372 372 :param starting_values: (Default: []) An array of strings to put in front of output que.
373 373 :param fail_on_stderr: (Default: True) Whether to raise an exception in
374 374 case something is written to stderr.
375 375 :param fail_on_return_code: (Default: True) Whether to raise an
376 376 exception if the return code is not 0.
377 377 """
378 378
379 379 starting_values = starting_values or []
380 380 if inputstream:
381 381 input_streamer = StreamFeeder(inputstream)
382 382 input_streamer.start()
383 383 inputstream = input_streamer.output
384 384 self._close_input_fd = inputstream
385 385
386 386 self._fail_on_stderr = fail_on_stderr
387 387 self._fail_on_return_code = fail_on_return_code
388 388
389 389 _shell = kwargs.get('shell', True)
390 390 kwargs['shell'] = _shell
391 391
392 392 _p = subprocess.Popen(cmd, bufsize=-1,
393 393 stdin=inputstream,
394 394 stdout=subprocess.PIPE,
395 395 stderr=subprocess.PIPE,
396 396 **kwargs)
397 397
398 398 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
399 399 starting_values)
400 400 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
401 401
402 402 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
403 403 # doing this until we reach either end of file, or end of buffer.
404 404 bg_out.data_added_event.wait(1)
405 405 bg_out.data_added_event.clear()
406 406
407 407 # at this point it's still ambiguous if we are done reading or just full buffer.
408 408 # Either way, if error (returned by ended process, or implied based on
409 409 # presence of stuff in stderr output) we error out.
410 410 # Else, we are happy.
411 411 _returncode = _p.poll()
412 412
413 413 if ((_returncode and fail_on_return_code) or
414 414 (fail_on_stderr and _returncode is None and bg_err.length)):
415 415 try:
416 416 _p.terminate()
417 417 except Exception:
418 418 pass
419 419 bg_out.stop()
420 420 bg_err.stop()
421 421 if fail_on_stderr:
422 422 err = ''.join(bg_err)
423 423 raise EnvironmentError(
424 424 "Subprocess exited due to an error:\n" + err)
425 425 if _returncode and fail_on_return_code:
426 426 err = ''.join(bg_err)
427 427 if not err:
428 428 # maybe get empty stderr, try stdout instead
429 429 # in many cases git reports the errors on stdout too
430 430 err = ''.join(bg_out)
431 431 raise EnvironmentError(
432 432 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
433 433 _returncode, err))
434 434
435 435 self.process = _p
436 436 self.output = bg_out
437 437 self.error = bg_err
438 438 self.inputstream = inputstream
439 439
440 440 def __iter__(self):
441 441 return self
442 442
443 443 def next(self):
444 444 # Note: mikhail: We need to be sure that we are checking the return
445 445 # code after the stdout stream is closed. Some processes, e.g. git
446 446 # are doing some magic in between closing stdout and terminating the
447 447 # process and, as a result, we are not getting return code on "slow"
448 448 # systems.
449 449 result = None
450 450 stop_iteration = None
451 451 try:
452 452 result = self.output.next()
453 453 except StopIteration as e:
454 454 stop_iteration = e
455 455
456 456 if self.process.poll() and self._fail_on_return_code:
457 457 err = '%s' % ''.join(self.error)
458 458 raise EnvironmentError(
459 459 "Subprocess exited due to an error:\n" + err)
460 460
461 461 if stop_iteration:
462 462 raise stop_iteration
463 463 return result
464 464
465 465 def throw(self, type, value=None, traceback=None):
466 466 if self.output.length or not self.output.done_reading:
467 467 raise type(value)
468 468
469 469 def close(self):
470 470 if self._closed:
471 471 return
472 472 self._closed = True
473 473 try:
474 474 self.process.terminate()
475 475 except Exception:
476 476 pass
477 477 if self._close_input_fd:
478 478 os.close(self._close_input_fd)
479 479 try:
480 480 self.output.close()
481 481 except Exception:
482 482 pass
483 483 try:
484 484 self.error.close()
485 485 except Exception:
486 486 pass
487 487 try:
488 488 os.close(self.inputstream)
489 489 except Exception:
490 490 pass
491 491
492 492
493 493 def run_command(arguments, env=None):
494 494 """
495 495 Run the specified command and return the stdout.
496 496
497 497 :param arguments: sequence of program arguments (including the program name)
498 498 :type arguments: list[str]
499 499 """
500 500
501 501 cmd = arguments
502 502 log.debug('Running subprocessio command %s', cmd)
503 503 proc = None
504 504 try:
505 505 _opts = {'shell': False, 'fail_on_stderr': False}
506 506 if env:
507 507 _opts.update({'env': env})
508 508 proc = SubprocessIOChunker(cmd, **_opts)
509 509 return ''.join(proc), ''.join(proc.error)
510 510 except (EnvironmentError, OSError) as err:
511 511 cmd = ' '.join(cmd) # human friendly CMD
512 512 tb_err = ("Couldn't run subprocessio command (%s).\n"
513 513 "Original error was:%s\n" % (cmd, err))
514 514 log.exception(tb_err)
515 515 raise Exception(tb_err)
516 516 finally:
517 517 if proc:
518 518 proc.close()
519 519
General Comments 0
You need to be logged in to leave comments. Login now