##// END OF EJS Templates
Added domain image for nv
Added domain image for nv

File last commit:

r1951:1024ce38 default
r1972:7389c05f default
Show More
manager.py
228 lines | 8.1 KiB | text/x-python | PythonLexer
import logging
from datetime import datetime, timedelta, date
from datetime import time as dtime
from django.core.exceptions import PermissionDenied
from django.db import models, transaction
from django.dispatch import Signal
from django.utils import timezone
import boards
from boards import utils
from boards.abstracts.exceptions import ArchiveException
from boards.abstracts.constants import REGEX_TAGS
from boards.mdx_neboard import Parser
from boards.models import Attachment
from boards.models.attachment import StickerPack, AttachmentSticker
from boards.models.user import Ban
__author__ = 'neko259'
POSTS_PER_DAY_RANGE = 7
NO_IP = '0.0.0.0'
post_import_deps = Signal()
class PostManager(models.Manager):
@transaction.atomic
def create_post(self, title: str, text: str, files=[], thread=None,
ip=NO_IP, tags: list=None,
tripcode='', monochrome=False, images=[],
file_urls=[], stickerpack=False):
"""
Creates new post
"""
if thread is not None and thread.is_archived():
raise ArchiveException('Cannot post into an archived thread')
if not utils.is_anonymous_mode():
is_banned = Ban.objects.filter(ip=ip).exists()
else:
is_banned = False
if is_banned:
raise PermissionDenied()
if not tags:
tags = []
posting_time = timezone.now()
new_thread = False
if not thread:
thread = boards.models.thread.Thread.objects.create(
bump_time=posting_time, last_edit_time=posting_time,
monochrome=monochrome, stickerpack=stickerpack)
list(map(thread.tags.add, tags))
new_thread = True
pre_text = Parser().preparse(text)
post = self.create(title=title,
text=pre_text,
pub_time=posting_time,
poster_ip=ip,
thread=thread,
last_edit_time=posting_time,
tripcode=tripcode,
opening=new_thread)
logger = logging.getLogger('boards.post.create')
logger.info('Created post [{}] with text [{}] by {}'.format(post,
post.get_text(),post.poster_ip))
for file in files:
self._add_file_to_post(file, post)
for image in images:
post.attachments.add(image)
for file_url in file_urls:
post.attachments.add(Attachment.objects.create_from_url(file_url))
post.set_global_id()
# Thread needs to be bumped only when the post is already created
if not new_thread:
thread.last_edit_time = posting_time
thread.bump()
thread.save()
self._create_stickers(post)
return post
def delete_posts_by_ip(self, ip):
"""
Deletes all posts of the author with same IP
"""
posts = self.filter(poster_ip=ip)
for post in posts:
post.delete()
@utils.cached_result()
def get_posts_per_day(self) -> float:
"""
Gets average count of posts per day for the last 7 days
"""
day_end = date.today()
day_start = day_end - timedelta(POSTS_PER_DAY_RANGE)
day_time_start = timezone.make_aware(datetime.combine(
day_start, dtime()), timezone.get_current_timezone())
day_time_end = timezone.make_aware(datetime.combine(
day_end, dtime()), timezone.get_current_timezone())
posts_per_period = float(self.filter(
pub_time__lte=day_time_end,
pub_time__gte=day_time_start).count())
ppd = posts_per_period / POSTS_PER_DAY_RANGE
return ppd
def get_post_per_days(self, days) -> int:
day_end = date.today() + timedelta(1)
day_start = day_end - timedelta(days)
day_time_start = timezone.make_aware(datetime.combine(
day_start, dtime()), timezone.get_current_timezone())
day_time_end = timezone.make_aware(datetime.combine(
day_end, dtime()), timezone.get_current_timezone())
return self.filter(
pub_time__lte=day_time_end,
pub_time__gte=day_time_start).count()
@transaction.atomic
def import_post(self, title: str, text: str, pub_time: str, global_id,
opening_post=None, tags=list(), files=list(),
file_urls=list(), tripcode=None, last_edit_time=None):
is_opening = opening_post is None
if is_opening:
thread = boards.models.thread.Thread.objects.create(
bump_time=pub_time, last_edit_time=pub_time)
list(map(thread.tags.add, tags))
else:
thread = opening_post.get_thread()
post = self.create(title=title,
text=text,
pub_time=pub_time,
poster_ip=NO_IP,
last_edit_time=last_edit_time or pub_time,
global_id=global_id,
opening=is_opening,
thread=thread,
tripcode=tripcode)
for file in files:
self._add_file_to_post(file, post)
for file_url in file_urls:
post.attachments.add(Attachment.objects.create_from_url(file_url))
url_to_post = '[post]{}[/post]'.format(str(global_id))
replies = self.filter(text__contains=url_to_post)
for reply in replies:
post_import_deps.send(reply)
@transaction.atomic
def update_post(self, post, title: str, text: str, pub_time: str,
tags=list(), files=list(), file_urls=list(), tripcode=None):
post.title = title
post.text = text
post.pub_time = pub_time
post.tripcode = tripcode
post.save()
post.clear_cache()
post.attachments.clear()
for file in files:
self._add_file_to_post(file, post)
for file_url in file_urls:
post.attachments.add(Attachment.objects.create_from_url(file_url))
thread = post.get_thread()
thread.tags.clear()
list(map(thread.tags.add, tags))
def _add_file_to_post(self, file, post):
post.attachments.add(Attachment.objects.create_with_hash(file))
def _create_stickers(self, post):
thread = post.get_thread()
stickerpack_thread = thread.is_stickerpack()
if stickerpack_thread:
logger = logging.getLogger('boards.stickers')
if not post.is_opening():
has_title = len(post.title) > 0
has_one_attachment = post.attachments.count() == 1
opening_post = thread.get_opening_post()
valid_name = REGEX_TAGS.match(post.title)
if has_title and has_one_attachment and valid_name:
existing_sticker = AttachmentSticker.objects.filter(
name=post.get_title()).first()
attachment = post.attachments.first()
if existing_sticker:
existing_sticker.attachment = attachment
existing_sticker.save()
logger.info('Updated sticker {} with new attachment'.format(existing_sticker))
else:
try:
stickerpack = StickerPack.objects.get(
name=opening_post.get_title(), tripcode=post.tripcode)
sticker = AttachmentSticker.objects.create(
stickerpack=stickerpack, name=post.get_title(),
attachment=attachment)
logger.info('Created sticker {}'.format(sticker))
except StickerPack.DoesNotExist:
pass
else:
stickerpack, created = StickerPack.objects.get_or_create(
name=post.get_title(), tripcode=post.tripcode)
if created:
logger.info('Created stickerpack {}'.format(stickerpack))