|
|
import logging
|
|
|
import re
|
|
|
from datetime import datetime, timedelta, date
|
|
|
from datetime import time as dtime
|
|
|
|
|
|
from django.core.exceptions import PermissionDenied
|
|
|
from django.db import models, transaction
|
|
|
from django.dispatch import Signal
|
|
|
from django.shortcuts import redirect
|
|
|
from django.utils import timezone
|
|
|
|
|
|
import boards
|
|
|
from boards import utils
|
|
|
from boards.abstracts.exceptions import ArchiveException
|
|
|
from boards.abstracts.constants import REGEX_TAGS, REGEX_REPLY
|
|
|
from boards.mdx_neboard import Parser
|
|
|
from boards.models import Attachment
|
|
|
from boards.models.attachment import StickerPack, AttachmentSticker
|
|
|
from boards.models.user import Ban
|
|
|
|
|
|
__author__ = 'neko259'
|
|
|
|
|
|
POSTS_PER_DAY_RANGE = 7
|
|
|
NO_IP = '0.0.0.0'
|
|
|
|
|
|
|
|
|
post_import_deps = Signal()
|
|
|
|
|
|
FORM_TEXT = 'text'
|
|
|
FORM_TAGS = 'tags'
|
|
|
|
|
|
REFLINK_PREFIX = '>>'
|
|
|
|
|
|
|
|
|
class PostManager(models.Manager):
|
|
|
@transaction.atomic
|
|
|
def create_post(self, title: str, text: str, files=[], thread=None,
|
|
|
ip=NO_IP, tags: list=None,
|
|
|
tripcode='', monochrome=False, images=[],
|
|
|
file_urls=[], stickerpack=False):
|
|
|
"""
|
|
|
Creates new post
|
|
|
"""
|
|
|
|
|
|
if thread is not None and thread.is_archived():
|
|
|
raise ArchiveException('Cannot post into an archived thread')
|
|
|
|
|
|
if not utils.is_anonymous_mode():
|
|
|
is_banned = Ban.objects.filter(ip=ip).exists()
|
|
|
else:
|
|
|
is_banned = False
|
|
|
|
|
|
if is_banned:
|
|
|
raise PermissionDenied()
|
|
|
|
|
|
if not tags:
|
|
|
tags = []
|
|
|
|
|
|
posting_time = timezone.now()
|
|
|
new_thread = False
|
|
|
if not thread:
|
|
|
thread = boards.models.thread.Thread.objects.create(
|
|
|
bump_time=posting_time, last_edit_time=posting_time,
|
|
|
monochrome=monochrome, stickerpack=stickerpack)
|
|
|
list(map(thread.tags.add, tags))
|
|
|
new_thread = True
|
|
|
|
|
|
pre_text = Parser().preparse(text)
|
|
|
|
|
|
post = self.create(title=title,
|
|
|
text=pre_text,
|
|
|
pub_time=posting_time,
|
|
|
poster_ip=ip,
|
|
|
thread=thread,
|
|
|
last_edit_time=posting_time,
|
|
|
tripcode=tripcode,
|
|
|
opening=new_thread)
|
|
|
|
|
|
logger = logging.getLogger('boards.post.create')
|
|
|
|
|
|
logger.info('Created post [{}] with text [{}] by {}'.format(post,
|
|
|
post.get_raw_text(), post.poster_ip))
|
|
|
|
|
|
for file in files:
|
|
|
self._add_file_to_post(file, post)
|
|
|
for image in images:
|
|
|
post.attachments.add(image)
|
|
|
for file_url in file_urls:
|
|
|
post.attachments.add(Attachment.objects.create_from_url(file_url))
|
|
|
|
|
|
post.set_global_id()
|
|
|
|
|
|
# Thread needs to be bumped only when the post is already created
|
|
|
if not new_thread:
|
|
|
thread.last_edit_time = posting_time
|
|
|
thread.bump()
|
|
|
thread.save()
|
|
|
|
|
|
self._create_stickers(post)
|
|
|
|
|
|
return post
|
|
|
|
|
|
def delete_posts_by_ip(self, ip):
|
|
|
"""
|
|
|
Deletes all posts of the author with same IP
|
|
|
"""
|
|
|
|
|
|
posts = self.filter(poster_ip=ip)
|
|
|
for post in posts:
|
|
|
post.delete()
|
|
|
|
|
|
@utils.cached_result()
|
|
|
def get_posts_per_day(self) -> float:
|
|
|
"""
|
|
|
Gets average count of posts per day for the last 7 days
|
|
|
"""
|
|
|
|
|
|
day_end = date.today()
|
|
|
day_start = day_end - timedelta(POSTS_PER_DAY_RANGE)
|
|
|
|
|
|
day_time_start = timezone.make_aware(datetime.combine(
|
|
|
day_start, dtime()), timezone.get_current_timezone())
|
|
|
day_time_end = timezone.make_aware(datetime.combine(
|
|
|
day_end, dtime()), timezone.get_current_timezone())
|
|
|
|
|
|
posts_per_period = float(self.filter(
|
|
|
pub_time__lte=day_time_end,
|
|
|
pub_time__gte=day_time_start).count())
|
|
|
|
|
|
ppd = posts_per_period / POSTS_PER_DAY_RANGE
|
|
|
|
|
|
return ppd
|
|
|
|
|
|
def get_post_per_days(self, days) -> int:
|
|
|
day_end = date.today() + timedelta(1)
|
|
|
day_start = day_end - timedelta(days)
|
|
|
|
|
|
day_time_start = timezone.make_aware(datetime.combine(
|
|
|
day_start, dtime()), timezone.get_current_timezone())
|
|
|
day_time_end = timezone.make_aware(datetime.combine(
|
|
|
day_end, dtime()), timezone.get_current_timezone())
|
|
|
|
|
|
return self.filter(
|
|
|
pub_time__lte=day_time_end,
|
|
|
pub_time__gte=day_time_start).count()
|
|
|
|
|
|
@transaction.atomic
|
|
|
def import_post(self, title: str, text: str, pub_time: str, global_id,
|
|
|
opening_post=None, tags=list(), files=list(),
|
|
|
file_urls=list(), tripcode=None, last_edit_time=None):
|
|
|
is_opening = opening_post is None
|
|
|
if is_opening:
|
|
|
thread = boards.models.thread.Thread.objects.create(
|
|
|
bump_time=pub_time, last_edit_time=pub_time)
|
|
|
list(map(thread.tags.add, tags))
|
|
|
else:
|
|
|
thread = opening_post.get_thread()
|
|
|
|
|
|
post = self.create(title=title,
|
|
|
text=text,
|
|
|
pub_time=pub_time,
|
|
|
poster_ip=NO_IP,
|
|
|
last_edit_time=last_edit_time or pub_time,
|
|
|
global_id=global_id,
|
|
|
opening=is_opening,
|
|
|
thread=thread,
|
|
|
tripcode=tripcode)
|
|
|
|
|
|
for file in files:
|
|
|
self._add_file_to_post(file, post)
|
|
|
for file_url in file_urls:
|
|
|
post.attachments.add(Attachment.objects.create_from_url(file_url))
|
|
|
|
|
|
url_to_post = '[post]{}[/post]'.format(str(global_id))
|
|
|
replies = self.filter(text__contains=url_to_post)
|
|
|
for reply in replies:
|
|
|
post_import_deps.send(reply)
|
|
|
|
|
|
@transaction.atomic
|
|
|
def update_post(self, post, title: str, text: str, pub_time: str,
|
|
|
tags=list(), files=list(), file_urls=list(), tripcode=None):
|
|
|
post.title = title
|
|
|
post.text = text
|
|
|
post.pub_time = pub_time
|
|
|
post.tripcode = tripcode
|
|
|
post.save()
|
|
|
|
|
|
post.clear_cache()
|
|
|
|
|
|
post.attachments.clear()
|
|
|
for file in files:
|
|
|
self._add_file_to_post(file, post)
|
|
|
for file_url in file_urls:
|
|
|
post.attachments.add(Attachment.objects.create_from_url(file_url))
|
|
|
|
|
|
thread = post.get_thread()
|
|
|
thread.tags.clear()
|
|
|
list(map(thread.tags.add, tags))
|
|
|
|
|
|
def create_from_form(self, request, form, opening_post, html_response=True):
|
|
|
ip = utils.get_client_ip(request)
|
|
|
|
|
|
data = form.cleaned_data
|
|
|
|
|
|
title = form.get_title()
|
|
|
text = data[FORM_TEXT]
|
|
|
files = form.get_files()
|
|
|
file_urls = form.get_file_urls()
|
|
|
images = form.get_images()
|
|
|
|
|
|
text = self._remove_invalid_links(text)
|
|
|
|
|
|
if opening_post:
|
|
|
post_thread = opening_post.get_thread()
|
|
|
monochrome = False
|
|
|
stickerpack = False
|
|
|
tags = []
|
|
|
else:
|
|
|
tags = data[FORM_TAGS]
|
|
|
monochrome = form.is_monochrome()
|
|
|
stickerpack = form.is_stickerpack()
|
|
|
post_thread = None
|
|
|
|
|
|
post = self.create_post(title=title, text=text, files=files,
|
|
|
thread=post_thread, ip=ip,
|
|
|
tripcode=form.get_tripcode(),
|
|
|
images=images, file_urls=file_urls,
|
|
|
monochrome=monochrome,
|
|
|
stickerpack=stickerpack, tags=tags)
|
|
|
|
|
|
if form.is_subscribe():
|
|
|
from boards.abstracts.settingsmanager import get_settings_manager
|
|
|
settings_manager = get_settings_manager(request)
|
|
|
settings_manager.add_or_read_fav_thread(
|
|
|
post.get_thread().get_opening_post())
|
|
|
|
|
|
if html_response:
|
|
|
return redirect(post.get_absolute_url())
|
|
|
else:
|
|
|
return post
|
|
|
|
|
|
def _add_file_to_post(self, file, post):
|
|
|
post.attachments.add(Attachment.objects.create_with_hash(file))
|
|
|
|
|
|
def _create_stickers(self, post):
|
|
|
thread = post.get_thread()
|
|
|
stickerpack_thread = thread.is_stickerpack()
|
|
|
if stickerpack_thread:
|
|
|
logger = logging.getLogger('boards.stickers')
|
|
|
if not post.is_opening():
|
|
|
has_title = len(post.title) > 0
|
|
|
has_one_attachment = post.attachments.count() == 1
|
|
|
opening_post = thread.get_opening_post()
|
|
|
valid_name = REGEX_TAGS.match(post.title)
|
|
|
if has_title and has_one_attachment and valid_name:
|
|
|
existing_sticker = AttachmentSticker.objects.filter(
|
|
|
name=post.get_title()).first()
|
|
|
attachment = post.attachments.first()
|
|
|
if existing_sticker:
|
|
|
existing_sticker.attachment = attachment
|
|
|
existing_sticker.save()
|
|
|
logger.info('Updated sticker {} with new attachment'.format(existing_sticker))
|
|
|
else:
|
|
|
try:
|
|
|
stickerpack = StickerPack.objects.get(
|
|
|
name=opening_post.get_title(), tripcode=post.tripcode)
|
|
|
sticker = AttachmentSticker.objects.create(
|
|
|
stickerpack=stickerpack, name=post.get_title(),
|
|
|
attachment=attachment)
|
|
|
logger.info('Created sticker {}'.format(sticker))
|
|
|
except StickerPack.DoesNotExist:
|
|
|
pass
|
|
|
else:
|
|
|
stickerpack, created = StickerPack.objects.get_or_create(
|
|
|
name=post.get_title(), tripcode=post.tripcode)
|
|
|
if created:
|
|
|
logger.info('Created stickerpack {}'.format(stickerpack))
|
|
|
|
|
|
def _remove_invalid_links(self, text):
|
|
|
"""
|
|
|
Replace invalid links in posts so that they won't be parsed.
|
|
|
Invalid links are links to non-existent posts
|
|
|
"""
|
|
|
|
|
|
for reply_number in re.finditer(REGEX_REPLY, text):
|
|
|
post_id = reply_number.group(1)
|
|
|
post = self.filter(id=post_id)
|
|
|
if not post.exists():
|
|
|
text = text.replace(REFLINK_PREFIX + post_id, post_id)
|
|
|
|
|
|
return text
|
|
|
|