import logging from datetime import datetime, timedelta, date from datetime import time as dtime from django.core.exceptions import PermissionDenied from django.db import models, transaction from django.dispatch import Signal from django.utils import timezone import boards from boards import utils from boards.abstracts.exceptions import ArchiveException from boards.abstracts.constants import REGEX_TAGS from boards.mdx_neboard import Parser from boards.models import Attachment from boards.models.attachment import StickerPack, AttachmentSticker from boards.models.user import Ban __author__ = 'neko259' POSTS_PER_DAY_RANGE = 7 NO_IP = '0.0.0.0' post_import_deps = Signal() class PostManager(models.Manager): @transaction.atomic def create_post(self, title: str, text: str, files=[], thread=None, ip=NO_IP, tags: list=None, tripcode='', monochrome=False, images=[], file_urls=[], stickerpack=False): """ Creates new post """ if thread is not None and thread.is_archived(): raise ArchiveException('Cannot post into an archived thread') if not utils.is_anonymous_mode(): is_banned = Ban.objects.filter(ip=ip).exists() else: is_banned = False if is_banned: raise PermissionDenied() if not tags: tags = [] posting_time = timezone.now() new_thread = False if not thread: thread = boards.models.thread.Thread.objects.create( bump_time=posting_time, last_edit_time=posting_time, monochrome=monochrome, stickerpack=stickerpack) list(map(thread.tags.add, tags)) new_thread = True pre_text = Parser().preparse(text) post = self.create(title=title, text=pre_text, pub_time=posting_time, poster_ip=ip, thread=thread, last_edit_time=posting_time, tripcode=tripcode, opening=new_thread) logger = logging.getLogger('boards.post.create') logger.info('Created post [{}] with text [{}] by {}'.format(post, post.get_text(),post.poster_ip)) for file in files: self._add_file_to_post(file, post) for image in images: post.attachments.add(image) for file_url in file_urls: post.attachments.add(Attachment.objects.create_from_url(file_url)) post.set_global_id() # Thread needs to be bumped only when the post is already created if not new_thread: thread.last_edit_time = posting_time thread.bump() thread.save() self._create_stickers(post) return post def delete_posts_by_ip(self, ip): """ Deletes all posts of the author with same IP """ posts = self.filter(poster_ip=ip) for post in posts: post.delete() @utils.cached_result() def get_posts_per_day(self) -> float: """ Gets average count of posts per day for the last 7 days """ day_end = date.today() day_start = day_end - timedelta(POSTS_PER_DAY_RANGE) day_time_start = timezone.make_aware(datetime.combine( day_start, dtime()), timezone.get_current_timezone()) day_time_end = timezone.make_aware(datetime.combine( day_end, dtime()), timezone.get_current_timezone()) posts_per_period = float(self.filter( pub_time__lte=day_time_end, pub_time__gte=day_time_start).count()) ppd = posts_per_period / POSTS_PER_DAY_RANGE return ppd def get_post_per_days(self, days) -> int: day_end = date.today() + timedelta(1) day_start = day_end - timedelta(days) day_time_start = timezone.make_aware(datetime.combine( day_start, dtime()), timezone.get_current_timezone()) day_time_end = timezone.make_aware(datetime.combine( day_end, dtime()), timezone.get_current_timezone()) return self.filter( pub_time__lte=day_time_end, pub_time__gte=day_time_start).count() @transaction.atomic def import_post(self, title: str, text: str, pub_time: str, global_id, opening_post=None, tags=list(), files=list(), file_urls=list(), tripcode=None, last_edit_time=None): is_opening = opening_post is None if is_opening: thread = boards.models.thread.Thread.objects.create( bump_time=pub_time, last_edit_time=pub_time) list(map(thread.tags.add, tags)) else: thread = opening_post.get_thread() post = self.create(title=title, text=text, pub_time=pub_time, poster_ip=NO_IP, last_edit_time=last_edit_time or pub_time, global_id=global_id, opening=is_opening, thread=thread, tripcode=tripcode) for file in files: self._add_file_to_post(file, post) for file_url in file_urls: post.attachments.add(Attachment.objects.create_from_url(file_url)) url_to_post = '[post]{}[/post]'.format(str(global_id)) replies = self.filter(text__contains=url_to_post) for reply in replies: post_import_deps.send(reply) @transaction.atomic def update_post(self, post, title: str, text: str, pub_time: str, tags=list(), files=list(), file_urls=list(), tripcode=None): post.title = title post.text = text post.pub_time = pub_time post.tripcode = tripcode post.save() post.clear_cache() post.attachments.clear() for file in files: self._add_file_to_post(file, post) for file_url in file_urls: post.attachments.add(Attachment.objects.create_from_url(file_url)) thread = post.get_thread() thread.tags.clear() list(map(thread.tags.add, tags)) def _add_file_to_post(self, file, post): post.attachments.add(Attachment.objects.create_with_hash(file)) def _create_stickers(self, post): thread = post.get_thread() stickerpack_thread = thread.is_stickerpack() if stickerpack_thread: logger = logging.getLogger('boards.stickers') if not post.is_opening(): has_title = len(post.title) > 0 has_one_attachment = post.attachments.count() == 1 opening_post = thread.get_opening_post() valid_name = REGEX_TAGS.match(post.title) if has_title and has_one_attachment and valid_name: existing_sticker = AttachmentSticker.objects.filter( name=post.get_title()).first() attachment = post.attachments.first() if existing_sticker: existing_sticker.attachment = attachment existing_sticker.save() logger.info('Updated sticker {} with new attachment'.format(existing_sticker)) else: try: stickerpack = StickerPack.objects.get( name=opening_post.get_title(), tripcode=post.tripcode) sticker = AttachmentSticker.objects.create( stickerpack=stickerpack, name=post.get_title(), attachment=attachment) logger.info('Created sticker {}'.format(sticker)) except StickerPack.DoesNotExist: pass else: stickerpack, created = StickerPack.objects.get_or_create( name=post.get_title(), tripcode=post.tripcode) if created: logger.info('Created stickerpack {}'.format(stickerpack))