##// END OF EJS Templates
spelling: application
timeless@gmail.com -
r5786:5be4d5cb default
parent child Browse files
Show More
@@ -1,425 +1,425 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI application to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import subprocess
27 27 from kallithea.lib.vcs.utils.compat import deque, Event, Thread, _bytes, _bytearray
28 28
29 29
30 30 class StreamFeeder(Thread):
31 31 """
32 32 Normal writing into pipe-like is blocking once the buffer is filled.
33 33 This thread allows a thread to seep data from a file-like into a pipe
34 34 without blocking the main thread.
35 35 We close inpipe once the end of the source stream is reached.
36 36 """
37 37
38 38 def __init__(self, source):
39 39 super(StreamFeeder, self).__init__()
40 40 self.daemon = True
41 41 filelike = False
42 42 self.bytes = _bytes()
43 43 if type(source) in (type(''), _bytes, _bytearray): # string-like
44 44 self.bytes = _bytes(source)
45 45 else: # can be either file pointer or file-like
46 46 if type(source) in (int, long): # file pointer it is
47 47 ## converting file descriptor (int) stdin into file-like
48 48 source = os.fdopen(source, 'rb', 16384)
49 49 # let's see if source is file-like by now
50 50 filelike = hasattr(source, 'read')
51 51 if not filelike and not self.bytes:
52 52 raise TypeError("StreamFeeder's source object must be a readable "
53 53 "file-like, a file descriptor, or a string-like.")
54 54 self.source = source
55 55 self.readiface, self.writeiface = os.pipe()
56 56
57 57 def run(self):
58 58 t = self.writeiface
59 59 if self.bytes:
60 60 os.write(t, self.bytes)
61 61 else:
62 62 s = self.source
63 63 b = s.read(4096)
64 64 while b:
65 65 os.write(t, b)
66 66 b = s.read(4096)
67 67 os.close(t)
68 68
69 69 @property
70 70 def output(self):
71 71 return self.readiface
72 72
73 73
74 74 class InputStreamChunker(Thread):
75 75 def __init__(self, source, target, buffer_size, chunk_size):
76 76
77 77 super(InputStreamChunker, self).__init__()
78 78
79 79 self.daemon = True # die die die.
80 80
81 81 self.source = source
82 82 self.target = target
83 83 self.chunk_count_max = int(buffer_size / chunk_size) + 1
84 84 self.chunk_size = chunk_size
85 85
86 86 self.data_added = Event()
87 87 self.data_added.clear()
88 88
89 89 self.keep_reading = Event()
90 90 self.keep_reading.set()
91 91
92 92 self.EOF = Event()
93 93 self.EOF.clear()
94 94
95 95 self.go = Event()
96 96 self.go.set()
97 97
98 98 def stop(self):
99 99 self.go.clear()
100 100 self.EOF.set()
101 101 try:
102 102 # this is not proper, but is done to force the reader thread let
103 103 # go of the input because, if successful, .close() will send EOF
104 104 # down the pipe.
105 105 self.source.close()
106 106 except:
107 107 pass
108 108
109 109 def run(self):
110 110 s = self.source
111 111 t = self.target
112 112 cs = self.chunk_size
113 113 ccm = self.chunk_count_max
114 114 kr = self.keep_reading
115 115 da = self.data_added
116 116 go = self.go
117 117
118 118 try:
119 119 b = s.read(cs)
120 120 except ValueError:
121 121 b = ''
122 122
123 123 while b and go.is_set():
124 124 if len(t) > ccm:
125 125 kr.clear()
126 126 kr.wait(2)
127 127 # # this only works on 2.7.x and up
128 128 # if not kr.wait(10):
129 129 # raise Exception("Timed out while waiting for input to be read.")
130 130 # instead we'll use this
131 131 if len(t) > ccm + 3:
132 132 raise IOError(
133 133 "Timed out while waiting for input from subprocess.")
134 134 t.append(b)
135 135 da.set()
136 136 try:
137 137 b = s.read(cs)
138 138 except ValueError: # probably "I/O operation on closed file"
139 139 b = ''
140 140
141 141 self.EOF.set()
142 142 da.set() # for cases when done but there was no input.
143 143
144 144
145 145 class BufferedGenerator(object):
146 146 """
147 147 Class behaves as a non-blocking, buffered pipe reader.
148 148 Reads chunks of data (through a thread)
149 149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 150 Reading is halted in the thread when max chunks is internally buffered.
151 151 The .next() may operate in blocking or non-blocking fashion by yielding
152 152 '' if no data is ready
153 153 to be sent or by not returning until there is some data to send
154 154 When we get EOF from underlying source pipe we raise the marker to raise
155 155 StopIteration after the last chunk of data is yielded.
156 156 """
157 157
158 158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 159 starting_values=None, bottomless=False):
160 160 starting_values = starting_values or []
161 161 if bottomless:
162 162 maxlen = int(buffer_size / chunk_size)
163 163 else:
164 164 maxlen = None
165 165
166 166 self.data = deque(starting_values, maxlen)
167 167 self.worker = InputStreamChunker(source, self.data, buffer_size,
168 168 chunk_size)
169 169 if starting_values:
170 170 self.worker.data_added.set()
171 171 self.worker.start()
172 172
173 173 ####################
174 174 # Generator's methods
175 175 ####################
176 176
177 177 def __iter__(self):
178 178 return self
179 179
180 180 def next(self):
181 181 while not len(self.data) and not self.worker.EOF.is_set():
182 182 self.worker.data_added.clear()
183 183 self.worker.data_added.wait(0.2)
184 184 if len(self.data):
185 185 self.worker.keep_reading.set()
186 186 return _bytes(self.data.popleft())
187 187 elif self.worker.EOF.is_set():
188 188 raise StopIteration
189 189
190 190 def throw(self, type, value=None, traceback=None):
191 191 if not self.worker.EOF.is_set():
192 192 raise type(value)
193 193
194 194 def start(self):
195 195 self.worker.start()
196 196
197 197 def stop(self):
198 198 self.worker.stop()
199 199
200 200 def close(self):
201 201 try:
202 202 self.worker.stop()
203 203 self.throw(GeneratorExit)
204 204 except (GeneratorExit, StopIteration):
205 205 pass
206 206
207 207 def __del__(self):
208 208 self.close()
209 209
210 210 ####################
211 211 # Threaded reader's infrastructure.
212 212 ####################
213 213 @property
214 214 def input(self):
215 215 return self.worker.w
216 216
217 217 @property
218 218 def data_added_event(self):
219 219 return self.worker.data_added
220 220
221 221 @property
222 222 def data_added(self):
223 223 return self.worker.data_added.is_set()
224 224
225 225 @property
226 226 def reading_paused(self):
227 227 return not self.worker.keep_reading.is_set()
228 228
229 229 @property
230 230 def done_reading_event(self):
231 231 """
232 232 Done_reading does not mean that the iterator's buffer is empty.
233 233 Iterator might have done reading from underlying source, but the read
234 234 chunks might still be available for serving through .next() method.
235 235
236 236 :returns: An Event class instance.
237 237 """
238 238 return self.worker.EOF
239 239
240 240 @property
241 241 def done_reading(self):
242 242 """
243 243 Done_reading does not mean that the iterator's buffer is empty.
244 244 Iterator might have done reading from underlying source, but the read
245 245 chunks might still be available for serving through .next() method.
246 246
247 247 :returns: An Bool value.
248 248 """
249 249 return self.worker.EOF.is_set()
250 250
251 251 @property
252 252 def length(self):
253 253 """
254 254 returns int.
255 255
256 256 This is the length of the queue of chunks, not the length of
257 257 the combined contents in those chunks.
258 258
259 259 __len__() cannot be meaningfully implemented because this
260 260 reader is just flying through a bottomless pit content and
261 261 can only know the length of what it already saw.
262 262
263 263 If __len__() on WSGI server per PEP 3333 returns a value,
264 264 the response's length will be set to that. In order not to
265 265 confuse WSGI PEP3333 servers, we will not implement __len__
266 266 at all.
267 267 """
268 268 return len(self.data)
269 269
270 270 def prepend(self, x):
271 271 self.data.appendleft(x)
272 272
273 273 def append(self, x):
274 274 self.data.append(x)
275 275
276 276 def extend(self, o):
277 277 self.data.extend(o)
278 278
279 279 def __getitem__(self, i):
280 280 return self.data[i]
281 281
282 282
283 283 class SubprocessIOChunker(object):
284 284 """
285 285 Processor class wrapping handling of subprocess IO.
286 286
287 287 In a way, this is a "communicate()" replacement with a twist.
288 288
289 289 - We are multithreaded. Writing in and reading out, err are all sep threads.
290 290 - We support concurrent (in and out) stream processing.
291 291 - The output is not a stream. It's a queue of read string (bytes, not unicode)
292 292 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
293 293 - We are non-blocking in more respects than communicate()
294 294 (reading from subprocess out pauses when internal buffer is full, but
295 295 does not block the parent calling code. On the flip side, reading from
296 296 slow-yielding subprocess may block the iteration until data shows up. This
297 297 does not block the parallel inpipe reading occurring parallel thread.)
298 298
299 299 The purpose of the object is to allow us to wrap subprocess interactions into
300 300 an iterable that can be passed to a WSGI server as the application's return
301 301 value. Because of stream-processing-ability, WSGI does not have to read ALL
302 302 of the subprocess's output and buffer it, before handing it to WSGI server for
303 303 HTTP response. Instead, the class initializer reads just a bit of the stream
304 304 to figure out if error occurred or likely to occur and if not, just hands the
305 305 further iteration over subprocess output to the server for completion of HTTP
306 306 response.
307 307
308 308 The real or perceived subprocess error is trapped and raised as one of
309 309 EnvironmentError family of exceptions
310 310
311 311 Example usage:
312 312 # try:
313 313 # answer = SubprocessIOChunker(
314 314 # cmd,
315 315 # input,
316 316 # buffer_size = 65536,
317 317 # chunk_size = 4096
318 318 # )
319 319 # except (EnvironmentError) as e:
320 320 # print str(e)
321 321 # raise e
322 322 #
323 323 # return answer
324 324
325 325
326 326 """
327 327
328 328 def __init__(self, cmd, inputstream=None, buffer_size=65536,
329 329 chunk_size=4096, starting_values=None, **kwargs):
330 330 """
331 331 Initializes SubprocessIOChunker
332 332
333 333 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
334 334 :param inputstream: (Default: None) A file-like, string, or file pointer.
335 335 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
336 336 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
337 337 :param starting_values: (Default: []) An array of strings to put in front of output que.
338 338 """
339 339 starting_values = starting_values or []
340 340 if inputstream:
341 341 input_streamer = StreamFeeder(inputstream)
342 342 input_streamer.start()
343 343 inputstream = input_streamer.output
344 344
345 345 # Note: fragile cmd mangling has been removed for use in Kallithea
346 346 assert isinstance(cmd, list), cmd
347 347
348 348 _p = subprocess.Popen(cmd, bufsize=-1,
349 349 stdin=inputstream,
350 350 stdout=subprocess.PIPE,
351 351 stderr=subprocess.PIPE,
352 352 **kwargs)
353 353
354 354 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
355 355 starting_values)
356 356 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
357 357
358 358 while not bg_out.done_reading and not bg_out.reading_paused:
359 359 # doing this until we reach either end of file, or end of buffer.
360 360 bg_out.data_added_event.wait(1)
361 361 bg_out.data_added_event.clear()
362 362
363 363 # at this point it's still ambiguous if we are done reading or just full buffer.
364 364 # Either way, if error (returned by ended process, or implied based on
365 365 # presence of stuff in stderr output) we error out.
366 366 # Else, we are happy.
367 367 _returncode = _p.poll()
368 368 if _returncode or (_returncode is None and bg_err.length):
369 369 try:
370 370 _p.terminate()
371 371 except Exception:
372 372 pass
373 373 bg_out.stop()
374 374 out = ''.join(bg_out)
375 375 bg_err.stop()
376 376 err = ''.join(bg_err)
377 377 if (err.strip() == 'fatal: The remote end hung up unexpectedly' and
378 378 out.startswith('0034shallow ')):
379 379 # hack inspired by https://github.com/schacon/grack/pull/7
380 380 bg_out = iter([out])
381 381 _p = None
382 382 elif err:
383 383 raise EnvironmentError(
384 384 "Subprocess exited due to an error:\n" + err)
385 385 else:
386 386 raise EnvironmentError(
387 387 "Subprocess exited with non 0 ret code:%s" % _returncode)
388 388 self.process = _p
389 389 self.output = bg_out
390 390 self.error = bg_err
391 391 self.inputstream = inputstream
392 392
393 393 def __iter__(self):
394 394 return self
395 395
396 396 def next(self):
397 397 if self.process and self.process.poll():
398 398 err = ''.join(self.error)
399 399 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
400 400 return self.output.next()
401 401
402 402 def throw(self, type, value=None, traceback=None):
403 403 if self.output.length or not self.output.done_reading:
404 404 raise type(value)
405 405
406 406 def close(self):
407 407 try:
408 408 self.process.terminate()
409 409 except:
410 410 pass
411 411 try:
412 412 self.output.close()
413 413 except:
414 414 pass
415 415 try:
416 416 self.error.close()
417 417 except:
418 418 pass
419 419 try:
420 420 os.close(self.inputstream)
421 421 except:
422 422 pass
423 423
424 424 def __del__(self):
425 425 self.close()
General Comments 0
You need to be logged in to leave comments. Login now