__init__.py
385 lines
| 10.5 KiB
| text/x-python
|
PythonLexer
r914 | # -*- coding: utf-8 -*- | ||
""" | |||
rhodecode.lib.__init__ | |||
~~~~~~~~~~~~~~~~~~~~~~~ | |||
Some simple helper functions | |||
r1203 | |||
r914 | :created_on: Jan 5, 2011 | ||
:author: marcink | |||
r1203 | :copyright: (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com> | ||
r914 | :license: GPLv3, see COPYING for more details. | ||
""" | |||
r1206 | # This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | |||
# the Free Software Foundation, either version 3 of the License, or | |||
# (at your option) any later version. | |||
r1203 | # | ||
r914 | # This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
# GNU General Public License for more details. | |||
r1203 | # | ||
r914 | # You should have received a copy of the GNU General Public License | ||
r1206 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
r914 | |||
r1223 | |||
r1387 | try: | ||
import json | |||
except ImportError: | |||
#python 2.5 compatibility | |||
import simplejson as json | |||
r1302 | def __get_lem(): | ||
from pygments import lexers | |||
from string import lower | |||
from collections import defaultdict | |||
d = defaultdict(lambda: []) | |||
def __clean(s): | |||
s = s.lstrip('*') | |||
s = s.lstrip('.') | |||
if s.find('[') != -1: | |||
exts = [] | |||
start, stop = s.find('['), s.find(']') | |||
for suffix in s[start + 1:stop]: | |||
exts.append(s[:s.find('[')] + suffix) | |||
return map(lower, exts) | |||
else: | |||
return map(lower, [s]) | |||
for lx, t in sorted(lexers.LEXERS.items()): | |||
m = map(__clean, t[-2]) | |||
if m: | |||
m = reduce(lambda x, y: x + y, m) | |||
for ext in m: | |||
desc = lx.replace('Lexer', '') | |||
d[ext].append(desc) | |||
return dict(d) | |||
# language map is also used by whoosh indexer, which for those specified | |||
# extensions will index it's content | |||
LANGUAGES_EXTENSIONS_MAP = __get_lem() | |||
r1307 | # Additional mappings that are not present in the pygments lexers | ||
r1302 | # NOTE: that this will overide any mappings in LANGUAGES_EXTENSIONS_MAP | ||
ADDITIONAL_MAPPINGS = {'xaml': 'XAML'} | |||
LANGUAGES_EXTENSIONS_MAP.update(ADDITIONAL_MAPPINGS) | |||
r1307 | |||
r1223 | def str2bool(_str): | ||
""" | |||
returs True/False value from given string, it tries to translate the | |||
string into boolean | |||
:param _str: string value to translate into boolean | |||
:rtype: boolean | |||
:returns: boolean from given string | |||
""" | |||
if _str is None: | |||
r1154 | return False | ||
r1223 | if _str in (True, False): | ||
return _str | |||
_str = str(_str).strip().lower() | |||
return _str in ('t', 'true', 'y', 'yes', 'on', '1') | |||
r1307 | |||
r1373 | def convert_line_endings(line, mode): | ||
""" | |||
Converts a given line "line end" accordingly to given mode | |||
Available modes are:: | |||
0 - Unix | |||
1 - Mac | |||
2 - DOS | |||
:param line: given line to convert | |||
:param mode: mode to convert to | |||
:rtype: str | |||
:return: converted line according to mode | |||
""" | |||
r1305 | from string import replace | ||
r1373 | |||
r1305 | if mode == 0: | ||
r1373 | line = replace(line, '\r\n', '\n') | ||
line = replace(line, '\r', '\n') | |||
r1305 | elif mode == 1: | ||
r1373 | line = replace(line, '\r\n', '\r') | ||
line = replace(line, '\n', '\r') | |||
r1305 | elif mode == 2: | ||
import re | |||
r1373 | line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line) | ||
return line | |||
r1305 | |||
def detect_mode(line, default): | |||
r1307 | """ | ||
Detects line break for given line, if line break couldn't be found | |||
given default value is returned | |||
:param line: str line | |||
:param default: default | |||
:rtype: int | |||
:return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS | |||
""" | |||
r1305 | if line.endswith('\r\n'): | ||
return 2 | |||
elif line.endswith('\n'): | |||
return 0 | |||
elif line.endswith('\r'): | |||
return 1 | |||
else: | |||
return default | |||
r1116 | |||
r1307 | |||
r1116 | def generate_api_key(username, salt=None): | ||
r1154 | """ | ||
r1466 | Generates unique API key for given username, if salt is not given | ||
r1223 | it'll be generated from some random string | ||
r1203 | |||
r1154 | :param username: username as string | ||
:param salt: salt to hash generate KEY | |||
r1223 | :rtype: str | ||
:returns: sha1 hash from username+salt | |||
r1154 | """ | ||
r1116 | from tempfile import _RandomNameSequence | ||
import hashlib | |||
if salt is None: | |||
salt = _RandomNameSequence().next() | |||
return hashlib.sha1(username + salt).hexdigest() | |||
r1154 | |||
r1223 | |||
r1490 | def safe_unicode(str_, from_encoding='utf8'): | ||
r1154 | """ | ||
r1490 | safe unicode function. Does few trick to turn str_ into unicode | ||
In case of UnicodeDecode error we try to return it with encoding detected | |||
by chardet library if it fails fallback to unicode with errors replaced | |||
r1223 | |||
r1490 | :param str_: string to decode | ||
r1222 | :rtype: unicode | ||
:returns: unicode object | |||
r1154 | """ | ||
r1490 | if isinstance(str_, unicode): | ||
return str_ | |||
r1199 | |||
r1154 | try: | ||
r1490 | return unicode(str_, from_encoding) | ||
r1154 | except UnicodeDecodeError: | ||
r1490 | pass | ||
r1494 | try: | ||
r1490 | import chardet | ||
encoding = chardet.detect(str_)['encoding'] | |||
if encoding is None: | |||
raise UnicodeDecodeError() | |||
return str_.decode(encoding) | |||
except (ImportError, UnicodeDecodeError): | |||
return unicode(str_, from_encoding, 'replace') | |||
r1300 | |||
r1490 | def safe_str(unicode_, to_encoding='utf8'): | ||
r1401 | """ | ||
r1490 | safe str function. Does few trick to turn unicode_ into string | ||
In case of UnicodeEncodeError we try to return it with encoding detected | |||
by chardet library if it fails fallback to string with errors replaced | |||
r1401 | |||
r1490 | :param unicode_: unicode to encode | ||
r1401 | :rtype: str | ||
:returns: str object | |||
""" | |||
r1490 | if isinstance(unicode_, str): | ||
return unicode_ | |||
r1401 | |||
try: | |||
r1494 | return unicode_.encode(to_encoding) | ||
r1401 | except UnicodeEncodeError: | ||
r1490 | pass | ||
try: | |||
import chardet | |||
encoding = chardet.detect(unicode_)['encoding'] | |||
print encoding | |||
if encoding is None: | |||
raise UnicodeEncodeError() | |||
return unicode_.encode(encoding) | |||
except (ImportError, UnicodeEncodeError): | |||
return unicode_.encode(to_encoding, 'replace') | |||
r1401 | |||
return safe_str | |||
r1300 | def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs): | ||
""" | |||
Custom engine_from_config functions that makes sure we use NullPool for | |||
r1360 | file based sqlite databases. This prevents errors on sqlite. This only | ||
applies to sqlalchemy versions < 0.7.0 | |||
r1307 | |||
r1300 | """ | ||
r1360 | import sqlalchemy | ||
r1300 | from sqlalchemy import engine_from_config as efc | ||
r1360 | import logging | ||
if int(sqlalchemy.__version__.split('.')[1]) < 7: | |||
# This solution should work for sqlalchemy < 0.7.0, and should use | |||
# proxy=TimerProxy() for execution time profiling | |||
from sqlalchemy.pool import NullPool | |||
url = configuration[prefix + 'url'] | |||
r1300 | |||
r1360 | if url.startswith('sqlite'): | ||
kwargs.update({'poolclass': NullPool}) | |||
return efc(configuration, prefix, **kwargs) | |||
else: | |||
import time | |||
from sqlalchemy import event | |||
from sqlalchemy.engine import Engine | |||
r1362 | log = logging.getLogger('sqlalchemy.engine') | ||
r1360 | BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38) | ||
engine = efc(configuration, prefix, **kwargs) | |||
r1300 | |||
r1360 | def color_sql(sql): | ||
COLOR_SEQ = "\033[1;%dm" | |||
COLOR_SQL = YELLOW | |||
normal = '\x1b[0m' | |||
return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal]) | |||
if configuration['debug']: | |||
#attach events only for debug configuration | |||
r1300 | |||
r1360 | def before_cursor_execute(conn, cursor, statement, | ||
parameters, context, executemany): | |||
context._query_start_time = time.time() | |||
log.info(color_sql(">>>>> STARTING QUERY >>>>>")) | |||
def after_cursor_execute(conn, cursor, statement, | |||
parameters, context, executemany): | |||
total = time.time() - context._query_start_time | |||
log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total)) | |||
event.listen(engine, "before_cursor_execute", | |||
before_cursor_execute) | |||
event.listen(engine, "after_cursor_execute", | |||
after_cursor_execute) | |||
return engine | |||
r1342 | |||
def age(curdate): | |||
""" | |||
turns a datetime into an age string. | |||
:param curdate: datetime object | |||
:rtype: unicode | |||
:returns: unicode words describing age | |||
""" | |||
from datetime import datetime | |||
from webhelpers.date import time_ago_in_words | |||
_ = lambda s:s | |||
if not curdate: | |||
return '' | |||
agescales = [(_(u"year"), 3600 * 24 * 365), | |||
(_(u"month"), 3600 * 24 * 30), | |||
(_(u"day"), 3600 * 24), | |||
(_(u"hour"), 3600), | |||
(_(u"minute"), 60), | |||
(_(u"second"), 1), ] | |||
age = datetime.now() - curdate | |||
age_seconds = (age.days * agescales[2][1]) + age.seconds | |||
pos = 1 | |||
for scale in agescales: | |||
if scale[1] <= age_seconds: | |||
if pos == 6:pos = 5 | |||
return '%s %s' % (time_ago_in_words(curdate, | |||
agescales[pos][0]), _('ago')) | |||
pos += 1 | |||
return _(u'just now') | |||
r1373 | def uri_filter(uri): | ||
r1342 | """ | ||
Removes user:password from given url string | |||
:param uri: | |||
:rtype: unicode | |||
r1373 | :returns: filtered list of strings | ||
r1342 | """ | ||
if not uri: | |||
return '' | |||
proto = '' | |||
for pat in ('https://', 'http://'): | |||
if uri.startswith(pat): | |||
uri = uri[len(pat):] | |||
proto = pat | |||
break | |||
# remove passwords and username | |||
uri = uri[uri.find('@') + 1:] | |||
# get the port | |||
cred_pos = uri.find(':') | |||
if cred_pos == -1: | |||
host, port = uri, None | |||
else: | |||
host, port = uri[:cred_pos], uri[cred_pos + 1:] | |||
return filter(None, [proto, host, port]) | |||
r1373 | |||
def credentials_filter(uri): | |||
""" | |||
Returns a url with removed credentials | |||
:param uri: | |||
""" | |||
uri = uri_filter(uri) | |||
#check if we have port | |||
if len(uri) > 2 and uri[2]: | |||
uri[2] = ':' + uri[2] | |||
return ''.join(uri) | |||
r1466 | def get_changeset_safe(repo, rev): | ||
""" | |||
Safe version of get_changeset if this changeset doesn't exists for a | |||
repo it returns a Dummy one instead | |||
:param repo: | |||
:param rev: | |||
""" | |||
from vcs.backends.base import BaseRepository | |||
from vcs.exceptions import RepositoryError | |||
if not isinstance(repo, BaseRepository): | |||
raise Exception('You must pass an Repository ' | |||
'object as first argument got %s', type(repo)) | |||
try: | |||
cs = repo.get_changeset(rev) | |||
except RepositoryError: | |||
from rhodecode.lib.utils import EmptyChangeset | |||
cs = EmptyChangeset(requested_revision=rev) | |||
return cs |