# -*- coding: utf-8 -*- """ rhodecode.lib.__init__ ~~~~~~~~~~~~~~~~~~~~~~~ Some simple helper functions :created_on: Jan 5, 2011 :author: marcink :copyright: (C) 2011-2012 Marcin Kuzminski :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import re from rhodecode.lib.vcs.utils.lazy import LazyProperty def __get_lem(): from pygments import lexers from string import lower from collections import defaultdict d = defaultdict(lambda: []) def __clean(s): s = s.lstrip('*') s = s.lstrip('.') if s.find('[') != -1: exts = [] start, stop = s.find('['), s.find(']') for suffix in s[start + 1:stop]: exts.append(s[:s.find('[')] + suffix) return map(lower, exts) else: return map(lower, [s]) for lx, t in sorted(lexers.LEXERS.items()): m = map(__clean, t[-2]) if m: m = reduce(lambda x, y: x + y, m) for ext in m: desc = lx.replace('Lexer', '') d[ext].append(desc) return dict(d) # language map is also used by whoosh indexer, which for those specified # extensions will index it's content LANGUAGES_EXTENSIONS_MAP = __get_lem() # Additional mappings that are not present in the pygments lexers # NOTE: that this will overide any mappings in LANGUAGES_EXTENSIONS_MAP ADDITIONAL_MAPPINGS = {'xaml': 'XAML'} LANGUAGES_EXTENSIONS_MAP.update(ADDITIONAL_MAPPINGS) # list of readme files to search in file tree and display in summary # attached weights defines the search order lower is first ALL_READMES = [ ('readme', 0), ('README', 0), ('Readme', 0), ('doc/readme', 1), ('doc/README', 1), ('doc/Readme', 1), ('Docs/readme', 2), ('Docs/README', 2), ('Docs/Readme', 2), ('DOCS/readme', 2), ('DOCS/README', 2), ('DOCS/Readme', 2), ('docs/readme', 2), ('docs/README', 2), ('docs/Readme', 2), ] # extension together with weights to search lower is first RST_EXTS = [ ('', 0), ('.rst', 1), ('.rest', 1), ('.RST', 2), ('.REST', 2), ('.txt', 3), ('.TXT', 3) ] MARKDOWN_EXTS = [ ('.md', 1), ('.MD', 1), ('.mkdn', 2), ('.MKDN', 2), ('.mdown', 3), ('.MDOWN', 3), ('.markdown', 4), ('.MARKDOWN', 4) ] PLAIN_EXTS = [('.text', 2), ('.TEXT', 2)] ALL_EXTS = MARKDOWN_EXTS + RST_EXTS + PLAIN_EXTS def str2bool(_str): """ returs True/False value from given string, it tries to translate the string into boolean :param _str: string value to translate into boolean :rtype: boolean :returns: boolean from given string """ if _str is None: return False if _str in (True, False): return _str _str = str(_str).strip().lower() return _str in ('t', 'true', 'y', 'yes', 'on', '1') def convert_line_endings(line, mode): """ Converts a given line "line end" accordingly to given mode Available modes are:: 0 - Unix 1 - Mac 2 - DOS :param line: given line to convert :param mode: mode to convert to :rtype: str :return: converted line according to mode """ from string import replace if mode == 0: line = replace(line, '\r\n', '\n') line = replace(line, '\r', '\n') elif mode == 1: line = replace(line, '\r\n', '\r') line = replace(line, '\n', '\r') elif mode == 2: line = re.sub("\r(?!\n)|(?>>>> STARTING QUERY >>>>>")) def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): total = time.time() - context._query_start_time log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total)) event.listen(engine, "before_cursor_execute", before_cursor_execute) event.listen(engine, "after_cursor_execute", after_cursor_execute) return engine def age(curdate): """ turns a datetime into an age string. :param curdate: datetime object :rtype: unicode :returns: unicode words describing age """ from datetime import datetime from webhelpers.date import time_ago_in_words _ = lambda s: s if not curdate: return '' agescales = [(_(u"year"), 3600 * 24 * 365), (_(u"month"), 3600 * 24 * 30), (_(u"day"), 3600 * 24), (_(u"hour"), 3600), (_(u"minute"), 60), (_(u"second"), 1), ] age = datetime.now() - curdate age_seconds = (age.days * agescales[2][1]) + age.seconds pos = 1 for scale in agescales: if scale[1] <= age_seconds: if pos == 6: pos = 5 return '%s %s' % (time_ago_in_words(curdate, agescales[pos][0]), _('ago')) pos += 1 return _(u'just now') def uri_filter(uri): """ Removes user:password from given url string :param uri: :rtype: unicode :returns: filtered list of strings """ if not uri: return '' proto = '' for pat in ('https://', 'http://'): if uri.startswith(pat): uri = uri[len(pat):] proto = pat break # remove passwords and username uri = uri[uri.find('@') + 1:] # get the port cred_pos = uri.find(':') if cred_pos == -1: host, port = uri, None else: host, port = uri[:cred_pos], uri[cred_pos + 1:] return filter(None, [proto, host, port]) def credentials_filter(uri): """ Returns a url with removed credentials :param uri: """ uri = uri_filter(uri) #check if we have port if len(uri) > 2 and uri[2]: uri[2] = ':' + uri[2] return ''.join(uri) def get_changeset_safe(repo, rev): """ Safe version of get_changeset if this changeset doesn't exists for a repo it returns a Dummy one instead :param repo: :param rev: """ from rhodecode.lib.vcs.backends.base import BaseRepository from rhodecode.lib.vcs.exceptions import RepositoryError if not isinstance(repo, BaseRepository): raise Exception('You must pass an Repository ' 'object as first argument got %s', type(repo)) try: cs = repo.get_changeset(rev) except RepositoryError: from rhodecode.lib.utils import EmptyChangeset cs = EmptyChangeset(requested_revision=rev) return cs def get_current_revision(quiet=False): """ Returns tuple of (number, id) from repository containing this package or None if repository could not be found. :param quiet: prints error for fetching revision if True """ try: from rhodecode.lib.vcs import get_repo from rhodecode.lib.vcs.utils.helpers import get_scm repopath = os.path.join(os.path.dirname(__file__), '..', '..') scm = get_scm(repopath)[0] repo = get_repo(path=repopath, alias=scm) tip = repo.get_changeset() return (tip.revision, tip.short_id) except Exception, err: if not quiet: print ("Cannot retrieve rhodecode's revision. Original error " "was: %s" % err) return None def extract_mentioned_users(s): """ Returns unique usernames from given string s that have @mention :param s: string to get mentions """ usrs = {} for username in re.findall(r'(?:^@|\s@)(\w+)', s): usrs[username] = username return sorted(usrs.keys())