##// END OF EJS Templates
Added option to swap the db connection uri based on ENV TEST_DB string
Added option to swap the db connection uri based on ENV TEST_DB string

File last commit:

r2543:03a77098 beta
r2560:380d6d86 beta
Show More
nodes.py
613 lines | 18.6 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
"""
vcs.nodes
~~~~~~~~~
Module holding everything related to vcs nodes.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
"""
import os
import stat
import posixpath
import mimetypes
from pygments import lexers
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.vcs.utils import safe_unicode
from rhodecode.lib.vcs.exceptions import NodeError
from rhodecode.lib.vcs.exceptions import RemovedFileNodeError
from rhodecode.lib.vcs.backends.base import EmptyChangeset
class NodeKind:
SUBMODULE = -1
DIR = 1
FILE = 2
class NodeState:
ADDED = u'added'
CHANGED = u'changed'
NOT_CHANGED = u'not changed'
REMOVED = u'removed'
class NodeGeneratorBase(object):
"""
Base class for removed added and changed filenodes, it's a lazy generator
class that will create filenodes only on iteration or call
The len method doesn't need to create filenodes at all
"""
def __init__(self, current_paths, cs):
self.cs = cs
self.current_paths = current_paths
def __call__(self):
return [n for n in self]
def __getslice__(self, i, j):
for p in self.current_paths[i:j]:
yield self.cs.get_node(p)
def __len__(self):
return len(self.current_paths)
def __iter__(self):
for p in self.current_paths:
yield self.cs.get_node(p)
class AddedFileNodesGenerator(NodeGeneratorBase):
"""
Class holding Added files for current changeset
"""
pass
class ChangedFileNodesGenerator(NodeGeneratorBase):
"""
Class holding Changed files for current changeset
"""
pass
class RemovedFileNodesGenerator(NodeGeneratorBase):
"""
Class holding removed files for current changeset
"""
def __iter__(self):
for p in self.current_paths:
yield RemovedFileNode(path=p)
def __getslice__(self, i, j):
for p in self.current_paths[i:j]:
yield RemovedFileNode(path=p)
class Node(object):
"""
Simplest class representing file or directory on repository. SCM backends
should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
directly.
Node's ``path`` cannot start with slash as we operate on *relative* paths
only. Moreover, every single node is identified by the ``path`` attribute,
so it cannot end with slash, too. Otherwise, path could lead to mistakes.
"""
def __init__(self, path, kind):
if path.startswith('/'):
raise NodeError("Cannot initialize Node objects with slash at "
"the beginning as only relative paths are supported")
self.path = path.rstrip('/')
if path == '' and kind != NodeKind.DIR:
raise NodeError("Only DirNode and its subclasses may be "
"initialized with empty path")
self.kind = kind
#self.dirs, self.files = [], []
if self.is_root() and not self.is_dir():
raise NodeError("Root node cannot be FILE kind")
@LazyProperty
def parent(self):
parent_path = self.get_parent_path()
if parent_path:
if self.changeset:
return self.changeset.get_node(parent_path)
return DirNode(parent_path)
return None
@LazyProperty
def unicode_path(self):
return safe_unicode(self.path)
@LazyProperty
def name(self):
"""
Returns name of the node so if its path
then only last part is returned.
"""
return safe_unicode(self.path.rstrip('/').split('/')[-1])
def _get_kind(self):
return self._kind
def _set_kind(self, kind):
if hasattr(self, '_kind'):
raise NodeError("Cannot change node's kind")
else:
self._kind = kind
# Post setter check (path's trailing slash)
if self.path.endswith('/'):
raise NodeError("Node's path cannot end with slash")
kind = property(_get_kind, _set_kind)
def __cmp__(self, other):
"""
Comparator using name of the node, needed for quick list sorting.
"""
kind_cmp = cmp(self.kind, other.kind)
if kind_cmp:
return kind_cmp
return cmp(self.name, other.name)
def __eq__(self, other):
for attr in ['name', 'path', 'kind']:
if getattr(self, attr) != getattr(other, attr):
return False
if self.is_file():
if self.content != other.content:
return False
else:
# For DirNode's check without entering each dir
self_nodes_paths = list(sorted(n.path for n in self.nodes))
other_nodes_paths = list(sorted(n.path for n in self.nodes))
if self_nodes_paths != other_nodes_paths:
return False
return True
def __nq__(self, other):
return not self.__eq__(other)
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.path)
def __str__(self):
return self.__repr__()
def __unicode__(self):
return self.name
def get_parent_path(self):
"""
Returns node's parent path or empty string if node is root.
"""
if self.is_root():
return ''
return posixpath.dirname(self.path.rstrip('/')) + '/'
def is_file(self):
"""
Returns ``True`` if node's kind is ``NodeKind.FILE``, ``False``
otherwise.
"""
return self.kind == NodeKind.FILE
def is_dir(self):
"""
Returns ``True`` if node's kind is ``NodeKind.DIR``, ``False``
otherwise.
"""
return self.kind == NodeKind.DIR
def is_root(self):
"""
Returns ``True`` if node is a root node and ``False`` otherwise.
"""
return self.kind == NodeKind.DIR and self.path == ''
def is_submodule(self):
"""
Returns ``True`` if node's kind is ``NodeKind.SUBMODULE``, ``False``
otherwise.
"""
return self.kind == NodeKind.SUBMODULE
@LazyProperty
def added(self):
return self.state is NodeState.ADDED
@LazyProperty
def changed(self):
return self.state is NodeState.CHANGED
@LazyProperty
def not_changed(self):
return self.state is NodeState.NOT_CHANGED
@LazyProperty
def removed(self):
return self.state is NodeState.REMOVED
class FileNode(Node):
"""
Class representing file nodes.
:attribute: path: path to the node, relative to repostiory's root
:attribute: content: if given arbitrary sets content of the file
:attribute: changeset: if given, first time content is accessed, callback
:attribute: mode: octal stat mode for a node. Default is 0100644.
"""
def __init__(self, path, content=None, changeset=None, mode=None):
"""
Only one of ``content`` and ``changeset`` may be given. Passing both
would raise ``NodeError`` exception.
:param path: relative path to the node
:param content: content may be passed to constructor
:param changeset: if given, will use it to lazily fetch content
:param mode: octal representation of ST_MODE (i.e. 0100644)
"""
if content and changeset:
raise NodeError("Cannot use both content and changeset")
super(FileNode, self).__init__(path, kind=NodeKind.FILE)
self.changeset = changeset
self._content = content
self._mode = mode or 0100644
@LazyProperty
def mode(self):
"""
Returns lazily mode of the FileNode. If ``changeset`` is not set, would
use value given at initialization or 0100644 (default).
"""
if self.changeset:
mode = self.changeset.get_file_mode(self.path)
else:
mode = self._mode
return mode
@property
def content(self):
"""
Returns lazily content of the FileNode. If possible, would try to
decode content from UTF-8.
"""
if self.changeset:
content = self.changeset.get_file_content(self.path)
else:
content = self._content
if bool(content and '\0' in content):
return content
return safe_unicode(content)
@LazyProperty
def size(self):
if self.changeset:
return self.changeset.get_file_size(self.path)
raise NodeError("Cannot retrieve size of the file without related "
"changeset attribute")
@LazyProperty
def message(self):
if self.changeset:
return self.last_changeset.message
raise NodeError("Cannot retrieve message of the file without related "
"changeset attribute")
@LazyProperty
def last_changeset(self):
if self.changeset:
return self.changeset.get_file_changeset(self.path)
raise NodeError("Cannot retrieve last changeset of the file without "
"related changeset attribute")
def get_mimetype(self):
"""
Mimetype is calculated based on the file's content. If ``_mimetype``
attribute is available, it will be returned (backends which store
mimetypes or can easily recognize them, should set this private
attribute to indicate that type should *NOT* be calculated).
"""
if hasattr(self, '_mimetype'):
if (isinstance(self._mimetype, (tuple, list,)) and
len(self._mimetype) == 2):
return self._mimetype
else:
raise NodeError('given _mimetype attribute must be an 2 '
'element list or tuple')
mtype, encoding = mimetypes.guess_type(self.name)
if mtype is None:
if self.is_binary:
mtype = 'application/octet-stream'
encoding = None
else:
mtype = 'text/plain'
encoding = None
return mtype, encoding
@LazyProperty
def mimetype(self):
"""
Wrapper around full mimetype info. It returns only type of fetched
mimetype without the encoding part. use get_mimetype function to fetch
full set of (type,encoding)
"""
return self.get_mimetype()[0]
@LazyProperty
def mimetype_main(self):
return self.mimetype.split('/')[0]
@LazyProperty
def lexer(self):
"""
Returns pygment's lexer class. Would try to guess lexer taking file's
content, name and mimetype.
"""
try:
lexer = lexers.guess_lexer_for_filename(self.name, self.content)
except lexers.ClassNotFound:
lexer = lexers.TextLexer()
# returns first alias
return lexer
@LazyProperty
def lexer_alias(self):
"""
Returns first alias of the lexer guessed for this file.
"""
return self.lexer.aliases[0]
@LazyProperty
def history(self):
"""
Returns a list of changeset for this file in which the file was changed
"""
if self.changeset is None:
raise NodeError('Unable to get changeset for this FileNode')
return self.changeset.get_file_history(self.path)
@LazyProperty
def annotate(self):
"""
Returns a list of three element tuples with lineno,changeset and line
"""
if self.changeset is None:
raise NodeError('Unable to get changeset for this FileNode')
return self.changeset.get_file_annotate(self.path)
@LazyProperty
def state(self):
if not self.changeset:
raise NodeError("Cannot check state of the node if it's not "
"linked with changeset")
elif self.path in (node.path for node in self.changeset.added):
return NodeState.ADDED
elif self.path in (node.path for node in self.changeset.changed):
return NodeState.CHANGED
else:
return NodeState.NOT_CHANGED
@property
def is_binary(self):
"""
Returns True if file has binary content.
"""
_bin = '\0' in self.content
return _bin
@LazyProperty
def extension(self):
"""Returns filenode extension"""
return self.name.split('.')[-1]
def is_executable(self):
"""
Returns ``True`` if file has executable flag turned on.
"""
return bool(self.mode & stat.S_IXUSR)
def __repr__(self):
return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
getattr(self.changeset, 'short_id', ''))
class RemovedFileNode(FileNode):
"""
Dummy FileNode class - trying to access any public attribute except path,
name, kind or state (or methods/attributes checking those two) would raise
RemovedFileNodeError.
"""
ALLOWED_ATTRIBUTES = [
'name', 'path', 'state', 'is_root', 'is_file', 'is_dir', 'kind',
'added', 'changed', 'not_changed', 'removed'
]
def __init__(self, path):
"""
:param path: relative path to the node
"""
super(RemovedFileNode, self).__init__(path=path)
def __getattribute__(self, attr):
if attr.startswith('_') or attr in RemovedFileNode.ALLOWED_ATTRIBUTES:
return super(RemovedFileNode, self).__getattribute__(attr)
raise RemovedFileNodeError("Cannot access attribute %s on "
"RemovedFileNode" % attr)
@LazyProperty
def state(self):
return NodeState.REMOVED
class DirNode(Node):
"""
DirNode stores list of files and directories within this node.
Nodes may be used standalone but within repository context they
lazily fetch data within same repositorty's changeset.
"""
def __init__(self, path, nodes=(), changeset=None):
"""
Only one of ``nodes`` and ``changeset`` may be given. Passing both
would raise ``NodeError`` exception.
:param path: relative path to the node
:param nodes: content may be passed to constructor
:param changeset: if given, will use it to lazily fetch content
:param size: always 0 for ``DirNode``
"""
if nodes and changeset:
raise NodeError("Cannot use both nodes and changeset")
super(DirNode, self).__init__(path, NodeKind.DIR)
self.changeset = changeset
self._nodes = nodes
@LazyProperty
def content(self):
raise NodeError("%s represents a dir and has no ``content`` attribute"
% self)
@LazyProperty
def nodes(self):
if self.changeset:
nodes = self.changeset.get_nodes(self.path)
else:
nodes = self._nodes
self._nodes_dict = dict((node.path, node) for node in nodes)
return sorted(nodes)
@LazyProperty
def files(self):
return sorted((node for node in self.nodes if node.is_file()))
@LazyProperty
def dirs(self):
return sorted((node for node in self.nodes if node.is_dir()))
def __iter__(self):
for node in self.nodes:
yield node
def get_node(self, path):
"""
Returns node from within this particular ``DirNode``, so it is now
allowed to fetch, i.e. node located at 'docs/api/index.rst' from node
'docs'. In order to access deeper nodes one must fetch nodes between
them first - this would work::
docs = root.get_node('docs')
docs.get_node('api').get_node('index.rst')
:param: path - relative to the current node
.. note::
To access lazily (as in example above) node have to be initialized
with related changeset object - without it node is out of
context and may know nothing about anything else than nearest
(located at same level) nodes.
"""
try:
path = path.rstrip('/')
if path == '':
raise NodeError("Cannot retrieve node without path")
self.nodes # access nodes first in order to set _nodes_dict
paths = path.split('/')
if len(paths) == 1:
if not self.is_root():
path = '/'.join((self.path, paths[0]))
else:
path = paths[0]
return self._nodes_dict[path]
elif len(paths) > 1:
if self.changeset is None:
raise NodeError("Cannot access deeper "
"nodes without changeset")
else:
path1, path2 = paths[0], '/'.join(paths[1:])
return self.get_node(path1).get_node(path2)
else:
raise KeyError
except KeyError:
raise NodeError("Node does not exist at %s" % path)
@LazyProperty
def state(self):
raise NodeError("Cannot access state of DirNode")
@LazyProperty
def size(self):
size = 0
for root, dirs, files in self.changeset.walk(self.path):
for f in files:
size += f.size
return size
def __repr__(self):
return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
getattr(self.changeset, 'short_id', ''))
class RootNode(DirNode):
"""
DirNode being the root node of the repository.
"""
def __init__(self, nodes=(), changeset=None):
super(RootNode, self).__init__(path='', nodes=nodes,
changeset=changeset)
def __repr__(self):
return '<%s>' % self.__class__.__name__
class SubModuleNode(Node):
"""
represents a SubModule of Git or SubRepo of Mercurial
"""
is_binary = False
size = 0
def __init__(self, name, url=None, changeset=None, alias=None):
self.path = name
self.kind = NodeKind.SUBMODULE
self.alias = alias
# we have to use emptyChangeset here since this can point to svn/git/hg
# submodules we cannot get from repository
self.changeset = EmptyChangeset(str(changeset), alias=alias)
self.url = url or self._extract_submodule_url()
def __repr__(self):
return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
getattr(self.changeset, 'short_id', ''))
def _extract_submodule_url(self):
if self.alias == 'git':
#TODO: find a way to parse gits submodule file and extract the
# linking URL
return self.path
if self.alias == 'hg':
return self.path
@LazyProperty
def name(self):
"""
Returns name of the node so if its path
then only last part is returned.
"""
org = safe_unicode(self.path.rstrip('/').split('/')[-1])
return u'%s @ %s' % (org, self.changeset.short_id)