##// END OF EJS Templates
Compute divided range only once, do not expose it outsided of paginator
Compute divided range only once, do not expose it outsided of paginator

File last commit:

r2038:45cac980 default
r2063:5d000252 default
Show More
__init__.py
595 lines | 20.4 KiB | text/x-python | PythonLexer
import logging
import time
import hashlib
import pytz
import re
from PIL import Image
from django import forms
from django.core.cache import cache
from django.core.files.images import get_image_dimensions
from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile
from django.forms.utils import ErrorList
from django.utils.translation import ugettext_lazy as _, ungettext_lazy
import boards.settings as board_settings
from boards import utils
from boards.abstracts.constants import REGEX_TAGS
from boards.abstracts.settingsmanager import get_settings_manager
from boards.abstracts.sticker_factory import get_attachment_by_alias
from boards.forms.fields import UrlFileField
from boards.mdx_neboard import formatters
from boards.models import Attachment
from boards.models import Tag
from boards.models.attachment import StickerPack
from boards.models.attachment.downloaders import download, REGEX_MAGNET
from boards.models.attachment.viewers import FILE_TYPES_IMAGE
from boards.models.post import TITLE_MAX_LENGTH
from boards.utils import validate_file_size, get_file_mimetype, \
FILE_EXTENSION_DELIMITER, get_tripcode_from_text
from boards.settings import SECTION_FORMS
FORMAT_PANEL_BUTTON = '<span class="mark_btn" ' \
'onClick="addMarkToMsg(\'{}\', \'{}\')">{}{}{}</span>'
POW_HASH_LENGTH = 16
POW_LIFE_MINUTES = 5
REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE)
REGEX_URL = re.compile(r'^(http|https|ftp):\/\/', re.UNICODE)
VETERAN_POSTING_DELAY = 5
ATTRIBUTE_PLACEHOLDER = 'placeholder'
ATTRIBUTE_ROWS = 'rows'
LAST_POST_TIME = 'last_post_time'
LAST_LOGIN_TIME = 'last_login_time'
TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.')
TAGS_PLACEHOLDER = _('music images i_dont_like_tags')
LABEL_TITLE = _('Title')
LABEL_TEXT = _('Text')
LABEL_TAG = _('Tag')
LABEL_SEARCH = _('Search')
LABEL_FILE = _('File')
LABEL_DUPLICATES = _('Check for duplicates')
LABEL_URL = _('Do not download URLs')
ERROR_SPEED = 'Please wait %(delay)d second before sending message'
ERROR_SPEED_PLURAL = 'Please wait %(delay)d seconds before sending message'
ERROR_MANY_FILES = 'You can post no more than %(files)d file.'
ERROR_MANY_FILES_PLURAL = 'You can post no more than %(files)d files.'
ERROR_DUPLICATES = 'Some files are already present on the board.'
TAG_MAX_LENGTH = 20
TEXTAREA_ROWS = 4
TRIPCODE_DELIM = '##'
# TODO Maybe this may be converted into the database table?
MIMETYPE_EXTENSIONS = {
'image/jpeg': 'jpeg',
'image/png': 'png',
'image/gif': 'gif',
'video/webm': 'webm',
'application/pdf': 'pdf',
'x-diff': 'diff',
'image/svg+xml': 'svg',
'application/x-shockwave-flash': 'swf',
'image/x-ms-bmp': 'bmp',
'image/bmp': 'bmp',
}
DOWN_MODE_DOWNLOAD = 'DOWNLOAD'
DOWN_MODE_DOWNLOAD_UNIQUE = 'DOWNLOAD_UNIQUE'
DOWN_MODE_URL = 'URL'
DOWN_MODE_TRY = 'TRY'
logger = logging.getLogger('boards.forms')
def get_timezones():
timezones = []
for tz in pytz.common_timezones:
timezones.append((tz, tz),)
return timezones
class FormatPanel(forms.Textarea):
"""
Panel for text formatting. Consists of buttons to add different tags to the
form text area.
"""
def render(self, name, value, attrs=None):
output_template = '<div id="mark-panel">{}</div>'
buttons = [self._get_button(formatter) for formatter in formatters]
output = output_template.format(''.join(buttons))
output += super(FormatPanel, self).render(name, value, attrs=attrs)
return output
def _get_button(self, formatter):
return FORMAT_PANEL_BUTTON.format(
formatter.format_left,
formatter.format_right,
formatter.preview_left,
formatter.name,
formatter.preview_right)
class PlainErrorList(ErrorList):
def __unicode__(self):
return self.as_text()
def as_text(self):
return ''.join(['(!) %s ' % e for e in self])
class NeboardForm(forms.Form):
"""
Form with neboard-specific formatting.
"""
required_css_class = 'required-field'
def as_div(self):
"""
Returns this form rendered as HTML <as_div>s.
"""
return self._html_output(
# TODO Do not show hidden rows in the list here
normal_row='<div class="form-row">'
'<div class="form-label">'
'%(label)s'
'</div>'
'<div class="form-input">'
'%(field)s'
'</div>'
'</div>'
'<div class="form-row">'
'%(help_text)s'
'</div>',
error_row='<div class="form-row">'
'<div class="form-label"></div>'
'<div class="form-errors">%s</div>'
'</div>',
row_ender='</div>',
help_text_html='%s',
errors_on_separate_row=True)
def as_json_errors(self):
errors = []
for name, field in list(self.fields.items()):
if self[name].errors:
errors.append({
'field': name,
'errors': self[name].errors.as_text(),
})
return errors
class PostForm(NeboardForm):
title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False,
label=LABEL_TITLE,
widget=forms.TextInput(
attrs={ATTRIBUTE_PLACEHOLDER: 'Title{}tripcode'.format(TRIPCODE_DELIM)}))
text = forms.CharField(
widget=FormatPanel(attrs={
ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER,
ATTRIBUTE_ROWS: TEXTAREA_ROWS,
}),
required=False, label=LABEL_TEXT)
download_mode = forms.ChoiceField(
choices=(
(DOWN_MODE_TRY, _('Download or insert as URLs')),
(DOWN_MODE_DOWNLOAD, _('Download')),
(DOWN_MODE_DOWNLOAD_UNIQUE, _('Download and check for uniqueness')),
(DOWN_MODE_URL, _('Insert as URLs')),
),
initial=DOWN_MODE_TRY,
label=_('File process mode'))
file = UrlFileField(required=False, label=LABEL_FILE)
# This field is for spam prevention only
email = forms.CharField(max_length=100, required=False, label=_('e-mail'),
widget=forms.TextInput(attrs={
'class': 'form-email'}))
subscribe = forms.BooleanField(required=False, label=_('Subscribe to thread'))
guess = forms.CharField(widget=forms.HiddenInput(), required=False)
timestamp = forms.CharField(widget=forms.HiddenInput(), required=False)
iteration = forms.CharField(widget=forms.HiddenInput(), required=False)
session = None
need_to_ban = False
def clean_title(self):
title = self.cleaned_data['title']
if title:
if len(title) > TITLE_MAX_LENGTH:
raise forms.ValidationError(_('Title must have less than %s '
'characters') %
str(TITLE_MAX_LENGTH))
return title
def clean_text(self):
text = self.cleaned_data['text'].strip()
if text:
max_length = board_settings.get_int(SECTION_FORMS, 'MaxTextLength')
if len(text) > max_length:
raise forms.ValidationError(_('Text must have less than %s '
'characters') % str(max_length))
return text
def clean_file(self):
return self._clean_files(self.cleaned_data['file'])
def clean(self):
cleaned_data = super(PostForm, self).clean()
if cleaned_data['email']:
if board_settings.get_bool(SECTION_FORMS, 'Autoban'):
self.need_to_ban = True
raise forms.ValidationError('A human cannot enter a hidden field')
if not self.errors:
self._clean_text_file()
limit_speed = board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed')
limit_first = board_settings.get_bool(SECTION_FORMS, 'LimitFirstPosting')
settings_manager = get_settings_manager(self)
if not self.errors and limit_speed or (limit_first and not settings_manager.get_setting('confirmed_user')):
pow_difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty')
if pow_difficulty > 0:
# PoW-based
if cleaned_data['timestamp'] \
and cleaned_data['iteration'] and cleaned_data['guess'] \
and not settings_manager.get_setting('confirmed_user'):
self._validate_hash(cleaned_data['timestamp'], cleaned_data['iteration'], cleaned_data['guess'], cleaned_data['text'])
else:
# Time-based
self._validate_posting_speed()
settings_manager.set_setting('confirmed_user', True)
if self.cleaned_data['download_mode'] == DOWN_MODE_DOWNLOAD_UNIQUE:
self._check_file_duplicates(self.get_files())
return cleaned_data
def get_files(self):
"""
Gets file from form or URL.
"""
files = []
for file in self.cleaned_data['file']:
if isinstance(file, UploadedFile):
files.append(file)
return files
def get_file_urls(self):
files = []
for file in self.cleaned_data['file']:
if type(file) == str:
files.append(file)
return files
def get_tripcode(self):
title = self.cleaned_data['title']
if title is not None and TRIPCODE_DELIM in title:
tripcode = get_tripcode_from_text(title.split(TRIPCODE_DELIM, maxsplit=1)[1])
else:
tripcode = ''
return tripcode
def get_title(self):
title = self.cleaned_data['title']
if title is not None and TRIPCODE_DELIM in title:
return title.split(TRIPCODE_DELIM, maxsplit=1)[0]
else:
return title
def get_images(self):
return self.cleaned_data.get('stickers', [])
def is_subscribe(self):
return self.cleaned_data['subscribe']
def _update_file_extension(self, file):
if file:
mimetype = get_file_mimetype(file)
extension = MIMETYPE_EXTENSIONS.get(mimetype)
if extension:
filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0]
new_filename = filename + FILE_EXTENSION_DELIMITER + extension
file.name = new_filename
else:
logger.info('Unrecognized file mimetype: {}'.format(mimetype))
def _clean_files(self, inputs):
files = []
max_file_count = board_settings.get_int(SECTION_FORMS, 'MaxFileCount')
if len(inputs) > max_file_count:
raise forms.ValidationError(
ungettext_lazy(ERROR_MANY_FILES, ERROR_MANY_FILES,
max_file_count) % {'files': max_file_count})
size = 0
for file_input in inputs:
if isinstance(file_input, UploadedFile):
file = self._clean_file_file(file_input)
size += file.size
files.append(file)
else:
files.append(self._clean_file_url(file_input))
for file in files:
self._validate_image_dimensions(file)
validate_file_size(size)
return files
def _validate_image_dimensions(self, file):
if isinstance(file, UploadedFile):
mimetype = get_file_mimetype(file)
if mimetype.split('/')[-1] in FILE_TYPES_IMAGE:
Image.warnings.simplefilter('error', Image.DecompressionBombWarning)
try:
print(get_image_dimensions(file))
except Exception:
raise forms.ValidationError('Possible decompression bomb or large image.')
def _clean_file_file(self, file):
self._update_file_extension(file)
return file
def _clean_file_url(self, url):
file = None
if url:
mode = self.cleaned_data['download_mode']
if mode == DOWN_MODE_URL:
return url
try:
image = get_attachment_by_alias(url, self.session)
if image is not None:
if 'stickers' not in self.cleaned_data:
self.cleaned_data['stickers'] = []
self.cleaned_data['stickers'].append(image)
return
if file is None:
file = self._get_file_from_url(url)
if not file:
raise forms.ValidationError(_('Invalid URL'))
self._update_file_extension(file)
except forms.ValidationError as e:
# Assume we will get the plain URL instead of a file and save it
if mode == DOWN_MODE_TRY and (REGEX_URL.match(url) or REGEX_MAGNET.match(url)):
logger.info('Error in forms: {}'.format(e))
return url
else:
raise e
return file
def _clean_text_file(self):
text = self.cleaned_data.get('text')
file = self.get_files()
file_url = self.get_file_urls()
images = self.get_images()
if (not text) and (not file) and (not file_url) and len(images) == 0:
error_message = _('Either text or file must be entered.')
self._add_general_error(error_message)
def _get_cache_key(self, key):
return '{}_{}'.format(self.session.session_key, key)
def _set_session_cache(self, key, value):
cache.set(self._get_cache_key(key), value)
def _get_session_cache(self, key):
return cache.get(self._get_cache_key(key))
def _get_last_post_time(self):
last = self._get_session_cache(LAST_POST_TIME)
if last is None:
last = self.session.get(LAST_POST_TIME)
return last
def _validate_posting_speed(self):
can_post = True
posting_delay = board_settings.get_int(SECTION_FORMS, 'PostingDelay')
if board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed'):
now = time.time()
current_delay = 0
if LAST_POST_TIME not in self.session:
self.session[LAST_POST_TIME] = now
need_delay = True
else:
last_post_time = self._get_last_post_time()
current_delay = int(now - last_post_time)
need_delay = current_delay < posting_delay
self._set_session_cache(LAST_POST_TIME, now)
if need_delay:
delay = posting_delay - current_delay
error_message = ungettext_lazy(ERROR_SPEED, ERROR_SPEED_PLURAL,
delay) % {'delay': delay}
self._add_general_error(error_message)
can_post = False
if can_post:
self.session[LAST_POST_TIME] = now
else:
# Reset the time since posting failed
self._set_session_cache(LAST_POST_TIME, self.session[LAST_POST_TIME])
def _get_file_from_url(self, url: str) -> SimpleUploadedFile:
"""
Gets an file file from URL.
"""
try:
return download(url)
except forms.ValidationError as e:
raise e
except Exception as e:
raise forms.ValidationError(e)
def _validate_hash(self, timestamp: str, iteration: str, guess: str, message: str):
payload = timestamp + message.replace('\r\n', '\n')
difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty')
target = str(int(2 ** (POW_HASH_LENGTH * 3) / difficulty))
if len(target) < POW_HASH_LENGTH:
target = '0' * (POW_HASH_LENGTH - len(target)) + target
computed_guess = hashlib.sha256((payload + iteration).encode())\
.hexdigest()[0:POW_HASH_LENGTH]
if guess != computed_guess or guess > target:
self._add_general_error(_('Invalid PoW.'))
def _check_file_duplicates(self, files):
for file in files:
file_hash = utils.get_file_hash(file)
if Attachment.objects.get_existing_duplicate(file_hash, file):
self._add_general_error(_(ERROR_DUPLICATES))
def _add_general_error(self, message):
self.add_error('text', forms.ValidationError(message))
class ThreadForm(PostForm):
tags = forms.CharField(
widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}),
max_length=100, label=_('Tags'), required=True)
monochrome = forms.BooleanField(label=_('Monochrome'), required=False)
stickerpack = forms.BooleanField(label=_('Sticker Pack'), required=False)
def clean_tags(self):
tags = self.cleaned_data['tags'].strip()
if not tags or not REGEX_TAGS.match(tags):
raise forms.ValidationError(
_('Inappropriate characters in tags.'))
default_tag_name = board_settings.get(SECTION_FORMS, 'DefaultTag')\
.strip().lower()
required_tag_exists = False
tag_set = set()
for tag_string in tags.split():
tag_name = tag_string.strip().lower()
if tag_name == default_tag_name:
required_tag_exists = True
tag, created = Tag.objects.get_or_create_with_alias(
name=tag_name, required=True)
else:
tag, created = Tag.objects.get_or_create_with_alias(name=tag_name)
tag_set.add(tag)
# If this is a new tag, don't check for its parents because nobody
# added them yet
if not created:
tag_set |= set(tag.get_all_parents())
for tag in tag_set:
if tag.required:
required_tag_exists = True
break
# Use default tag if no section exists
if not required_tag_exists:
default_tag, created = Tag.objects.get_or_create_with_alias(
name=default_tag_name, required=True)
tag_set.add(default_tag)
return tag_set
def clean(self):
cleaned_data = super(ThreadForm, self).clean()
return cleaned_data
def is_monochrome(self):
return self.cleaned_data['monochrome']
def clean_stickerpack(self):
stickerpack = self.cleaned_data['stickerpack']
if stickerpack:
tripcode = self.get_tripcode()
if not tripcode:
raise forms.ValidationError(_(
'Tripcode should be specified to own a stickerpack.'))
title = self.get_title()
if not title:
raise forms.ValidationError(_(
'Title should be specified as a stickerpack name.'))
if not REGEX_TAGS.match(title):
raise forms.ValidationError(_('Inappropriate sticker pack name.'))
existing_pack = StickerPack.objects.filter(name=title).first()
if existing_pack:
if existing_pack.tripcode != tripcode:
raise forms.ValidationError(_(
'A sticker pack with this name already exists and is'
' owned by another tripcode.'))
if not existing_pack.tripcode:
raise forms.ValidationError(_(
'This sticker pack can only be updated by an '
'administrator.'))
return stickerpack
def is_stickerpack(self):
return self.cleaned_data['stickerpack']
class SettingsForm(NeboardForm):
theme = forms.ChoiceField(
choices=board_settings.get_list_dict('View', 'Themes'),
label=_('Theme'))
image_viewer = forms.ChoiceField(
choices=board_settings.get_list_dict('View', 'ImageViewers'),
label=_('Image view mode'))
username = forms.CharField(label=_('User name'), required=False)
timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone'))
def clean_username(self):
username = self.cleaned_data['username']
if username and not REGEX_USERNAMES.match(username):
raise forms.ValidationError(_('Inappropriate characters.'))
return username
class SearchForm(NeboardForm):
query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False)