__init__.py
408 lines
| 11.2 KiB
| text/x-python
|
PythonLexer
r914 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.lib.__init__ | ||||
~~~~~~~~~~~~~~~~~~~~~~~ | ||||
Some simple helper functions | ||||
r1203 | ||||
r914 | :created_on: Jan 5, 2011 | |||
:author: marcink | ||||
r1203 | :copyright: (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com> | |||
r914 | :license: GPLv3, see COPYING for more details. | |||
""" | ||||
r1206 | # This program is free software: you can redistribute it and/or modify | |||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
r1203 | # | |||
r914 | # This program is distributed in the hope that it will be useful, | |||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
r1203 | # | |||
r914 | # You should have received a copy of the GNU General Public License | |||
r1206 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
r914 | ||||
r1541 | import os | |||
r1302 | def __get_lem(): | |||
from pygments import lexers | ||||
from string import lower | ||||
from collections import defaultdict | ||||
d = defaultdict(lambda: []) | ||||
def __clean(s): | ||||
s = s.lstrip('*') | ||||
s = s.lstrip('.') | ||||
if s.find('[') != -1: | ||||
exts = [] | ||||
start, stop = s.find('['), s.find(']') | ||||
for suffix in s[start + 1:stop]: | ||||
exts.append(s[:s.find('[')] + suffix) | ||||
return map(lower, exts) | ||||
else: | ||||
return map(lower, [s]) | ||||
for lx, t in sorted(lexers.LEXERS.items()): | ||||
m = map(__clean, t[-2]) | ||||
if m: | ||||
m = reduce(lambda x, y: x + y, m) | ||||
for ext in m: | ||||
desc = lx.replace('Lexer', '') | ||||
d[ext].append(desc) | ||||
return dict(d) | ||||
# language map is also used by whoosh indexer, which for those specified | ||||
# extensions will index it's content | ||||
LANGUAGES_EXTENSIONS_MAP = __get_lem() | ||||
r1307 | # Additional mappings that are not present in the pygments lexers | |||
r1302 | # NOTE: that this will overide any mappings in LANGUAGES_EXTENSIONS_MAP | |||
ADDITIONAL_MAPPINGS = {'xaml': 'XAML'} | ||||
LANGUAGES_EXTENSIONS_MAP.update(ADDITIONAL_MAPPINGS) | ||||
r1307 | ||||
r1223 | def str2bool(_str): | |||
""" | ||||
returs True/False value from given string, it tries to translate the | ||||
string into boolean | ||||
:param _str: string value to translate into boolean | ||||
:rtype: boolean | ||||
:returns: boolean from given string | ||||
""" | ||||
if _str is None: | ||||
r1154 | return False | |||
r1223 | if _str in (True, False): | |||
return _str | ||||
_str = str(_str).strip().lower() | ||||
return _str in ('t', 'true', 'y', 'yes', 'on', '1') | ||||
r1307 | ||||
r1373 | def convert_line_endings(line, mode): | |||
""" | ||||
Converts a given line "line end" accordingly to given mode | ||||
Available modes are:: | ||||
0 - Unix | ||||
1 - Mac | ||||
2 - DOS | ||||
:param line: given line to convert | ||||
:param mode: mode to convert to | ||||
:rtype: str | ||||
:return: converted line according to mode | ||||
""" | ||||
r1305 | from string import replace | |||
r1373 | ||||
r1305 | if mode == 0: | |||
r1373 | line = replace(line, '\r\n', '\n') | |||
line = replace(line, '\r', '\n') | ||||
r1305 | elif mode == 1: | |||
r1373 | line = replace(line, '\r\n', '\r') | |||
line = replace(line, '\n', '\r') | ||||
r1305 | elif mode == 2: | |||
import re | ||||
r1373 | line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line) | |||
return line | ||||
r1305 | ||||
def detect_mode(line, default): | ||||
r1307 | """ | |||
Detects line break for given line, if line break couldn't be found | ||||
given default value is returned | ||||
:param line: str line | ||||
:param default: default | ||||
:rtype: int | ||||
:return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS | ||||
""" | ||||
r1305 | if line.endswith('\r\n'): | |||
return 2 | ||||
elif line.endswith('\n'): | ||||
return 0 | ||||
elif line.endswith('\r'): | ||||
return 1 | ||||
else: | ||||
return default | ||||
r1116 | ||||
r1307 | ||||
r1116 | def generate_api_key(username, salt=None): | |||
r1154 | """ | |||
r1466 | Generates unique API key for given username, if salt is not given | |||
r1223 | it'll be generated from some random string | |||
r1203 | ||||
r1154 | :param username: username as string | |||
:param salt: salt to hash generate KEY | ||||
r1223 | :rtype: str | |||
:returns: sha1 hash from username+salt | ||||
r1154 | """ | |||
r1116 | from tempfile import _RandomNameSequence | |||
import hashlib | ||||
if salt is None: | ||||
salt = _RandomNameSequence().next() | ||||
return hashlib.sha1(username + salt).hexdigest() | ||||
r1154 | ||||
r1223 | ||||
r1490 | def safe_unicode(str_, from_encoding='utf8'): | |||
r1154 | """ | |||
r1490 | safe unicode function. Does few trick to turn str_ into unicode | |||
In case of UnicodeDecode error we try to return it with encoding detected | ||||
by chardet library if it fails fallback to unicode with errors replaced | ||||
r1223 | ||||
r1490 | :param str_: string to decode | |||
r1222 | :rtype: unicode | |||
:returns: unicode object | ||||
r1154 | """ | |||
r1490 | if isinstance(str_, unicode): | |||
return str_ | ||||
r1199 | ||||
r1154 | try: | |||
r1502 | return unicode(str_) | |||
except UnicodeDecodeError: | ||||
pass | ||||
try: | ||||
r1490 | return unicode(str_, from_encoding) | |||
r1154 | except UnicodeDecodeError: | |||
r1490 | pass | |||
r1502 | ||||
try: | ||||
r1490 | import chardet | |||
encoding = chardet.detect(str_)['encoding'] | ||||
if encoding is None: | ||||
r1502 | raise Exception() | |||
r1490 | return str_.decode(encoding) | |||
r1502 | except (ImportError, UnicodeDecodeError, Exception): | |||
return unicode(str_, from_encoding, 'replace') | ||||
r1300 | ||||
r1490 | def safe_str(unicode_, to_encoding='utf8'): | |||
r1401 | """ | |||
r1490 | safe str function. Does few trick to turn unicode_ into string | |||
In case of UnicodeEncodeError we try to return it with encoding detected | ||||
by chardet library if it fails fallback to string with errors replaced | ||||
r1401 | ||||
r1490 | :param unicode_: unicode to encode | |||
r1401 | :rtype: str | |||
:returns: str object | ||||
""" | ||||
r1490 | if isinstance(unicode_, str): | |||
return unicode_ | ||||
r1401 | ||||
try: | ||||
r1494 | return unicode_.encode(to_encoding) | |||
r1401 | except UnicodeEncodeError: | |||
r1490 | pass | |||
r1514 | ||||
r1490 | try: | |||
import chardet | ||||
encoding = chardet.detect(unicode_)['encoding'] | ||||
print encoding | ||||
if encoding is None: | ||||
raise UnicodeEncodeError() | ||||
r1514 | ||||
r1490 | return unicode_.encode(encoding) | |||
except (ImportError, UnicodeEncodeError): | ||||
return unicode_.encode(to_encoding, 'replace') | ||||
r1401 | ||||
return safe_str | ||||
r1300 | def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs): | |||
""" | ||||
Custom engine_from_config functions that makes sure we use NullPool for | ||||
r1360 | file based sqlite databases. This prevents errors on sqlite. This only | |||
applies to sqlalchemy versions < 0.7.0 | ||||
r1307 | ||||
r1300 | """ | |||
r1360 | import sqlalchemy | |||
r1300 | from sqlalchemy import engine_from_config as efc | |||
r1360 | import logging | |||
if int(sqlalchemy.__version__.split('.')[1]) < 7: | ||||
# This solution should work for sqlalchemy < 0.7.0, and should use | ||||
# proxy=TimerProxy() for execution time profiling | ||||
from sqlalchemy.pool import NullPool | ||||
url = configuration[prefix + 'url'] | ||||
r1300 | ||||
r1360 | if url.startswith('sqlite'): | |||
kwargs.update({'poolclass': NullPool}) | ||||
return efc(configuration, prefix, **kwargs) | ||||
else: | ||||
import time | ||||
from sqlalchemy import event | ||||
from sqlalchemy.engine import Engine | ||||
r1362 | log = logging.getLogger('sqlalchemy.engine') | |||
r1360 | BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38) | |||
engine = efc(configuration, prefix, **kwargs) | ||||
r1300 | ||||
r1360 | def color_sql(sql): | |||
COLOR_SEQ = "\033[1;%dm" | ||||
COLOR_SQL = YELLOW | ||||
normal = '\x1b[0m' | ||||
return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal]) | ||||
if configuration['debug']: | ||||
#attach events only for debug configuration | ||||
r1300 | ||||
r1360 | def before_cursor_execute(conn, cursor, statement, | |||
parameters, context, executemany): | ||||
context._query_start_time = time.time() | ||||
log.info(color_sql(">>>>> STARTING QUERY >>>>>")) | ||||
def after_cursor_execute(conn, cursor, statement, | ||||
parameters, context, executemany): | ||||
total = time.time() - context._query_start_time | ||||
log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total)) | ||||
event.listen(engine, "before_cursor_execute", | ||||
before_cursor_execute) | ||||
event.listen(engine, "after_cursor_execute", | ||||
after_cursor_execute) | ||||
return engine | ||||
r1342 | ||||
def age(curdate): | ||||
""" | ||||
turns a datetime into an age string. | ||||
:param curdate: datetime object | ||||
:rtype: unicode | ||||
:returns: unicode words describing age | ||||
""" | ||||
from datetime import datetime | ||||
from webhelpers.date import time_ago_in_words | ||||
_ = lambda s:s | ||||
if not curdate: | ||||
return '' | ||||
agescales = [(_(u"year"), 3600 * 24 * 365), | ||||
(_(u"month"), 3600 * 24 * 30), | ||||
(_(u"day"), 3600 * 24), | ||||
(_(u"hour"), 3600), | ||||
(_(u"minute"), 60), | ||||
(_(u"second"), 1), ] | ||||
age = datetime.now() - curdate | ||||
age_seconds = (age.days * agescales[2][1]) + age.seconds | ||||
pos = 1 | ||||
for scale in agescales: | ||||
if scale[1] <= age_seconds: | ||||
if pos == 6:pos = 5 | ||||
return '%s %s' % (time_ago_in_words(curdate, | ||||
agescales[pos][0]), _('ago')) | ||||
pos += 1 | ||||
return _(u'just now') | ||||
r1373 | def uri_filter(uri): | |||
r1342 | """ | |||
Removes user:password from given url string | ||||
:param uri: | ||||
:rtype: unicode | ||||
r1373 | :returns: filtered list of strings | |||
r1342 | """ | |||
if not uri: | ||||
return '' | ||||
proto = '' | ||||
for pat in ('https://', 'http://'): | ||||
if uri.startswith(pat): | ||||
uri = uri[len(pat):] | ||||
proto = pat | ||||
break | ||||
# remove passwords and username | ||||
uri = uri[uri.find('@') + 1:] | ||||
# get the port | ||||
cred_pos = uri.find(':') | ||||
if cred_pos == -1: | ||||
host, port = uri, None | ||||
else: | ||||
host, port = uri[:cred_pos], uri[cred_pos + 1:] | ||||
return filter(None, [proto, host, port]) | ||||
r1373 | ||||
def credentials_filter(uri): | ||||
""" | ||||
Returns a url with removed credentials | ||||
:param uri: | ||||
""" | ||||
uri = uri_filter(uri) | ||||
#check if we have port | ||||
if len(uri) > 2 and uri[2]: | ||||
uri[2] = ':' + uri[2] | ||||
return ''.join(uri) | ||||
r1466 | def get_changeset_safe(repo, rev): | |||
""" | ||||
Safe version of get_changeset if this changeset doesn't exists for a | ||||
repo it returns a Dummy one instead | ||||
:param repo: | ||||
:param rev: | ||||
""" | ||||
from vcs.backends.base import BaseRepository | ||||
from vcs.exceptions import RepositoryError | ||||
if not isinstance(repo, BaseRepository): | ||||
raise Exception('You must pass an Repository ' | ||||
'object as first argument got %s', type(repo)) | ||||
try: | ||||
cs = repo.get_changeset(rev) | ||||
except RepositoryError: | ||||
from rhodecode.lib.utils import EmptyChangeset | ||||
cs = EmptyChangeset(requested_revision=rev) | ||||
r1514 | return cs | |||
r1541 | ||||
r1571 | def get_current_revision(quiet=False): | |||
r1541 | """ | |||
Returns tuple of (number, id) from repository containing this package | ||||
or None if repository could not be found. | ||||
r1571 | ||||
:param quiet: prints error for fetching revision if True | ||||
r1541 | """ | |||
try: | ||||
from vcs import get_repo | ||||
from vcs.utils.helpers import get_scm | ||||
from vcs.exceptions import RepositoryError, VCSError | ||||
repopath = os.path.join(os.path.dirname(__file__), '..', '..') | ||||
scm = get_scm(repopath)[0] | ||||
repo = get_repo(path=repopath, alias=scm) | ||||
tip = repo.get_changeset() | ||||
return (tip.revision, tip.short_id) | ||||
except (ImportError, RepositoryError, VCSError), err: | ||||
r1571 | if not quiet: | |||
print ("Cannot retrieve rhodecode's revision. Original error " | ||||
"was: %s" % err) | ||||
r1541 | return None | |||
r1571 | ||||