ui: introduce user-bookmarks for creation of quick shortcuts
ui: introduce user-bookmarks for creation of quick shortcuts

File last commit:

r3363:f08e98b1 default
r3424:c7ed0ba5 default
Show More
db_4_3_0_0.py
3480 lines | 129.4 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright (C) 2010-2019 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Database Models for RhodeCode Enterprise
"""
import os
import sys
import time
import hashlib
import logging
import datetime
import warnings
import ipaddress
import functools
import traceback
import collections
from sqlalchemy import *
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import (
relationship, joinedload, class_mapper, validates, aliased)
from sqlalchemy.sql.expression import true
from beaker.cache import cache_region, region_invalidate
from webob.exc import HTTPNotFound
from zope.cachedescriptors.property import Lazy as LazyProperty
# replace pylons with fake url for migration
from rhodecode.lib.dbmigrate.schema import url
from rhodecode.translation import _
from rhodecode.lib.vcs import get_backend
from rhodecode.lib.vcs.utils.helpers import get_scm
from rhodecode.lib.vcs.exceptions import VCSError
from rhodecode.lib.vcs.backends.base import (
EmptyCommit, Reference, MergeFailureReason)
from rhodecode.lib.utils2 import (
str2bool, safe_str, get_commit_safe, safe_unicode, remove_prefix, md5_safe,
time_to_datetime, aslist, Optional, safe_int, get_clone_url, AttributeDict)
from rhodecode.lib.ext_json import json
from rhodecode.lib.caching_query import FromCache
from rhodecode.lib.encrypt import AESCipher
from rhodecode.model.meta import Base, Session
URL_SEP = '/'
log = logging.getLogger(__name__)
# =============================================================================
# BASE CLASSES
# =============================================================================
# this is propagated from .ini file rhodecode.encrypted_values.secret or
# beaker.session.secret if first is not set.
# and initialized at environment.py
ENCRYPTION_KEY = None
# used to sort permissions by types, '#' used here is not allowed to be in
# usernames, and it's very early in sorted string.printable table.
PERMISSION_TYPE_SORT = {
'admin': '####',
'write': '###',
'read': '##',
'none': '#',
}
def display_sort(obj):
"""
Sort function used to sort permissions in .permissions() function of
Repository, RepoGroup, UserGroup. Also it put the default user in front
of all other resources
"""
if obj.username == User.DEFAULT_USER:
return '#####'
prefix = PERMISSION_TYPE_SORT.get(obj.permission.split('.')[-1], '')
return prefix + obj.username
def _hash_key(k):
return md5_safe(k)
class EncryptedTextValue(TypeDecorator):
"""
Special column for encrypted long text data, use like::
value = Column("encrypted_value", EncryptedValue(), nullable=False)
This column is intelligent so if value is in unencrypted form it return
unencrypted form, but on save it always encrypts
"""
impl = Text
def process_bind_param(self, value, dialect):
if not value:
return value
if value.startswith('enc$aes$') or value.startswith('enc$aes_hmac$'):
# protect against double encrypting if someone manually starts
# doing
raise ValueError('value needs to be in unencrypted format, ie. '
'not starting with enc$aes')
return 'enc$aes_hmac$%s' % AESCipher(
ENCRYPTION_KEY, hmac=True).encrypt(value)
def process_result_value(self, value, dialect):
import rhodecode
if not value:
return value
parts = value.split('$', 3)
if not len(parts) == 3:
# probably not encrypted values
return value
else:
if parts[0] != 'enc':
# parts ok but without our header ?
return value
enc_strict_mode = str2bool(rhodecode.CONFIG.get(
'rhodecode.encrypted_values.strict') or True)
# at that stage we know it's our encryption
if parts[1] == 'aes':
decrypted_data = AESCipher(ENCRYPTION_KEY).decrypt(parts[2])
elif parts[1] == 'aes_hmac':
decrypted_data = AESCipher(
ENCRYPTION_KEY, hmac=True,
strict_verification=enc_strict_mode).decrypt(parts[2])
else:
raise ValueError(
'Encryption type part is wrong, must be `aes` '
'or `aes_hmac`, got `%s` instead' % (parts[1]))
return decrypted_data
class BaseModel(object):
"""
Base Model for all classes
"""
@classmethod
def _get_keys(cls):
"""return column names for this model """
return class_mapper(cls).c.keys()
def get_dict(self):
"""
return dict with keys and values corresponding
to this model data """
d = {}
for k in self._get_keys():
d[k] = getattr(self, k)
# also use __json__() if present to get additional fields
_json_attr = getattr(self, '__json__', None)
if _json_attr:
# update with attributes from __json__
if callable(_json_attr):
_json_attr = _json_attr()
for k, val in _json_attr.iteritems():
d[k] = val
return d
def get_appstruct(self):
"""return list with keys and values tuples corresponding
to this model data """
l = []
for k in self._get_keys():
l.append((k, getattr(self, k),))
return l
def populate_obj(self, populate_dict):
"""populate model with data from given populate_dict"""
for k in self._get_keys():
if k in populate_dict:
setattr(self, k, populate_dict[k])
@classmethod
def query(cls):
return Session().query(cls)
@classmethod
def get(cls, id_):
if id_:
return cls.query().get(id_)
@classmethod
def get_or_404(cls, id_):
try:
id_ = int(id_)
except (TypeError, ValueError):
raise HTTPNotFound
res = cls.query().get(id_)
if not res:
raise HTTPNotFound
return res
@classmethod
def getAll(cls):
# deprecated and left for backward compatibility
return cls.get_all()
@classmethod
def get_all(cls):
return cls.query().all()
@classmethod
def delete(cls, id_):
obj = cls.query().get(id_)
Session().delete(obj)
@classmethod
def identity_cache(cls, session, attr_name, value):
exist_in_session = []
for (item_cls, pkey), instance in session.identity_map.items():
if cls == item_cls and getattr(instance, attr_name) == value:
exist_in_session.append(instance)
if exist_in_session:
if len(exist_in_session) == 1:
return exist_in_session[0]
log.exception(
'multiple objects with attr %s and '
'value %s found with same name: %r',
attr_name, value, exist_in_session)
def __repr__(self):
if hasattr(self, '__unicode__'):
# python repr needs to return str
try:
return safe_str(self.__unicode__())
except UnicodeDecodeError:
pass
return '<DB:%s>' % (self.__class__.__name__)
class RhodeCodeSetting(Base, BaseModel):
__tablename__ = 'rhodecode_settings'
__table_args__ = (
UniqueConstraint('app_settings_name'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
SETTINGS_TYPES = {
'str': safe_str,
'int': safe_int,
'unicode': safe_unicode,
'bool': str2bool,
'list': functools.partial(aslist, sep=',')
}
DEFAULT_UPDATE_URL = 'https://rhodecode.com/api/v1/info/versions'
GLOBAL_CONF_KEY = 'app_settings'
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(255), nullable=True, unique=None, default=None)
_app_settings_value = Column("app_settings_value", String(4096), nullable=True, unique=None, default=None)
_app_settings_type = Column("app_settings_type", String(255), nullable=True, unique=None, default=None)
def __init__(self, key='', val='', type='unicode'):
self.app_settings_name = key
self.app_settings_type = type
self.app_settings_value = val
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
_type = self.app_settings_type
if _type:
_type = self.app_settings_type.split('.')[0]
# decode the encrypted value
if 'encrypted' in self.app_settings_type:
cipher = EncryptedTextValue()
v = safe_unicode(cipher.process_result_value(v, None))
converter = self.SETTINGS_TYPES.get(_type) or \
self.SETTINGS_TYPES['unicode']
return converter(v)
@app_settings_value.setter
def app_settings_value(self, val):
"""
Setter that will always make sure we use unicode in app_settings_value
:param val:
"""
val = safe_unicode(val)
# encode the encrypted value
if 'encrypted' in self.app_settings_type:
cipher = EncryptedTextValue()
val = safe_unicode(cipher.process_bind_param(val, None))
self._app_settings_value = val
@hybrid_property
def app_settings_type(self):
return self._app_settings_type
@app_settings_type.setter
def app_settings_type(self, val):
if val.split('.')[0] not in self.SETTINGS_TYPES:
raise Exception('type must be one of %s got %s'
% (self.SETTINGS_TYPES.keys(), val))
self._app_settings_type = val
def __unicode__(self):
return u"<%s('%s:%s[%s]')>" % (
self.__class__.__name__,
self.app_settings_name, self.app_settings_value,
self.app_settings_type
)
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
__table_args__ = (
UniqueConstraint('ui_key'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
HOOK_REPO_SIZE = 'changegroup.repo_size'
# HG
HOOK_PRE_PULL = 'preoutgoing.pre_pull'
HOOK_PULL = 'outgoing.pull_logger'
HOOK_PRE_PUSH = 'prechangegroup.pre_push'
HOOK_PUSH = 'changegroup.push_logger'
# TODO: johbo: Unify way how hooks are configured for git and hg,
# git part is currently hardcoded.
# SVN PATTERNS
SVN_BRANCH_ID = 'vcs_svn_branch'
SVN_TAG_ID = 'vcs_svn_tag'
ui_id = Column(
"ui_id", Integer(), nullable=False, unique=True, default=None,
primary_key=True)
ui_section = Column(
"ui_section", String(255), nullable=True, unique=None, default=None)
ui_key = Column(
"ui_key", String(255), nullable=True, unique=None, default=None)
ui_value = Column(
"ui_value", String(255), nullable=True, unique=None, default=None)
ui_active = Column(
"ui_active", Boolean(), nullable=True, unique=None, default=True)
def __repr__(self):
return '<%s[%s]%s=>%s]>' % (self.__class__.__name__, self.ui_section,
self.ui_key, self.ui_value)
class RepoRhodeCodeSetting(Base, BaseModel):
__tablename__ = 'repo_rhodecode_settings'
__table_args__ = (
UniqueConstraint(
'app_settings_name', 'repository_id',
name='uq_repo_rhodecode_setting_name_repo_id'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
repository_id = Column(
"repository_id", Integer(), ForeignKey('repositories.repo_id'),
nullable=False)
app_settings_id = Column(
"app_settings_id", Integer(), nullable=False, unique=True,
default=None, primary_key=True)
app_settings_name = Column(
"app_settings_name", String(255), nullable=True, unique=None,
default=None)
_app_settings_value = Column(
"app_settings_value", String(4096), nullable=True, unique=None,
default=None)
_app_settings_type = Column(
"app_settings_type", String(255), nullable=True, unique=None,
default=None)
repository = relationship('Repository')
def __init__(self, repository_id, key='', val='', type='unicode'):
self.repository_id = repository_id
self.app_settings_name = key
self.app_settings_type = type
self.app_settings_value = val
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
type_ = self.app_settings_type
SETTINGS_TYPES = RhodeCodeSetting.SETTINGS_TYPES
converter = SETTINGS_TYPES.get(type_) or SETTINGS_TYPES['unicode']
return converter(v)
@app_settings_value.setter
def app_settings_value(self, val):
"""
Setter that will always make sure we use unicode in app_settings_value
:param val:
"""
self._app_settings_value = safe_unicode(val)
@hybrid_property
def app_settings_type(self):
return self._app_settings_type
@app_settings_type.setter
def app_settings_type(self, val):
SETTINGS_TYPES = RhodeCodeSetting.SETTINGS_TYPES
if val not in SETTINGS_TYPES:
raise Exception('type must be one of %s got %s'
% (SETTINGS_TYPES.keys(), val))
self._app_settings_type = val
def __unicode__(self):
return u"<%s('%s:%s:%s[%s]')>" % (
self.__class__.__name__, self.repository.repo_name,
self.app_settings_name, self.app_settings_value,
self.app_settings_type
)
class RepoRhodeCodeUi(Base, BaseModel):
__tablename__ = 'repo_rhodecode_ui'
__table_args__ = (
UniqueConstraint(
'repository_id', 'ui_section', 'ui_key',
name='uq_repo_rhodecode_ui_repository_id_section_key'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
repository_id = Column(
"repository_id", Integer(), ForeignKey('repositories.repo_id'),
nullable=False)
ui_id = Column(
"ui_id", Integer(), nullable=False, unique=True, default=None,
primary_key=True)
ui_section = Column(
"ui_section", String(255), nullable=True, unique=None, default=None)
ui_key = Column(
"ui_key", String(255), nullable=True, unique=None, default=None)
ui_value = Column(
"ui_value", String(255), nullable=True, unique=None, default=None)
ui_active = Column(
"ui_active", Boolean(), nullable=True, unique=None, default=True)
repository = relationship('Repository')
def __repr__(self):
return '<%s[%s:%s]%s=>%s]>' % (
self.__class__.__name__, self.repository.repo_name,
self.ui_section, self.ui_key, self.ui_value)
class User(Base, BaseModel):
__tablename__ = 'users'
__table_args__ = (
UniqueConstraint('username'), UniqueConstraint('email'),
Index('u_username_idx', 'username'),
Index('u_email_idx', 'email'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
DEFAULT_USER = 'default'
DEFAULT_USER_EMAIL = 'anonymous@rhodecode.org'
DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(255), nullable=True, unique=None, default=None)
password = Column("password", String(255), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=True)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("firstname", String(255), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(255), nullable=True, unique=None, default=None)
_email = Column("email", String(255), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
extern_type = Column("extern_type", String(255), nullable=True, unique=None, default=None)
extern_name = Column("extern_name", String(255), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(255), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_user_data = Column("user_data", LargeBinary(), nullable=True) # JSON data
user_log = relationship('UserLog')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
repository_groups = relationship('RepoGroup')
user_groups = relationship('UserGroup')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
user_group_to_perm = relationship('UserUserGroupToPerm', primaryjoin='UserUserGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UserGroupMember', cascade='all')
notifications = relationship('UserNotification', cascade='all')
# notifications assigned to this user
user_created_notifications = relationship('Notification', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
# user profile extra info
user_emails = relationship('UserEmailMap', cascade='all')
user_ip_map = relationship('UserIpMap', cascade='all')
user_auth_tokens = relationship('UserApiKeys', cascade='all')
# gists
user_gists = relationship('Gist', cascade='all')
# user pull requests
user_pull_requests = relationship('PullRequest', cascade='all')
# external identities
extenal_identities = relationship(
'ExternalIdentity',
primaryjoin="User.user_id==ExternalIdentity.local_user_id",
cascade='all')
def __unicode__(self):
return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
@hybrid_property
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
@property
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user==self).all()
return [self.email] + [x.email for x in other]
@property
def auth_tokens(self):
return [self.api_key] + [x.api_key for x in self.extra_auth_tokens]
@property
def extra_auth_tokens(self):
return UserApiKeys.query().filter(UserApiKeys.user == self).all()
@property
def feed_token(self):
feed_tokens = UserApiKeys.query()\
.filter(UserApiKeys.user == self)\
.filter(UserApiKeys.role == UserApiKeys.ROLE_FEED)\
.all()
if feed_tokens:
return feed_tokens[0].api_key
else:
# use the main token so we don't end up with nothing...
return self.api_key
@classmethod
def extra_valid_auth_tokens(cls, user, role=None):
tokens = UserApiKeys.query().filter(UserApiKeys.user == user)\
.filter(or_(UserApiKeys.expires == -1,
UserApiKeys.expires >= time.time()))
if role:
tokens = tokens.filter(or_(UserApiKeys.role == role,
UserApiKeys.role == UserApiKeys.ROLE_ALL))
return tokens.all()
@property
def ip_addresses(self):
ret = UserIpMap.query().filter(UserIpMap.user == self).all()
return [x.ip_addr for x in ret]
@property
def username_and_name(self):
return '%s (%s %s)' % (self.username, self.firstname, self.lastname)
@property
def username_or_name_or_email(self):
full_name = self.full_name if self.full_name is not ' ' else None
return self.username or full_name or self.email
@property
def full_name(self):
return '%s %s' % (self.firstname, self.lastname)
@property
def full_name_or_username(self):
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
@property
def full_contact(self):
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
@property
def short_contact(self):
return '%s %s' % (self.firstname, self.lastname)
@property
def is_admin(self):
return self.admin
@property
def AuthUser(self):
"""
Returns instance of AuthUser for this user
"""
from rhodecode.lib.auth import AuthUser
return AuthUser(user_id=self.user_id, api_key=self.api_key,
username=self.username)
@hybrid_property
def user_data(self):
if not self._user_data:
return {}
try:
return json.loads(self._user_data)
except TypeError:
return {}
@user_data.setter
def user_data(self, val):
if not isinstance(val, dict):
raise Exception('user_data must be dict, got %s' % type(val))
try:
self._user_data = json.dumps(val)
except Exception:
log.error(traceback.format_exc())
@classmethod
def get_by_username(cls, username, case_insensitive=False,
cache=False, identity_cache=False):
session = Session()
if case_insensitive:
q = cls.query().filter(
func.lower(cls.username) == func.lower(username))
else:
q = cls.query().filter(cls.username == username)
if cache:
if identity_cache:
val = cls.identity_cache(session, 'username', username)
if val:
return val
else:
q = q.options(
FromCache("sql_cache_short",
"get_user_by_name_%s" % _hash_key(username)))
return q.scalar()
@classmethod
def get_by_auth_token(cls, auth_token, cache=False, fallback=True):
q = cls.query().filter(cls.api_key == auth_token)
if cache:
q = q.options(FromCache("sql_cache_short",
"get_auth_token_%s" % auth_token))
res = q.scalar()
if fallback and not res:
#fallback to additional keys
_res = UserApiKeys.query()\
.filter(UserApiKeys.api_key == auth_token)\
.filter(or_(UserApiKeys.expires == -1,
UserApiKeys.expires >= time.time()))\
.first()
if _res:
res = _res.user
return res
@classmethod
def get_by_email(cls, email, case_insensitive=False, cache=False):
if case_insensitive:
q = cls.query().filter(func.lower(cls.email) == func.lower(email))
else:
q = cls.query().filter(cls.email == email)
if cache:
q = q.options(FromCache("sql_cache_short",
"get_email_key_%s" % email))
ret = q.scalar()
if ret is None:
q = UserEmailMap.query()
# try fetching in alternate email map
if case_insensitive:
q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
else:
q = q.filter(UserEmailMap.email == email)
q = q.options(joinedload(UserEmailMap.user))
if cache:
q = q.options(FromCache("sql_cache_short",
"get_email_map_key_%s" % email))
ret = getattr(q.scalar(), 'user', None)
return ret
@classmethod
def get_from_cs_author(cls, author):
"""
Tries to get User objects out of commit author string
:param author:
"""
from rhodecode.lib.helpers import email, author_name
# Valid email in the attribute passed, see if they're in the system
_email = email(author)
if _email:
user = cls.get_by_email(_email, case_insensitive=True)
if user:
return user
# Maybe we can match by username?
_author = author_name(author)
user = cls.get_by_username(_author, case_insensitive=True)
if user:
return user
def update_userdata(self, **kwargs):
usr = self
old = usr.user_data
old.update(**kwargs)
usr.user_data = old
Session().add(usr)
log.debug('updated userdata with ', kwargs)
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session().add(self)
log.debug('updated user %s lastlogin', self.username)
def update_lastactivity(self):
"""Update user lastactivity"""
usr = self
old = usr.user_data
old.update({'last_activity': time.time()})
usr.user_data = old
Session().add(usr)
log.debug('updated user %s lastactivity', usr.username)
def update_password(self, new_password, change_api_key=False):
from rhodecode.lib.auth import get_crypt_password,generate_auth_token
self.password = get_crypt_password(new_password)
if change_api_key:
self.api_key = generate_auth_token(self.username)
Session().add(self)
@classmethod
def get_first_super_admin(cls):
user = User.query().filter(User.admin == true()).first()
if user is None:
raise Exception('FATAL: Missing administrative account!')
return user
@classmethod
def get_all_super_admins(cls):
"""
Returns all admin accounts sorted by username
"""
return User.query().filter(User.admin == true())\
.order_by(User.username.asc()).all()
@classmethod
def get_default_user(cls, cache=False):
user = User.get_by_username(User.DEFAULT_USER, cache=cache)
if user is None:
raise Exception('FATAL: Missing default account!')
return user
def _get_default_perms(self, user, suffix=''):
from rhodecode.model.permission import PermissionModel
return PermissionModel().get_default_perms(user.user_perms, suffix)
def get_default_perms(self, suffix=''):
return self._get_default_perms(self, suffix)
def get_api_data(self, include_secrets=False, details='full'):
"""
Common function for generating user related data for API
:param include_secrets: By default secrets in the API data will be replaced
by a placeholder value to prevent exposing this data by accident. In case
this data shall be exposed, set this flag to ``True``.
:param details: details can be 'basic|full' basic gives only a subset of
the available user information that includes user_id, name and emails.
"""
user = self
user_data = self.user_data
data = {
'user_id': user.user_id,
'username': user.username,
'firstname': user.name,
'lastname': user.lastname,
'email': user.email,
'emails': user.emails,
}
if details == 'basic':
return data
api_key_length = 40
api_key_replacement = '*' * api_key_length
extras = {
'api_key': api_key_replacement,
'api_keys': [api_key_replacement],
'active': user.active,
'admin': user.admin,
'extern_type': user.extern_type,
'extern_name': user.extern_name,
'last_login': user.last_login,
'ip_addresses': user.ip_addresses,
'language': user_data.get('language')
}
data.update(extras)
if include_secrets:
data['api_key'] = user.api_key
data['api_keys'] = user.auth_tokens
return data
def __json__(self):
data = {
'full_name': self.full_name,
'full_name_or_username': self.full_name_or_username,
'short_contact': self.short_contact,
'full_contact': self.full_contact,
}
data.update(self.get_api_data())
return data
class UserApiKeys(Base, BaseModel):
__tablename__ = 'user_api_keys'
__table_args__ = (
Index('uak_api_key_idx', 'api_key'),
Index('uak_api_key_expires_idx', 'api_key', 'expires'),
UniqueConstraint('api_key'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
__mapper_args__ = {}
# ApiKey role
ROLE_ALL = 'token_role_all'
ROLE_HTTP = 'token_role_http'
ROLE_VCS = 'token_role_vcs'
ROLE_API = 'token_role_api'
ROLE_FEED = 'token_role_feed'
ROLES = [ROLE_ALL, ROLE_HTTP, ROLE_VCS, ROLE_API, ROLE_FEED]
user_api_key_id = Column("user_api_key_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(255), nullable=False, unique=True)
description = Column('description', UnicodeText().with_variant(UnicodeText(1024), 'mysql'))
expires = Column('expires', Float(53), nullable=False)
role = Column('role', String(255), nullable=True)
created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
user = relationship('User', lazy='joined')
@classmethod
def _get_role_name(cls, role):
return {
cls.ROLE_ALL: _('all'),
cls.ROLE_HTTP: _('http/web interface'),
cls.ROLE_VCS: _('vcs (git/hg/svn protocol)'),
cls.ROLE_API: _('api calls'),
cls.ROLE_FEED: _('feed access'),
}.get(role, role)
@property
def expired(self):
if self.expires == -1:
return False
return time.time() > self.expires
@property
def role_humanized(self):
return self._get_role_name(self.role)
class UserEmailMap(Base, BaseModel):
__tablename__ = 'user_email_map'
__table_args__ = (
Index('uem_email_idx', 'email'),
UniqueConstraint('email'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
__mapper_args__ = {}
email_id = Column("email_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
_email = Column("email", String(255), nullable=True, unique=False, default=None)
user = relationship('User', lazy='joined')
@validates('_email')
def validate_email(self, key, email):
# check if this email is not main one
main_email = Session().query(User).filter(User.email == email).scalar()
if main_email is not None:
raise AttributeError('email %s is present is user table' % email)
return email
@hybrid_property
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
class UserIpMap(Base, BaseModel):
__tablename__ = 'user_ip_map'
__table_args__ = (
UniqueConstraint('user_id', 'ip_addr'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
)
__mapper_args__ = {}
ip_id = Column("ip_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
ip_addr = Column("ip_addr", String(255), nullable=True, unique=False, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(10000), nullable=True, unique=None, default=None)
user = relationship('User', lazy='joined')
@classmethod
def _get_ip_range(cls, ip_addr):
net = ipaddress.ip_network(ip_addr, strict=False)
return [str(net.network_address), str(net.broadcast_address)]
def __json__(self):
return {
'ip_addr': self.ip_addr,
'ip_range': self._get_ip_range(self.ip_addr),
}
def __unicode__(self):
return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
self.user_id, self.ip_addr)
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
__table_args__ = (
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
)
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
username = Column("username", String(255), nullable=True, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column("repository_name", String(255), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(255), nullable=True, unique=None, default=None)
action = Column("action", Text().with_variant(Text(1200000), 'mysql'), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def __unicode__(self):
return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
self.repository_name,
self.action)
@property
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository', cascade='')
class UserGroup(Base, BaseModel):
__tablename__ = 'users_groups'
__table_args__ = (
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
)
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(255), nullable=False, unique=True, default=None)
user_group_description = Column("user_group_description", String(10000), nullable=True, unique=None, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_group_data = Column("group_data", LargeBinary(), nullable=True) # JSON data
members = relationship('UserGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
user_user_group_to_perm = relationship('UserUserGroupToPerm', cascade='all')
user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
user = relationship('User')
@hybrid_property
def group_data(self):
if not self._group_data:
return {}
try:
return json.loads(self._group_data)
except TypeError:
return {}
@group_data.setter
def group_data(self, val):
try:
self._group_data = json.dumps(val)
except Exception:
log.error(traceback.format_exc())
def __unicode__(self):
return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
self.users_group_id,
self.users_group_name)
@classmethod
def get_by_group_name(cls, group_name, cache=False,
case_insensitive=False):
if case_insensitive:
q = cls.query().filter(func.lower(cls.users_group_name) ==
func.lower(group_name))
else:
q = cls.query().filter(cls.users_group_name == group_name)
if cache:
q = q.options(FromCache(
"sql_cache_short",
"get_group_%s" % _hash_key(group_name)))
return q.scalar()
@classmethod
def get(cls, user_group_id, cache=False):
user_group = cls.query()
if cache:
user_group = user_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % user_group_id))
return user_group.get(user_group_id)
def permissions(self, with_admins=True, with_owner=True):
q = UserUserGroupToPerm.query().filter(UserUserGroupToPerm.user_group == self)
q = q.options(joinedload(UserUserGroupToPerm.user_group),
joinedload(UserUserGroupToPerm.user),
joinedload(UserUserGroupToPerm.permission),)
# get owners and admins and permissions. We do a trick of re-writing
# objects from sqlalchemy to named-tuples due to sqlalchemy session
# has a global reference and changing one object propagates to all
# others. This means if admin is also an owner admin_row that change
# would propagate to both objects
perm_rows = []
for _usr in q.all():
usr = AttributeDict(_usr.user.get_dict())
usr.permission = _usr.permission.permission_name
perm_rows.append(usr)
# filter the perm rows by 'default' first and then sort them by
# admin,write,read,none permissions sorted again alphabetically in
# each group
perm_rows = sorted(perm_rows, key=display_sort)
_admin_perm = 'usergroup.admin'
owner_row = []
if with_owner:
usr = AttributeDict(self.user.get_dict())
usr.owner_row = True
usr.permission = _admin_perm
owner_row.append(usr)
super_admin_rows = []
if with_admins:
for usr in User.get_all_super_admins():
# if this admin is also owner, don't double the record
if usr.user_id == owner_row[0].user_id:
owner_row[0].admin_row = True
else:
usr = AttributeDict(usr.get_dict())
usr.admin_row = True
usr.permission = _admin_perm
super_admin_rows.append(usr)
return super_admin_rows + owner_row + perm_rows
def permission_user_groups(self):
q = UserGroupUserGroupToPerm.query().filter(UserGroupUserGroupToPerm.target_user_group == self)
q = q.options(joinedload(UserGroupUserGroupToPerm.user_group),
joinedload(UserGroupUserGroupToPerm.target_user_group),
joinedload(UserGroupUserGroupToPerm.permission),)
perm_rows = []
for _user_group in q.all():
usr = AttributeDict(_user_group.user_group.get_dict())
usr.permission = _user_group.permission.permission_name
perm_rows.append(usr)
return perm_rows
def _get_default_perms(self, user_group, suffix=''):
from rhodecode.model.permission import PermissionModel
return PermissionModel().get_default_perms(user_group.users_group_to_perm, suffix)
def get_default_perms(self, suffix=''):
return self._get_default_perms(self, suffix)
def get_api_data(self, with_group_members=True, include_secrets=False):
"""
:param include_secrets: See :meth:`User.get_api_data`, this parameter is
basically forwarded.
"""
user_group = self
data = {
'users_group_id': user_group.users_group_id,
'group_name': user_group.users_group_name,
'group_description': user_group.user_group_description,
'active': user_group.users_group_active,
'owner': user_group.user.username,
}
if with_group_members:
users = []
for user in user_group.members:
user = user.user
users.append(user.get_api_data(include_secrets=include_secrets))
data['users'] = users
return data
class UserGroupMember(Base, BaseModel):
__tablename__ = 'users_groups_members'
__table_args__ = (
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
)
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
user = relationship('User', lazy='joined')
users_group = relationship('UserGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class RepositoryField(Base, BaseModel):
__tablename__ = 'repositories_fields'
__table_args__ = (
UniqueConstraint('repository_id', 'field_key'), # no-multi field
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
)
PREFIX = 'ex_' # prefix used in form to not conflict with already existing fields
repo_field_id = Column("repo_field_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
field_key = Column("field_key", String(250))
field_label = Column("field_label", String(1024), nullable=False)
field_value = Column("field_value", String(10000), nullable=False)
field_desc = Column("field_desc", String(1024), nullable=False)
field_type = Column("field_type", String(255), nullable=False, unique=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
repository = relationship('Repository')
@property
def field_key_prefixed(self):
return 'ex_%s' % self.field_key
@classmethod
def un_prefix_key(cls, key):
if key.startswith(cls.PREFIX):
return key[len(cls.PREFIX):]
return key
@classmethod
def get_by_key_name(cls, key, repo):
row = cls.query()\
.filter(cls.repository == repo)\
.filter(cls.field_key == key).scalar()
return row
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
__table_args__ = (
Index('r_repo_name_idx', 'repo_name', mysql_length=255),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
)
DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
DEFAULT_CLONE_URI_ID = '{scheme}://{user}@{netloc}/_{repoid}'
STATE_CREATED = 'repo_state_created'
STATE_PENDING = 'repo_state_pending'
STATE_ERROR = 'repo_state_error'
LOCK_AUTOMATIC = 'lock_auto'
LOCK_API = 'lock_api'
LOCK_WEB = 'lock_web'
LOCK_PULL = 'lock_pull'
NAME_SEP = URL_SEP
repo_id = Column(
"repo_id", Integer(), nullable=False, unique=True, default=None,
primary_key=True)
_repo_name = Column(
"repo_name", Text(), nullable=False, default=None)
_repo_name_hash = Column(
"repo_name_hash", String(255), nullable=False, unique=True)
repo_state = Column("repo_state", String(255), nullable=True)
clone_uri = Column(
"clone_uri", EncryptedTextValue(), nullable=True, unique=False,
default=None)
repo_type = Column(
"repo_type", String(255), nullable=False, unique=False, default=None)
user_id = Column(
"user_id", Integer(), ForeignKey('users.user_id'), nullable=False,
unique=False, default=None)
private = Column(
"private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column(
"statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column(
"downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column(
"description", String(10000), nullable=True, unique=None, default=None)
created_on = Column(
'created_on', DateTime(timezone=False), nullable=True, unique=None,
default=datetime.datetime.now)
updated_on = Column(
'updated_on', DateTime(timezone=False), nullable=True, unique=None,
default=datetime.datetime.now)
_landing_revision = Column(
"landing_revision", String(255), nullable=False, unique=False,
default=None)
enable_locking = Column(
"enable_locking", Boolean(), nullable=False, unique=None,
default=False)
_locked = Column(
"locked", String(255), nullable=True, unique=False, default=None)
_changeset_cache = Column(
"changeset_cache", LargeBinary(), nullable=True) # JSON data
fork_id = Column(
"fork_id", Integer(), ForeignKey('repositories.repo_id'),
nullable=True, unique=False, default=None)
group_id = Column(
"group_id", Integer(), ForeignKey('groups.group_id'), nullable=True,
unique=False, default=None)
user = relationship('User', lazy='joined')
fork = relationship('Repository', remote_side=repo_id, lazy='joined')
group = relationship('RepoGroup', lazy='joined')
repo_to_perm = relationship(
'UserRepoToPerm', cascade='all',
order_by='UserRepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship(
'UserFollowing',
primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id',
cascade='all')
extra_fields = relationship(
'RepositoryField', cascade="all, delete, delete-orphan")
logs = relationship('UserLog')
comments = relationship(
'ChangesetComment', cascade="all, delete, delete-orphan")
pull_requests_source = relationship(
'PullRequest',
primaryjoin='PullRequest.source_repo_id==Repository.repo_id',
cascade="all, delete, delete-orphan")
pull_requests_target = relationship(
'PullRequest',
primaryjoin='PullRequest.target_repo_id==Repository.repo_id',
cascade="all, delete, delete-orphan")
ui = relationship('RepoRhodeCodeUi', cascade="all")
settings = relationship('RepoRhodeCodeSetting', cascade="all")
def __unicode__(self):
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
safe_unicode(self.repo_name))
@hybrid_property
def landing_rev(self):
# always should return [rev_type, rev]
if self._landing_revision:
_rev_info = self._landing_revision.split(':')
if len(_rev_info) < 2:
_rev_info.insert(0, 'rev')
return [_rev_info[0], _rev_info[1]]
return [None, None]
@landing_rev.setter
def landing_rev(self, val):
if ':' not in val:
raise ValueError('value must be delimited with `:` and consist '
'of <rev_type>:<rev>, got %s instead' % val)
self._landing_revision = val
@hybrid_property
def locked(self):
if self._locked:
user_id, timelocked, reason = self._locked.split(':')
lock_values = int(user_id), timelocked, reason
else:
lock_values = [None, None, None]
return lock_values
@locked.setter
def locked(self, val):
if val and isinstance(val, (list, tuple)):
self._locked = ':'.join(map(str, val))
else:
self._locked = None
@hybrid_property
def changeset_cache(self):
from rhodecode.lib.vcs.backends.base import EmptyCommit
dummy = EmptyCommit().__json__()
if not self._changeset_cache:
return dummy
try:
return json.loads(self._changeset_cache)
except TypeError:
return dummy
except Exception:
log.error(traceback.format_exc())
return dummy
@changeset_cache.setter
def changeset_cache(self, val):
try:
self._changeset_cache = json.dumps(val)
except Exception:
log.error(traceback.format_exc())
@hybrid_property
def repo_name(self):
return self._repo_name
@repo_name.setter
def repo_name(self, value):
self._repo_name = value
self._repo_name_hash = hashlib.sha1(safe_str(value)).hexdigest()
@classmethod
def normalize_repo_name(cls, repo_name):
"""
Normalizes os specific repo_name to the format internally stored inside
database using URL_SEP
:param cls:
:param repo_name:
"""
return cls.NAME_SEP.join(repo_name.split(os.sep))
@classmethod
def get_by_repo_name(cls, repo_name, cache=False, identity_cache=False):
session = Session()
q = session.query(cls).filter(cls.repo_name == repo_name)
if cache:
if identity_cache:
val = cls.identity_cache(session, 'repo_name', repo_name)
if val:
return val
else:
q = q.options(
FromCache("sql_cache_short",
"get_repo_by_name_%s" % _hash_key(repo_name)))
return q.scalar()
@classmethod
def get_by_full_path(cls, repo_full_path):
repo_name = repo_full_path.split(cls.base_path(), 1)[-1]
repo_name = cls.normalize_repo_name(repo_name)
return cls.get_by_repo_name(repo_name.strip(URL_SEP))
@classmethod
def get_repo_forks(cls, repo_id):
return cls.query().filter(Repository.fork_id == repo_id)
@classmethod
def base_path(cls):
"""
Returns base path when all repos are stored
:param cls:
"""
q = Session().query(RhodeCodeUi)\
.filter(RhodeCodeUi.ui_key == cls.NAME_SEP)
q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
@classmethod
def is_valid(cls, repo_name):
"""
returns True if given repo name is a valid filesystem repository
:param cls:
:param repo_name:
"""
from rhodecode.lib.utils import is_valid_repo
return is_valid_repo(repo_name, cls.base_path())
@classmethod
def get_all_repos(cls, user_id=Optional(None), group_id=Optional(None),
case_insensitive=True):
q = Repository.query()
if not isinstance(user_id, Optional):
q = q.filter(Repository.user_id == user_id)
if not isinstance(group_id, Optional):
q = q.filter(Repository.group_id == group_id)
if case_insensitive:
q = q.order_by(func.lower(Repository.repo_name))
else:
q = q.order_by(Repository.repo_name)
return q.all()
@property
def forks(self):
"""
Return forks of this repo
"""
return Repository.get_repo_forks(self.repo_id)
@property
def parent(self):
"""
Returns fork parent
"""
return self.fork
@property
def just_name(self):
return self.repo_name.split(self.NAME_SEP)[-1]
@property
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
return groups
@property
def groups_and_repo(self):
return self.groups_with_parents, self
@LazyProperty
def repo_path(self):
"""
Returns base full path for that repository means where it actually
exists on a filesystem
"""
q = Session().query(RhodeCodeUi).filter(
RhodeCodeUi.ui_key == self.NAME_SEP)
q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
@property
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split(self.NAME_SEP)
return os.path.join(*map(safe_unicode, p))
@property
def cache_keys(self):
"""
Returns associated cache keys for that repo
"""
return CacheKey.query()\
.filter(CacheKey.cache_args == self.repo_name)\
.order_by(CacheKey.cache_key)\
.all()
def get_new_name(self, repo_name):
"""
returns new full repository name based on assigned group and new new
:param group_name:
"""
path_prefix = self.group.full_path_splitted if self.group else []
return self.NAME_SEP.join(path_prefix + [repo_name])
@property
def _config(self):
"""
Returns db based config object.
"""
from rhodecode.lib.utils import make_db_config
return make_db_config(clear_session=False, repo=self)
def permissions(self, with_admins=True, with_owner=True):
q = UserRepoToPerm.query().filter(UserRepoToPerm.repository == self)
q = q.options(joinedload(UserRepoToPerm.repository),
joinedload(UserRepoToPerm.user),
joinedload(UserRepoToPerm.permission),)
# get owners and admins and permissions. We do a trick of re-writing
# objects from sqlalchemy to named-tuples due to sqlalchemy session
# has a global reference and changing one object propagates to all
# others. This means if admin is also an owner admin_row that change
# would propagate to both objects
perm_rows = []
for _usr in q.all():
usr = AttributeDict(_usr.user.get_dict())
usr.permission = _usr.permission.permission_name
perm_rows.append(usr)
# filter the perm rows by 'default' first and then sort them by
# admin,write,read,none permissions sorted again alphabetically in
# each group
perm_rows = sorted(perm_rows, key=display_sort)
_admin_perm = 'repository.admin'
owner_row = []
if with_owner:
usr = AttributeDict(self.user.get_dict())
usr.owner_row = True
usr.permission = _admin_perm
owner_row.append(usr)
super_admin_rows = []
if with_admins:
for usr in User.get_all_super_admins():
# if this admin is also owner, don't double the record
if usr.user_id == owner_row[0].user_id:
owner_row[0].admin_row = True
else:
usr = AttributeDict(usr.get_dict())
usr.admin_row = True
usr.permission = _admin_perm
super_admin_rows.append(usr)
return super_admin_rows + owner_row + perm_rows
def permission_user_groups(self):
q = UserGroupRepoToPerm.query().filter(
UserGroupRepoToPerm.repository == self)
q = q.options(joinedload(UserGroupRepoToPerm.repository),
joinedload(UserGroupRepoToPerm.users_group),
joinedload(UserGroupRepoToPerm.permission),)
perm_rows = []
for _user_group in q.all():
usr = AttributeDict(_user_group.users_group.get_dict())
usr.permission = _user_group.permission.permission_name
perm_rows.append(usr)
return perm_rows
def get_api_data(self, include_secrets=False):
"""
Common function for generating repo api data
:param include_secrets: See :meth:`User.get_api_data`.
"""
# TODO: mikhail: Here there is an anti-pattern, we probably need to
# move this methods on models level.
from rhodecode.model.settings import SettingsModel
repo = self
_user_id, _time, _reason = self.locked
data = {
'repo_id': repo.repo_id,
'repo_name': repo.repo_name,
'repo_type': repo.repo_type,
'clone_uri': repo.clone_uri or '',
'url': url('summary_home', repo_name=self.repo_name, qualified=True),
'private': repo.private,
'created_on': repo.created_on,
'description': repo.description,
'landing_rev': repo.landing_rev,
'owner': repo.user.username,
'fork_of': repo.fork.repo_name if repo.fork else None,
'enable_statistics': repo.enable_statistics,
'enable_locking': repo.enable_locking,
'enable_downloads': repo.enable_downloads,
'last_changeset': repo.changeset_cache,
'locked_by': User.get(_user_id).get_api_data(
include_secrets=include_secrets) if _user_id else None,
'locked_date': time_to_datetime(_time) if _time else None,
'lock_reason': _reason if _reason else None,
}
# TODO: mikhail: should be per-repo settings here
rc_config = SettingsModel().get_all_settings()
repository_fields = str2bool(
rc_config.get('rhodecode_repository_fields'))
if repository_fields:
for f in self.extra_fields:
data[f.field_key_prefixed] = f.field_value
return data
@classmethod
def lock(cls, repo, user_id, lock_time=None, lock_reason=None):
if not lock_time:
lock_time = time.time()
if not lock_reason:
lock_reason = cls.LOCK_AUTOMATIC
repo.locked = [user_id, lock_time, lock_reason]
Session().add(repo)
Session().commit()
@classmethod
def unlock(cls, repo):
repo.locked = None
Session().add(repo)
Session().commit()
@classmethod
def getlock(cls, repo):
return repo.locked
def is_user_lock(self, user_id):
if self.lock[0]:
lock_user_id = safe_int(self.lock[0])
user_id = safe_int(user_id)
# both are ints, and they are equal
return