##// END OF EJS Templates
Speed up tag local cache
Speed up tag local cache

File last commit:

r2021:b8cdd5b1 default
r2021:b8cdd5b1 default
Show More
tag.py
212 lines | 6.4 KiB | text/x-python | PythonLexer
import hashlib
from django.urls import reverse
from django.db import models
from django.db.models import Count, Q
from django.utils.translation import get_language
import boards
from boards.models import Attachment
from boards.models.attachment import FILE_TYPES_IMAGE
from boards.models.base import Viewable
from boards.models.thread import STATUS_ACTIVE, STATUS_BUMPLIMIT, STATUS_ARCHIVE
from boards.utils import cached_result
__author__ = 'neko259'
RELATED_TAGS_COUNT = 5
DEFAULT_LOCALE = 'default'
class TagAliasManager(models.Manager):
def filter_localized(self, *args, **kwargs):
locale = get_language()
tag_aliases = (self.filter(locale=locale)
| self.filter(Q(locale=DEFAULT_LOCALE)
& ~Q(parent__aliases__locale=locale)))
return tag_aliases.filter(**kwargs)
class TagAlias(models.Model, Viewable):
class Meta:
app_label = 'boards'
ordering = ('name',)
objects = TagAliasManager()
name = models.CharField(max_length=100, db_index=True)
locale = models.CharField(max_length=10, db_index=True)
parent = models.ForeignKey('Tag', on_delete=models.CASCADE, null=True,
blank=True, related_name='aliases')
class TagManager(models.Manager):
def get_not_empty_tags(self):
"""
Gets tags that have non-archived threads.
"""
return self.annotate(num_threads=Count('thread_tags'))\
.filter(num_threads__gt=0)\
.filter(aliases__in=TagAlias.objects.filter_localized())\
.order_by('aliases__name')
def get_tag_url_list(self, tags: list) -> str:
"""
Gets a comma-separated list of tag links.
"""
return ', '.join([tag.get_view() for tag in tags])
def get_by_alias(self, alias):
tag = None
aliases = TagAlias.objects.filter(name=alias).all()
if aliases:
tag = aliases[0].parent
return tag
def get_or_create_with_alias(self, name, required=False):
tag = self.get_by_alias(name)
created = False
if not tag:
tag = self.create(required=required)
TagAlias.objects.create(name=name, locale=DEFAULT_LOCALE, parent=tag)
created = True
return tag, created
class Tag(models.Model, Viewable):
"""
A tag is a text node assigned to the thread. The tag serves as a board
section. There can be multiple tags for each thread
"""
objects = TagManager()
class Meta:
app_label = 'boards'
required = models.BooleanField(default=False, db_index=True)
description = models.TextField(blank=True)
parent = models.ForeignKey('Tag', on_delete=models.CASCADE, null=True,
blank=True, related_name='children')
def get_name(self):
return self.aliases.get(locale=DEFAULT_LOCALE).name
def __str__(self):
return self.get_name()
def is_empty(self) -> bool:
"""
Checks if the tag has some threads.
"""
return self.get_thread_count() == 0
def get_thread_count(self, status=None) -> int:
threads = self.get_threads()
if status is not None:
threads = threads.filter(status=status)
return threads.count()
def get_active_thread_count(self) -> int:
return self.get_thread_count(status=STATUS_ACTIVE)
def get_bumplimit_thread_count(self) -> int:
return self.get_thread_count(status=STATUS_BUMPLIMIT)
def get_archived_thread_count(self) -> int:
return self.get_thread_count(status=STATUS_ARCHIVE)
@cached_result()
def get_absolute_url(self):
return reverse('tag', kwargs={'tag_name': self.get_name()})
def get_threads(self):
return self.thread_tags.order_by('-bump_time')
def is_required(self):
return self.required
def _get_locale_cache_key(self):
return [get_language()]
@cached_result(key_method=_get_locale_cache_key)
def get_localized_name(self):
locale = get_language()
aliases = self.aliases.filter(Q(locale=locale) | Q(locale=DEFAULT_LOCALE))
localized_tag_name = None
default_tag_name = None
for alias in aliases:
if alias.locale == locale:
localized_tag_name = alias.name
elif alias.locale == DEFAULT_LOCALE:
default_tag_name = alias.name
return localized_tag_name if localized_tag_name else default_tag_name
def get_view(self):
name = self.get_localized_name()
link = '<a class="tag" href="{}">{}</a>'.format(
self.get_absolute_url(), name)
if self.is_required():
link = '<b>{}</b>'.format(link)
return link
@cached_result()
def get_post_count(self):
return self.get_threads().aggregate(num_posts=Count('replies'))['num_posts']
def get_description(self):
return self.description
def get_random_image_post(self, status=[STATUS_ACTIVE, STATUS_BUMPLIMIT]):
posts = boards.models.Post.objects.filter(attachments__mimetype__in=FILE_TYPES_IMAGE)\
.annotate(images_count=Count(
'attachments')).filter(images_count__gt=0, thread__tags__in=[self])
if status is not None:
posts = posts.filter(thread__status__in=status)
return posts.order_by('?').first()
def get_first_letter(self):
name = self.get_localized_name()
return name and name[0] or ''
def get_related_tags(self):
return set(Tag.objects.filter(thread_tags__in=self.get_threads()).exclude(
id=self.id).order_by('?')[:RELATED_TAGS_COUNT])
@cached_result()
def get_color(self):
"""
Gets color hashed from the tag name.
"""
return hashlib.md5(self.get_name().encode()).hexdigest()[:6]
def get_parent(self):
return self.parent
def get_all_parents(self):
parents = list()
parent = self.get_parent()
if parent and parent not in parents:
parents.insert(0, parent)
parents = parent.get_all_parents() + parents
return parents
def get_children(self):
return self.children
def get_images(self):
return Attachment.objects.filter(
attachment_posts__thread__tags__in=[self]).filter(
mimetype__in=FILE_TYPES_IMAGE).order_by('-attachment_posts__pub_time')