##// END OF EJS Templates
Closed branch as merged with default
Closed branch as merged with default

File last commit:

r373:2f30e48c thread_autoupdate
r377:c55b31b4 thread_autoupdate
Show More
views.py
563 lines | 16.0 KiB | text/x-python | PythonLexer
import hashlib
import json
import string
import time
import calendar
from datetime import datetime
from django.core import serializers
from django.core.urlresolvers import reverse
from django.http import HttpResponseRedirect
from django.http.response import HttpResponse
from django.template import RequestContext
from django.shortcuts import render, redirect, get_object_or_404
from django.utils import timezone
from django.db import transaction
import math
from boards import forms
import boards
from boards import utils
from boards.forms import ThreadForm, PostForm, SettingsForm, PlainErrorList, \
ThreadCaptchaForm, PostCaptchaForm, LoginForm, ModeratorSettingsForm
from boards.models import Post, Tag, Ban, User, RANK_USER, SETTING_MODERATE, \
REGEX_REPLY
from boards import authors
from boards.utils import get_client_ip
import neboard
import re
BAN_REASON_SPAM = 'Autoban: spam bot'
def index(request, page=0):
context = _init_default_context(request)
if utils.need_include_captcha(request):
threadFormClass = ThreadCaptchaForm
kwargs = {'request': request}
else:
threadFormClass = ThreadForm
kwargs = {}
if request.method == 'POST':
form = threadFormClass(request.POST, request.FILES,
error_class=PlainErrorList, **kwargs)
form.session = request.session
if form.is_valid():
return _new_post(request, form)
if form.need_to_ban:
# Ban user because he is suspected to be a bot
_ban_current_user(request)
else:
form = threadFormClass(error_class=PlainErrorList, **kwargs)
threads = []
for thread in Post.objects.get_threads(page=int(page)):
threads.append({
'thread': thread,
'bumpable': thread.can_bump(),
'last_replies': thread.get_last_replies(),
})
# TODO Make this generic for tag and threads list pages
context['threads'] = None if len(threads) == 0 else threads
context['form'] = form
page_count = Post.objects.get_thread_page_count()
context['pages'] = range(page_count)
page = int(page)
if page < page_count - 1:
context['next_page'] = str(page + 1)
if page > 0:
context['prev_page'] = str(page - 1)
return render(request, 'boards/posting_general.html',
context)
@transaction.commit_on_success
def _new_post(request, form, thread_id=boards.models.NO_PARENT):
"""Add a new post (in thread or as a reply)."""
ip = get_client_ip(request)
is_banned = Ban.objects.filter(ip=ip).exists()
if is_banned:
return redirect(you_are_banned)
data = form.cleaned_data
title = data['title']
text = data['text']
text = _remove_invalid_links(text)
if 'image' in data.keys():
image = data['image']
else:
image = None
tags = []
new_thread = thread_id == boards.models.NO_PARENT
if new_thread:
tag_strings = data['tags']
if tag_strings:
tag_strings = tag_strings.split(' ')
for tag_name in tag_strings:
tag_name = string.lower(tag_name.strip())
if len(tag_name) > 0:
tag, created = Tag.objects.get_or_create(name=tag_name)
tags.append(tag)
linked_tags = tag.get_linked_tags()
if len(linked_tags) > 0:
tags.extend(linked_tags)
op = None if thread_id == boards.models.NO_PARENT else \
get_object_or_404(Post, id=thread_id)
post = Post.objects.create_post(title=title, text=text, ip=ip,
thread=op, image=image,
tags=tags, user=_get_user(request))
thread_to_show = (post.id if new_thread else thread_id)
if new_thread:
return redirect(thread, post_id=thread_to_show)
else:
return redirect(reverse(thread, kwargs={'post_id': thread_to_show}) +
'#' + str(post.id))
def tag(request, tag_name, page=0):
"""
Get all tag threads. Threads are split in pages, so some page is
requested. Default page is 0.
"""
tag = get_object_or_404(Tag, name=tag_name)
threads = []
for thread in Post.objects.get_threads(tag=tag, page=int(page)):
threads.append({
'thread': thread,
'bumpable': thread.can_bump(),
'last_replies': thread.get_last_replies(),
})
if request.method == 'POST':
form = ThreadForm(request.POST, request.FILES,
error_class=PlainErrorList)
form.session = request.session
if form.is_valid():
return _new_post(request, form)
if form.need_to_ban:
# Ban user because he is suspected to be a bot
_ban_current_user(request)
else:
form = forms.ThreadForm(initial={'tags': tag_name},
error_class=PlainErrorList)
context = _init_default_context(request)
context['threads'] = None if len(threads) == 0 else threads
context['tag'] = tag
page_count = Post.objects.get_thread_page_count(tag=tag)
context['pages'] = range(page_count)
page = int(page)
if page < page_count - 1:
context['next_page'] = str(page + 1)
if page > 0:
context['prev_page'] = str(page - 1)
context['form'] = form
return render(request, 'boards/posting_general.html',
context)
def thread(request, post_id):
"""Get all thread posts"""
if utils.need_include_captcha(request):
postFormClass = PostCaptchaForm
kwargs = {'request': request}
else:
postFormClass = PostForm
kwargs = {}
if request.method == 'POST':
form = postFormClass(request.POST, request.FILES,
error_class=PlainErrorList, **kwargs)
form.session = request.session
if form.is_valid():
return _new_post(request, form, post_id)
if form.need_to_ban:
# Ban user because he is suspected to be a bot
_ban_current_user(request)
else:
form = postFormClass(error_class=PlainErrorList, **kwargs)
posts = Post.objects.get_thread(post_id)
context = _init_default_context(request)
context['posts'] = posts
context['form'] = form
context['bumpable'] = posts[0].can_bump()
if context['bumpable']:
context['posts_left'] = neboard.settings.MAX_POSTS_PER_THREAD - len(
posts)
context['bumplimit_progress'] = str(
float(context['posts_left']) /
neboard.settings.MAX_POSTS_PER_THREAD * 100)
context["last_update"] = _datetime_to_epoch(posts[0].last_edit_time)
return render(request, 'boards/thread.html', context)
def login(request):
"""Log in with user id"""
context = _init_default_context(request)
if request.method == 'POST':
form = LoginForm(request.POST, request.FILES,
error_class=PlainErrorList)
form.session = request.session
if form.is_valid():
user = User.objects.get(user_id=form.cleaned_data['user_id'])
request.session['user_id'] = user.id
return redirect(index)
else:
form = LoginForm()
context['form'] = form
return render(request, 'boards/login.html', context)
def settings(request):
"""User's settings"""
context = _init_default_context(request)
user = _get_user(request)
is_moderator = user.is_moderator()
if request.method == 'POST':
with transaction.commit_on_success():
if is_moderator:
form = ModeratorSettingsForm(request.POST,
error_class=PlainErrorList)
else:
form = SettingsForm(request.POST, error_class=PlainErrorList)
if form.is_valid():
selected_theme = form.cleaned_data['theme']
user.save_setting('theme', selected_theme)
if is_moderator:
moderate = form.cleaned_data['moderate']
user.save_setting(SETTING_MODERATE, moderate)
return redirect(settings)
else:
selected_theme = _get_theme(request)
if is_moderator:
form = ModeratorSettingsForm(initial={'theme': selected_theme,
'moderate': context['moderator']},
error_class=PlainErrorList)
else:
form = SettingsForm(initial={'theme': selected_theme},
error_class=PlainErrorList)
context['form'] = form
return render(request, 'boards/settings.html', context)
def all_tags(request):
"""All tags list"""
context = _init_default_context(request)
context['all_tags'] = Tag.objects.get_not_empty_tags()
return render(request, 'boards/tags.html', context)
def jump_to_post(request, post_id):
"""Determine thread in which the requested post is and open it's page"""
post = get_object_or_404(Post, id=post_id)
if not post.thread:
return redirect(thread, post_id=post.id)
else:
return redirect(reverse(thread, kwargs={'post_id': post.thread.id})
+ '#' + str(post.id))
def authors(request):
"""Show authors list"""
context = _init_default_context(request)
context['authors'] = boards.authors.authors
return render(request, 'boards/authors.html', context)
@transaction.commit_on_success
def delete(request, post_id):
"""Delete post"""
user = _get_user(request)
post = get_object_or_404(Post, id=post_id)
if user.is_moderator():
# TODO Show confirmation page before deletion
Post.objects.delete_post(post)
if not post.thread:
return _redirect_to_next(request)
else:
return redirect(thread, post_id=post.thread.id)
@transaction.commit_on_success
def ban(request, post_id):
"""Ban user"""
user = _get_user(request)
post = get_object_or_404(Post, id=post_id)
if user.is_moderator():
# TODO Show confirmation page before ban
ban, created = Ban.objects.get_or_create(ip=post.poster_ip)
if created:
ban.reason = 'Banned for post ' + str(post_id)
ban.save()
return _redirect_to_next(request)
def you_are_banned(request):
"""Show the page that notifies that user is banned"""
context = _init_default_context(request)
ban = get_object_or_404(Ban, ip=utils.get_client_ip(request))
context['ban_reason'] = ban.reason
return render(request, 'boards/staticpages/banned.html', context)
def page_404(request):
"""Show page 404 (not found error)"""
context = _init_default_context(request)
return render(request, 'boards/404.html', context)
@transaction.commit_on_success
def tag_subscribe(request, tag_name):
"""Add tag to favorites"""
user = _get_user(request)
tag = get_object_or_404(Tag, name=tag_name)
if not tag in user.fav_tags.all():
user.add_tag(tag)
return _redirect_to_next(request)
@transaction.commit_on_success
def tag_unsubscribe(request, tag_name):
"""Remove tag from favorites"""
user = _get_user(request)
tag = get_object_or_404(Tag, name=tag_name)
if tag in user.fav_tags.all():
user.remove_tag(tag)
return _redirect_to_next(request)
def static_page(request, name):
"""Show a static page that needs only tags list and a CSS"""
context = _init_default_context(request)
return render(request, 'boards/staticpages/' + name + '.html', context)
def api_get_post(request, post_id):
"""
Get the JSON of a post. This can be
used as and API for external clients.
"""
post = get_object_or_404(Post, id=post_id)
json = serializers.serialize("json", [post], fields=(
"pub_time", "_text_rendered", "title", "text", "image",
"image_width", "image_height", "replies", "tags"
))
return HttpResponse(content=json)
def api_get_threaddiff(request, thread_id, last_update_time):
"""Get posts that were changed or added since time"""
thread = get_object_or_404(Post, id=thread_id)
filter_time = datetime.fromtimestamp(float(last_update_time) / 1000000,
timezone.get_current_timezone())
json_data = {
'added': [],
'updated': [],
'last_update': None,
}
added_posts = Post.objects.filter(thread=thread, pub_time__gt=filter_time)
updated_posts = Post.objects.filter(thread=thread,
pub_time__lt=filter_time,
last_edit_time__gt=filter_time)
for post in added_posts:
json_data['added'].append(get_post(request, post.id).content.strip())
for post in updated_posts:
json_data['updated'].append(get_post(request, post.id).content.strip())
json_data['last_update'] = _datetime_to_epoch(thread.last_edit_time)
return HttpResponse(content=json.dumps(json_data))
def get_post(request, post_id):
"""Get the html of a post. Used for popups."""
post = get_object_or_404(Post, id=post_id)
thread = post.thread
context = RequestContext(request)
context["post"] = post
context["can_bump"] = thread.can_bump()
if "truncated" in request.GET:
context["truncated"] = True
return render(request, 'boards/post.html', context)
def _get_theme(request, user=None):
"""Get user's CSS theme"""
if not user:
user = _get_user(request)
theme = user.get_setting('theme')
if not theme:
theme = neboard.settings.DEFAULT_THEME
return theme
def _init_default_context(request):
"""Create context with default values that are used in most views"""
context = RequestContext(request)
user = _get_user(request)
context['user'] = user
context['tags'] = user.get_sorted_fav_tags()
theme = _get_theme(request, user)
context['theme'] = theme
context['theme_css'] = 'css/' + theme + '/base_page.css'
# This shows the moderator panel
moderate = user.get_setting(SETTING_MODERATE)
if moderate == 'True':
context['moderator'] = user.is_moderator()
else:
context['moderator'] = False
return context
def _get_user(request):
"""
Get current user from the session. If the user does not exist, create
a new one.
"""
session = request.session
if not 'user_id' in session:
request.session.save()
md5 = hashlib.md5()
md5.update(session.session_key)
new_id = md5.hexdigest()
time_now = timezone.now()
user = User.objects.create(user_id=new_id, rank=RANK_USER,
registration_time=time_now)
session['user_id'] = user.id
else:
user = User.objects.get(id=session['user_id'])
return user
def _redirect_to_next(request):
"""
If a 'next' parameter was specified, redirect to the next page. This is
used when the user is required to return to some page after the current
view has finished its work.
"""
if 'next' in request.GET:
next_page = request.GET['next']
return HttpResponseRedirect(next_page)
else:
return redirect(index)
@transaction.commit_on_success
def _ban_current_user(request):
"""Add current user to the IP ban list"""
ip = utils.get_client_ip(request)
ban, created = Ban.objects.get_or_create(ip=ip)
if created:
ban.can_read = False
ban.reason = BAN_REASON_SPAM
ban.save()
def _remove_invalid_links(text):
"""
Replace invalid links in posts so that they won't be parsed.
Invalid links are links to non-existent posts
"""
for reply_number in re.finditer(REGEX_REPLY, text):
post_id = reply_number.group(1)
post = Post.objects.filter(id=post_id)
if not post.exists():
text = string.replace(text, '>>' + id, id)
return text
def _datetime_to_epoch(datetime):
return int(time.mktime(timezone.localtime(
datetime,timezone.get_current_timezone()).timetuple())
* 1000000 + datetime.microsecond)