Show More
utils2.py
609 lines
| 16.5 KiB
| text/x-python
|
PythonLexer
r2109 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.lib.utils | ||||
~~~~~~~~~~~~~~~~~~~ | ||||
Some simple helper functions | ||||
:created_on: Jan 5, 2011 | ||||
:author: marcink | ||||
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com> | ||||
:license: GPLv3, see COPYING for more details. | ||||
""" | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
r3577 | import os | |||
r2109 | import re | |||
r3577 | import sys | |||
r2699 | import time | |||
r2726 | import datetime | |||
r3577 | import traceback | |||
r2969 | import webob | |||
Vincent Duvert
|
r2303 | from pylons.i18n.translation import _, ungettext | ||
r2109 | from rhodecode.lib.vcs.utils.lazy import LazyProperty | |||
r3577 | from rhodecode.lib.compat import json | |||
r2109 | ||||
def __get_lem(): | ||||
""" | ||||
Get language extension map based on what's inside pygments lexers | ||||
""" | ||||
from pygments import lexers | ||||
from string import lower | ||||
from collections import defaultdict | ||||
d = defaultdict(lambda: []) | ||||
def __clean(s): | ||||
s = s.lstrip('*') | ||||
s = s.lstrip('.') | ||||
if s.find('[') != -1: | ||||
exts = [] | ||||
start, stop = s.find('['), s.find(']') | ||||
for suffix in s[start + 1:stop]: | ||||
exts.append(s[:s.find('[')] + suffix) | ||||
return map(lower, exts) | ||||
else: | ||||
return map(lower, [s]) | ||||
for lx, t in sorted(lexers.LEXERS.items()): | ||||
m = map(__clean, t[-2]) | ||||
if m: | ||||
m = reduce(lambda x, y: x + y, m) | ||||
for ext in m: | ||||
desc = lx.replace('Lexer', '') | ||||
d[ext].append(desc) | ||||
return dict(d) | ||||
r3008 | ||||
r2109 | def str2bool(_str): | |||
""" | ||||
returs True/False value from given string, it tries to translate the | ||||
string into boolean | ||||
:param _str: string value to translate into boolean | ||||
:rtype: boolean | ||||
:returns: boolean from given string | ||||
""" | ||||
if _str is None: | ||||
return False | ||||
if _str in (True, False): | ||||
return _str | ||||
_str = str(_str).strip().lower() | ||||
return _str in ('t', 'true', 'y', 'yes', 'on', '1') | ||||
r3008 | def aslist(obj, sep=None, strip=True): | |||
""" | ||||
Returns given string separated by sep as list | ||||
:param obj: | ||||
:param sep: | ||||
:param strip: | ||||
""" | ||||
if isinstance(obj, (basestring)): | ||||
lst = obj.split(sep) | ||||
if strip: | ||||
lst = [v.strip() for v in lst] | ||||
return lst | ||||
elif isinstance(obj, (list, tuple)): | ||||
return obj | ||||
elif obj is None: | ||||
return [] | ||||
else: | ||||
return [obj] | ||||
r2109 | def convert_line_endings(line, mode): | |||
""" | ||||
Converts a given line "line end" accordingly to given mode | ||||
Available modes are:: | ||||
0 - Unix | ||||
1 - Mac | ||||
2 - DOS | ||||
:param line: given line to convert | ||||
:param mode: mode to convert to | ||||
:rtype: str | ||||
:return: converted line according to mode | ||||
""" | ||||
from string import replace | ||||
if mode == 0: | ||||
line = replace(line, '\r\n', '\n') | ||||
line = replace(line, '\r', '\n') | ||||
elif mode == 1: | ||||
line = replace(line, '\r\n', '\r') | ||||
line = replace(line, '\n', '\r') | ||||
elif mode == 2: | ||||
line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line) | ||||
return line | ||||
def detect_mode(line, default): | ||||
""" | ||||
Detects line break for given line, if line break couldn't be found | ||||
given default value is returned | ||||
:param line: str line | ||||
:param default: default | ||||
:rtype: int | ||||
:return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS | ||||
""" | ||||
if line.endswith('\r\n'): | ||||
return 2 | ||||
elif line.endswith('\n'): | ||||
return 0 | ||||
elif line.endswith('\r'): | ||||
return 1 | ||||
else: | ||||
return default | ||||
def generate_api_key(username, salt=None): | ||||
""" | ||||
Generates unique API key for given username, if salt is not given | ||||
it'll be generated from some random string | ||||
:param username: username as string | ||||
:param salt: salt to hash generate KEY | ||||
:rtype: str | ||||
:returns: sha1 hash from username+salt | ||||
""" | ||||
from tempfile import _RandomNameSequence | ||||
import hashlib | ||||
if salt is None: | ||||
salt = _RandomNameSequence().next() | ||||
return hashlib.sha1(username + salt).hexdigest() | ||||
r2845 | def safe_int(val, default=None): | |||
""" | ||||
Returns int() of val if val is not convertable to int use default | ||||
instead | ||||
:param val: | ||||
:param default: | ||||
""" | ||||
try: | ||||
val = int(val) | ||||
r3218 | except (ValueError, TypeError): | |||
r2845 | val = default | |||
return val | ||||
r2109 | def safe_unicode(str_, from_encoding=None): | |||
""" | ||||
safe unicode function. Does few trick to turn str_ into unicode | ||||
In case of UnicodeDecode error we try to return it with encoding detected | ||||
by chardet library if it fails fallback to unicode with errors replaced | ||||
:param str_: string to decode | ||||
:rtype: unicode | ||||
:returns: unicode object | ||||
""" | ||||
if isinstance(str_, unicode): | ||||
return str_ | ||||
if not from_encoding: | ||||
import rhodecode | ||||
r3008 | DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding', | |||
'utf8'), sep=',') | ||||
from_encoding = DEFAULT_ENCODINGS | ||||
if not isinstance(from_encoding, (list, tuple)): | ||||
from_encoding = [from_encoding] | ||||
r2109 | ||||
try: | ||||
return unicode(str_) | ||||
except UnicodeDecodeError: | ||||
pass | ||||
r3008 | for enc in from_encoding: | |||
try: | ||||
return unicode(str_, enc) | ||||
except UnicodeDecodeError: | ||||
pass | ||||
r2109 | ||||
try: | ||||
import chardet | ||||
encoding = chardet.detect(str_)['encoding'] | ||||
if encoding is None: | ||||
raise Exception() | ||||
return str_.decode(encoding) | ||||
except (ImportError, UnicodeDecodeError, Exception): | ||||
r3008 | return unicode(str_, from_encoding[0], 'replace') | |||
r2109 | ||||
def safe_str(unicode_, to_encoding=None): | ||||
""" | ||||
safe str function. Does few trick to turn unicode_ into string | ||||
In case of UnicodeEncodeError we try to return it with encoding detected | ||||
by chardet library if it fails fallback to string with errors replaced | ||||
:param unicode_: unicode to encode | ||||
:rtype: str | ||||
:returns: str object | ||||
""" | ||||
# if it's not basestr cast to str | ||||
if not isinstance(unicode_, basestring): | ||||
return str(unicode_) | ||||
if isinstance(unicode_, str): | ||||
return unicode_ | ||||
if not to_encoding: | ||||
import rhodecode | ||||
r3008 | DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding', | |||
'utf8'), sep=',') | ||||
to_encoding = DEFAULT_ENCODINGS | ||||
r2109 | ||||
r3008 | if not isinstance(to_encoding, (list, tuple)): | |||
to_encoding = [to_encoding] | ||||
for enc in to_encoding: | ||||
try: | ||||
return unicode_.encode(enc) | ||||
except UnicodeEncodeError: | ||||
pass | ||||
r2109 | ||||
try: | ||||
import chardet | ||||
encoding = chardet.detect(unicode_)['encoding'] | ||||
if encoding is None: | ||||
raise UnicodeEncodeError() | ||||
return unicode_.encode(encoding) | ||||
except (ImportError, UnicodeEncodeError): | ||||
r3008 | return unicode_.encode(to_encoding[0], 'replace') | |||
r2109 | ||||
return safe_str | ||||
r3018 | def remove_suffix(s, suffix): | |||
if s.endswith(suffix): | ||||
s = s[:-1 * len(suffix)] | ||||
return s | ||||
def remove_prefix(s, prefix): | ||||
if s.startswith(prefix): | ||||
r3056 | s = s[len(prefix):] | |||
r3018 | return s | |||
r2109 | def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs): | |||
""" | ||||
Custom engine_from_config functions that makes sure we use NullPool for | ||||
file based sqlite databases. This prevents errors on sqlite. This only | ||||
applies to sqlalchemy versions < 0.7.0 | ||||
""" | ||||
import sqlalchemy | ||||
from sqlalchemy import engine_from_config as efc | ||||
import logging | ||||
if int(sqlalchemy.__version__.split('.')[1]) < 7: | ||||
# This solution should work for sqlalchemy < 0.7.0, and should use | ||||
# proxy=TimerProxy() for execution time profiling | ||||
from sqlalchemy.pool import NullPool | ||||
url = configuration[prefix + 'url'] | ||||
if url.startswith('sqlite'): | ||||
kwargs.update({'poolclass': NullPool}) | ||||
return efc(configuration, prefix, **kwargs) | ||||
else: | ||||
import time | ||||
from sqlalchemy import event | ||||
from sqlalchemy.engine import Engine | ||||
log = logging.getLogger('sqlalchemy.engine') | ||||
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38) | ||||
engine = efc(configuration, prefix, **kwargs) | ||||
def color_sql(sql): | ||||
COLOR_SEQ = "\033[1;%dm" | ||||
COLOR_SQL = YELLOW | ||||
normal = '\x1b[0m' | ||||
return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal]) | ||||
if configuration['debug']: | ||||
#attach events only for debug configuration | ||||
def before_cursor_execute(conn, cursor, statement, | ||||
parameters, context, executemany): | ||||
context._query_start_time = time.time() | ||||
log.info(color_sql(">>>>> STARTING QUERY >>>>>")) | ||||
def after_cursor_execute(conn, cursor, statement, | ||||
parameters, context, executemany): | ||||
total = time.time() - context._query_start_time | ||||
log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total)) | ||||
event.listen(engine, "before_cursor_execute", | ||||
before_cursor_execute) | ||||
event.listen(engine, "after_cursor_execute", | ||||
after_cursor_execute) | ||||
return engine | ||||
r3644 | def age(prevdate, show_short_version=False, now=None): | |||
r2109 | """ | |||
turns a datetime into an age string. | ||||
Leonardo
|
r3536 | If show_short_version is True, then it will generate a not so accurate but shorter string, | ||
example: 2days ago, instead of 2 days and 23 hours ago. | ||||
Vincent Duvert
|
r2303 | :param prevdate: datetime object | ||
Leonardo
|
r3536 | :param show_short_version: if it should aproximate the date and return a shorter string | ||
r2109 | :rtype: unicode | |||
:returns: unicode words describing age | ||||
""" | ||||
r3644 | now = now or datetime.datetime.now() | |||
Vincent Duvert
|
r2303 | order = ['year', 'month', 'day', 'hour', 'minute', 'second'] | ||
deltas = {} | ||||
r2902 | future = False | |||
Vincent Duvert
|
r2303 | |||
r2902 | if prevdate > now: | |||
now, prevdate = prevdate, now | ||||
future = True | ||||
r3644 | if future: | |||
prevdate = prevdate.replace(microsecond=0) | ||||
r3261 | # Get date parts deltas | |||
r3644 | from dateutil import relativedelta | |||
Vincent Duvert
|
r2303 | for part in order: | ||
r3644 | d = relativedelta.relativedelta(now, prevdate) | |||
deltas[part] = getattr(d, part + 's') | ||||
r2109 | ||||
Vincent Duvert
|
r2303 | # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00, | ||
# not 1 hour, -59 minutes and -59 seconds) | ||||
for num, length in [(5, 60), (4, 60), (3, 24)]: # seconds, minutes, hours | ||||
part = order[num] | ||||
carry_part = order[num - 1] | ||||
r2367 | ||||
Vincent Duvert
|
r2303 | if deltas[part] < 0: | ||
deltas[part] += length | ||||
deltas[carry_part] -= 1 | ||||
r2109 | ||||
Vincent Duvert
|
r2303 | # Same thing for days except that the increment depends on the (variable) | ||
# number of days in the month | ||||
month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] | ||||
if deltas['day'] < 0: | ||||
if prevdate.month == 2 and (prevdate.year % 4 == 0 and | ||||
(prevdate.year % 100 != 0 or prevdate.year % 400 == 0)): | ||||
deltas['day'] += 29 | ||||
else: | ||||
deltas['day'] += month_lengths[prevdate.month - 1] | ||||
r2367 | ||||
Vincent Duvert
|
r2303 | deltas['month'] -= 1 | ||
r2367 | ||||
Vincent Duvert
|
r2303 | if deltas['month'] < 0: | ||
deltas['month'] += 12 | ||||
deltas['year'] -= 1 | ||||
r2367 | ||||
Vincent Duvert
|
r2303 | # Format the result | ||
fmt_funcs = { | ||||
'year': lambda d: ungettext(u'%d year', '%d years', d) % d, | ||||
'month': lambda d: ungettext(u'%d month', '%d months', d) % d, | ||||
'day': lambda d: ungettext(u'%d day', '%d days', d) % d, | ||||
'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d, | ||||
'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d, | ||||
'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d, | ||||
} | ||||
r2367 | ||||
Vincent Duvert
|
r2303 | for i, part in enumerate(order): | ||
value = deltas[part] | ||||
if value == 0: | ||||
continue | ||||
r2367 | ||||
Vincent Duvert
|
r2303 | if i < 5: | ||
sub_part = order[i + 1] | ||||
sub_value = deltas[sub_part] | ||||
else: | ||||
sub_value = 0 | ||||
r2367 | ||||
Leonardo
|
r3536 | if sub_value == 0 or show_short_version: | ||
r2902 | if future: | |||
return _(u'in %s') % fmt_funcs[part](value) | ||||
else: | ||||
return _(u'%s ago') % fmt_funcs[part](value) | ||||
if future: | ||||
return _(u'in %s and %s') % (fmt_funcs[part](value), | ||||
fmt_funcs[sub_part](sub_value)) | ||||
else: | ||||
return _(u'%s and %s ago') % (fmt_funcs[part](value), | ||||
fmt_funcs[sub_part](sub_value)) | ||||
r2109 | ||||
return _(u'just now') | ||||
def uri_filter(uri): | ||||
""" | ||||
Removes user:password from given url string | ||||
:param uri: | ||||
:rtype: unicode | ||||
:returns: filtered list of strings | ||||
""" | ||||
if not uri: | ||||
return '' | ||||
proto = '' | ||||
for pat in ('https://', 'http://'): | ||||
if uri.startswith(pat): | ||||
uri = uri[len(pat):] | ||||
proto = pat | ||||
break | ||||
# remove passwords and username | ||||
uri = uri[uri.find('@') + 1:] | ||||
# get the port | ||||
cred_pos = uri.find(':') | ||||
if cred_pos == -1: | ||||
host, port = uri, None | ||||
else: | ||||
host, port = uri[:cred_pos], uri[cred_pos + 1:] | ||||
return filter(None, [proto, host, port]) | ||||
def credentials_filter(uri): | ||||
""" | ||||
Returns a url with removed credentials | ||||
:param uri: | ||||
""" | ||||
uri = uri_filter(uri) | ||||
#check if we have port | ||||
if len(uri) > 2 and uri[2]: | ||||
uri[2] = ':' + uri[2] | ||||
return ''.join(uri) | ||||
def get_changeset_safe(repo, rev): | ||||
""" | ||||
Safe version of get_changeset if this changeset doesn't exists for a | ||||
repo it returns a Dummy one instead | ||||
:param repo: | ||||
:param rev: | ||||
""" | ||||
from rhodecode.lib.vcs.backends.base import BaseRepository | ||||
from rhodecode.lib.vcs.exceptions import RepositoryError | ||||
r2684 | from rhodecode.lib.vcs.backends.base import EmptyChangeset | |||
r2109 | if not isinstance(repo, BaseRepository): | |||
raise Exception('You must pass an Repository ' | ||||
'object as first argument got %s', type(repo)) | ||||
try: | ||||
cs = repo.get_changeset(rev) | ||||
except RepositoryError: | ||||
cs = EmptyChangeset(requested_revision=rev) | ||||
return cs | ||||
r2699 | def datetime_to_time(dt): | |||
if dt: | ||||
return time.mktime(dt.timetuple()) | ||||
r2726 | def time_to_datetime(tm): | |||
if tm: | ||||
if isinstance(tm, basestring): | ||||
try: | ||||
tm = float(tm) | ||||
except ValueError: | ||||
return | ||||
return datetime.datetime.fromtimestamp(tm) | ||||
r2201 | MENTIONS_REGEX = r'(?:^@|\s@)([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)(?:\s{1})' | |||
r2109 | def extract_mentioned_users(s): | |||
""" | ||||
Returns unique usernames from given string s that have @mention | ||||
:param s: string to get mentions | ||||
""" | ||||
r2201 | usrs = set() | |||
for username in re.findall(MENTIONS_REGEX, s): | ||||
usrs.add(username) | ||||
r2109 | ||||
r2201 | return sorted(list(usrs), key=lambda k: k.lower()) | |||
r2674 | ||||
r2699 | ||||
r2674 | class AttributeDict(dict): | |||
def __getattr__(self, attr): | ||||
return self.get(attr, None) | ||||
__setattr__ = dict.__setitem__ | ||||
__delattr__ = dict.__delitem__ | ||||
r2869 | ||||
def fix_PATH(os_=None): | ||||
""" | ||||
Get current active python path, and append it to PATH variable to fix issues | ||||
of subprocess calls and different python versions | ||||
""" | ||||
if os_ is None: | ||||
import os | ||||
else: | ||||
os = os_ | ||||
cur_path = os.path.split(sys.executable)[0] | ||||
if not os.environ['PATH'].startswith(cur_path): | ||||
os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH']) | ||||
r2882 | ||||
def obfuscate_url_pw(engine): | ||||
r3454 | _url = engine or '' | |||
from sqlalchemy.engine import url as sa_url | ||||
try: | ||||
_url = sa_url.make_url(engine) | ||||
if _url.password: | ||||
_url.password = 'XXXXX' | ||||
r3631 | except Exception: | |||
r3454 | pass | |||
return str(_url) | ||||
r2969 | ||||
def get_server_url(environ): | ||||
req = webob.Request(environ) | ||||
return req.host_url + req.script_name | ||||
r3577 | ||||
r3590 | def _extract_extras(env=None): | |||
r3577 | """ | |||
Extracts the rc extras data from os.environ, and wraps it into named | ||||
AttributeDict object | ||||
""" | ||||
r3590 | if not env: | |||
env = os.environ | ||||
r3577 | try: | |||
r3590 | rc_extras = json.loads(env['RC_SCM_DATA']) | |||
r3631 | except Exception: | |||
r3577 | print os.environ | |||
print >> sys.stderr, traceback.format_exc() | ||||
rc_extras = {} | ||||
try: | ||||
for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock', | ||||
'action', 'ip']: | ||||
rc_extras[k] | ||||
except KeyError, e: | ||||
raise Exception('Missing key %s in os.environ %s' % (e, rc_extras)) | ||||
return AttributeDict(rc_extras) | ||||
def _set_extras(extras): | ||||
os.environ['RC_SCM_DATA'] = json.dumps(extras) | ||||