# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
'
' ' + ls + ' | '
'')
else:
yield 0, ('
" "Date: %s " "Message: %s") % (author, date, message) lnk_format = show_id(changeset) uri = link_to( lnk_format, url('changeset_home', repo_name=repo_name, revision=changeset.raw_id), style=get_color_string(changeset.raw_id), **{'data-toggle': 'popover', 'data-content': tooltip_html} ) uri += '\n' return uri return literal(markup_whitespace(annotate_highlight(filenode, url_func, **kwargs))) class _Message(object): """A message returned by ``pop_flash_messages()``. Converting the message to a string returns the message text. Instances also have the following attributes: * ``category``: the category specified when the message was created. * ``message``: the html-safe message text. """ def __init__(self, category, message): self.category = category self.message = message def _session_flash_messages(append=None, clear=False): """Manage a message queue in tg.session: return the current message queue after appending the given message, and possibly clearing the queue.""" key = 'flash' from tg import session if key in session: flash_messages = session[key] else: if append is None: # common fast path - also used for clearing empty queue return [] # don't bother saving flash_messages = [] session[key] = flash_messages if append is not None and append not in flash_messages: flash_messages.append(append) if clear: session.pop(key, None) session.save() return flash_messages def flash(message, category, logf=None): """ Show a message to the user _and_ log it through the specified function category: notice (default), warning, error, success logf: a custom log function - such as log.debug logf defaults to log.info, unless category equals 'success', in which case logf defaults to log.debug. """ assert category in ('error', 'success', 'warning'), category if hasattr(message, '__html__'): # render to HTML for storing in cookie safe_message = unicode(message) else: # Apply str - the message might be an exception with __str__ # Escape, so we can trust the result without further escaping, without any risk of injection safe_message = html_escape(unicode(message)) if logf is None: logf = log.info if category == 'success': logf = log.debug logf('Flash %s: %s', category, safe_message) _session_flash_messages(append=(category, safe_message)) def pop_flash_messages(): """Return all accumulated messages and delete them from the session. The return value is a list of ``Message`` objects. """ return [_Message(category, message) for category, message in _session_flash_messages(clear=True)] age = lambda x, y=False: _age(x, y) capitalize = lambda x: x.capitalize() email = author_email short_id = lambda x: x[:12] hide_credentials = lambda x: ''.join(credentials_filter(x)) def show_id(cs): """ Configurable function that shows ID by default it's r123:fffeeefffeee :param cs: changeset instance """ from kallithea import CONFIG def_len = safe_int(CONFIG.get('show_sha_length', 12)) show_rev = str2bool(CONFIG.get('show_revision_number', False)) raw_id = cs.raw_id[:def_len] if show_rev: return 'r%s:%s' % (cs.revision, raw_id) else: return raw_id def fmt_date(date): if date: return date.strftime("%Y-%m-%d %H:%M:%S") return "" def is_git(repository): if hasattr(repository, 'alias'): _type = repository.alias elif hasattr(repository, 'repo_type'): _type = repository.repo_type else: _type = repository return _type == 'git' def is_hg(repository): if hasattr(repository, 'alias'): _type = repository.alias elif hasattr(repository, 'repo_type'): _type = repository.repo_type else: _type = repository return _type == 'hg' @cache_region('long_term', 'user_attr_or_none') def user_attr_or_none(author, show_attr): """Try to match email part of VCS committer string with a local user and return show_attr - or return None if user not found""" email = author_email(author) if email: from kallithea.model.db import User user = User.get_by_email(email, cache=True) # cache will only use sql_cache_short if user is not None: return getattr(user, show_attr) return None def email_or_none(author): """Try to match email part of VCS committer string with a local user. Return primary email of user, email part of the specified author name, or None.""" if not author: return None email = user_attr_or_none(author, 'email') if email is not None: return email # always use user's main email address - not necessarily the one used to find user # extract email from the commit string email = author_email(author) if email: return email # No valid email, not a valid user in the system, none! return None def person(author, show_attr="username"): """Find the user identified by 'author', return one of the users attributes, default to the username attribute, None if there is no user""" from kallithea.model.db import User # if author is already an instance use it for extraction if isinstance(author, User): return getattr(author, show_attr) value = user_attr_or_none(author, show_attr) if value is not None: return value # Still nothing? Just pass back the author name if any, else the email return author_name(author) or email(author) def person_by_id(id_, show_attr="username"): from kallithea.model.db import User # attr to return from fetched user person_getter = lambda usr: getattr(usr, show_attr) # maybe it's an ID ? if str(id_).isdigit() or isinstance(id_, int): id_ = int(id_) user = User.get(id_) if user is not None: return person_getter(user) return id_ def boolicon(value): """Returns boolean value of a value, represented as small html image of true/false icons :param value: value """ if value: return HTML.tag('i', class_="icon-ok") else: return HTML.tag('i', class_="icon-minus-circled") def action_parser(user_log, feed=False, parse_cs=False): """ This helper will action_map the specified string action into translated fancy names with icons and links :param user_log: user log instance :param feed: use output for feeds (no html and fancy icons) :param parse_cs: parse Changesets into VCS instances """ action = user_log.action action_params = ' ' x = action.split(':') if len(x) > 1: action, action_params = x def get_cs_links(): revs_limit = 3 # display this amount always revs_top_limit = 50 # show upto this amount of changesets hidden revs_ids = action_params.split(',') deleted = user_log.repository is None if deleted: return ','.join(revs_ids) repo_name = user_log.repository.repo_name def lnk(rev, repo_name): lazy_cs = False title_ = None url_ = '#' if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict): if rev.op and rev.ref_name: if rev.op == 'delete_branch': lbl = _('Deleted branch: %s') % rev.ref_name elif rev.op == 'tag': lbl = _('Created tag: %s') % rev.ref_name else: lbl = 'Unknown operation %s' % rev.op else: lazy_cs = True lbl = rev.short_id[:8] url_ = url('changeset_home', repo_name=repo_name, revision=rev.raw_id) else: # changeset cannot be found - it might have been stripped or removed lbl = rev[:12] title_ = _('Changeset %s not found') % lbl if parse_cs: return link_to(lbl, url_, title=title_, **{'data-toggle': 'tooltip'}) return link_to(lbl, url_, class_='lazy-cs' if lazy_cs else '', **{'data-raw_id': rev.raw_id, 'data-repo_name': repo_name}) def _get_op(rev_txt): _op = None _name = rev_txt if len(rev_txt.split('=>')) == 2: _op, _name = rev_txt.split('=>') return _op, _name revs = [] if len([v for v in revs_ids if v != '']) > 0: repo = None for rev in revs_ids[:revs_top_limit]: _op, _name = _get_op(rev) # we want parsed changesets, or new log store format is bad if parse_cs: try: if repo is None: repo = user_log.repository.scm_instance _rev = repo.get_changeset(rev) revs.append(_rev) except ChangesetDoesNotExistError: log.error('cannot find revision %s in this repo', rev) revs.append(rev) else: _rev = AttributeDict({ 'short_id': rev[:12], 'raw_id': rev, 'message': '', 'op': _op, 'ref_name': _name }) revs.append(_rev) cs_links = [" " + ', '.join( [lnk(rev, repo_name) for rev in revs[:revs_limit]] )] _op1, _name1 = _get_op(revs_ids[0]) _op2, _name2 = _get_op(revs_ids[-1]) _rev = '%s...%s' % (_name1, _name2) compare_view = ( ' '
'%s ' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0][:12], revs_ids[-1][:12]
),
url('changeset_home', repo_name=repo_name,
revision=_rev
),
_('Compare view')
)
)
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
cs_links.append(", " + lnk(revs[revs_limit], repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
' %s %s %s'
)
if not feed:
cs_links.append(html_tmpl % (
_('and'),
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
_('revisions')
)
)
if not feed:
html_tmpl = ' '
else:
html_tmpl = ' %s '
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
)
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
cs_links.append(compare_view)
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
url_ = url('summary_home', repo_name=repo_name)
return _('Fork name %s') % link_to(action_params, url_)
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
from kallithea.model.db import PullRequest
pull_request_id = action_params
nice_id = PullRequest.make_nice_id(pull_request_id)
deleted = user_log.repository is None
if deleted:
repo_name = user_log.repository_name
else:
repo_name = user_log.repository.repo_name
return link_to(_('Pull request %s') % nice_id,
url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request_id))
def get_archive_name():
archive_name = action_params
return archive_name
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'icon-trashcan'),
'user_created_repo': (_('[created] repository'),
None, 'icon-plus'),
'user_created_fork': (_('[created] repository as fork'),
None, 'icon-fork'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'icon-fork'),
'user_updated_repo': (_('[updated] repository'),
None, 'icon-pencil'),
'user_downloaded_archive': (_('[downloaded] archive from repository'),
get_archive_name, 'icon-download-cloud'),
'admin_deleted_repo': (_('[delete] repository'),
None, 'icon-trashcan'),
'admin_created_repo': (_('[created] repository'),
None, 'icon-plus'),
'admin_forked_repo': (_('[forked] repository'),
None, 'icon-fork'),
'admin_updated_repo': (_('[updated] repository'),
None, 'icon-pencil'),
'admin_created_user': (_('[created] user'),
get_user_name, 'icon-user'),
'admin_updated_user': (_('[updated] user'),
get_user_name, 'icon-user'),
'admin_created_users_group': (_('[created] user group'),
get_users_group, 'icon-pencil'),
'admin_updated_users_group': (_('[updated] user group'),
get_users_group, 'icon-pencil'),
'user_commented_revision': (_('[commented] on revision in repository'),
get_cs_links, 'icon-comment'),
'user_commented_pull_request': (_('[commented] on pull request for'),
get_pull_request, 'icon-comment'),
'user_closed_pull_request': (_('[closed] pull request for'),
get_pull_request, 'icon-ok'),
'push': (_('[pushed] into'),
get_cs_links, 'icon-move-up'),
'push_local': (_('[committed via Kallithea] into repository'),
get_cs_links, 'icon-pencil'),
'push_remote': (_('[pulled from remote] into repository'),
get_cs_links, 'icon-move-up'),
'pull': (_('[pulled] from'),
None, 'icon-move-down'),
'started_following_repo': (_('[started following] repository'),
None, 'icon-heart'),
'stopped_following_repo': (_('[stopped following] repository'),
None, 'icon-heart-empty'),
}
action_str = action_map.get(action, action)
if feed:
action = action_str[0].replace('[', '').replace(']', '')
else:
action = action_str[0] \
.replace('[', '') \
.replace(']', '')
action_params_func = lambda: ""
if callable(action_str[1]):
action_params_func = action_str[1]
def action_parser_icon():
action = user_log.action
action_params = None
x = action.split(':')
if len(x) > 1:
action, action_params = x
ico = action_map.get(action, ['', '', ''])[2]
html = """""" % ico
return literal(html)
# returned callbacks we need to call to get
return [lambda: literal(action), action_params_func, action_parser_icon]
#==============================================================================
# GRAVATAR URL
#==============================================================================
def gravatar_div(email_address, cls='', size=30, **div_attributes):
"""Return an html literal with a span around a gravatar if they are enabled.
Extra keyword parameters starting with 'div_' will get the prefix removed
and '_' changed to '-' and be used as attributes on the div. The default
class is 'gravatar'.
"""
from tg import tmpl_context as c
if not c.visual.use_gravatar:
return ''
if 'div_class' not in div_attributes:
div_attributes['div_class'] = "gravatar"
attributes = []
for k, v in sorted(div_attributes.items()):
assert k.startswith('div_'), k
attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
return literal("""%s""" %
(''.join(attributes),
gravatar(email_address, cls=cls, size=size)))
def gravatar(email_address, cls='', size=30):
"""return html element of the gravatar
This method will return an with the resolution double the size (for
retina screens) of the image. If the url returned from gravatar_url is
empty then we fallback to using an icon.
"""
from tg import tmpl_context as c
if not c.visual.use_gravatar:
return ''
src = gravatar_url(email_address, size * 2)
if src:
# here it makes sense to use style="width: ..." (instead of, say, a
# stylesheet) because we using this to generate a high-res (retina) size
html = ('').format(cls=cls, size=size, src=src)
else:
# if src is empty then there was no gravatar, so we use a font icon
html = (""""""
.format(cls=cls, size=size, src=src))
return literal(html)
def gravatar_url(email_address, size=30, default=''):
# doh, we need to re-import those to mock it later
from kallithea.config.routing import url
from kallithea.model.db import User
from tg import tmpl_context as c
if not c.visual.use_gravatar:
return ""
_def = 'anonymous@kallithea-scm.org' # default gravatar
email_address = email_address or _def
if email_address == _def:
return default
parsed_url = urllib.parse.urlparse(url.current(qualified=True))
url = (c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL) \
.replace('{email}', email_address) \
.replace('{md5email}', hashlib.md5(safe_bytes(email_address).lower()).hexdigest()) \
.replace('{netloc}', parsed_url.netloc) \
.replace('{scheme}', parsed_url.scheme) \
.replace('{size}', str(size))
return url
def changed_tooltip(nodes):
"""
Generates a html string for changed nodes in changeset page.
It limits the output to 30 entries
:param nodes: LazyNodesGenerator
"""
if nodes:
pref = ': ' suf = '' if len(nodes) > 30: suf = ' ' + _(' and %s more') % (len(nodes) - 30) return literal(pref + ' '.join([x.path for x in nodes[:30]]) + suf) else: return ': ' + _('No files') def fancy_file_stats(stats): """ Displays a fancy two colored bar for number of added/deleted lines of code on file :param stats: two element list of added/deleted lines of code """ from kallithea.lib.diffs import NEW_FILENODE, DEL_FILENODE, \ MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE a, d = stats['added'], stats['deleted'] width = 100 if stats['binary']: # binary mode lbl = '' bin_op = 1 if BIN_FILENODE in stats['ops']: lbl = 'bin+' if NEW_FILENODE in stats['ops']: lbl += _('new file') bin_op = NEW_FILENODE elif MOD_FILENODE in stats['ops']: lbl += _('mod') bin_op = MOD_FILENODE elif DEL_FILENODE in stats['ops']: lbl += _('del') bin_op = DEL_FILENODE elif RENAMED_FILENODE in stats['ops']: lbl += _('rename') bin_op = RENAMED_FILENODE # chmod can go with other operations if CHMOD_FILENODE in stats['ops']: _org_lbl = _('chmod') lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl #import ipdb;ipdb.set_trace() b_d = ' ' % (bin_op, lbl) b_a = '' return literal(' %s%s ' % (width, b_a, b_d))
t = stats['added'] + stats['deleted']
unit = float(width) / (t or 1)
# needs > 9% of width to be visible or 0 to be hidden
a_p = max(9, unit * a) if a > 0 else 0
d_p = max(9, unit * d) if d > 0 else 0
p_sum = a_p + d_p
if p_sum > width:
# adjust the percentage to be == 100% since we adjusted to 9
if a_p > d_p:
a_p = a_p - (p_sum - width)
else:
d_p = d_p - (p_sum - width)
a_v = a if a > 0 else ''
d_v = d if d > 0 else ''
d_a = ' ' % (
a_p, a_v
)
d_d = ' ' % (
d_p, d_v
)
return literal('%s%s ' % (width, d_a, d_d))
_URLIFY_RE = re.compile(r'''
# URL markup
(?P').replace('\n', ' ') # Turn HTML5 into more valid HTML4 as required by some mail readers. # (This is not done in one step in html_escape, because character codes like # { risk to be seen as an issue reference due to the presence of '#'.) s = s.replace("'", "'") return literal(s) def linkify_others(t, l): """Add a default link to html with links. HTML doesn't allow nesting of links, so the outer link must be broken up in pieces and give space for other links. """ urls = re.compile(r'(\ %s ' % s)
def short_ref(ref_type, ref_name):
if ref_type == 'rev':
return short_id(ref_name)
return ref_name
def link_to_ref(repo_name, ref_type, ref_name, rev=None):
"""
Return full markup for a href to changeset_home for a changeset.
If ref_type is branch it will link to changelog.
ref_name is shortened if ref_type is 'rev'.
if rev is specified show it too, explicitly linking to that revision.
"""
txt = short_ref(ref_type, ref_name)
if ref_type == 'branch':
u = url('changelog_home', repo_name=repo_name, branch=ref_name)
else:
u = url('changeset_home', repo_name=repo_name, revision=ref_name)
l = link_to(repo_name + '#' + txt, u)
if rev and ref_type != 'rev':
l = literal('%s (%s)' % (l, link_to(short_id(rev), url('changeset_home', repo_name=repo_name, revision=rev))))
return l
def changeset_status(repo, revision):
from kallithea.model.changeset_status import ChangesetStatusModel
return ChangesetStatusModel().get_status(repo, revision)
def changeset_status_lbl(changeset_status):
from kallithea.model.db import ChangesetStatus
return ChangesetStatus.get_status_lbl(changeset_status)
def get_permission_name(key):
from kallithea.model.db import Permission
return dict(Permission.PERMS).get(key)
def journal_filter_help():
return _(textwrap.dedent('''
Example filter terms:
repository:vcs
username:developer
action:*push*
ip:127.0.0.1
date:20120101
date:[20120101100000 TO 20120102]
Generate wildcards using '*' character:
"repository:vcs*" - search everything starting with 'vcs'
"repository:*vcs*" - search for repository containing 'vcs'
Optional AND / OR operators in queries
"repository:vcs OR repository:test"
"username:test AND repository:test*"
'''))
def not_mapped_error(repo_name):
flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name, category='error')
def ip_range(ip_addr):
from kallithea.model.db import UserIpMap
s, e = UserIpMap._get_ip_range(ip_addr)
return '%s - %s' % (s, e)
session_csrf_secret_name = "_session_csrf_secret_token"
def session_csrf_secret_token():
"""Return (and create) the current session's CSRF protection token."""
from tg import session
if not session_csrf_secret_name in session:
session[session_csrf_secret_name] = str(random.getrandbits(128))
session.save()
return session[session_csrf_secret_name]
def form(url, method="post", **attrs):
"""Like webhelpers.html.tags.form , but automatically adding
session_csrf_secret_token for POST. The secret is thus never leaked in GET
URLs.
"""
form = insecure_form(url, method, **attrs)
if method.lower() == 'get':
return form
return form + HTML.div(hidden(session_csrf_secret_name, session_csrf_secret_token()), style="display: none;")
|