manager.py
228 lines
| 8.1 KiB
| text/x-python
|
PythonLexer
neko259
|
r1359 | import logging | ||
from datetime import datetime, timedelta, date | ||||
from datetime import time as dtime | ||||
neko259
|
r1951 | from django.core.exceptions import PermissionDenied | ||
neko259
|
r1359 | from django.db import models, transaction | ||
neko259
|
r1951 | from django.dispatch import Signal | ||
neko259
|
r1359 | from django.utils import timezone | ||
import boards | ||||
neko259
|
r1951 | from boards import utils | ||
from boards.abstracts.exceptions import ArchiveException | ||||
from boards.abstracts.constants import REGEX_TAGS | ||||
neko259
|
r1359 | from boards.mdx_neboard import Parser | ||
neko259
|
r1590 | from boards.models import Attachment | ||
neko259
|
r1951 | from boards.models.attachment import StickerPack, AttachmentSticker | ||
from boards.models.user import Ban | ||||
neko259
|
r1359 | |||
__author__ = 'neko259' | ||||
POSTS_PER_DAY_RANGE = 7 | ||||
NO_IP = '0.0.0.0' | ||||
neko259
|
r1588 | post_import_deps = Signal() | ||
neko259
|
r1359 | class PostManager(models.Manager): | ||
@transaction.atomic | ||||
neko259
|
r1753 | def create_post(self, title: str, text: str, files=[], thread=None, | ||
neko259
|
r1704 | ip=NO_IP, tags: list=None, | ||
neko259
|
r1660 | tripcode='', monochrome=False, images=[], | ||
neko259
|
r1951 | file_urls=[], stickerpack=False): | ||
neko259
|
r1359 | """ | ||
Creates new post | ||||
""" | ||||
neko259
|
r1449 | if thread is not None and thread.is_archived(): | ||
neko259
|
r1602 | raise ArchiveException('Cannot post into an archived thread') | ||
neko259
|
r1449 | |||
neko259
|
r1362 | if not utils.is_anonymous_mode(): | ||
is_banned = Ban.objects.filter(ip=ip).exists() | ||||
neko259
|
r1434 | else: | ||
is_banned = False | ||||
neko259
|
r1359 | |||
if is_banned: | ||||
neko259
|
r1902 | raise PermissionDenied() | ||
neko259
|
r1359 | |||
if not tags: | ||||
tags = [] | ||||
posting_time = timezone.now() | ||||
new_thread = False | ||||
if not thread: | ||||
thread = boards.models.thread.Thread.objects.create( | ||||
neko259
|
r1434 | bump_time=posting_time, last_edit_time=posting_time, | ||
neko259
|
r1951 | monochrome=monochrome, stickerpack=stickerpack) | ||
neko259
|
r1359 | list(map(thread.tags.add, tags)) | ||
new_thread = True | ||||
pre_text = Parser().preparse(text) | ||||
post = self.create(title=title, | ||||
text=pre_text, | ||||
pub_time=posting_time, | ||||
poster_ip=ip, | ||||
thread=thread, | ||||
last_edit_time=posting_time, | ||||
tripcode=tripcode, | ||||
opening=new_thread) | ||||
logger = logging.getLogger('boards.post.create') | ||||
neko259
|
r1380 | logger.info('Created post [{}] with text [{}] by {}'.format(post, | ||
neko259
|
r1368 | post.get_text(),post.poster_ip)) | ||
neko259
|
r1359 | |||
neko259
|
r1753 | for file in files: | ||
neko259
|
r1511 | self._add_file_to_post(file, post) | ||
neko259
|
r1500 | for image in images: | ||
neko259
|
r1604 | post.attachments.add(image) | ||
neko259
|
r1753 | for file_url in file_urls: | ||
neko259
|
r1660 | post.attachments.add(Attachment.objects.create_from_url(file_url)) | ||
neko259
|
r1359 | |||
neko259
|
r1320 | post.set_global_id() | ||
neko259
|
r1359 | |||
# Thread needs to be bumped only when the post is already created | ||||
if not new_thread: | ||||
thread.last_edit_time = posting_time | ||||
thread.bump() | ||||
thread.save() | ||||
neko259
|
r1951 | self._create_stickers(post) | ||
neko259
|
r1359 | return post | ||
def delete_posts_by_ip(self, ip): | ||||
""" | ||||
Deletes all posts of the author with same IP | ||||
""" | ||||
posts = self.filter(poster_ip=ip) | ||||
for post in posts: | ||||
post.delete() | ||||
@utils.cached_result() | ||||
def get_posts_per_day(self) -> float: | ||||
""" | ||||
Gets average count of posts per day for the last 7 days | ||||
""" | ||||
day_end = date.today() | ||||
day_start = day_end - timedelta(POSTS_PER_DAY_RANGE) | ||||
day_time_start = timezone.make_aware(datetime.combine( | ||||
day_start, dtime()), timezone.get_current_timezone()) | ||||
day_time_end = timezone.make_aware(datetime.combine( | ||||
day_end, dtime()), timezone.get_current_timezone()) | ||||
posts_per_period = float(self.filter( | ||||
pub_time__lte=day_time_end, | ||||
pub_time__gte=day_time_start).count()) | ||||
ppd = posts_per_period / POSTS_PER_DAY_RANGE | ||||
return ppd | ||||
neko259
|
r1618 | def get_post_per_days(self, days) -> int: | ||
day_end = date.today() + timedelta(1) | ||||
day_start = day_end - timedelta(days) | ||||
day_time_start = timezone.make_aware(datetime.combine( | ||||
day_start, dtime()), timezone.get_current_timezone()) | ||||
day_time_end = timezone.make_aware(datetime.combine( | ||||
day_end, dtime()), timezone.get_current_timezone()) | ||||
return self.filter( | ||||
pub_time__lte=day_time_end, | ||||
pub_time__gte=day_time_start).count() | ||||
neko259
|
r1320 | @transaction.atomic | ||
def import_post(self, title: str, text: str, pub_time: str, global_id, | ||||
neko259
|
r1557 | opening_post=None, tags=list(), files=list(), | ||
neko259
|
r1928 | file_urls=list(), tripcode=None, last_edit_time=None): | ||
neko259
|
r1360 | is_opening = opening_post is None | ||
if is_opening: | ||||
neko259
|
r1320 | thread = boards.models.thread.Thread.objects.create( | ||
bump_time=pub_time, last_edit_time=pub_time) | ||||
list(map(thread.tags.add, tags)) | ||||
else: | ||||
thread = opening_post.get_thread() | ||||
neko259
|
r1586 | post = self.create(title=title, | ||
text=text, | ||||
neko259
|
r1320 | pub_time=pub_time, | ||
poster_ip=NO_IP, | ||||
neko259
|
r1928 | last_edit_time=last_edit_time or pub_time, | ||
neko259
|
r1360 | global_id=global_id, | ||
neko259
|
r1386 | opening=is_opening, | ||
neko259
|
r1586 | thread=thread, | ||
neko259
|
r1928 | tripcode=tripcode) | ||
neko259
|
r1320 | |||
neko259
|
r1511 | for file in files: | ||
self._add_file_to_post(file, post) | ||||
neko259
|
r1800 | for file_url in file_urls: | ||
post.attachments.add(Attachment.objects.create_from_url(file_url)) | ||||
neko259
|
r1511 | |||
neko259
|
r1588 | url_to_post = '[post]{}[/post]'.format(str(global_id)) | ||
replies = self.filter(text__contains=url_to_post) | ||||
for reply in replies: | ||||
neko259
|
r1613 | post_import_deps.send(reply) | ||
neko259
|
r1588 | |||
neko259
|
r1586 | @transaction.atomic | ||
def update_post(self, post, title: str, text: str, pub_time: str, | ||||
neko259
|
r1928 | tags=list(), files=list(), file_urls=list(), tripcode=None): | ||
neko259
|
r1586 | post.title = title | ||
post.text = text | ||||
post.pub_time = pub_time | ||||
post.tripcode = tripcode | ||||
post.save() | ||||
post.clear_cache() | ||||
post.attachments.clear() | ||||
for file in files: | ||||
self._add_file_to_post(file, post) | ||||
neko259
|
r1800 | for file_url in file_urls: | ||
post.attachments.add(Attachment.objects.create_from_url(file_url)) | ||||
neko259
|
r1586 | |||
thread = post.get_thread() | ||||
thread.tags.clear() | ||||
list(map(thread.tags.add, tags)) | ||||
neko259
|
r1511 | def _add_file_to_post(self, file, post): | ||
neko259
|
r1590 | post.attachments.add(Attachment.objects.create_with_hash(file)) | ||
neko259
|
r1951 | |||
def _create_stickers(self, post): | ||||
thread = post.get_thread() | ||||
stickerpack_thread = thread.is_stickerpack() | ||||
if stickerpack_thread: | ||||
logger = logging.getLogger('boards.stickers') | ||||
if not post.is_opening(): | ||||
has_title = len(post.title) > 0 | ||||
has_one_attachment = post.attachments.count() == 1 | ||||
opening_post = thread.get_opening_post() | ||||
valid_name = REGEX_TAGS.match(post.title) | ||||
if has_title and has_one_attachment and valid_name: | ||||
existing_sticker = AttachmentSticker.objects.filter( | ||||
name=post.get_title()).first() | ||||
attachment = post.attachments.first() | ||||
if existing_sticker: | ||||
existing_sticker.attachment = attachment | ||||
existing_sticker.save() | ||||
logger.info('Updated sticker {} with new attachment'.format(existing_sticker)) | ||||
else: | ||||
try: | ||||
stickerpack = StickerPack.objects.get( | ||||
name=opening_post.get_title(), tripcode=post.tripcode) | ||||
sticker = AttachmentSticker.objects.create( | ||||
stickerpack=stickerpack, name=post.get_title(), | ||||
attachment=attachment) | ||||
logger.info('Created sticker {}'.format(sticker)) | ||||
except StickerPack.DoesNotExist: | ||||
pass | ||||
else: | ||||
stickerpack, created = StickerPack.objects.get_or_create( | ||||
name=post.get_title(), tripcode=post.tripcode) | ||||
if created: | ||||
logger.info('Created stickerpack {}'.format(stickerpack)) | ||||