|
|
import logging
|
|
|
|
|
|
import pytz
|
|
|
import re
|
|
|
from PIL import Image
|
|
|
from django import forms
|
|
|
from django.core.files.images import get_image_dimensions
|
|
|
from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile
|
|
|
from django.forms.utils import ErrorList
|
|
|
from django.utils.translation import ugettext_lazy as _, ungettext_lazy
|
|
|
|
|
|
import boards.settings as board_settings
|
|
|
from boards import utils
|
|
|
from boards.abstracts.constants import REGEX_TAGS
|
|
|
from boards.abstracts.settingsmanager import get_settings_manager, \
|
|
|
SETTING_CONFIRMED_USER
|
|
|
from boards.abstracts.sticker_factory import get_attachment_by_alias
|
|
|
from boards.forms.fields import UrlFileField
|
|
|
from boards.forms.validators import TimeValidator, PowValidator
|
|
|
from boards.mdx_neboard import formatters
|
|
|
from boards.models import Attachment
|
|
|
from boards.models import Tag
|
|
|
from boards.models.attachment import StickerPack
|
|
|
from boards.models.attachment.downloaders import download, REGEX_MAGNET
|
|
|
from boards.models.attachment.viewers import FILE_TYPES_IMAGE
|
|
|
from boards.models.post import TITLE_MAX_LENGTH
|
|
|
from boards.settings import SECTION_FORMS
|
|
|
from boards.utils import validate_file_size, get_file_mimetype, \
|
|
|
FILE_EXTENSION_DELIMITER, get_tripcode_from_text
|
|
|
|
|
|
FORMAT_PANEL_BUTTON = '<span class="mark_btn" ' \
|
|
|
'onClick="addMarkToMsg(\'{}\', {}, \'{}\')">{}{}{}</span>'
|
|
|
|
|
|
|
|
|
REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE)
|
|
|
REGEX_URL = re.compile(r'^(http|https|ftp):\/\/', re.UNICODE)
|
|
|
|
|
|
VETERAN_POSTING_DELAY = 5
|
|
|
|
|
|
ATTRIBUTE_PLACEHOLDER = 'placeholder'
|
|
|
ATTRIBUTE_ROWS = 'rows'
|
|
|
|
|
|
TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.')
|
|
|
TAGS_PLACEHOLDER = _('music images i_dont_like_tags')
|
|
|
|
|
|
LABEL_TITLE = _('Title')
|
|
|
LABEL_TEXT = _('Text')
|
|
|
LABEL_TAG = _('Tag')
|
|
|
LABEL_SEARCH = _('Search')
|
|
|
LABEL_FILE = _('File')
|
|
|
LABEL_DUPLICATES = _('Check for duplicates')
|
|
|
LABEL_URL = _('Do not download URLs')
|
|
|
|
|
|
ERROR_MANY_FILES = 'You can post no more than %(files)d file.'
|
|
|
ERROR_MANY_FILES_PLURAL = 'You can post no more than %(files)d files.'
|
|
|
ERROR_DUPLICATES = 'Some files are already present on the board.'
|
|
|
|
|
|
TAG_MAX_LENGTH = 20
|
|
|
|
|
|
TEXTAREA_ROWS = 4
|
|
|
|
|
|
TRIPCODE_DELIM = '##'
|
|
|
|
|
|
# TODO Maybe this may be converted into the database table?
|
|
|
MIMETYPE_EXTENSIONS = {
|
|
|
'image/jpeg': 'jpeg',
|
|
|
'image/png': 'png',
|
|
|
'image/gif': 'gif',
|
|
|
'video/webm': 'webm',
|
|
|
'application/pdf': 'pdf',
|
|
|
'x-diff': 'diff',
|
|
|
'image/svg+xml': 'svg',
|
|
|
'application/x-shockwave-flash': 'swf',
|
|
|
'image/x-ms-bmp': 'bmp',
|
|
|
'image/bmp': 'bmp',
|
|
|
}
|
|
|
|
|
|
DOWN_MODE_DOWNLOAD = 'DOWNLOAD'
|
|
|
DOWN_MODE_DOWNLOAD_UNIQUE = 'DOWNLOAD_UNIQUE'
|
|
|
DOWN_MODE_URL = 'URL'
|
|
|
DOWN_MODE_TRY = 'TRY'
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('boards.forms')
|
|
|
|
|
|
|
|
|
def get_timezones():
|
|
|
timezones = []
|
|
|
for tz in pytz.common_timezones:
|
|
|
timezones.append((tz, tz),)
|
|
|
return timezones
|
|
|
|
|
|
|
|
|
class FormatPanel(forms.Textarea):
|
|
|
"""
|
|
|
Panel for text formatting. Consists of buttons to add different tags to the
|
|
|
form text area.
|
|
|
"""
|
|
|
|
|
|
def render(self, name, value, attrs=None):
|
|
|
output_template = '<div id="mark-panel">{}</div>'
|
|
|
|
|
|
buttons = [self._get_button(formatter) for formatter in formatters]
|
|
|
|
|
|
output = output_template.format(''.join(buttons))
|
|
|
|
|
|
output += super(FormatPanel, self).render(name, value, attrs=attrs)
|
|
|
|
|
|
return output
|
|
|
|
|
|
def _get_button(self, formatter):
|
|
|
return FORMAT_PANEL_BUTTON.format(
|
|
|
formatter.tag_name,
|
|
|
formatter.has_input,
|
|
|
formatter.input_prompt,
|
|
|
formatter.preview_left,
|
|
|
formatter.name,
|
|
|
formatter.preview_right)
|
|
|
|
|
|
|
|
|
class PlainErrorList(ErrorList):
|
|
|
def __unicode__(self):
|
|
|
return self.as_text()
|
|
|
|
|
|
def as_text(self):
|
|
|
return ''.join(['(!) %s ' % e for e in self])
|
|
|
|
|
|
|
|
|
class NeboardForm(forms.Form):
|
|
|
"""
|
|
|
Form with neboard-specific formatting.
|
|
|
"""
|
|
|
required_css_class = 'required-field'
|
|
|
|
|
|
def as_div(self):
|
|
|
"""
|
|
|
Returns this form rendered as HTML <as_div>s.
|
|
|
"""
|
|
|
|
|
|
return self._html_output(
|
|
|
# TODO Do not show hidden rows in the list here
|
|
|
normal_row='<div class="form-row">'
|
|
|
'<div class="form-label">'
|
|
|
'%(label)s'
|
|
|
'</div>'
|
|
|
'<div class="form-input">'
|
|
|
'%(field)s'
|
|
|
'</div>'
|
|
|
'</div>'
|
|
|
'<div class="form-row">'
|
|
|
'%(help_text)s'
|
|
|
'</div>',
|
|
|
error_row='<div class="form-row">'
|
|
|
'<div class="form-label"></div>'
|
|
|
'<div class="form-errors">%s</div>'
|
|
|
'</div>',
|
|
|
row_ender='</div>',
|
|
|
help_text_html='%s',
|
|
|
errors_on_separate_row=True)
|
|
|
|
|
|
def as_json_errors(self):
|
|
|
errors = []
|
|
|
|
|
|
for name, field in list(self.fields.items()):
|
|
|
if self[name].errors:
|
|
|
errors.append({
|
|
|
'field': name,
|
|
|
'errors': self[name].errors.as_text(),
|
|
|
})
|
|
|
|
|
|
return errors
|
|
|
|
|
|
|
|
|
class PostForm(NeboardForm):
|
|
|
|
|
|
title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False,
|
|
|
label=LABEL_TITLE,
|
|
|
widget=forms.TextInput(
|
|
|
attrs={ATTRIBUTE_PLACEHOLDER: 'Title{}tripcode'.format(TRIPCODE_DELIM)}))
|
|
|
text = forms.CharField(
|
|
|
widget=FormatPanel(attrs={
|
|
|
ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER,
|
|
|
ATTRIBUTE_ROWS: TEXTAREA_ROWS,
|
|
|
}),
|
|
|
required=False, label=LABEL_TEXT)
|
|
|
download_mode = forms.ChoiceField(
|
|
|
choices=(
|
|
|
(DOWN_MODE_TRY, _('Download or insert as URLs')),
|
|
|
(DOWN_MODE_DOWNLOAD, _('Download')),
|
|
|
(DOWN_MODE_DOWNLOAD_UNIQUE, _('Download and check for uniqueness')),
|
|
|
(DOWN_MODE_URL, _('Insert as URLs')),
|
|
|
),
|
|
|
initial=DOWN_MODE_TRY,
|
|
|
label=_('File process mode'))
|
|
|
file = UrlFileField(required=False, label=LABEL_FILE)
|
|
|
|
|
|
# This field is for spam prevention only
|
|
|
email = forms.CharField(max_length=100, required=False, label=_('e-mail'),
|
|
|
widget=forms.TextInput(attrs={
|
|
|
'class': 'form-email'}))
|
|
|
subscribe = forms.BooleanField(required=False,
|
|
|
label=_('Subscribe to thread'))
|
|
|
|
|
|
guess = forms.CharField(widget=forms.HiddenInput(), required=False)
|
|
|
timestamp = forms.CharField(widget=forms.HiddenInput(), required=False)
|
|
|
iteration = forms.CharField(widget=forms.HiddenInput(), required=False)
|
|
|
|
|
|
need_to_ban = False
|
|
|
|
|
|
def __init__(self, data=None, files=None, auto_id='id_%s', prefix=None,
|
|
|
initial=None, error_class=ErrorList, label_suffix=None,
|
|
|
empty_permitted=False, field_order=None,
|
|
|
use_required_attribute=None, renderer=None, session=None):
|
|
|
super().__init__(data, files, auto_id, prefix, initial, error_class,
|
|
|
label_suffix, empty_permitted, field_order,
|
|
|
use_required_attribute, renderer)
|
|
|
|
|
|
self.session = session
|
|
|
|
|
|
def clean_title(self):
|
|
|
title = self.cleaned_data['title']
|
|
|
if title:
|
|
|
if len(title) > TITLE_MAX_LENGTH:
|
|
|
raise forms.ValidationError(_('Title must have less than %s '
|
|
|
'characters') %
|
|
|
str(TITLE_MAX_LENGTH))
|
|
|
return title
|
|
|
|
|
|
def clean_text(self):
|
|
|
text = self.cleaned_data['text'].strip()
|
|
|
if text:
|
|
|
max_length = board_settings.get_int(SECTION_FORMS, 'MaxTextLength')
|
|
|
if len(text) > max_length:
|
|
|
raise forms.ValidationError(_('Text must have less than %s '
|
|
|
'characters') % str(max_length))
|
|
|
return text
|
|
|
|
|
|
def clean_file(self):
|
|
|
return self._clean_files(self.cleaned_data['file'])
|
|
|
|
|
|
def clean(self):
|
|
|
cleaned_data = super(PostForm, self).clean()
|
|
|
|
|
|
if cleaned_data['email']:
|
|
|
if board_settings.get_bool(SECTION_FORMS, 'Autoban'):
|
|
|
self.need_to_ban = True
|
|
|
raise forms.ValidationError('A human cannot enter a hidden field')
|
|
|
|
|
|
if not self.errors:
|
|
|
self._clean_text_file()
|
|
|
|
|
|
limit_speed = board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed')
|
|
|
limit_first = board_settings.get_bool(SECTION_FORMS, 'LimitFirstPosting')
|
|
|
|
|
|
settings_manager = get_settings_manager(self)
|
|
|
if not self.errors and limit_speed or (limit_first and not settings_manager.get_setting(SETTING_CONFIRMED_USER)):
|
|
|
pow_difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty')
|
|
|
if pow_difficulty > 0:
|
|
|
validator = PowValidator(
|
|
|
self.session, cleaned_data['timestamp'],
|
|
|
cleaned_data['iteration'], cleaned_data['guess'],
|
|
|
cleaned_data['text'])
|
|
|
else:
|
|
|
validator = TimeValidator(self.session)
|
|
|
|
|
|
validator.validate()
|
|
|
for error in validator.get_errors():
|
|
|
self._add_general_error(error)
|
|
|
|
|
|
settings_manager.set_setting(SETTING_CONFIRMED_USER, True)
|
|
|
if self.cleaned_data['download_mode'] == DOWN_MODE_DOWNLOAD_UNIQUE:
|
|
|
self._check_file_duplicates(self.get_files())
|
|
|
|
|
|
return cleaned_data
|
|
|
|
|
|
def get_files(self):
|
|
|
"""
|
|
|
Gets file from form or URL.
|
|
|
"""
|
|
|
|
|
|
files = []
|
|
|
for file in self.cleaned_data['file']:
|
|
|
if isinstance(file, UploadedFile):
|
|
|
files.append(file)
|
|
|
|
|
|
return files
|
|
|
|
|
|
def get_file_urls(self):
|
|
|
files = []
|
|
|
for file in self.cleaned_data['file']:
|
|
|
if type(file) == str:
|
|
|
files.append(file)
|
|
|
|
|
|
return files
|
|
|
|
|
|
def get_tripcode(self):
|
|
|
title = self.cleaned_data['title']
|
|
|
if title is not None and TRIPCODE_DELIM in title:
|
|
|
tripcode = get_tripcode_from_text(title.split(TRIPCODE_DELIM, maxsplit=1)[1])
|
|
|
else:
|
|
|
tripcode = ''
|
|
|
return tripcode
|
|
|
|
|
|
def get_title(self):
|
|
|
title = self.cleaned_data['title']
|
|
|
if title is not None and TRIPCODE_DELIM in title:
|
|
|
return title.split(TRIPCODE_DELIM, maxsplit=1)[0]
|
|
|
else:
|
|
|
return title
|
|
|
|
|
|
def get_images(self):
|
|
|
return self.cleaned_data.get('stickers', [])
|
|
|
|
|
|
def is_subscribe(self):
|
|
|
return self.cleaned_data['subscribe']
|
|
|
|
|
|
def _update_file_extension(self, file):
|
|
|
if file:
|
|
|
mimetype = get_file_mimetype(file)
|
|
|
extension = MIMETYPE_EXTENSIONS.get(mimetype)
|
|
|
if extension:
|
|
|
filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0]
|
|
|
new_filename = filename + FILE_EXTENSION_DELIMITER + extension
|
|
|
|
|
|
file.name = new_filename
|
|
|
else:
|
|
|
logger.info('Unrecognized file mimetype: {}'.format(mimetype))
|
|
|
|
|
|
def _clean_files(self, inputs):
|
|
|
files = []
|
|
|
|
|
|
max_file_count = board_settings.get_int(SECTION_FORMS, 'MaxFileCount')
|
|
|
if len(inputs) > max_file_count:
|
|
|
raise forms.ValidationError(
|
|
|
ungettext_lazy(ERROR_MANY_FILES, ERROR_MANY_FILES,
|
|
|
max_file_count) % {'files': max_file_count})
|
|
|
|
|
|
size = 0
|
|
|
for file_input in inputs:
|
|
|
if isinstance(file_input, UploadedFile):
|
|
|
file = self._clean_file_file(file_input)
|
|
|
size += file.size
|
|
|
files.append(file)
|
|
|
else:
|
|
|
files.append(self._clean_file_url(file_input))
|
|
|
|
|
|
for file in files:
|
|
|
self._validate_image_dimensions(file)
|
|
|
validate_file_size(size)
|
|
|
|
|
|
return files
|
|
|
|
|
|
def _validate_image_dimensions(self, file):
|
|
|
if isinstance(file, UploadedFile):
|
|
|
mimetype = get_file_mimetype(file)
|
|
|
if mimetype.split('/')[-1] in FILE_TYPES_IMAGE:
|
|
|
Image.warnings.simplefilter('error', Image.DecompressionBombWarning)
|
|
|
try:
|
|
|
print(get_image_dimensions(file))
|
|
|
except Exception:
|
|
|
raise forms.ValidationError('Possible decompression bomb or large image.')
|
|
|
|
|
|
def _clean_file_file(self, file):
|
|
|
self._update_file_extension(file)
|
|
|
|
|
|
return file
|
|
|
|
|
|
def _clean_file_url(self, url):
|
|
|
file = None
|
|
|
|
|
|
if url:
|
|
|
mode = self.cleaned_data['download_mode']
|
|
|
if mode == DOWN_MODE_URL:
|
|
|
return url
|
|
|
|
|
|
try:
|
|
|
image = get_attachment_by_alias(url, self.session)
|
|
|
if image is not None:
|
|
|
if 'stickers' not in self.cleaned_data:
|
|
|
self.cleaned_data['stickers'] = []
|
|
|
self.cleaned_data['stickers'].append(image)
|
|
|
return
|
|
|
|
|
|
if file is None:
|
|
|
file = self._get_file_from_url(url)
|
|
|
if not file:
|
|
|
raise forms.ValidationError(_('Invalid URL'))
|
|
|
self._update_file_extension(file)
|
|
|
except forms.ValidationError as e:
|
|
|
# Assume we will get the plain URL instead of a file and save it
|
|
|
if mode == DOWN_MODE_TRY and (REGEX_URL.match(url) or REGEX_MAGNET.match(url)):
|
|
|
logger.info('Error in forms: {}'.format(e))
|
|
|
return url
|
|
|
else:
|
|
|
raise e
|
|
|
|
|
|
return file
|
|
|
|
|
|
def _clean_text_file(self):
|
|
|
text = self.cleaned_data.get('text')
|
|
|
file = self.get_files()
|
|
|
file_url = self.get_file_urls()
|
|
|
images = self.get_images()
|
|
|
|
|
|
if (not text) and (not file) and (not file_url) and len(images) == 0:
|
|
|
error_message = _('Either text or file must be entered.')
|
|
|
self._add_general_error(error_message)
|
|
|
|
|
|
def _get_file_from_url(self, url: str) -> SimpleUploadedFile:
|
|
|
"""
|
|
|
Gets an file file from URL.
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
return download(url)
|
|
|
except forms.ValidationError as e:
|
|
|
raise e
|
|
|
except Exception as e:
|
|
|
raise forms.ValidationError(e)
|
|
|
|
|
|
def _check_file_duplicates(self, files):
|
|
|
for file in files:
|
|
|
file_hash = utils.get_file_hash(file)
|
|
|
if Attachment.objects.get_existing_duplicate(file_hash, file):
|
|
|
self._add_general_error(_(ERROR_DUPLICATES))
|
|
|
|
|
|
def _add_general_error(self, message):
|
|
|
self.add_error('text', forms.ValidationError(message))
|
|
|
|
|
|
|
|
|
class ThreadForm(PostForm):
|
|
|
|
|
|
tags = forms.CharField(
|
|
|
widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}),
|
|
|
max_length=100, label=_('Tags'), required=True)
|
|
|
monochrome = forms.BooleanField(label=_('Monochrome'), required=False)
|
|
|
stickerpack = forms.BooleanField(label=_('Sticker Pack'), required=False)
|
|
|
|
|
|
def clean_tags(self):
|
|
|
tags = self.cleaned_data['tags'].strip()
|
|
|
|
|
|
if not tags or not REGEX_TAGS.match(tags):
|
|
|
raise forms.ValidationError(
|
|
|
_('Inappropriate characters in tags.'))
|
|
|
|
|
|
default_tag_name = board_settings.get(SECTION_FORMS, 'DefaultTag')\
|
|
|
.strip().lower()
|
|
|
|
|
|
required_tag_exists = False
|
|
|
tag_set = set()
|
|
|
for tag_string in tags.split():
|
|
|
tag_name = tag_string.strip().lower()
|
|
|
if tag_name == default_tag_name:
|
|
|
required_tag_exists = True
|
|
|
tag, created = Tag.objects.get_or_create_with_alias(
|
|
|
name=tag_name, required=True)
|
|
|
else:
|
|
|
tag, created = Tag.objects.get_or_create_with_alias(name=tag_name)
|
|
|
tag_set.add(tag)
|
|
|
|
|
|
# If this is a new tag, don't check for its parents because nobody
|
|
|
# added them yet
|
|
|
if not created:
|
|
|
tag_set |= set(tag.get_all_parents())
|
|
|
|
|
|
for tag in tag_set:
|
|
|
if tag.required:
|
|
|
required_tag_exists = True
|
|
|
break
|
|
|
|
|
|
# Use default tag if no section exists
|
|
|
if not required_tag_exists:
|
|
|
default_tag, created = Tag.objects.get_or_create_with_alias(
|
|
|
name=default_tag_name, required=True)
|
|
|
tag_set.add(default_tag)
|
|
|
|
|
|
return tag_set
|
|
|
|
|
|
def clean(self):
|
|
|
cleaned_data = super(ThreadForm, self).clean()
|
|
|
|
|
|
return cleaned_data
|
|
|
|
|
|
def is_monochrome(self):
|
|
|
return self.cleaned_data['monochrome']
|
|
|
|
|
|
def clean_stickerpack(self):
|
|
|
stickerpack = self.cleaned_data['stickerpack']
|
|
|
if stickerpack:
|
|
|
tripcode = self.get_tripcode()
|
|
|
if not tripcode:
|
|
|
raise forms.ValidationError(_(
|
|
|
'Tripcode should be specified to own a stickerpack.'))
|
|
|
title = self.get_title()
|
|
|
if not title:
|
|
|
raise forms.ValidationError(_(
|
|
|
'Title should be specified as a stickerpack name.'))
|
|
|
if not REGEX_TAGS.match(title):
|
|
|
raise forms.ValidationError(_('Inappropriate sticker pack name.'))
|
|
|
|
|
|
existing_pack = StickerPack.objects.filter(name=title).first()
|
|
|
if existing_pack:
|
|
|
if existing_pack.tripcode != tripcode:
|
|
|
raise forms.ValidationError(_(
|
|
|
'A sticker pack with this name already exists and is'
|
|
|
' owned by another tripcode.'))
|
|
|
if not existing_pack.tripcode:
|
|
|
raise forms.ValidationError(_(
|
|
|
'This sticker pack can only be updated by an '
|
|
|
'administrator.'))
|
|
|
|
|
|
return stickerpack
|
|
|
|
|
|
def is_stickerpack(self):
|
|
|
return self.cleaned_data['stickerpack']
|
|
|
|
|
|
|
|
|
class SettingsForm(NeboardForm):
|
|
|
|
|
|
theme = forms.ChoiceField(
|
|
|
choices=board_settings.get_list_dict('View', 'Themes'),
|
|
|
label=_('Theme'))
|
|
|
image_viewer = forms.ChoiceField(
|
|
|
choices=board_settings.get_list_dict('View', 'ImageViewers'),
|
|
|
label=_('Image view mode'))
|
|
|
username = forms.CharField(label=_('User name'), required=False)
|
|
|
timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone'))
|
|
|
subscribe_by_default = forms.BooleanField(
|
|
|
required=False, label=_('Subscribe to threads by default'))
|
|
|
|
|
|
def clean_username(self):
|
|
|
username = self.cleaned_data['username']
|
|
|
|
|
|
if username and not REGEX_USERNAMES.match(username):
|
|
|
raise forms.ValidationError(_('Inappropriate characters.'))
|
|
|
|
|
|
return username
|
|
|
|
|
|
|
|
|
class SearchForm(NeboardForm):
|
|
|
query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False)
|
|
|
|