##// END OF EJS Templates
Unify thread and post creation into one method inside post manager, that can be called from almost anywhere (one step closer to ajax thread creation)
Unify thread and post creation into one method inside post manager, that can be called from almost anywhere (one step closer to ajax thread creation)

File last commit:

r1997:be673d04 default
r1997:be673d04 default
Show More
manager.py
291 lines | 10.2 KiB | text/x-python | PythonLexer
import logging
import re
from datetime import datetime, timedelta, date
from datetime import time as dtime
from django.core.exceptions import PermissionDenied
from django.db import models, transaction
from django.dispatch import Signal
from django.shortcuts import redirect
from django.utils import timezone
import boards
from boards import utils
from boards.abstracts.exceptions import ArchiveException
from boards.abstracts.constants import REGEX_TAGS, REGEX_REPLY
from boards.mdx_neboard import Parser
from boards.models import Attachment
from boards.models.attachment import StickerPack, AttachmentSticker
from boards.models.user import Ban
__author__ = 'neko259'
POSTS_PER_DAY_RANGE = 7
NO_IP = '0.0.0.0'
post_import_deps = Signal()
FORM_TEXT = 'text'
FORM_TAGS = 'tags'
REFLINK_PREFIX = '>>'
class PostManager(models.Manager):
@transaction.atomic
def create_post(self, title: str, text: str, files=[], thread=None,
ip=NO_IP, tags: list=None,
tripcode='', monochrome=False, images=[],
file_urls=[], stickerpack=False):
"""
Creates new post
"""
if thread is not None and thread.is_archived():
raise ArchiveException('Cannot post into an archived thread')
if not utils.is_anonymous_mode():
is_banned = Ban.objects.filter(ip=ip).exists()
else:
is_banned = False
if is_banned:
raise PermissionDenied()
if not tags:
tags = []
posting_time = timezone.now()
new_thread = False
if not thread:
thread = boards.models.thread.Thread.objects.create(
bump_time=posting_time, last_edit_time=posting_time,
monochrome=monochrome, stickerpack=stickerpack)
list(map(thread.tags.add, tags))
new_thread = True
pre_text = Parser().preparse(text)
post = self.create(title=title,
text=pre_text,
pub_time=posting_time,
poster_ip=ip,
thread=thread,
last_edit_time=posting_time,
tripcode=tripcode,
opening=new_thread)
logger = logging.getLogger('boards.post.create')
logger.info('Created post [{}] with text [{}] by {}'.format(post,
post.get_text(),post.poster_ip))
for file in files:
self._add_file_to_post(file, post)
for image in images:
post.attachments.add(image)
for file_url in file_urls:
post.attachments.add(Attachment.objects.create_from_url(file_url))
post.set_global_id()
# Thread needs to be bumped only when the post is already created
if not new_thread:
thread.last_edit_time = posting_time
thread.bump()
thread.save()
self._create_stickers(post)
return post
def delete_posts_by_ip(self, ip):
"""
Deletes all posts of the author with same IP
"""
posts = self.filter(poster_ip=ip)
for post in posts:
post.delete()
@utils.cached_result()
def get_posts_per_day(self) -> float:
"""
Gets average count of posts per day for the last 7 days
"""
day_end = date.today()
day_start = day_end - timedelta(POSTS_PER_DAY_RANGE)
day_time_start = timezone.make_aware(datetime.combine(
day_start, dtime()), timezone.get_current_timezone())
day_time_end = timezone.make_aware(datetime.combine(
day_end, dtime()), timezone.get_current_timezone())
posts_per_period = float(self.filter(
pub_time__lte=day_time_end,
pub_time__gte=day_time_start).count())
ppd = posts_per_period / POSTS_PER_DAY_RANGE
return ppd
def get_post_per_days(self, days) -> int:
day_end = date.today() + timedelta(1)
day_start = day_end - timedelta(days)
day_time_start = timezone.make_aware(datetime.combine(
day_start, dtime()), timezone.get_current_timezone())
day_time_end = timezone.make_aware(datetime.combine(
day_end, dtime()), timezone.get_current_timezone())
return self.filter(
pub_time__lte=day_time_end,
pub_time__gte=day_time_start).count()
@transaction.atomic
def import_post(self, title: str, text: str, pub_time: str, global_id,
opening_post=None, tags=list(), files=list(),
file_urls=list(), tripcode=None, last_edit_time=None):
is_opening = opening_post is None
if is_opening:
thread = boards.models.thread.Thread.objects.create(
bump_time=pub_time, last_edit_time=pub_time)
list(map(thread.tags.add, tags))
else:
thread = opening_post.get_thread()
post = self.create(title=title,
text=text,
pub_time=pub_time,
poster_ip=NO_IP,
last_edit_time=last_edit_time or pub_time,
global_id=global_id,
opening=is_opening,
thread=thread,
tripcode=tripcode)
for file in files:
self._add_file_to_post(file, post)
for file_url in file_urls:
post.attachments.add(Attachment.objects.create_from_url(file_url))
url_to_post = '[post]{}[/post]'.format(str(global_id))
replies = self.filter(text__contains=url_to_post)
for reply in replies:
post_import_deps.send(reply)
@transaction.atomic
def update_post(self, post, title: str, text: str, pub_time: str,
tags=list(), files=list(), file_urls=list(), tripcode=None):
post.title = title
post.text = text
post.pub_time = pub_time
post.tripcode = tripcode
post.save()
post.clear_cache()
post.attachments.clear()
for file in files:
self._add_file_to_post(file, post)
for file_url in file_urls:
post.attachments.add(Attachment.objects.create_from_url(file_url))
thread = post.get_thread()
thread.tags.clear()
list(map(thread.tags.add, tags))
def create_from_form(self, request, form, opening_post, html_response=True):
ip = utils.get_client_ip(request)
data = form.cleaned_data
title = form.get_title()
text = data[FORM_TEXT]
files = form.get_files()
file_urls = form.get_file_urls()
images = form.get_images()
text = self._remove_invalid_links(text)
if opening_post:
post_thread = opening_post.get_thread()
monochrome = False
stickerpack = False
tags = []
else:
tags = data[FORM_TAGS]
monochrome = form.is_monochrome()
stickerpack = form.is_stickerpack()
post_thread = None
post = self.create_post(title=title, text=text, files=files,
thread=post_thread, ip=ip,
tripcode=form.get_tripcode(),
images=images, file_urls=file_urls,
monochrome=monochrome,
stickerpack=stickerpack, tags=tags)
if form.is_subscribe():
from boards.abstracts.settingsmanager import get_settings_manager
settings_manager = get_settings_manager(request)
settings_manager.add_or_read_fav_thread(
post_thread.get_opening_post())
if html_response:
return redirect(post.get_absolute_url())
else:
return post
def _add_file_to_post(self, file, post):
post.attachments.add(Attachment.objects.create_with_hash(file))
def _create_stickers(self, post):
thread = post.get_thread()
stickerpack_thread = thread.is_stickerpack()
if stickerpack_thread:
logger = logging.getLogger('boards.stickers')
if not post.is_opening():
has_title = len(post.title) > 0
has_one_attachment = post.attachments.count() == 1
opening_post = thread.get_opening_post()
valid_name = REGEX_TAGS.match(post.title)
if has_title and has_one_attachment and valid_name:
existing_sticker = AttachmentSticker.objects.filter(
name=post.get_title()).first()
attachment = post.attachments.first()
if existing_sticker:
existing_sticker.attachment = attachment
existing_sticker.save()
logger.info('Updated sticker {} with new attachment'.format(existing_sticker))
else:
try:
stickerpack = StickerPack.objects.get(
name=opening_post.get_title(), tripcode=post.tripcode)
sticker = AttachmentSticker.objects.create(
stickerpack=stickerpack, name=post.get_title(),
attachment=attachment)
logger.info('Created sticker {}'.format(sticker))
except StickerPack.DoesNotExist:
pass
else:
stickerpack, created = StickerPack.objects.get_or_create(
name=post.get_title(), tripcode=post.tripcode)
if created:
logger.info('Created stickerpack {}'.format(stickerpack))
def _remove_invalid_links(self, text):
"""
Replace invalid links in posts so that they won't be parsed.
Invalid links are links to non-existent posts
"""
for reply_number in re.finditer(REGEX_REPLY, text):
post_id = reply_number.group(1)
post = self.filter(id=post_id)
if not post.exists():
text = text.replace(REFLINK_PREFIX + post_id, post_id)
return text