##// END OF EJS Templates
python2.5 fixes !
marcink -
r2826:4b7ad342 beta
parent child Browse files
Show More
@@ -1,591 +1,720 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.compat
3 rhodecode.lib.compat
4 ~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~
5
5
6 Python backward compatibility functions and common libs
6 Python backward compatibility functions and common libs
7
7
8
8
9 :created_on: Oct 7, 2011
9 :created_on: Oct 7, 2011
10 :author: marcink
10 :author: marcink
11 :copyright: (C) 2010-2010 Marcin Kuzminski <marcin@python-works.com>
11 :copyright: (C) 2010-2010 Marcin Kuzminski <marcin@python-works.com>
12 :license: GPLv3, see COPYING for more details.
12 :license: GPLv3, see COPYING for more details.
13 """
13 """
14 # This program is free software: you can redistribute it and/or modify
14 # This program is free software: you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation, either version 3 of the License, or
16 # the Free Software Foundation, either version 3 of the License, or
17 # (at your option) any later version.
17 # (at your option) any later version.
18 #
18 #
19 # This program is distributed in the hope that it will be useful,
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
22 # GNU General Public License for more details.
23 #
23 #
24 # You should have received a copy of the GNU General Public License
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26
26
27 import os
27 import os
28 from rhodecode import __platform__, PLATFORM_WIN, __py_version__
28 from rhodecode import __platform__, PLATFORM_WIN, __py_version__
29
29
30 #==============================================================================
30 #==============================================================================
31 # json
31 # json
32 #==============================================================================
32 #==============================================================================
33 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
34 import array
34
35
35
36
36 #==============================================================================
37 #==============================================================================
37 # izip_longest
38 # izip_longest
38 #==============================================================================
39 #==============================================================================
39 try:
40 try:
40 from itertools import izip_longest
41 from itertools import izip_longest
41 except ImportError:
42 except ImportError:
42 import itertools
43 import itertools
43
44
44 def izip_longest(*args, **kwds):
45 def izip_longest(*args, **kwds):
45 fillvalue = kwds.get("fillvalue")
46 fillvalue = kwds.get("fillvalue")
46
47
47 def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):
48 def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):
48 yield counter() # yields the fillvalue, or raises IndexError
49 yield counter() # yields the fillvalue, or raises IndexError
49
50
50 fillers = itertools.repeat(fillvalue)
51 fillers = itertools.repeat(fillvalue)
51 iters = [itertools.chain(it, sentinel(), fillers)
52 iters = [itertools.chain(it, sentinel(), fillers)
52 for it in args]
53 for it in args]
53 try:
54 try:
54 for tup in itertools.izip(*iters):
55 for tup in itertools.izip(*iters):
55 yield tup
56 yield tup
56 except IndexError:
57 except IndexError:
57 pass
58 pass
58
59
59
60
60 #==============================================================================
61 #==============================================================================
61 # OrderedDict
62 # OrderedDict
62 #==============================================================================
63 #==============================================================================
63
64
64 # Python Software Foundation License
65 # Python Software Foundation License
65
66
66 # XXX: it feels like using the class with "is" and "is not" instead of "==" and
67 # XXX: it feels like using the class with "is" and "is not" instead of "==" and
67 # "!=" should be faster.
68 # "!=" should be faster.
68 class _Nil(object):
69 class _Nil(object):
69
70
70 def __repr__(self):
71 def __repr__(self):
71 return "nil"
72 return "nil"
72
73
73 def __eq__(self, other):
74 def __eq__(self, other):
74 if (isinstance(other, _Nil)):
75 if (isinstance(other, _Nil)):
75 return True
76 return True
76 else:
77 else:
77 return NotImplemented
78 return NotImplemented
78
79
79 def __ne__(self, other):
80 def __ne__(self, other):
80 if (isinstance(other, _Nil)):
81 if (isinstance(other, _Nil)):
81 return False
82 return False
82 else:
83 else:
83 return NotImplemented
84 return NotImplemented
84
85
85 _nil = _Nil()
86 _nil = _Nil()
86
87
87
88
88 class _odict(object):
89 class _odict(object):
89 """Ordered dict data structure, with O(1) complexity for dict operations
90 """Ordered dict data structure, with O(1) complexity for dict operations
90 that modify one element.
91 that modify one element.
91
92
92 Overwriting values doesn't change their original sequential order.
93 Overwriting values doesn't change their original sequential order.
93 """
94 """
94
95
95 def _dict_impl(self):
96 def _dict_impl(self):
96 return None
97 return None
97
98
98 def __init__(self, data=(), **kwds):
99 def __init__(self, data=(), **kwds):
99 """This doesn't accept keyword initialization as normal dicts to avoid
100 """This doesn't accept keyword initialization as normal dicts to avoid
100 a trap - inside a function or method the keyword args are accessible
101 a trap - inside a function or method the keyword args are accessible
101 only as a dict, without a defined order, so their original order is
102 only as a dict, without a defined order, so their original order is
102 lost.
103 lost.
103 """
104 """
104 if kwds:
105 if kwds:
105 raise TypeError("__init__() of ordered dict takes no keyword "
106 raise TypeError("__init__() of ordered dict takes no keyword "
106 "arguments to avoid an ordering trap.")
107 "arguments to avoid an ordering trap.")
107 self._dict_impl().__init__(self)
108 self._dict_impl().__init__(self)
108 # If you give a normal dict, then the order of elements is undefined
109 # If you give a normal dict, then the order of elements is undefined
109 if hasattr(data, "iteritems"):
110 if hasattr(data, "iteritems"):
110 for key, val in data.iteritems():
111 for key, val in data.iteritems():
111 self[key] = val
112 self[key] = val
112 else:
113 else:
113 for key, val in data:
114 for key, val in data:
114 self[key] = val
115 self[key] = val
115
116
116 # Double-linked list header
117 # Double-linked list header
117 def _get_lh(self):
118 def _get_lh(self):
118 dict_impl = self._dict_impl()
119 dict_impl = self._dict_impl()
119 if not hasattr(self, '_lh'):
120 if not hasattr(self, '_lh'):
120 dict_impl.__setattr__(self, '_lh', _nil)
121 dict_impl.__setattr__(self, '_lh', _nil)
121 return dict_impl.__getattribute__(self, '_lh')
122 return dict_impl.__getattribute__(self, '_lh')
122
123
123 def _set_lh(self, val):
124 def _set_lh(self, val):
124 self._dict_impl().__setattr__(self, '_lh', val)
125 self._dict_impl().__setattr__(self, '_lh', val)
125
126
126 lh = property(_get_lh, _set_lh)
127 lh = property(_get_lh, _set_lh)
127
128
128 # Double-linked list tail
129 # Double-linked list tail
129 def _get_lt(self):
130 def _get_lt(self):
130 dict_impl = self._dict_impl()
131 dict_impl = self._dict_impl()
131 if not hasattr(self, '_lt'):
132 if not hasattr(self, '_lt'):
132 dict_impl.__setattr__(self, '_lt', _nil)
133 dict_impl.__setattr__(self, '_lt', _nil)
133 return dict_impl.__getattribute__(self, '_lt')
134 return dict_impl.__getattribute__(self, '_lt')
134
135
135 def _set_lt(self, val):
136 def _set_lt(self, val):
136 self._dict_impl().__setattr__(self, '_lt', val)
137 self._dict_impl().__setattr__(self, '_lt', val)
137
138
138 lt = property(_get_lt, _set_lt)
139 lt = property(_get_lt, _set_lt)
139
140
140 def __getitem__(self, key):
141 def __getitem__(self, key):
141 return self._dict_impl().__getitem__(self, key)[1]
142 return self._dict_impl().__getitem__(self, key)[1]
142
143
143 def __setitem__(self, key, val):
144 def __setitem__(self, key, val):
144 dict_impl = self._dict_impl()
145 dict_impl = self._dict_impl()
145 try:
146 try:
146 dict_impl.__getitem__(self, key)[1] = val
147 dict_impl.__getitem__(self, key)[1] = val
147 except KeyError:
148 except KeyError:
148 new = [dict_impl.__getattribute__(self, 'lt'), val, _nil]
149 new = [dict_impl.__getattribute__(self, 'lt'), val, _nil]
149 dict_impl.__setitem__(self, key, new)
150 dict_impl.__setitem__(self, key, new)
150 if dict_impl.__getattribute__(self, 'lt') == _nil:
151 if dict_impl.__getattribute__(self, 'lt') == _nil:
151 dict_impl.__setattr__(self, 'lh', key)
152 dict_impl.__setattr__(self, 'lh', key)
152 else:
153 else:
153 dict_impl.__getitem__(
154 dict_impl.__getitem__(
154 self, dict_impl.__getattribute__(self, 'lt'))[2] = key
155 self, dict_impl.__getattribute__(self, 'lt'))[2] = key
155 dict_impl.__setattr__(self, 'lt', key)
156 dict_impl.__setattr__(self, 'lt', key)
156
157
157 def __delitem__(self, key):
158 def __delitem__(self, key):
158 dict_impl = self._dict_impl()
159 dict_impl = self._dict_impl()
159 pred, _, succ = self._dict_impl().__getitem__(self, key)
160 pred, _, succ = self._dict_impl().__getitem__(self, key)
160 if pred == _nil:
161 if pred == _nil:
161 dict_impl.__setattr__(self, 'lh', succ)
162 dict_impl.__setattr__(self, 'lh', succ)
162 else:
163 else:
163 dict_impl.__getitem__(self, pred)[2] = succ
164 dict_impl.__getitem__(self, pred)[2] = succ
164 if succ == _nil:
165 if succ == _nil:
165 dict_impl.__setattr__(self, 'lt', pred)
166 dict_impl.__setattr__(self, 'lt', pred)
166 else:
167 else:
167 dict_impl.__getitem__(self, succ)[0] = pred
168 dict_impl.__getitem__(self, succ)[0] = pred
168 dict_impl.__delitem__(self, key)
169 dict_impl.__delitem__(self, key)
169
170
170 def __contains__(self, key):
171 def __contains__(self, key):
171 return key in self.keys()
172 return key in self.keys()
172
173
173 def __len__(self):
174 def __len__(self):
174 return len(self.keys())
175 return len(self.keys())
175
176
176 def __str__(self):
177 def __str__(self):
177 pairs = ("%r: %r" % (k, v) for k, v in self.iteritems())
178 pairs = ("%r: %r" % (k, v) for k, v in self.iteritems())
178 return "{%s}" % ", ".join(pairs)
179 return "{%s}" % ", ".join(pairs)
179
180
180 def __repr__(self):
181 def __repr__(self):
181 if self:
182 if self:
182 pairs = ("(%r, %r)" % (k, v) for k, v in self.iteritems())
183 pairs = ("(%r, %r)" % (k, v) for k, v in self.iteritems())
183 return "odict([%s])" % ", ".join(pairs)
184 return "odict([%s])" % ", ".join(pairs)
184 else:
185 else:
185 return "odict()"
186 return "odict()"
186
187
187 def get(self, k, x=None):
188 def get(self, k, x=None):
188 if k in self:
189 if k in self:
189 return self._dict_impl().__getitem__(self, k)[1]
190 return self._dict_impl().__getitem__(self, k)[1]
190 else:
191 else:
191 return x
192 return x
192
193
193 def __iter__(self):
194 def __iter__(self):
194 dict_impl = self._dict_impl()
195 dict_impl = self._dict_impl()
195 curr_key = dict_impl.__getattribute__(self, 'lh')
196 curr_key = dict_impl.__getattribute__(self, 'lh')
196 while curr_key != _nil:
197 while curr_key != _nil:
197 yield curr_key
198 yield curr_key
198 curr_key = dict_impl.__getitem__(self, curr_key)[2]
199 curr_key = dict_impl.__getitem__(self, curr_key)[2]
199
200
200 iterkeys = __iter__
201 iterkeys = __iter__
201
202
202 def keys(self):
203 def keys(self):
203 return list(self.iterkeys())
204 return list(self.iterkeys())
204
205
205 def itervalues(self):
206 def itervalues(self):
206 dict_impl = self._dict_impl()
207 dict_impl = self._dict_impl()
207 curr_key = dict_impl.__getattribute__(self, 'lh')
208 curr_key = dict_impl.__getattribute__(self, 'lh')
208 while curr_key != _nil:
209 while curr_key != _nil:
209 _, val, curr_key = dict_impl.__getitem__(self, curr_key)
210 _, val, curr_key = dict_impl.__getitem__(self, curr_key)
210 yield val
211 yield val
211
212
212 def values(self):
213 def values(self):
213 return list(self.itervalues())
214 return list(self.itervalues())
214
215
215 def iteritems(self):
216 def iteritems(self):
216 dict_impl = self._dict_impl()
217 dict_impl = self._dict_impl()
217 curr_key = dict_impl.__getattribute__(self, 'lh')
218 curr_key = dict_impl.__getattribute__(self, 'lh')
218 while curr_key != _nil:
219 while curr_key != _nil:
219 _, val, next_key = dict_impl.__getitem__(self, curr_key)
220 _, val, next_key = dict_impl.__getitem__(self, curr_key)
220 yield curr_key, val
221 yield curr_key, val
221 curr_key = next_key
222 curr_key = next_key
222
223
223 def items(self):
224 def items(self):
224 return list(self.iteritems())
225 return list(self.iteritems())
225
226
226 def sort(self, cmp=None, key=None, reverse=False):
227 def sort(self, cmp=None, key=None, reverse=False):
227 items = [(k, v) for k, v in self.items()]
228 items = [(k, v) for k, v in self.items()]
228 if cmp is not None:
229 if cmp is not None:
229 items = sorted(items, cmp=cmp)
230 items = sorted(items, cmp=cmp)
230 elif key is not None:
231 elif key is not None:
231 items = sorted(items, key=key)
232 items = sorted(items, key=key)
232 else:
233 else:
233 items = sorted(items, key=lambda x: x[1])
234 items = sorted(items, key=lambda x: x[1])
234 if reverse:
235 if reverse:
235 items.reverse()
236 items.reverse()
236 self.clear()
237 self.clear()
237 self.__init__(items)
238 self.__init__(items)
238
239
239 def clear(self):
240 def clear(self):
240 dict_impl = self._dict_impl()
241 dict_impl = self._dict_impl()
241 dict_impl.clear(self)
242 dict_impl.clear(self)
242 dict_impl.__setattr__(self, 'lh', _nil)
243 dict_impl.__setattr__(self, 'lh', _nil)
243 dict_impl.__setattr__(self, 'lt', _nil)
244 dict_impl.__setattr__(self, 'lt', _nil)
244
245
245 def copy(self):
246 def copy(self):
246 return self.__class__(self)
247 return self.__class__(self)
247
248
248 def update(self, data=(), **kwds):
249 def update(self, data=(), **kwds):
249 if kwds:
250 if kwds:
250 raise TypeError("update() of ordered dict takes no keyword "
251 raise TypeError("update() of ordered dict takes no keyword "
251 "arguments to avoid an ordering trap.")
252 "arguments to avoid an ordering trap.")
252 if hasattr(data, "iteritems"):
253 if hasattr(data, "iteritems"):
253 data = data.iteritems()
254 data = data.iteritems()
254 for key, val in data:
255 for key, val in data:
255 self[key] = val
256 self[key] = val
256
257
257 def setdefault(self, k, x=None):
258 def setdefault(self, k, x=None):
258 try:
259 try:
259 return self[k]
260 return self[k]
260 except KeyError:
261 except KeyError:
261 self[k] = x
262 self[k] = x
262 return x
263 return x
263
264
264 def pop(self, k, x=_nil):
265 def pop(self, k, x=_nil):
265 try:
266 try:
266 val = self[k]
267 val = self[k]
267 del self[k]
268 del self[k]
268 return val
269 return val
269 except KeyError:
270 except KeyError:
270 if x == _nil:
271 if x == _nil:
271 raise
272 raise
272 return x
273 return x
273
274
274 def popitem(self):
275 def popitem(self):
275 try:
276 try:
276 dict_impl = self._dict_impl()
277 dict_impl = self._dict_impl()
277 key = dict_impl.__getattribute__(self, 'lt')
278 key = dict_impl.__getattribute__(self, 'lt')
278 return key, self.pop(key)
279 return key, self.pop(key)
279 except KeyError:
280 except KeyError:
280 raise KeyError("'popitem(): ordered dictionary is empty'")
281 raise KeyError("'popitem(): ordered dictionary is empty'")
281
282
282 def riterkeys(self):
283 def riterkeys(self):
283 """To iterate on keys in reversed order.
284 """To iterate on keys in reversed order.
284 """
285 """
285 dict_impl = self._dict_impl()
286 dict_impl = self._dict_impl()
286 curr_key = dict_impl.__getattribute__(self, 'lt')
287 curr_key = dict_impl.__getattribute__(self, 'lt')
287 while curr_key != _nil:
288 while curr_key != _nil:
288 yield curr_key
289 yield curr_key
289 curr_key = dict_impl.__getitem__(self, curr_key)[0]
290 curr_key = dict_impl.__getitem__(self, curr_key)[0]
290
291
291 __reversed__ = riterkeys
292 __reversed__ = riterkeys
292
293
293 def rkeys(self):
294 def rkeys(self):
294 """List of the keys in reversed order.
295 """List of the keys in reversed order.
295 """
296 """
296 return list(self.riterkeys())
297 return list(self.riterkeys())
297
298
298 def ritervalues(self):
299 def ritervalues(self):
299 """To iterate on values in reversed order.
300 """To iterate on values in reversed order.
300 """
301 """
301 dict_impl = self._dict_impl()
302 dict_impl = self._dict_impl()
302 curr_key = dict_impl.__getattribute__(self, 'lt')
303 curr_key = dict_impl.__getattribute__(self, 'lt')
303 while curr_key != _nil:
304 while curr_key != _nil:
304 curr_key, val, _ = dict_impl.__getitem__(self, curr_key)
305 curr_key, val, _ = dict_impl.__getitem__(self, curr_key)
305 yield val
306 yield val
306
307
307 def rvalues(self):
308 def rvalues(self):
308 """List of the values in reversed order.
309 """List of the values in reversed order.
309 """
310 """
310 return list(self.ritervalues())
311 return list(self.ritervalues())
311
312
312 def riteritems(self):
313 def riteritems(self):
313 """To iterate on (key, value) in reversed order.
314 """To iterate on (key, value) in reversed order.
314 """
315 """
315 dict_impl = self._dict_impl()
316 dict_impl = self._dict_impl()
316 curr_key = dict_impl.__getattribute__(self, 'lt')
317 curr_key = dict_impl.__getattribute__(self, 'lt')
317 while curr_key != _nil:
318 while curr_key != _nil:
318 pred_key, val, _ = dict_impl.__getitem__(self, curr_key)
319 pred_key, val, _ = dict_impl.__getitem__(self, curr_key)
319 yield curr_key, val
320 yield curr_key, val
320 curr_key = pred_key
321 curr_key = pred_key
321
322
322 def ritems(self):
323 def ritems(self):
323 """List of the (key, value) in reversed order.
324 """List of the (key, value) in reversed order.
324 """
325 """
325 return list(self.riteritems())
326 return list(self.riteritems())
326
327
327 def firstkey(self):
328 def firstkey(self):
328 if self:
329 if self:
329 return self._dict_impl().__getattribute__(self, 'lh')
330 return self._dict_impl().__getattribute__(self, 'lh')
330 else:
331 else:
331 raise KeyError("'firstkey(): ordered dictionary is empty'")
332 raise KeyError("'firstkey(): ordered dictionary is empty'")
332
333
333 def lastkey(self):
334 def lastkey(self):
334 if self:
335 if self:
335 return self._dict_impl().__getattribute__(self, 'lt')
336 return self._dict_impl().__getattribute__(self, 'lt')
336 else:
337 else:
337 raise KeyError("'lastkey(): ordered dictionary is empty'")
338 raise KeyError("'lastkey(): ordered dictionary is empty'")
338
339
339 def as_dict(self):
340 def as_dict(self):
340 return self._dict_impl()(self.items())
341 return self._dict_impl()(self.items())
341
342
342 def _repr(self):
343 def _repr(self):
343 """_repr(): low level repr of the whole data contained in the odict.
344 """_repr(): low level repr of the whole data contained in the odict.
344 Useful for debugging.
345 Useful for debugging.
345 """
346 """
346 dict_impl = self._dict_impl()
347 dict_impl = self._dict_impl()
347 form = "odict low level repr lh,lt,data: %r, %r, %s"
348 form = "odict low level repr lh,lt,data: %r, %r, %s"
348 return form % (dict_impl.__getattribute__(self, 'lh'),
349 return form % (dict_impl.__getattribute__(self, 'lh'),
349 dict_impl.__getattribute__(self, 'lt'),
350 dict_impl.__getattribute__(self, 'lt'),
350 dict_impl.__repr__(self))
351 dict_impl.__repr__(self))
351
352
352
353
353 class OrderedDict(_odict, dict):
354 class OrderedDict(_odict, dict):
354
355
355 def _dict_impl(self):
356 def _dict_impl(self):
356 return dict
357 return dict
357
358
358
359
359 #==============================================================================
360 #==============================================================================
360 # OrderedSet
361 # OrderedSet
361 #==============================================================================
362 #==============================================================================
362 from sqlalchemy.util import OrderedSet
363 from sqlalchemy.util import OrderedSet
363
364
364
365
365 #==============================================================================
366 #==============================================================================
366 # kill FUNCTIONS
367 # kill FUNCTIONS
367 #==============================================================================
368 #==============================================================================
368 if __platform__ in PLATFORM_WIN:
369 if __platform__ in PLATFORM_WIN:
369 import ctypes
370 import ctypes
370
371
371 def kill(pid, sig):
372 def kill(pid, sig):
372 """kill function for Win32"""
373 """kill function for Win32"""
373 kernel32 = ctypes.windll.kernel32
374 kernel32 = ctypes.windll.kernel32
374 handle = kernel32.OpenProcess(1, 0, pid)
375 handle = kernel32.OpenProcess(1, 0, pid)
375 return (0 != kernel32.TerminateProcess(handle, 0))
376 return (0 != kernel32.TerminateProcess(handle, 0))
376
377
377 else:
378 else:
378 kill = os.kill
379 kill = os.kill
379
380
380
381
381 #==============================================================================
382 #==============================================================================
382 # itertools.product
383 # itertools.product
383 #==============================================================================
384 #==============================================================================
384
385
385 try:
386 try:
386 from itertools import product
387 from itertools import product
387 except ImportError:
388 except ImportError:
388 def product(*args, **kwds):
389 def product(*args, **kwds):
389 # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
390 # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
390 # product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111
391 # product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111
391 pools = map(tuple, args) * kwds.get('repeat', 1)
392 pools = map(tuple, args) * kwds.get('repeat', 1)
392 result = [[]]
393 result = [[]]
393 for pool in pools:
394 for pool in pools:
394 result = [x + [y] for x in result for y in pool]
395 result = [x + [y] for x in result for y in pool]
395 for prod in result:
396 for prod in result:
396 yield tuple(prod)
397 yield tuple(prod)
397
398
398
399
399 #==============================================================================
400 #==============================================================================
400 # BytesIO
401 # BytesIO
401 #==============================================================================
402 #==============================================================================
402
403
403 try:
404 try:
404 from io import BytesIO
405 from io import BytesIO
405 except ImportError:
406 except ImportError:
406 from cStringIO import StringIO as BytesIO
407 from cStringIO import StringIO as BytesIO
407
408
408
409
409 #==============================================================================
410 #==============================================================================
410 # bytes
411 # bytes
411 #==============================================================================
412 #==============================================================================
412 if __py_version__ >= (2, 6):
413 if __py_version__ >= (2, 6):
413 _bytes = bytes
414 _bytes = bytes
414 else:
415 else:
415 # in py2.6 bytes is a synonim for str
416 # in py2.6 bytes is a synonim for str
416 _bytes = str
417 _bytes = str
417
418
419 if __py_version__ >= (2, 6):
420 _bytearray = bytearray
421 else:
422 # no idea if this is correct but all integration tests are passing
423 # i think we never use bytearray anyway
424 _bytearray = array
425
426
418 #==============================================================================
427 #==============================================================================
419 # deque
428 # deque
420 #==============================================================================
429 #==============================================================================
421
430
422 if __py_version__ >= (2, 6):
431 if __py_version__ >= (2, 6):
423 from collections import deque
432 from collections import deque
424 else:
433 else:
425 #need to implement our own deque with maxlen
434 #need to implement our own deque with maxlen
426 class deque(object):
435 class deque(object):
427
436
428 def __init__(self, iterable=(), maxlen= -1):
437 def __init__(self, iterable=(), maxlen= -1):
429 if not hasattr(self, 'data'):
438 if not hasattr(self, 'data'):
430 self.left = self.right = 0
439 self.left = self.right = 0
431 self.data = {}
440 self.data = {}
432 self.maxlen = maxlen or -1
441 self.maxlen = maxlen or -1
433 self.extend(iterable)
442 self.extend(iterable)
434
443
435 def append(self, x):
444 def append(self, x):
436 self.data[self.right] = x
445 self.data[self.right] = x
437 self.right += 1
446 self.right += 1
438 if self.maxlen != -1 and len(self) > self.maxlen:
447 if self.maxlen != -1 and len(self) > self.maxlen:
439 self.popleft()
448 self.popleft()
440
449
441 def appendleft(self, x):
450 def appendleft(self, x):
442 self.left -= 1
451 self.left -= 1
443 self.data[self.left] = x
452 self.data[self.left] = x
444 if self.maxlen != -1 and len(self) > self.maxlen:
453 if self.maxlen != -1 and len(self) > self.maxlen:
445 self.pop()
454 self.pop()
446
455
447 def pop(self):
456 def pop(self):
448 if self.left == self.right:
457 if self.left == self.right:
449 raise IndexError('cannot pop from empty deque')
458 raise IndexError('cannot pop from empty deque')
450 self.right -= 1
459 self.right -= 1
451 elem = self.data[self.right]
460 elem = self.data[self.right]
452 del self.data[self.right]
461 del self.data[self.right]
453 return elem
462 return elem
454
463
455 def popleft(self):
464 def popleft(self):
456 if self.left == self.right:
465 if self.left == self.right:
457 raise IndexError('cannot pop from empty deque')
466 raise IndexError('cannot pop from empty deque')
458 elem = self.data[self.left]
467 elem = self.data[self.left]
459 del self.data[self.left]
468 del self.data[self.left]
460 self.left += 1
469 self.left += 1
461 return elem
470 return elem
462
471
463 def clear(self):
472 def clear(self):
464 self.data.clear()
473 self.data.clear()
465 self.left = self.right = 0
474 self.left = self.right = 0
466
475
467 def extend(self, iterable):
476 def extend(self, iterable):
468 for elem in iterable:
477 for elem in iterable:
469 self.append(elem)
478 self.append(elem)
470
479
471 def extendleft(self, iterable):
480 def extendleft(self, iterable):
472 for elem in iterable:
481 for elem in iterable:
473 self.appendleft(elem)
482 self.appendleft(elem)
474
483
475 def rotate(self, n=1):
484 def rotate(self, n=1):
476 if self:
485 if self:
477 n %= len(self)
486 n %= len(self)
478 for i in xrange(n):
487 for i in xrange(n):
479 self.appendleft(self.pop())
488 self.appendleft(self.pop())
480
489
481 def __getitem__(self, i):
490 def __getitem__(self, i):
482 if i < 0:
491 if i < 0:
483 i += len(self)
492 i += len(self)
484 try:
493 try:
485 return self.data[i + self.left]
494 return self.data[i + self.left]
486 except KeyError:
495 except KeyError:
487 raise IndexError
496 raise IndexError
488
497
489 def __setitem__(self, i, value):
498 def __setitem__(self, i, value):
490 if i < 0:
499 if i < 0:
491 i += len(self)
500 i += len(self)
492 try:
501 try:
493 self.data[i + self.left] = value
502 self.data[i + self.left] = value
494 except KeyError:
503 except KeyError:
495 raise IndexError
504 raise IndexError
496
505
497 def __delitem__(self, i):
506 def __delitem__(self, i):
498 size = len(self)
507 size = len(self)
499 if not (-size <= i < size):
508 if not (-size <= i < size):
500 raise IndexError
509 raise IndexError
501 data = self.data
510 data = self.data
502 if i < 0:
511 if i < 0:
503 i += size
512 i += size
504 for j in xrange(self.left + i, self.right - 1):
513 for j in xrange(self.left + i, self.right - 1):
505 data[j] = data[j + 1]
514 data[j] = data[j + 1]
506 self.pop()
515 self.pop()
507
516
508 def __len__(self):
517 def __len__(self):
509 return self.right - self.left
518 return self.right - self.left
510
519
511 def __cmp__(self, other):
520 def __cmp__(self, other):
512 if type(self) != type(other):
521 if type(self) != type(other):
513 return cmp(type(self), type(other))
522 return cmp(type(self), type(other))
514 return cmp(list(self), list(other))
523 return cmp(list(self), list(other))
515
524
516 def __repr__(self, _track=[]):
525 def __repr__(self, _track=[]):
517 if id(self) in _track:
526 if id(self) in _track:
518 return '...'
527 return '...'
519 _track.append(id(self))
528 _track.append(id(self))
520 r = 'deque(%r, maxlen=%s)' % (list(self), self.maxlen)
529 r = 'deque(%r, maxlen=%s)' % (list(self), self.maxlen)
521 _track.remove(id(self))
530 _track.remove(id(self))
522 return r
531 return r
523
532
524 def __getstate__(self):
533 def __getstate__(self):
525 return (tuple(self),)
534 return (tuple(self),)
526
535
527 def __setstate__(self, s):
536 def __setstate__(self, s):
528 self.__init__(s[0])
537 self.__init__(s[0])
529
538
530 def __hash__(self):
539 def __hash__(self):
531 raise TypeError
540 raise TypeError
532
541
533 def __copy__(self):
542 def __copy__(self):
534 return self.__class__(self)
543 return self.__class__(self)
535
544
536 def __deepcopy__(self, memo={}):
545 def __deepcopy__(self, memo={}):
537 from copy import deepcopy
546 from copy import deepcopy
538 result = self.__class__()
547 result = self.__class__()
539 memo[id(self)] = result
548 memo[id(self)] = result
540 result.__init__(deepcopy(tuple(self), memo))
549 result.__init__(deepcopy(tuple(self), memo))
541 return result
550 return result
542
551
543
552
544 #==============================================================================
553 #==============================================================================
545 # threading.Event
554 # threading.Event
546 #==============================================================================
555 #==============================================================================
547
556
548 if __py_version__ >= (2, 6):
557 if __py_version__ >= (2, 6):
549 from threading import Event, Thread
558 from threading import Event, Thread
550 else:
559 else:
551 from threading import _Verbose, Condition, Lock, Thread
560 from threading import _Verbose, Condition, Lock, Thread, _time, \
561 _allocate_lock, RLock, _sleep
562
563 def Condition(*args, **kwargs):
564 return _Condition(*args, **kwargs)
565
566 class _Condition(_Verbose):
567
568 def __init__(self, lock=None, verbose=None):
569 _Verbose.__init__(self, verbose)
570 if lock is None:
571 lock = RLock()
572 self.__lock = lock
573 # Export the lock's acquire() and release() methods
574 self.acquire = lock.acquire
575 self.release = lock.release
576 # If the lock defines _release_save() and/or _acquire_restore(),
577 # these override the default implementations (which just call
578 # release() and acquire() on the lock). Ditto for _is_owned().
579 try:
580 self._release_save = lock._release_save
581 except AttributeError:
582 pass
583 try:
584 self._acquire_restore = lock._acquire_restore
585 except AttributeError:
586 pass
587 try:
588 self._is_owned = lock._is_owned
589 except AttributeError:
590 pass
591 self.__waiters = []
592
593 def __enter__(self):
594 return self.__lock.__enter__()
595
596 def __exit__(self, *args):
597 return self.__lock.__exit__(*args)
598
599 def __repr__(self):
600 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
601
602 def _release_save(self):
603 self.__lock.release() # No state to save
604
605 def _acquire_restore(self, x):
606 self.__lock.acquire() # Ignore saved state
607
608 def _is_owned(self):
609 # Return True if lock is owned by current_thread.
610 # This method is called only if __lock doesn't have _is_owned().
611 if self.__lock.acquire(0):
612 self.__lock.release()
613 return False
614 else:
615 return True
616
617 def wait(self, timeout=None):
618 if not self._is_owned():
619 raise RuntimeError("cannot wait on un-acquired lock")
620 waiter = _allocate_lock()
621 waiter.acquire()
622 self.__waiters.append(waiter)
623 saved_state = self._release_save()
624 try: # restore state no matter what (e.g., KeyboardInterrupt)
625 if timeout is None:
626 waiter.acquire()
627 if __debug__:
628 self._note("%s.wait(): got it", self)
629 else:
630 # Balancing act: We can't afford a pure busy loop, so we
631 # have to sleep; but if we sleep the whole timeout time,
632 # we'll be unresponsive. The scheme here sleeps very
633 # little at first, longer as time goes on, but never longer
634 # than 20 times per second (or the timeout time remaining).
635 endtime = _time() + timeout
636 delay = 0.0005 # 500 us -> initial delay of 1 ms
637 while True:
638 gotit = waiter.acquire(0)
639 if gotit:
640 break
641 remaining = endtime - _time()
642 if remaining <= 0:
643 break
644 delay = min(delay * 2, remaining, .05)
645 _sleep(delay)
646 if not gotit:
647 if __debug__:
648 self._note("%s.wait(%s): timed out", self, timeout)
649 try:
650 self.__waiters.remove(waiter)
651 except ValueError:
652 pass
653 else:
654 if __debug__:
655 self._note("%s.wait(%s): got it", self, timeout)
656 finally:
657 self._acquire_restore(saved_state)
658
659 def notify(self, n=1):
660 if not self._is_owned():
661 raise RuntimeError("cannot notify on un-acquired lock")
662 __waiters = self.__waiters
663 waiters = __waiters[:n]
664 if not waiters:
665 if __debug__:
666 self._note("%s.notify(): no waiters", self)
667 return
668 self._note("%s.notify(): notifying %d waiter%s", self, n,
669 n != 1 and "s" or "")
670 for waiter in waiters:
671 waiter.release()
672 try:
673 __waiters.remove(waiter)
674 except ValueError:
675 pass
676
677 def notifyAll(self):
678 self.notify(len(self.__waiters))
679
680 notify_all = notifyAll
552
681
553 def Event(*args, **kwargs):
682 def Event(*args, **kwargs):
554 return _Event(*args, **kwargs)
683 return _Event(*args, **kwargs)
555
684
556 class _Event(_Verbose):
685 class _Event(_Verbose):
557
686
558 # After Tim Peters' event class (without is_posted())
687 # After Tim Peters' event class (without is_posted())
559
688
560 def __init__(self, verbose=None):
689 def __init__(self, verbose=None):
561 _Verbose.__init__(self, verbose)
690 _Verbose.__init__(self, verbose)
562 self.__cond = Condition(Lock())
691 self.__cond = Condition(Lock())
563 self.__flag = False
692 self.__flag = False
564
693
565 def isSet(self):
694 def isSet(self):
566 return self.__flag
695 return self.__flag
567
696
568 is_set = isSet
697 is_set = isSet
569
698
570 def set(self):
699 def set(self):
571 self.__cond.acquire()
700 self.__cond.acquire()
572 try:
701 try:
573 self.__flag = True
702 self.__flag = True
574 self.__cond.notify_all()
703 self.__cond.notify_all()
575 finally:
704 finally:
576 self.__cond.release()
705 self.__cond.release()
577
706
578 def clear(self):
707 def clear(self):
579 self.__cond.acquire()
708 self.__cond.acquire()
580 try:
709 try:
581 self.__flag = False
710 self.__flag = False
582 finally:
711 finally:
583 self.__cond.release()
712 self.__cond.release()
584
713
585 def wait(self, timeout=None):
714 def wait(self, timeout=None):
586 self.__cond.acquire()
715 self.__cond.acquire()
587 try:
716 try:
588 if not self.__flag:
717 if not self.__flag:
589 self.__cond.wait(timeout)
718 self.__cond.wait(timeout)
590 finally:
719 finally:
591 self.__cond.release()
720 self.__cond.release()
@@ -1,409 +1,409 b''
1 '''
1 '''
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 '''
24 '''
25 import os
25 import os
26 import subprocess
26 import subprocess
27 from rhodecode.lib.compat import deque, Event, Thread, _bytes
27 from rhodecode.lib.compat import deque, Event, Thread, _bytes, _bytearray
28
28
29
29
30 class StreamFeeder(Thread):
30 class StreamFeeder(Thread):
31 """
31 """
32 Normal writing into pipe-like is blocking once the buffer is filled.
32 Normal writing into pipe-like is blocking once the buffer is filled.
33 This thread allows a thread to seep data from a file-like into a pipe
33 This thread allows a thread to seep data from a file-like into a pipe
34 without blocking the main thread.
34 without blocking the main thread.
35 We close inpipe once the end of the source stream is reached.
35 We close inpipe once the end of the source stream is reached.
36 """
36 """
37 def __init__(self, source):
37 def __init__(self, source):
38 super(StreamFeeder, self).__init__()
38 super(StreamFeeder, self).__init__()
39 self.daemon = True
39 self.daemon = True
40 filelike = False
40 filelike = False
41 self.bytes = _bytes()
41 self.bytes = _bytes()
42 if type(source) in (type(''), _bytes, bytearray): # string-like
42 if type(source) in (type(''), _bytes, _bytearray): # string-like
43 self.bytes = _bytes(source)
43 self.bytes = _bytes(source)
44 else: # can be either file pointer or file-like
44 else: # can be either file pointer or file-like
45 if type(source) in (int, long): # file pointer it is
45 if type(source) in (int, long): # file pointer it is
46 ## converting file descriptor (int) stdin into file-like
46 ## converting file descriptor (int) stdin into file-like
47 try:
47 try:
48 source = os.fdopen(source, 'rb', 16384)
48 source = os.fdopen(source, 'rb', 16384)
49 except Exception:
49 except Exception:
50 pass
50 pass
51 # let's see if source is file-like by now
51 # let's see if source is file-like by now
52 try:
52 try:
53 filelike = source.read
53 filelike = source.read
54 except Exception:
54 except Exception:
55 pass
55 pass
56 if not filelike and not self.bytes:
56 if not filelike and not self.bytes:
57 raise TypeError("StreamFeeder's source object must be a readable "
57 raise TypeError("StreamFeeder's source object must be a readable "
58 "file-like, a file descriptor, or a string-like.")
58 "file-like, a file descriptor, or a string-like.")
59 self.source = source
59 self.source = source
60 self.readiface, self.writeiface = os.pipe()
60 self.readiface, self.writeiface = os.pipe()
61
61
62 def run(self):
62 def run(self):
63 t = self.writeiface
63 t = self.writeiface
64 if self.bytes:
64 if self.bytes:
65 os.write(t, self.bytes)
65 os.write(t, self.bytes)
66 else:
66 else:
67 s = self.source
67 s = self.source
68 b = s.read(4096)
68 b = s.read(4096)
69 while b:
69 while b:
70 os.write(t, b)
70 os.write(t, b)
71 b = s.read(4096)
71 b = s.read(4096)
72 os.close(t)
72 os.close(t)
73
73
74 @property
74 @property
75 def output(self):
75 def output(self):
76 return self.readiface
76 return self.readiface
77
77
78
78
79 class InputStreamChunker(Thread):
79 class InputStreamChunker(Thread):
80 def __init__(self, source, target, buffer_size, chunk_size):
80 def __init__(self, source, target, buffer_size, chunk_size):
81
81
82 super(InputStreamChunker, self).__init__()
82 super(InputStreamChunker, self).__init__()
83
83
84 self.daemon = True # die die die.
84 self.daemon = True # die die die.
85
85
86 self.source = source
86 self.source = source
87 self.target = target
87 self.target = target
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 self.chunk_size = chunk_size
89 self.chunk_size = chunk_size
90
90
91 self.data_added = Event()
91 self.data_added = Event()
92 self.data_added.clear()
92 self.data_added.clear()
93
93
94 self.keep_reading = Event()
94 self.keep_reading = Event()
95 self.keep_reading.set()
95 self.keep_reading.set()
96
96
97 self.EOF = Event()
97 self.EOF = Event()
98 self.EOF.clear()
98 self.EOF.clear()
99
99
100 self.go = Event()
100 self.go = Event()
101 self.go.set()
101 self.go.set()
102
102
103 def stop(self):
103 def stop(self):
104 self.go.clear()
104 self.go.clear()
105 self.EOF.set()
105 self.EOF.set()
106 try:
106 try:
107 # this is not proper, but is done to force the reader thread let
107 # this is not proper, but is done to force the reader thread let
108 # go of the input because, if successful, .close() will send EOF
108 # go of the input because, if successful, .close() will send EOF
109 # down the pipe.
109 # down the pipe.
110 self.source.close()
110 self.source.close()
111 except:
111 except:
112 pass
112 pass
113
113
114 def run(self):
114 def run(self):
115 s = self.source
115 s = self.source
116 t = self.target
116 t = self.target
117 cs = self.chunk_size
117 cs = self.chunk_size
118 ccm = self.chunk_count_max
118 ccm = self.chunk_count_max
119 kr = self.keep_reading
119 kr = self.keep_reading
120 da = self.data_added
120 da = self.data_added
121 go = self.go
121 go = self.go
122 b = s.read(cs)
122 b = s.read(cs)
123
123
124 while b and go.is_set():
124 while b and go.is_set():
125 if len(t) > ccm:
125 if len(t) > ccm:
126 kr.clear()
126 kr.clear()
127 kr.wait(2)
127 kr.wait(2)
128 # # this only works on 2.7.x and up
128 # # this only works on 2.7.x and up
129 # if not kr.wait(10):
129 # if not kr.wait(10):
130 # raise Exception("Timed out while waiting for input to be read.")
130 # raise Exception("Timed out while waiting for input to be read.")
131 # instead we'll use this
131 # instead we'll use this
132 if len(t) > ccm + 3:
132 if len(t) > ccm + 3:
133 raise IOError("Timed out while waiting for input from subprocess.")
133 raise IOError("Timed out while waiting for input from subprocess.")
134 t.append(b)
134 t.append(b)
135 da.set()
135 da.set()
136 b = s.read(cs)
136 b = s.read(cs)
137 self.EOF.set()
137 self.EOF.set()
138 da.set() # for cases when done but there was no input.
138 da.set() # for cases when done but there was no input.
139
139
140
140
141 class BufferedGenerator():
141 class BufferedGenerator():
142 '''
142 '''
143 Class behaves as a non-blocking, buffered pipe reader.
143 Class behaves as a non-blocking, buffered pipe reader.
144 Reads chunks of data (through a thread)
144 Reads chunks of data (through a thread)
145 from a blocking pipe, and attaches these to an array (Deque) of chunks.
145 from a blocking pipe, and attaches these to an array (Deque) of chunks.
146 Reading is halted in the thread when max chunks is internally buffered.
146 Reading is halted in the thread when max chunks is internally buffered.
147 The .next() may operate in blocking or non-blocking fashion by yielding
147 The .next() may operate in blocking or non-blocking fashion by yielding
148 '' if no data is ready
148 '' if no data is ready
149 to be sent or by not returning until there is some data to send
149 to be sent or by not returning until there is some data to send
150 When we get EOF from underlying source pipe we raise the marker to raise
150 When we get EOF from underlying source pipe we raise the marker to raise
151 StopIteration after the last chunk of data is yielded.
151 StopIteration after the last chunk of data is yielded.
152 '''
152 '''
153
153
154 def __init__(self, source, buffer_size=65536, chunk_size=4096,
154 def __init__(self, source, buffer_size=65536, chunk_size=4096,
155 starting_values=[], bottomless=False):
155 starting_values=[], bottomless=False):
156
156
157 if bottomless:
157 if bottomless:
158 maxlen = int(buffer_size / chunk_size)
158 maxlen = int(buffer_size / chunk_size)
159 else:
159 else:
160 maxlen = None
160 maxlen = None
161
161
162 self.data = deque(starting_values, maxlen)
162 self.data = deque(starting_values, maxlen)
163
163
164 self.worker = InputStreamChunker(source, self.data, buffer_size,
164 self.worker = InputStreamChunker(source, self.data, buffer_size,
165 chunk_size)
165 chunk_size)
166 if starting_values:
166 if starting_values:
167 self.worker.data_added.set()
167 self.worker.data_added.set()
168 self.worker.start()
168 self.worker.start()
169
169
170 ####################
170 ####################
171 # Generator's methods
171 # Generator's methods
172 ####################
172 ####################
173
173
174 def __iter__(self):
174 def __iter__(self):
175 return self
175 return self
176
176
177 def next(self):
177 def next(self):
178 while not len(self.data) and not self.worker.EOF.is_set():
178 while not len(self.data) and not self.worker.EOF.is_set():
179 self.worker.data_added.clear()
179 self.worker.data_added.clear()
180 self.worker.data_added.wait(0.2)
180 self.worker.data_added.wait(0.2)
181 if len(self.data):
181 if len(self.data):
182 self.worker.keep_reading.set()
182 self.worker.keep_reading.set()
183 return _bytes(self.data.popleft())
183 return _bytes(self.data.popleft())
184 elif self.worker.EOF.is_set():
184 elif self.worker.EOF.is_set():
185 raise StopIteration
185 raise StopIteration
186
186
187 def throw(self, type, value=None, traceback=None):
187 def throw(self, type, value=None, traceback=None):
188 if not self.worker.EOF.is_set():
188 if not self.worker.EOF.is_set():
189 raise type(value)
189 raise type(value)
190
190
191 def start(self):
191 def start(self):
192 self.worker.start()
192 self.worker.start()
193
193
194 def stop(self):
194 def stop(self):
195 self.worker.stop()
195 self.worker.stop()
196
196
197 def close(self):
197 def close(self):
198 try:
198 try:
199 self.worker.stop()
199 self.worker.stop()
200 self.throw(GeneratorExit)
200 self.throw(GeneratorExit)
201 except (GeneratorExit, StopIteration):
201 except (GeneratorExit, StopIteration):
202 pass
202 pass
203
203
204 def __del__(self):
204 def __del__(self):
205 self.close()
205 self.close()
206
206
207 ####################
207 ####################
208 # Threaded reader's infrastructure.
208 # Threaded reader's infrastructure.
209 ####################
209 ####################
210 @property
210 @property
211 def input(self):
211 def input(self):
212 return self.worker.w
212 return self.worker.w
213
213
214 @property
214 @property
215 def data_added_event(self):
215 def data_added_event(self):
216 return self.worker.data_added
216 return self.worker.data_added
217
217
218 @property
218 @property
219 def data_added(self):
219 def data_added(self):
220 return self.worker.data_added.is_set()
220 return self.worker.data_added.is_set()
221
221
222 @property
222 @property
223 def reading_paused(self):
223 def reading_paused(self):
224 return not self.worker.keep_reading.is_set()
224 return not self.worker.keep_reading.is_set()
225
225
226 @property
226 @property
227 def done_reading_event(self):
227 def done_reading_event(self):
228 '''
228 '''
229 Done_reding does not mean that the iterator's buffer is empty.
229 Done_reding does not mean that the iterator's buffer is empty.
230 Iterator might have done reading from underlying source, but the read
230 Iterator might have done reading from underlying source, but the read
231 chunks might still be available for serving through .next() method.
231 chunks might still be available for serving through .next() method.
232
232
233 @return An Event class instance.
233 @return An Event class instance.
234 '''
234 '''
235 return self.worker.EOF
235 return self.worker.EOF
236
236
237 @property
237 @property
238 def done_reading(self):
238 def done_reading(self):
239 '''
239 '''
240 Done_reding does not mean that the iterator's buffer is empty.
240 Done_reding does not mean that the iterator's buffer is empty.
241 Iterator might have done reading from underlying source, but the read
241 Iterator might have done reading from underlying source, but the read
242 chunks might still be available for serving through .next() method.
242 chunks might still be available for serving through .next() method.
243
243
244 @return An Bool value.
244 @return An Bool value.
245 '''
245 '''
246 return self.worker.EOF.is_set()
246 return self.worker.EOF.is_set()
247
247
248 @property
248 @property
249 def length(self):
249 def length(self):
250 '''
250 '''
251 returns int.
251 returns int.
252
252
253 This is the lenght of the que of chunks, not the length of
253 This is the lenght of the que of chunks, not the length of
254 the combined contents in those chunks.
254 the combined contents in those chunks.
255
255
256 __len__() cannot be meaningfully implemented because this
256 __len__() cannot be meaningfully implemented because this
257 reader is just flying throuh a bottomless pit content and
257 reader is just flying throuh a bottomless pit content and
258 can only know the lenght of what it already saw.
258 can only know the lenght of what it already saw.
259
259
260 If __len__() on WSGI server per PEP 3333 returns a value,
260 If __len__() on WSGI server per PEP 3333 returns a value,
261 the responce's length will be set to that. In order not to
261 the responce's length will be set to that. In order not to
262 confuse WSGI PEP3333 servers, we will not implement __len__
262 confuse WSGI PEP3333 servers, we will not implement __len__
263 at all.
263 at all.
264 '''
264 '''
265 return len(self.data)
265 return len(self.data)
266
266
267 def prepend(self, x):
267 def prepend(self, x):
268 self.data.appendleft(x)
268 self.data.appendleft(x)
269
269
270 def append(self, x):
270 def append(self, x):
271 self.data.append(x)
271 self.data.append(x)
272
272
273 def extend(self, o):
273 def extend(self, o):
274 self.data.extend(o)
274 self.data.extend(o)
275
275
276 def __getitem__(self, i):
276 def __getitem__(self, i):
277 return self.data[i]
277 return self.data[i]
278
278
279
279
280 class SubprocessIOChunker(object):
280 class SubprocessIOChunker(object):
281 '''
281 '''
282 Processor class wrapping handling of subprocess IO.
282 Processor class wrapping handling of subprocess IO.
283
283
284 In a way, this is a "communicate()" replacement with a twist.
284 In a way, this is a "communicate()" replacement with a twist.
285
285
286 - We are multithreaded. Writing in and reading out, err are all sep threads.
286 - We are multithreaded. Writing in and reading out, err are all sep threads.
287 - We support concurrent (in and out) stream processing.
287 - We support concurrent (in and out) stream processing.
288 - The output is not a stream. It's a queue of read string (bytes, not unicode)
288 - The output is not a stream. It's a queue of read string (bytes, not unicode)
289 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
289 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
290 - We are non-blocking in more respects than communicate()
290 - We are non-blocking in more respects than communicate()
291 (reading from subprocess out pauses when internal buffer is full, but
291 (reading from subprocess out pauses when internal buffer is full, but
292 does not block the parent calling code. On the flip side, reading from
292 does not block the parent calling code. On the flip side, reading from
293 slow-yielding subprocess may block the iteration until data shows up. This
293 slow-yielding subprocess may block the iteration until data shows up. This
294 does not block the parallel inpipe reading occurring parallel thread.)
294 does not block the parallel inpipe reading occurring parallel thread.)
295
295
296 The purpose of the object is to allow us to wrap subprocess interactions into
296 The purpose of the object is to allow us to wrap subprocess interactions into
297 and interable that can be passed to a WSGI server as the application's return
297 and interable that can be passed to a WSGI server as the application's return
298 value. Because of stream-processing-ability, WSGI does not have to read ALL
298 value. Because of stream-processing-ability, WSGI does not have to read ALL
299 of the subprocess's output and buffer it, before handing it to WSGI server for
299 of the subprocess's output and buffer it, before handing it to WSGI server for
300 HTTP response. Instead, the class initializer reads just a bit of the stream
300 HTTP response. Instead, the class initializer reads just a bit of the stream
301 to figure out if error ocurred or likely to occur and if not, just hands the
301 to figure out if error ocurred or likely to occur and if not, just hands the
302 further iteration over subprocess output to the server for completion of HTTP
302 further iteration over subprocess output to the server for completion of HTTP
303 response.
303 response.
304
304
305 The real or perceived subprocess error is trapped and raised as one of
305 The real or perceived subprocess error is trapped and raised as one of
306 EnvironmentError family of exceptions
306 EnvironmentError family of exceptions
307
307
308 Example usage:
308 Example usage:
309 # try:
309 # try:
310 # answer = SubprocessIOChunker(
310 # answer = SubprocessIOChunker(
311 # cmd,
311 # cmd,
312 # input,
312 # input,
313 # buffer_size = 65536,
313 # buffer_size = 65536,
314 # chunk_size = 4096
314 # chunk_size = 4096
315 # )
315 # )
316 # except (EnvironmentError) as e:
316 # except (EnvironmentError) as e:
317 # print str(e)
317 # print str(e)
318 # raise e
318 # raise e
319 #
319 #
320 # return answer
320 # return answer
321
321
322
322
323 '''
323 '''
324 def __init__(self, cmd, inputstream=None, buffer_size=65536,
324 def __init__(self, cmd, inputstream=None, buffer_size=65536,
325 chunk_size=4096, starting_values=[], **kwargs):
325 chunk_size=4096, starting_values=[], **kwargs):
326 '''
326 '''
327 Initializes SubprocessIOChunker
327 Initializes SubprocessIOChunker
328
328
329 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
329 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
330 :param inputstream: (Default: None) A file-like, string, or file pointer.
330 :param inputstream: (Default: None) A file-like, string, or file pointer.
331 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
331 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
332 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
332 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
333 :param starting_values: (Default: []) An array of strings to put in front of output que.
333 :param starting_values: (Default: []) An array of strings to put in front of output que.
334 '''
334 '''
335
335
336 if inputstream:
336 if inputstream:
337 input_streamer = StreamFeeder(inputstream)
337 input_streamer = StreamFeeder(inputstream)
338 input_streamer.start()
338 input_streamer.start()
339 inputstream = input_streamer.output
339 inputstream = input_streamer.output
340
340
341 if isinstance(cmd, (list, tuple)):
341 if isinstance(cmd, (list, tuple)):
342 cmd = ' '.join(cmd)
342 cmd = ' '.join(cmd)
343
343
344 _shell = kwargs.get('shell') or True
344 _shell = kwargs.get('shell') or True
345 kwargs['shell'] = _shell
345 kwargs['shell'] = _shell
346 _p = subprocess.Popen(cmd,
346 _p = subprocess.Popen(cmd,
347 bufsize=-1,
347 bufsize=-1,
348 stdin=inputstream,
348 stdin=inputstream,
349 stdout=subprocess.PIPE,
349 stdout=subprocess.PIPE,
350 stderr=subprocess.PIPE,
350 stderr=subprocess.PIPE,
351 **kwargs
351 **kwargs
352 )
352 )
353
353
354 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
354 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
355 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
355 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
356
356
357 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
357 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
358 # doing this until we reach either end of file, or end of buffer.
358 # doing this until we reach either end of file, or end of buffer.
359 bg_out.data_added_event.wait(1)
359 bg_out.data_added_event.wait(1)
360 bg_out.data_added_event.clear()
360 bg_out.data_added_event.clear()
361
361
362 # at this point it's still ambiguous if we are done reading or just full buffer.
362 # at this point it's still ambiguous if we are done reading or just full buffer.
363 # Either way, if error (returned by ended process, or implied based on
363 # Either way, if error (returned by ended process, or implied based on
364 # presence of stuff in stderr output) we error out.
364 # presence of stuff in stderr output) we error out.
365 # Else, we are happy.
365 # Else, we are happy.
366 _returncode = _p.poll()
366 _returncode = _p.poll()
367 if _returncode or (_returncode == None and bg_err.length):
367 if _returncode or (_returncode == None and bg_err.length):
368 try:
368 try:
369 _p.terminate()
369 _p.terminate()
370 except:
370 except:
371 pass
371 pass
372 bg_out.stop()
372 bg_out.stop()
373 bg_err.stop()
373 bg_err.stop()
374 err = '%s' % ''.join(bg_err)
374 err = '%s' % ''.join(bg_err)
375 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
375 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
376
376
377 self.process = _p
377 self.process = _p
378 self.output = bg_out
378 self.output = bg_out
379 self.error = bg_err
379 self.error = bg_err
380
380
381 def __iter__(self):
381 def __iter__(self):
382 return self
382 return self
383
383
384 def next(self):
384 def next(self):
385 if self.process.poll():
385 if self.process.poll():
386 err = '%s' % ''.join(self.error)
386 err = '%s' % ''.join(self.error)
387 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
387 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
388 return self.output.next()
388 return self.output.next()
389
389
390 def throw(self, type, value=None, traceback=None):
390 def throw(self, type, value=None, traceback=None):
391 if self.output.length or not self.output.done_reading:
391 if self.output.length or not self.output.done_reading:
392 raise type(value)
392 raise type(value)
393
393
394 def close(self):
394 def close(self):
395 try:
395 try:
396 self.process.terminate()
396 self.process.terminate()
397 except:
397 except:
398 pass
398 pass
399 try:
399 try:
400 self.output.close()
400 self.output.close()
401 except:
401 except:
402 pass
402 pass
403 try:
403 try:
404 self.error.close()
404 self.error.close()
405 except:
405 except:
406 pass
406 pass
407
407
408 def __del__(self):
408 def __del__(self):
409 self.close()
409 self.close()
General Comments 0
You need to be logged in to leave comments. Login now