# -*- coding: utf-8 -*- """ rhodecode.lib.compat ~~~~~~~~~~~~~~~~~~~~ Python backward compatibility functions and common libs :created_on: Oct 7, 2011 :author: marcink :copyright: (C) 2010-2010 Marcin Kuzminski :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os from rhodecode import __platform__, PLATFORM_WIN #============================================================================== # json #============================================================================== try: import json except ImportError: import simplejson as json #============================================================================== # izip_longest #============================================================================== try: from itertools import izip_longest except ImportError: import itertools def izip_longest(*args, **kwds): # noqa fillvalue = kwds.get("fillvalue") def sentinel(counter=([fillvalue] * (len(args) - 1)).pop): yield counter() # yields the fillvalue, or raises IndexError fillers = itertools.repeat(fillvalue) iters = [itertools.chain(it, sentinel(), fillers) for it in args] try: for tup in itertools.izip(*iters): yield tup except IndexError: pass #============================================================================== # OrderedDict #============================================================================== # Python Software Foundation License # XXX: it feels like using the class with "is" and "is not" instead of "==" and # "!=" should be faster. class _Nil(object): def __repr__(self): return "nil" def __eq__(self, other): if (isinstance(other, _Nil)): return True else: return NotImplemented def __ne__(self, other): if (isinstance(other, _Nil)): return False else: return NotImplemented _nil = _Nil() class _odict(object): """Ordered dict data structure, with O(1) complexity for dict operations that modify one element. Overwriting values doesn't change their original sequential order. """ def _dict_impl(self): return None def __init__(self, data=(), **kwds): """This doesn't accept keyword initialization as normal dicts to avoid a trap - inside a function or method the keyword args are accessible only as a dict, without a defined order, so their original order is lost. """ if kwds: raise TypeError("__init__() of ordered dict takes no keyword " "arguments to avoid an ordering trap.") self._dict_impl().__init__(self) # If you give a normal dict, then the order of elements is undefined if hasattr(data, "iteritems"): for key, val in data.iteritems(): self[key] = val else: for key, val in data: self[key] = val # Double-linked list header def _get_lh(self): dict_impl = self._dict_impl() if not hasattr(self, '_lh'): dict_impl.__setattr__(self, '_lh', _nil) return dict_impl.__getattribute__(self, '_lh') def _set_lh(self, val): self._dict_impl().__setattr__(self, '_lh', val) lh = property(_get_lh, _set_lh) # Double-linked list tail def _get_lt(self): dict_impl = self._dict_impl() if not hasattr(self, '_lt'): dict_impl.__setattr__(self, '_lt', _nil) return dict_impl.__getattribute__(self, '_lt') def _set_lt(self, val): self._dict_impl().__setattr__(self, '_lt', val) lt = property(_get_lt, _set_lt) def __getitem__(self, key): return self._dict_impl().__getitem__(self, key)[1] def __setitem__(self, key, val): dict_impl = self._dict_impl() try: dict_impl.__getitem__(self, key)[1] = val except KeyError: new = [dict_impl.__getattribute__(self, 'lt'), val, _nil] dict_impl.__setitem__(self, key, new) if dict_impl.__getattribute__(self, 'lt') == _nil: dict_impl.__setattr__(self, 'lh', key) else: dict_impl.__getitem__( self, dict_impl.__getattribute__(self, 'lt'))[2] = key dict_impl.__setattr__(self, 'lt', key) def __delitem__(self, key): dict_impl = self._dict_impl() pred, _, succ = self._dict_impl().__getitem__(self, key) if pred == _nil: dict_impl.__setattr__(self, 'lh', succ) else: dict_impl.__getitem__(self, pred)[2] = succ if succ == _nil: dict_impl.__setattr__(self, 'lt', pred) else: dict_impl.__getitem__(self, succ)[0] = pred dict_impl.__delitem__(self, key) def __contains__(self, key): return key in self.keys() def __len__(self): return len(self.keys()) def __str__(self): pairs = ("%r: %r" % (k, v) for k, v in self.iteritems()) return "{%s}" % ", ".join(pairs) def __repr__(self): if self: pairs = ("(%r, %r)" % (k, v) for k, v in self.iteritems()) return "odict([%s])" % ", ".join(pairs) else: return "odict()" def get(self, k, x=None): if k in self: return self._dict_impl().__getitem__(self, k)[1] else: return x def __iter__(self): dict_impl = self._dict_impl() curr_key = dict_impl.__getattribute__(self, 'lh') while curr_key != _nil: yield curr_key curr_key = dict_impl.__getitem__(self, curr_key)[2] iterkeys = __iter__ def keys(self): return list(self.iterkeys()) def itervalues(self): dict_impl = self._dict_impl() curr_key = dict_impl.__getattribute__(self, 'lh') while curr_key != _nil: _, val, curr_key = dict_impl.__getitem__(self, curr_key) yield val def values(self): return list(self.itervalues()) def iteritems(self): dict_impl = self._dict_impl() curr_key = dict_impl.__getattribute__(self, 'lh') while curr_key != _nil: _, val, next_key = dict_impl.__getitem__(self, curr_key) yield curr_key, val curr_key = next_key def items(self): return list(self.iteritems()) def sort(self, cmp=None, key=None, reverse=False): items = [(k, v) for k, v in self.items()] if cmp is not None: items = sorted(items, cmp=cmp) elif key is not None: items = sorted(items, key=key) else: items = sorted(items, key=lambda x: x[1]) if reverse: items.reverse() self.clear() self.__init__(items) def clear(self): dict_impl = self._dict_impl() dict_impl.clear(self) dict_impl.__setattr__(self, 'lh', _nil) dict_impl.__setattr__(self, 'lt', _nil) def copy(self): return self.__class__(self) def update(self, data=(), **kwds): if kwds: raise TypeError("update() of ordered dict takes no keyword " "arguments to avoid an ordering trap.") if hasattr(data, "iteritems"): data = data.iteritems() for key, val in data: self[key] = val def setdefault(self, k, x=None): try: return self[k] except KeyError: self[k] = x return x def pop(self, k, x=_nil): try: val = self[k] del self[k] return val except KeyError: if x == _nil: raise return x def popitem(self): try: dict_impl = self._dict_impl() key = dict_impl.__getattribute__(self, 'lt') return key, self.pop(key) except KeyError: raise KeyError("'popitem(): ordered dictionary is empty'") def riterkeys(self): """To iterate on keys in reversed order. """ dict_impl = self._dict_impl() curr_key = dict_impl.__getattribute__(self, 'lt') while curr_key != _nil: yield curr_key curr_key = dict_impl.__getitem__(self, curr_key)[0] __reversed__ = riterkeys def rkeys(self): """List of the keys in reversed order. """ return list(self.riterkeys()) def ritervalues(self): """To iterate on values in reversed order. """ dict_impl = self._dict_impl() curr_key = dict_impl.__getattribute__(self, 'lt') while curr_key != _nil: curr_key, val, _ = dict_impl.__getitem__(self, curr_key) yield val def rvalues(self): """List of the values in reversed order. """ return list(self.ritervalues()) def riteritems(self): """To iterate on (key, value) in reversed order. """ dict_impl = self._dict_impl() curr_key = dict_impl.__getattribute__(self, 'lt') while curr_key != _nil: pred_key, val, _ = dict_impl.__getitem__(self, curr_key) yield curr_key, val curr_key = pred_key def ritems(self): """List of the (key, value) in reversed order. """ return list(self.riteritems()) def firstkey(self): if self: return self._dict_impl().__getattribute__(self, 'lh') else: raise KeyError("'firstkey(): ordered dictionary is empty'") def lastkey(self): if self: return self._dict_impl().__getattribute__(self, 'lt') else: raise KeyError("'lastkey(): ordered dictionary is empty'") def as_dict(self): return self._dict_impl()(self.items()) def _repr(self): """_repr(): low level repr of the whole data contained in the odict. Useful for debugging. """ dict_impl = self._dict_impl() form = "odict low level repr lh,lt,data: %r, %r, %s" return form % (dict_impl.__getattribute__(self, 'lh'), dict_impl.__getattribute__(self, 'lt'), dict_impl.__repr__(self)) class OrderedDict(_odict, dict): def _dict_impl(self): return dict #============================================================================== # OrderedSet #============================================================================== from sqlalchemy.util import OrderedSet #============================================================================== # kill FUNCTIONS #============================================================================== if __platform__ in PLATFORM_WIN: import ctypes def kill(pid, sig): """kill function for Win32""" kernel32 = ctypes.windll.kernel32 handle = kernel32.OpenProcess(1, 0, pid) return (0 != kernel32.TerminateProcess(handle, 0)) else: kill = os.kill #============================================================================== # itertools.product #============================================================================== try: from itertools import product except ImportError: def product(*args, **kwds): # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy # product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111 pools = map(tuple, args) * kwds.get('repeat', 1) result = [[]] for pool in pools: result = [x + [y] for x in result for y in pool] for prod in result: yield tuple(prod)