# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
' ' + ls + ' | ')
yield 0, ('
" %s ")
tooltip_html = tooltip_html % (author, date, message)
lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
uri = link_to(
url('changeset_home', repo_name=repo_name,
uri += '\n'
return uri
return _url_func
return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
def is_following_repo(repo_name, user_id):
from rhodecode.model.scm import ScmModel
return ScmModel().is_following_repo(repo_name, user_id)
class _Message(object):
"""A message returned by ``Flash.pop_messages()``.
Converting the message to a string returns the message text. Instances
also have the following attributes:
* ``message``: the message text.
* ``category``: the category specified when the message was created.
def __init__(self, category, message):
self.category = category
self.message = message
def __str__(self):
return self.message
__unicode__ = __str__
def __html__(self):
return escape(safe_unicode(self.message))
class Flash(_Flash):
def pop_messages(self):
"""Return all accumulated messages and delete them from the session.
The return value is a list of ``Message`` objects.
from pylons import session
messages = session.pop(self.session_key, [])
return [_Message(*m) for m in messages]
flash = Flash()
# SCM FILTERS available via h.
from rhodecode.lib.vcs.utils import author_name, author_email
from rhodecode.lib.utils2 import credentials_filter, age as _age
from rhodecode.model.db import User, ChangesetStatus
age = lambda x, y=False: _age(x, y)
capitalize = lambda x: x.capitalize()
email = author_email
short_id = lambda x: x[:12]
hide_credentials = lambda x: ''.join(credentials_filter(x))
def show_id(cs):
Configurable function that shows ID
by default it's r123:fffeeefffeee
:param cs: changeset instance
from rhodecode import CONFIG
def_len = safe_int(CONFIG.get('show_sha_length', 12))
show_rev = str2bool(CONFIG.get('show_revision_number', True))
raw_id = cs.raw_id[:def_len]
if show_rev:
return 'r%s:%s' % (cs.revision, raw_id)
return '%s' % (raw_id)
def fmt_date(date):
if date:
_fmt = u"%a, %d %b %Y %H:%M:%S".encode('utf8')
return date.strftime(_fmt).decode('utf8')
return ""
def is_git(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
_type = repository
return _type == 'git'
def is_hg(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
_type = repository
return _type == 'hg'
def email_or_none(author):
# extract email from the commit string
_email = email(author)
if _email != '':
# check it against RhodeCode database, and use the MAIN email for this
# user
user = User.get_by_email(_email, case_insensitive=True, cache=True)
if user is not None:
return user.email
return _email
# See if it contains a username we can get an email from
user = User.get_by_username(author_name(author), case_insensitive=True,
if user is not None:
return user.email
# No valid email, not a valid user in the system, none!
return None
def person(author, show_attr="username_and_name"):
# attr to return from fetched user
person_getter = lambda usr: getattr(usr, show_attr)
# if author is already an instance use it for extraction
if isinstance(author, User):
return person_getter(author)
# Valid email in the attribute passed, see if they're in the system
_email = email(author)
if _email != '':
user = User.get_by_email(_email, case_insensitive=True, cache=True)
if user is not None:
return person_getter(user)
# Maybe it's a username?
_author = author_name(author)
user = User.get_by_username(_author, case_insensitive=True,
if user is not None:
return person_getter(user)
# Still nothing? Just pass back the author name if any, else the email
return _author or _email
def person_by_id(id_, show_attr="username_and_name"):
# attr to return from fetched user
person_getter = lambda usr: getattr(usr, show_attr)
#maybe it's an ID ?
if str(id_).isdigit() or isinstance(id_, int):
id_ = int(id_)
user = User.get(id_)
if user is not None:
return person_getter(user)
return id_
def desc_stylize(value):
converts tags from value into html equivalent
:param value:
if not value:
return ''
value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
' ', value)
value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
' ', value)
value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z0-9\-\/]*)\]',
' ', value)
value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/\#\+]*)\]',
' ', value)
value = re.sub(r'\[([a-z]+)\]',
' ', value)
return value
def boolicon(value):
"""Returns boolean value of a value, represented as small html image of true/false
:param value: value
if value:
return HTML.tag('i', class_="icon-ok-sign")
return HTML.tag('i', class_="icon-minus-sign")
def action_parser(user_log, feed=False, parse_cs=False):
This helper will action_map the specified string action into translated
fancy names with icons and links
:param user_log: user log instance
:param feed: use output for feeds (no html and fancy icons)
:param parse_cs: parse Changesets into VCS instances
action = user_log.action
action_params = ' '
x = action.split(':')
if len(x) > 1:
action, action_params = x
def get_cs_links():
revs_limit = 3 # display this amount always
revs_top_limit = 50 # show upto this amount of changesets hidden
revs_ids = action_params.split(',')
deleted = user_log.repository is None
if deleted:
return ','.join(revs_ids)
repo_name = user_log.repository.repo_name
def lnk(rev, repo_name):
if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
lazy_cs = True
if getattr(rev, 'op', None) and getattr(rev, 'ref_name', None):
lazy_cs = False
lbl = '?'
if rev.op == 'delete_branch':
lbl = '%s' % _('Deleted branch: %s') % rev.ref_name
title = ''
elif rev.op == 'tag':
lbl = '%s' % _('Created tag: %s') % rev.ref_name
title = ''
_url = '#'
lbl = '%s' % (rev.short_id[:8])
_url = url('changeset_home', repo_name=repo_name,
title = tooltip(rev.message)
## changeset cannot be found/striped/removed etc.
lbl = ('%s' % rev)[:12]
_url = '#'
title = _('Changeset not found')
if parse_cs:
return link_to(lbl, _url, title=title, class_='tooltip')
return link_to(lbl, _url, raw_id=rev.raw_id, repo_name=repo_name,
class_='lazy-cs' if lazy_cs else '')
def _get_op(rev_txt):
_op = None
_name = rev_txt
if len(rev_txt.split('=>')) == 2:
_op, _name = rev_txt.split('=>')
return _op, _name
revs = []
if len(filter(lambda v: v != '', revs_ids)) > 0:
repo = None
for rev in revs_ids[:revs_top_limit]:
_op, _name = _get_op(rev)
# we want parsed changesets, or new log store format is bad
if parse_cs:
if repo is None:
repo = user_log.repository.scm_instance
_rev = repo.get_changeset(rev)
except ChangesetDoesNotExistError:
log.error('cannot find revision %s in this repo' % rev)
_rev = AttributeDict({
'short_id': rev[:12],
'raw_id': rev,
'message': '',
'op': _op,
'ref_name': _name
cs_links = [" " + ', '.join(
[lnk(rev, repo_name) for rev in revs[:revs_limit]]
_op1, _name1 = _get_op(revs_ids[0])
_op2, _name2 = _get_op(revs_ids[-1])
_rev = '%s...%s' % (_name1, _name2)
compare_view = (
' Date: %s Message:" " %s '
'%s ' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0][:12], revs_ids[-1][:12]
url('changeset_home', repo_name=repo_name,
_('compare view')
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
rev = revs[revs_limit]
cs_links.append(", " + lnk(rev, repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
' %s %s %s'
if not feed:
cs_links.append(html_tmpl % (
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
if not feed:
html_tmpl = ' '
html_tmpl = ' %s '
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
_url = url('summary_home', repo_name=repo_name)
return _('fork name %s') % link_to(action_params, _url)
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
pull_request_id = action_params
deleted = user_log.repository is None
if deleted:
repo_name = user_log.repository_name
repo_name = user_log.repository.repo_name
return link_to(_('Pull request #%s') % pull_request_id,
url('pullrequest_show', repo_name=repo_name,
def get_archive_name():
archive_name = action_params
return archive_name
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'icon-trash'),
'user_created_repo': (_('[created] repository'),
None, 'icon-plus icon-plus-colored'),
'user_created_fork': (_('[created] repository as fork'),
None, 'icon-code-fork'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'icon-code-fork'),
'user_updated_repo': (_('[updated] repository'),
None, 'icon-pencil icon-pencil-colored'),
'user_downloaded_archive': (_('[downloaded] archive from repository'),
get_archive_name, 'icon-download-alt'),
'admin_deleted_repo': (_('[delete] repository'),
None, 'icon-trash'),
'admin_created_repo': (_('[created] repository'),
None, 'icon-plus icon-plus-colored'),
'admin_forked_repo': (_('[forked] repository'),
None, 'icon-code-fork icon-fork-colored'),
'admin_updated_repo': (_('[updated] repository'),
None, 'icon-pencil icon-pencil-colored'),
'admin_created_user': (_('[created] user'),
get_user_name, 'icon-user icon-user-colored'),
'admin_updated_user': (_('[updated] user'),
get_user_name, 'icon-user icon-user-colored'),
'admin_created_users_group': (_('[created] user group'),
get_users_group, 'icon-pencil icon-pencil-colored'),
'admin_updated_users_group': (_('[updated] user group'),
get_users_group, 'icon-pencil icon-pencil-colored'),
'user_commented_revision': (_('[commented] on revision in repository'),
get_cs_links, 'icon-comment icon-comment-colored'),
'user_commented_pull_request': (_('[commented] on pull request for'),
get_pull_request, 'icon-comment icon-comment-colored'),
'user_closed_pull_request': (_('[closed] pull request for'),
get_pull_request, 'icon-check'),
'push': (_('[pushed] into'),
get_cs_links, 'icon-arrow-up'),
'push_local': (_('[committed via RhodeCode] into repository'),
get_cs_links, 'icon-pencil icon-pencil-colored'),
'push_remote': (_('[pulled from remote] into repository'),
get_cs_links, 'icon-arrow-up'),
'pull': (_('[pulled] from'),
None, 'icon-arrow-down'),
'started_following_repo': (_('[started following] repository'),
None, 'icon-heart icon-heart-colored'),
'stopped_following_repo': (_('[stopped following] repository'),
None, 'icon-heart-empty icon-heart-colored'),
action_str = action_map.get(action, action)
if feed:
action = action_str[0].replace('[', '').replace(']', '')
action = action_str[0]\
.replace('[', '')\
.replace(']', '')
action_params_func = lambda: ""
if callable(action_str[1]):
action_params_func = action_str[1]
def action_parser_icon():
action = user_log.action
action_params = None
x = action.split(':')
if len(x) > 1:
action, action_params = x
tmpl = """"""
ico = action_map.get(action, ['', '', ''])[2]
return literal(tmpl % (ico, action))
# returned callbacks we need to call to get
return [lambda: literal(action), action_params_func, action_parser_icon]
from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
HasRepoPermissionAny, HasRepoPermissionAll, HasRepoGroupPermissionAll, \
def gravatar_url(email_address, size=30, ssl_enabled=True):
# doh, we need to re-import those to mock it later
from pylons import url
from pylons import tmpl_context as c
_def = 'anonymous@rhodecode.org' # default gravatar
_use_gravatar = c.visual.use_gravatar
_gravatar_url = c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL
email_address = email_address or _def
if isinstance(email_address, unicode):
#hashlib crashes on unicode items
email_address = safe_str(email_address)
if not _use_gravatar or not email_address or email_address == _def:
# pick best matching size to one given in size param
f = lambda a, l: min(l, key=lambda x: abs(x - a))
return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
if _use_gravatar:
_md5 = lambda s: hashlib.md5(s).hexdigest()
tmpl = _gravatar_url
parsed_url = urlparse.urlparse(url.current(qualified=True))
tmpl = tmpl.replace('{email}', email_address)\
.replace('{md5email}', _md5(email_address.lower())) \
.replace('{netloc}', parsed_url.netloc)\
.replace('{scheme}', parsed_url.scheme)\
.replace('{size}', safe_str(size))
return tmpl
class Page(_Page):
Custom pager to match rendering style with YUI paginator
def _get_pos(self, cur_page, max_page, items):
edge = (items / 2) + 1
if (cur_page <= edge):
radius = max(items / 2, items - cur_page)
elif (max_page - cur_page) < edge:
radius = (items - 1) - (max_page - cur_page)
radius = items / 2
left = max(1, (cur_page - (radius)))
right = min(max_page, cur_page + (radius))
return left, cur_page, right
def _range(self, regexp_match):
Return range of linked pages (e.g. '1 2 [3] 4 5 6 7 8').
A "re" (regular expressions) match object containing the
radius of linked pages around the current page in
regexp_match.group(1) as a string
This function is supposed to be called as a callable in
radius = int(regexp_match.group(1))
# Compute the first and last page number within the radius
# e.g. '1 .. 5 6 [7] 8 9 .. 12'
# -> leftmost_page = 5
# -> rightmost_page = 9
leftmost_page, _cur, rightmost_page = self._get_pos(self.page,
(radius * 2) + 1)
nav_items = []
# Create a link to the first page (unless we are on the first page
# or there would be no need to insert '..' spacers)
if self.page != self.first_page and self.first_page < leftmost_page:
nav_items.append(self._pagerlink(self.first_page, self.first_page))
# Insert dots if there are pages between the first page
# and the currently displayed page range
if leftmost_page - self.first_page > 1:
# Wrap in a SPAN tag if nolink_attr is set
text = '..'
if self.dotdot_attr:
text = HTML.span(c=text, **self.dotdot_attr)
for thispage in xrange(leftmost_page, rightmost_page + 1):
# Hilight the current page number and do not use a link
if thispage == self.page:
text = '%s' % (thispage,)
# Wrap in a SPAN tag if nolink_attr is set
if self.curpage_attr:
text = HTML.span(c=text, **self.curpage_attr)
# Otherwise create just a link to that page
text = '%s' % (thispage,)
nav_items.append(self._pagerlink(thispage, text))
# Insert dots if there are pages between the displayed
# page numbers and the end of the page range
if self.last_page - rightmost_page > 1:
text = '..'
# Wrap in a SPAN tag if nolink_attr is set
if self.dotdot_attr:
text = HTML.span(c=text, **self.dotdot_attr)
# Create a link to the very last page (unless we are on the last
# page or there would be no need to insert '..' spacers)
if self.page != self.last_page and rightmost_page < self.last_page:
nav_items.append(self._pagerlink(self.last_page, self.last_page))
## prerender links
#_page_link = url.current()
#nav_items.append(literal('' % (_page_link, str(int(self.page)+1))))
#nav_items.append(literal('' % (_page_link, str(int(self.page)+1))))
return self.separator.join(nav_items)
def pager(self, format='~2~', page_param='page', partial_param='partial',
show_if_single_page=False, separator=' ', onclick=None,
symbol_first='<<', symbol_last='>>',
symbol_previous='<', symbol_next='>',
link_attr={'class': 'pager_link', 'rel': 'prerender'},
curpage_attr={'class': 'pager_curpage'},
dotdot_attr={'class': 'pager_dotdot'}, **kwargs):
self.curpage_attr = curpage_attr
self.separator = separator
self.pager_kwargs = kwargs
self.page_param = page_param
self.partial_param = partial_param
self.onclick = onclick
self.link_attr = link_attr
self.dotdot_attr = dotdot_attr
# Don't show navigator if there is no more than one page
if self.page_count == 0 or (self.page_count == 1 and not show_if_single_page):
return ''
from string import Template
# Replace ~...~ in token format by range of pages
result = re.sub(r'~(\d+)~', self._range, format)
# Interpolate '%' variables
result = Template(result).safe_substitute({
'first_page': self.first_page,
'last_page': self.last_page,
'page': self.page,
'page_count': self.page_count,
'items_per_page': self.items_per_page,
'first_item': self.first_item,
'last_item': self.last_item,
'item_count': self.item_count,
'link_first': self.page > self.first_page and \
self._pagerlink(self.first_page, symbol_first) or '',
'link_last': self.page < self.last_page and \
self._pagerlink(self.last_page, symbol_last) or '',
'link_previous': self.previous_page and \
self._pagerlink(self.previous_page, symbol_previous) \
or HTML.span(symbol_previous, class_="yui-pg-previous"),
'link_next': self.next_page and \
self._pagerlink(self.next_page, symbol_next) \
or HTML.span(symbol_next, class_="yui-pg-next")
return literal(result)
class RepoPage(Page):
def __init__(self, collection, page=1, items_per_page=20,
item_count=None, url=None, **kwargs):
"""Create a "RepoPage" instance. special pager for paging
self._url_generator = url
# Safe the kwargs class-wide so they can be used in the pager() method
self.kwargs = kwargs
# Save a reference to the collection
self.original_collection = collection
self.collection = collection
# The self.page is the number of the current page.
# The first page has the number 1!
self.page = int(page) # make it int() if we get it as a string
except (ValueError, TypeError):
self.page = 1
self.items_per_page = items_per_page
# Unless the user tells us how many items the collections has
# we calculate that ourselves.
if item_count is not None:
self.item_count = item_count
self.item_count = len(self.collection)
# Compute the number of the first and last available page
if self.item_count > 0:
self.first_page = 1
self.page_count = int(math.ceil(float(self.item_count) /
self.last_page = self.first_page + self.page_count - 1
# Make sure that the requested page number is the range of
# valid pages
if self.page > self.last_page:
self.page = self.last_page
elif self.page < self.first_page:
self.page = self.first_page
# Note: the number of items on this page can be less than
# items_per_page if the last page is not full
self.first_item = max(0, (self.item_count) - (self.page *
self.last_item = ((self.item_count - 1) - items_per_page *
(self.page - 1))
self.items = list(self.collection[self.first_item:self.last_item + 1])
# Links to previous and next page
if self.page > self.first_page:
self.previous_page = self.page - 1
self.previous_page = None
if self.page < self.last_page:
self.next_page = self.page + 1
self.next_page = None
# No items available
self.first_page = None
self.page_count = 0
self.last_page = None
self.first_item = None
self.last_item = None
self.previous_page = None
self.next_page = None
self.items = []
# This is a subclass of the 'list' type. Initialise the list now.
list.__init__(self, reversed(self.items))
def changed_tooltip(nodes):
Generates a html string for changed nodes in changeset page.
It limits the output to 30 entries
:param nodes: LazyNodesGenerator
if nodes:
pref = ': ' suf = '' if len(nodes) > 30: suf = ' ' + _(' and %s more') % (len(nodes) - 30) return literal(pref + ' '.join([safe_unicode(x.path) for x in nodes[:30]]) + suf) else: return ': ' + _('No Files') def repo_link(groups_and_repos): """ Makes a breadcrumbs link to repo within a group joins » on each group to create a fancy link ex:: group >> subgroup >> repo :param groups_and_repos: :param last_url: """ groups, just_name, repo_name = groups_and_repos last_url = url('summary_home', repo_name=repo_name) last_link = link_to(just_name, last_url) def make_link(group): return link_to(group.name, url('repos_group_home', group_name=group.group_name)) return literal(' » '.join(map(make_link, groups) + ['%s' % last_link])) def fancy_file_stats(stats): """ Displays a fancy two colored bar for number of added/deleted lines of code on file :param stats: two element list of added/deleted lines of code """ from rhodecode.lib.diffs import NEW_FILENODE, DEL_FILENODE, \ MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE def cgen(l_type, a_v, d_v): mapping = {'tr': 'top-right-rounded-corner-mid', 'tl': 'top-left-rounded-corner-mid', 'br': 'bottom-right-rounded-corner-mid', 'bl': 'bottom-left-rounded-corner-mid'} map_getter = lambda x: mapping[x] if l_type == 'a' and d_v: #case when added and deleted are present return ' '.join(map(map_getter, ['tl', 'bl'])) if l_type == 'a' and not d_v: return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl'])) if l_type == 'd' and a_v: return ' '.join(map(map_getter, ['tr', 'br'])) if l_type == 'd' and not a_v: return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl'])) a, d = stats['added'], stats['deleted'] width = 100 if stats['binary']: #binary mode lbl = '' bin_op = 1 if BIN_FILENODE in stats['ops']: lbl = 'bin+' if NEW_FILENODE in stats['ops']: lbl += _('new file') bin_op = NEW_FILENODE elif MOD_FILENODE in stats['ops']: lbl += _('mod') bin_op = MOD_FILENODE elif DEL_FILENODE in stats['ops']: lbl += _('del') bin_op = DEL_FILENODE elif RENAMED_FILENODE in stats['ops']: lbl += _('rename') bin_op = RENAMED_FILENODE #chmod can go with other operations if CHMOD_FILENODE in stats['ops']: _org_lbl = _('chmod') lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl #import ipdb;ipdb.set_trace() b_d = ' %s ' % (bin_op, cgen('a', a_v='', d_v=0), lbl)
b_a = ''
return literal('%s%s ' % (width, b_a, b_d))
t = stats['added'] + stats['deleted']
unit = float(width) / (t or 1)
# needs > 9% of width to be visible or 0 to be hidden
a_p = max(9, unit * a) if a > 0 else 0
d_p = max(9, unit * d) if d > 0 else 0
p_sum = a_p + d_p
if p_sum > width:
#adjust the percentage to be == 100% since we adjusted to 9
if a_p > d_p:
a_p = a_p - (p_sum - width)
d_p = d_p - (p_sum - width)
a_v = a if a > 0 else ''
d_v = d if d > 0 else ''
d_a = '%s ' % (
cgen('a', a_v, d_v), a_p, a_v
d_d = '%s ' % (
cgen('d', a_v, d_v), d_p, d_v
return literal('%s%s ' % (width, d_a, d_d))
def urlify_text(text_, safe=True):
Extrac urls from text and make html links out of them
:param text_:
url_pat = re.compile(r'''(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]'''
def url_func(match_obj):
url_full = match_obj.groups()[0]
return '%(url)s' % ({'url': url_full})
_newtext = url_pat.sub(url_func, text_)
if safe:
return literal(_newtext)
return _newtext
def urlify_changesets(text_, repository):
Extract revision ids from changeset and make link from them
:param text_:
:param repository: repo name to build the URL with
from pylons import url # doh, we need to re-import url to mock it later
URL_PAT = re.compile(r'(^|\s)([0-9a-fA-F]{12,40})($|\s)')
def url_func(match_obj):
rev = match_obj.groups()[1]
pref = match_obj.groups()[0]
suf = match_obj.groups()[2]
tmpl = (
return tmpl % {
'pref': pref,
'cls': 'revision-link',
'url': url('changeset_home', repo_name=repository, revision=rev),
'rev': rev,
'suf': suf
newtext = URL_PAT.sub(url_func, text_)
return newtext
def urlify_commit(text_, repository=None, link_=None):
Parses given text message and makes proper links.
issues are linked to given issue-server, and rest is a changeset link
if link_ is given, in other case it's a plain text
:param text_:
:param repository:
:param link_: changeset link
import traceback
from pylons import url # doh, we need to re-import url to mock it later
def escaper(string):
return string.replace('<', '<').replace('>', '>')
def linkify_others(t, l):
urls = re.compile(r'(\%s ' %
def rst_w_mentions(source):
Wrapped rst renderer with @mention highlighting
:param source:
return literal('%s ' %
def changeset_status(repo, revision):
return ChangesetStatusModel().get_status(repo, revision)
def changeset_status_lbl(changeset_status):
return dict(ChangesetStatus.STATUSES).get(changeset_status)
def get_permission_name(key):
return dict(Permission.PERMS).get(key)
def journal_filter_help():
return _(textwrap.dedent('''
Example filter terms:
date:[20120101100000 TO 20120102]
Generate wildcards using '*' character:
"repositroy:vcs*" - search everything starting with 'vcs'
"repository:*vcs*" - search for repository containing 'vcs'
Optional AND / OR operators in queries
"repository:vcs OR repository:test"
"username:test AND repository:test*"
def not_mapped_error(repo_name):
flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name, category='error')
def ip_range(ip_addr):
from rhodecode.model.db import UserIpMap
s, e = UserIpMap._get_ip_range(ip_addr)
return '%s - %s' % (s, e)