|
|
# -*- coding: utf-8 -*-
|
|
|
"""
|
|
|
rhodecode.lib.__init__
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
Some simple helper functions
|
|
|
|
|
|
:created_on: Jan 5, 2011
|
|
|
:author: marcink
|
|
|
:copyright: (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
|
|
|
:license: GPLv3, see COPYING for more details.
|
|
|
"""
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
# (at your option) any later version.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import os
|
|
|
|
|
|
def __get_lem():
|
|
|
from pygments import lexers
|
|
|
from string import lower
|
|
|
from collections import defaultdict
|
|
|
|
|
|
d = defaultdict(lambda: [])
|
|
|
|
|
|
def __clean(s):
|
|
|
s = s.lstrip('*')
|
|
|
s = s.lstrip('.')
|
|
|
|
|
|
if s.find('[') != -1:
|
|
|
exts = []
|
|
|
start, stop = s.find('['), s.find(']')
|
|
|
|
|
|
for suffix in s[start + 1:stop]:
|
|
|
exts.append(s[:s.find('[')] + suffix)
|
|
|
return map(lower, exts)
|
|
|
else:
|
|
|
return map(lower, [s])
|
|
|
|
|
|
for lx, t in sorted(lexers.LEXERS.items()):
|
|
|
m = map(__clean, t[-2])
|
|
|
if m:
|
|
|
m = reduce(lambda x, y: x + y, m)
|
|
|
for ext in m:
|
|
|
desc = lx.replace('Lexer', '')
|
|
|
d[ext].append(desc)
|
|
|
|
|
|
return dict(d)
|
|
|
|
|
|
# language map is also used by whoosh indexer, which for those specified
|
|
|
# extensions will index it's content
|
|
|
LANGUAGES_EXTENSIONS_MAP = __get_lem()
|
|
|
|
|
|
# Additional mappings that are not present in the pygments lexers
|
|
|
# NOTE: that this will overide any mappings in LANGUAGES_EXTENSIONS_MAP
|
|
|
ADDITIONAL_MAPPINGS = {'xaml': 'XAML'}
|
|
|
|
|
|
LANGUAGES_EXTENSIONS_MAP.update(ADDITIONAL_MAPPINGS)
|
|
|
|
|
|
|
|
|
def str2bool(_str):
|
|
|
"""
|
|
|
returs True/False value from given string, it tries to translate the
|
|
|
string into boolean
|
|
|
|
|
|
:param _str: string value to translate into boolean
|
|
|
:rtype: boolean
|
|
|
:returns: boolean from given string
|
|
|
"""
|
|
|
if _str is None:
|
|
|
return False
|
|
|
if _str in (True, False):
|
|
|
return _str
|
|
|
_str = str(_str).strip().lower()
|
|
|
return _str in ('t', 'true', 'y', 'yes', 'on', '1')
|
|
|
|
|
|
|
|
|
def convert_line_endings(line, mode):
|
|
|
"""
|
|
|
Converts a given line "line end" accordingly to given mode
|
|
|
|
|
|
Available modes are::
|
|
|
0 - Unix
|
|
|
1 - Mac
|
|
|
2 - DOS
|
|
|
|
|
|
:param line: given line to convert
|
|
|
:param mode: mode to convert to
|
|
|
:rtype: str
|
|
|
:return: converted line according to mode
|
|
|
"""
|
|
|
from string import replace
|
|
|
|
|
|
if mode == 0:
|
|
|
line = replace(line, '\r\n', '\n')
|
|
|
line = replace(line, '\r', '\n')
|
|
|
elif mode == 1:
|
|
|
line = replace(line, '\r\n', '\r')
|
|
|
line = replace(line, '\n', '\r')
|
|
|
elif mode == 2:
|
|
|
import re
|
|
|
line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
|
|
|
return line
|
|
|
|
|
|
|
|
|
def detect_mode(line, default):
|
|
|
"""
|
|
|
Detects line break for given line, if line break couldn't be found
|
|
|
given default value is returned
|
|
|
|
|
|
:param line: str line
|
|
|
:param default: default
|
|
|
:rtype: int
|
|
|
:return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
|
|
|
"""
|
|
|
if line.endswith('\r\n'):
|
|
|
return 2
|
|
|
elif line.endswith('\n'):
|
|
|
return 0
|
|
|
elif line.endswith('\r'):
|
|
|
return 1
|
|
|
else:
|
|
|
return default
|
|
|
|
|
|
|
|
|
def generate_api_key(username, salt=None):
|
|
|
"""
|
|
|
Generates unique API key for given username, if salt is not given
|
|
|
it'll be generated from some random string
|
|
|
|
|
|
:param username: username as string
|
|
|
:param salt: salt to hash generate KEY
|
|
|
:rtype: str
|
|
|
:returns: sha1 hash from username+salt
|
|
|
"""
|
|
|
from tempfile import _RandomNameSequence
|
|
|
import hashlib
|
|
|
|
|
|
if salt is None:
|
|
|
salt = _RandomNameSequence().next()
|
|
|
|
|
|
return hashlib.sha1(username + salt).hexdigest()
|
|
|
|
|
|
|
|
|
def safe_unicode(str_, from_encoding='utf8'):
|
|
|
"""
|
|
|
safe unicode function. Does few trick to turn str_ into unicode
|
|
|
|
|
|
In case of UnicodeDecode error we try to return it with encoding detected
|
|
|
by chardet library if it fails fallback to unicode with errors replaced
|
|
|
|
|
|
:param str_: string to decode
|
|
|
:rtype: unicode
|
|
|
:returns: unicode object
|
|
|
"""
|
|
|
if isinstance(str_, unicode):
|
|
|
return str_
|
|
|
|
|
|
try:
|
|
|
return unicode(str_)
|
|
|
except UnicodeDecodeError:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
return unicode(str_, from_encoding)
|
|
|
except UnicodeDecodeError:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
import chardet
|
|
|
encoding = chardet.detect(str_)['encoding']
|
|
|
if encoding is None:
|
|
|
raise Exception()
|
|
|
return str_.decode(encoding)
|
|
|
except (ImportError, UnicodeDecodeError, Exception):
|
|
|
return unicode(str_, from_encoding, 'replace')
|
|
|
|
|
|
def safe_str(unicode_, to_encoding='utf8'):
|
|
|
"""
|
|
|
safe str function. Does few trick to turn unicode_ into string
|
|
|
|
|
|
In case of UnicodeEncodeError we try to return it with encoding detected
|
|
|
by chardet library if it fails fallback to string with errors replaced
|
|
|
|
|
|
:param unicode_: unicode to encode
|
|
|
:rtype: str
|
|
|
:returns: str object
|
|
|
"""
|
|
|
|
|
|
if isinstance(unicode_, str):
|
|
|
return unicode_
|
|
|
|
|
|
try:
|
|
|
return unicode_.encode(to_encoding)
|
|
|
except UnicodeEncodeError:
|
|
|
pass
|
|
|
|
|
|
try:
|
|
|
import chardet
|
|
|
encoding = chardet.detect(unicode_)['encoding']
|
|
|
print encoding
|
|
|
if encoding is None:
|
|
|
raise UnicodeEncodeError()
|
|
|
|
|
|
return unicode_.encode(encoding)
|
|
|
except (ImportError, UnicodeEncodeError):
|
|
|
return unicode_.encode(to_encoding, 'replace')
|
|
|
|
|
|
return safe_str
|
|
|
|
|
|
|
|
|
|
|
|
def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
|
|
|
"""
|
|
|
Custom engine_from_config functions that makes sure we use NullPool for
|
|
|
file based sqlite databases. This prevents errors on sqlite. This only
|
|
|
applies to sqlalchemy versions < 0.7.0
|
|
|
|
|
|
"""
|
|
|
import sqlalchemy
|
|
|
from sqlalchemy import engine_from_config as efc
|
|
|
import logging
|
|
|
|
|
|
if int(sqlalchemy.__version__.split('.')[1]) < 7:
|
|
|
|
|
|
# This solution should work for sqlalchemy < 0.7.0, and should use
|
|
|
# proxy=TimerProxy() for execution time profiling
|
|
|
|
|
|
from sqlalchemy.pool import NullPool
|
|
|
url = configuration[prefix + 'url']
|
|
|
|
|
|
if url.startswith('sqlite'):
|
|
|
kwargs.update({'poolclass': NullPool})
|
|
|
return efc(configuration, prefix, **kwargs)
|
|
|
else:
|
|
|
import time
|
|
|
from sqlalchemy import event
|
|
|
from sqlalchemy.engine import Engine
|
|
|
|
|
|
log = logging.getLogger('sqlalchemy.engine')
|
|
|
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
|
|
|
engine = efc(configuration, prefix, **kwargs)
|
|
|
|
|
|
def color_sql(sql):
|
|
|
COLOR_SEQ = "\033[1;%dm"
|
|
|
COLOR_SQL = YELLOW
|
|
|
normal = '\x1b[0m'
|
|
|
return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
|
|
|
|
|
|
if configuration['debug']:
|
|
|
#attach events only for debug configuration
|
|
|
|
|
|
def before_cursor_execute(conn, cursor, statement,
|
|
|
parameters, context, executemany):
|
|
|
context._query_start_time = time.time()
|
|
|
log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
|
|
|
|
|
|
|
|
|
def after_cursor_execute(conn, cursor, statement,
|
|
|
parameters, context, executemany):
|
|
|
total = time.time() - context._query_start_time
|
|
|
log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
|
|
|
|
|
|
event.listen(engine, "before_cursor_execute",
|
|
|
before_cursor_execute)
|
|
|
event.listen(engine, "after_cursor_execute",
|
|
|
after_cursor_execute)
|
|
|
|
|
|
return engine
|
|
|
|
|
|
|
|
|
def age(curdate):
|
|
|
"""
|
|
|
turns a datetime into an age string.
|
|
|
|
|
|
:param curdate: datetime object
|
|
|
:rtype: unicode
|
|
|
:returns: unicode words describing age
|
|
|
"""
|
|
|
|
|
|
from datetime import datetime
|
|
|
from webhelpers.date import time_ago_in_words
|
|
|
|
|
|
_ = lambda s:s
|
|
|
|
|
|
if not curdate:
|
|
|
return ''
|
|
|
|
|
|
agescales = [(_(u"year"), 3600 * 24 * 365),
|
|
|
(_(u"month"), 3600 * 24 * 30),
|
|
|
(_(u"day"), 3600 * 24),
|
|
|
(_(u"hour"), 3600),
|
|
|
(_(u"minute"), 60),
|
|
|
(_(u"second"), 1), ]
|
|
|
|
|
|
age = datetime.now() - curdate
|
|
|
age_seconds = (age.days * agescales[2][1]) + age.seconds
|
|
|
pos = 1
|
|
|
for scale in agescales:
|
|
|
if scale[1] <= age_seconds:
|
|
|
if pos == 6:pos = 5
|
|
|
return '%s %s' % (time_ago_in_words(curdate,
|
|
|
agescales[pos][0]), _('ago'))
|
|
|
pos += 1
|
|
|
|
|
|
return _(u'just now')
|
|
|
|
|
|
|
|
|
def uri_filter(uri):
|
|
|
"""
|
|
|
Removes user:password from given url string
|
|
|
|
|
|
:param uri:
|
|
|
:rtype: unicode
|
|
|
:returns: filtered list of strings
|
|
|
"""
|
|
|
if not uri:
|
|
|
return ''
|
|
|
|
|
|
proto = ''
|
|
|
|
|
|
for pat in ('https://', 'http://'):
|
|
|
if uri.startswith(pat):
|
|
|
uri = uri[len(pat):]
|
|
|
proto = pat
|
|
|
break
|
|
|
|
|
|
# remove passwords and username
|
|
|
uri = uri[uri.find('@') + 1:]
|
|
|
|
|
|
# get the port
|
|
|
cred_pos = uri.find(':')
|
|
|
if cred_pos == -1:
|
|
|
host, port = uri, None
|
|
|
else:
|
|
|
host, port = uri[:cred_pos], uri[cred_pos + 1:]
|
|
|
|
|
|
return filter(None, [proto, host, port])
|
|
|
|
|
|
|
|
|
def credentials_filter(uri):
|
|
|
"""
|
|
|
Returns a url with removed credentials
|
|
|
|
|
|
:param uri:
|
|
|
"""
|
|
|
|
|
|
uri = uri_filter(uri)
|
|
|
#check if we have port
|
|
|
if len(uri) > 2 and uri[2]:
|
|
|
uri[2] = ':' + uri[2]
|
|
|
|
|
|
return ''.join(uri)
|
|
|
|
|
|
def get_changeset_safe(repo, rev):
|
|
|
"""
|
|
|
Safe version of get_changeset if this changeset doesn't exists for a
|
|
|
repo it returns a Dummy one instead
|
|
|
|
|
|
:param repo:
|
|
|
:param rev:
|
|
|
"""
|
|
|
from vcs.backends.base import BaseRepository
|
|
|
from vcs.exceptions import RepositoryError
|
|
|
if not isinstance(repo, BaseRepository):
|
|
|
raise Exception('You must pass an Repository '
|
|
|
'object as first argument got %s', type(repo))
|
|
|
|
|
|
try:
|
|
|
cs = repo.get_changeset(rev)
|
|
|
except RepositoryError:
|
|
|
from rhodecode.lib.utils import EmptyChangeset
|
|
|
cs = EmptyChangeset(requested_revision=rev)
|
|
|
return cs
|
|
|
|
|
|
|
|
|
def get_current_revision(quiet=False):
|
|
|
"""
|
|
|
Returns tuple of (number, id) from repository containing this package
|
|
|
or None if repository could not be found.
|
|
|
|
|
|
:param quiet: prints error for fetching revision if True
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
from vcs import get_repo
|
|
|
from vcs.utils.helpers import get_scm
|
|
|
from vcs.exceptions import RepositoryError, VCSError
|
|
|
repopath = os.path.join(os.path.dirname(__file__), '..', '..')
|
|
|
scm = get_scm(repopath)[0]
|
|
|
repo = get_repo(path=repopath, alias=scm)
|
|
|
tip = repo.get_changeset()
|
|
|
return (tip.revision, tip.short_id)
|
|
|
except (ImportError, RepositoryError, VCSError), err:
|
|
|
if not quiet:
|
|
|
print ("Cannot retrieve rhodecode's revision. Original error "
|
|
|
"was: %s" % err)
|
|
|
return None
|
|
|
|
|
|
|