post.py
272 lines
| 8.2 KiB
| text/x-python
|
PythonLexer
neko259
|
r384 | import os | ||
from random import random | ||||
import time | ||||
import math | ||||
neko259
|
r386 | import re | ||
neko259
|
r384 | |||
from django.db import models | ||||
from django.http import Http404 | ||||
from django.utils import timezone | ||||
from markupfield.fields import MarkupField | ||||
from neboard import settings | ||||
neko259
|
r394 | from boards import settings as boards_settings | ||
neko259
|
r384 | from boards import thumbs | ||
BAN_REASON_AUTO = 'Auto' | ||||
IMAGE_THUMB_SIZE = (200, 150) | ||||
TITLE_MAX_LENGTH = 50 | ||||
DEFAULT_MARKUP_TYPE = 'markdown' | ||||
NO_PARENT = -1 | ||||
NO_IP = '0.0.0.0' | ||||
UNKNOWN_UA = '' | ||||
ALL_PAGES = -1 | ||||
IMAGES_DIRECTORY = 'images/' | ||||
FILE_EXTENSION_DELIMITER = '.' | ||||
SETTING_MODERATE = "moderate" | ||||
REGEX_REPLY = re.compile('>>(\d+)') | ||||
class PostManager(models.Manager): | ||||
def create_post(self, title, text, image=None, thread=None, | ||||
ip=NO_IP, tags=None, user=None): | ||||
posting_time = timezone.now() | ||||
post = self.create(title=title, | ||||
text=text, | ||||
pub_time=posting_time, | ||||
thread=thread, | ||||
image=image, | ||||
poster_ip=ip, | ||||
poster_user_agent=UNKNOWN_UA, | ||||
last_edit_time=posting_time, | ||||
bump_time=posting_time, | ||||
user=user) | ||||
if tags: | ||||
linked_tags = [] | ||||
for tag in tags: | ||||
tag_linked_tags = tag.get_linked_tags() | ||||
if len(tag_linked_tags) > 0: | ||||
linked_tags.extend(tag_linked_tags) | ||||
tags.extend(linked_tags) | ||||
map(post.tags.add, tags) | ||||
for tag in tags: | ||||
tag.threads.add(post) | ||||
if thread: | ||||
thread.replies.add(post) | ||||
thread.bump() | ||||
thread.last_edit_time = posting_time | ||||
thread.save() | ||||
else: | ||||
self._delete_old_threads() | ||||
self.connect_replies(post) | ||||
return post | ||||
def delete_post(self, post): | ||||
if post.replies.count() > 0: | ||||
map(self.delete_post, post.replies.all()) | ||||
neko259
|
r396 | # Update thread's last edit time | ||
neko259
|
r384 | thread = post.thread | ||
if thread: | ||||
thread.last_edit_time = timezone.now() | ||||
thread.save() | ||||
post.delete() | ||||
def delete_posts_by_ip(self, ip): | ||||
posts = self.filter(poster_ip=ip) | ||||
map(self.delete_post, posts) | ||||
def get_threads(self, tag=None, page=ALL_PAGES, | ||||
order_by='-bump_time'): | ||||
if tag: | ||||
threads = tag.threads | ||||
if threads.count() == 0: | ||||
raise Http404 | ||||
else: | ||||
threads = self.filter(thread=None) | ||||
threads = threads.order_by(order_by) | ||||
if page != ALL_PAGES: | ||||
thread_count = threads.count() | ||||
if page < self._get_page_count(thread_count): | ||||
start_thread = page * settings.THREADS_PER_PAGE | ||||
end_thread = min(start_thread + settings.THREADS_PER_PAGE, | ||||
thread_count) | ||||
threads = threads[start_thread:end_thread] | ||||
return threads | ||||
def get_thread(self, opening_post_id): | ||||
try: | ||||
opening_post = self.get(id=opening_post_id, thread=None) | ||||
except Post.DoesNotExist: | ||||
raise Http404 | ||||
if opening_post.replies: | ||||
thread = [opening_post] | ||||
thread.extend(opening_post.replies.all().order_by('pub_time')) | ||||
return thread | ||||
def get_thread_page_count(self, tag=None): | ||||
if tag: | ||||
threads = self.filter(thread=None, tags=tag) | ||||
else: | ||||
threads = self.filter(thread=None) | ||||
return self._get_page_count(threads.count()) | ||||
def _delete_old_threads(self): | ||||
""" | ||||
Preserves maximum thread count. If there are too many threads, | ||||
delete the old ones. | ||||
""" | ||||
# TODO Move old threads to the archive instead of deleting them. | ||||
# Maybe make some 'old' field in the model to indicate the thread | ||||
# must not be shown and be able for replying. | ||||
threads = self.get_threads() | ||||
thread_count = threads.count() | ||||
if thread_count > settings.MAX_THREAD_COUNT: | ||||
num_threads_to_delete = thread_count - settings.MAX_THREAD_COUNT | ||||
old_threads = threads[thread_count - num_threads_to_delete:] | ||||
map(self.delete_post, old_threads) | ||||
def connect_replies(self, post): | ||||
"""Connect replies to a post to show them as a refmap""" | ||||
for reply_number in re.finditer(REGEX_REPLY, post.text.raw): | ||||
post_id = reply_number.group(1) | ||||
ref_post = self.filter(id=post_id) | ||||
if ref_post.count() > 0: | ||||
referenced_post = ref_post[0] | ||||
referenced_post.referenced_posts.add(post) | ||||
referenced_post.last_edit_time = post.pub_time | ||||
referenced_post.save() | ||||
def _get_page_count(self, thread_count): | ||||
return int(math.ceil(thread_count / float(settings.THREADS_PER_PAGE))) | ||||
class Post(models.Model): | ||||
"""A post is a message.""" | ||||
objects = PostManager() | ||||
class Meta: | ||||
app_label = 'boards' | ||||
def _update_image_filename(self, filename): | ||||
"""Get unique image filename""" | ||||
path = IMAGES_DIRECTORY | ||||
new_name = str(int(time.mktime(time.gmtime()))) | ||||
new_name += str(int(random() * 1000)) | ||||
new_name += FILE_EXTENSION_DELIMITER | ||||
new_name += filename.split(FILE_EXTENSION_DELIMITER)[-1:][0] | ||||
return os.path.join(path, new_name) | ||||
title = models.CharField(max_length=TITLE_MAX_LENGTH) | ||||
pub_time = models.DateTimeField() | ||||
text = MarkupField(default_markup_type=DEFAULT_MARKUP_TYPE, | ||||
escape_html=False) | ||||
image_width = models.IntegerField(default=0) | ||||
image_height = models.IntegerField(default=0) | ||||
image = thumbs.ImageWithThumbsField(upload_to=_update_image_filename, | ||||
blank=True, sizes=(IMAGE_THUMB_SIZE,), | ||||
width_field='image_width', | ||||
height_field='image_height') | ||||
poster_ip = models.GenericIPAddressField() | ||||
poster_user_agent = models.TextField() | ||||
thread = models.ForeignKey('Post', null=True, default=None) | ||||
neko259
|
r385 | tags = models.ManyToManyField('Tag') | ||
neko259
|
r384 | last_edit_time = models.DateTimeField() | ||
bump_time = models.DateTimeField() | ||||
user = models.ForeignKey('User', null=True, default=None) | ||||
replies = models.ManyToManyField('Post', symmetrical=False, null=True, | ||||
blank=True, related_name='re+') | ||||
referenced_posts = models.ManyToManyField('Post', symmetrical=False, | ||||
null=True, | ||||
blank=True, related_name='rfp+') | ||||
def __unicode__(self): | ||||
return '#' + str(self.id) + ' ' + self.title + ' (' + \ | ||||
self.text.raw[:50] + ')' | ||||
def get_title(self): | ||||
title = self.title | ||||
if len(title) == 0: | ||||
title = self.text.raw[:20] | ||||
return title | ||||
def get_reply_count(self): | ||||
return self.replies.count() | ||||
def get_images_count(self): | ||||
images_count = 1 if self.image else 0 | ||||
images_count += self.replies.filter(image_width__gt=0).count() | ||||
return images_count | ||||
def can_bump(self): | ||||
"""Check if the thread can be bumped by replying""" | ||||
post_count = self.get_reply_count() | ||||
return post_count <= settings.MAX_POSTS_PER_THREAD | ||||
def bump(self): | ||||
"""Bump (move to up) thread""" | ||||
if self.can_bump(): | ||||
self.bump_time = timezone.now() | ||||
def get_last_replies(self): | ||||
if settings.LAST_REPLIES_COUNT > 0: | ||||
reply_count = self.get_reply_count() | ||||
if reply_count > 0: | ||||
reply_count_to_show = min(settings.LAST_REPLIES_COUNT, | ||||
reply_count) | ||||
last_replies = self.replies.all().order_by('pub_time')[ | ||||
reply_count - reply_count_to_show:] | ||||
return last_replies | ||||
def get_tags(self): | ||||
"""Get a sorted tag list""" | ||||
return self.tags.order_by('name') | ||||
def get_sorted_referenced_posts(self): | ||||
return self.referenced_posts.order_by('id') | ||||
def is_referenced(self): | ||||
return self.referenced_posts.count() > 0 | ||||