__init__.py
540 lines
| 18.5 KiB
| text/x-python
|
PythonLexer
neko259
|
r1757 | import logging | ||
import pytz | ||||
neko259
|
r1979 | import re | ||
neko259
|
r1822 | from PIL import Image | ||
neko259
|
r1757 | from django import forms | ||
neko259
|
r1979 | from django.core.files.images import get_image_dimensions | ||
neko259
|
r1761 | from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile | ||
neko259
|
r1757 | from django.forms.utils import ErrorList | ||
from django.utils.translation import ugettext_lazy as _, ungettext_lazy | ||||
import boards.settings as board_settings | ||||
neko259
|
r1855 | from boards import utils | ||
neko259
|
r1951 | from boards.abstracts.constants import REGEX_TAGS | ||
neko259
|
r2065 | from boards.abstracts.settingsmanager import get_settings_manager, \ | ||
SETTING_CONFIRMED_USER | ||||
neko259
|
r1942 | from boards.abstracts.sticker_factory import get_attachment_by_alias | ||
neko259
|
r1757 | from boards.forms.fields import UrlFileField | ||
neko259
|
r2065 | from boards.forms.validators import TimeValidator, PowValidator | ||
neko259
|
r1757 | from boards.mdx_neboard import formatters | ||
neko259
|
r1855 | from boards.models import Attachment | ||
neko259
|
r1757 | from boards.models import Tag | ||
neko259
|
r1951 | from boards.models.attachment import StickerPack | ||
neko259
|
r1765 | from boards.models.attachment.downloaders import download, REGEX_MAGNET | ||
neko259
|
r1979 | from boards.models.attachment.viewers import FILE_TYPES_IMAGE | ||
neko259
|
r1757 | from boards.models.post import TITLE_MAX_LENGTH | ||
neko259
|
r2065 | from boards.settings import SECTION_FORMS | ||
neko259
|
r1757 | from boards.utils import validate_file_size, get_file_mimetype, \ | ||
neko259
|
r1973 | FILE_EXTENSION_DELIMITER, get_tripcode_from_text | ||
neko259
|
r1757 | |||
neko259
|
r2038 | FORMAT_PANEL_BUTTON = '<span class="mark_btn" ' \ | ||
'onClick="addMarkToMsg(\'{}\', \'{}\')">{}{}{}</span>' | ||||
neko259
|
r1761 | |||
neko259
|
r1757 | |||
REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE) | ||||
neko259
|
r1765 | REGEX_URL = re.compile(r'^(http|https|ftp):\/\/', re.UNICODE) | ||
neko259
|
r1757 | |||
VETERAN_POSTING_DELAY = 5 | ||||
ATTRIBUTE_PLACEHOLDER = 'placeholder' | ||||
ATTRIBUTE_ROWS = 'rows' | ||||
TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.') | ||||
TAGS_PLACEHOLDER = _('music images i_dont_like_tags') | ||||
LABEL_TITLE = _('Title') | ||||
LABEL_TEXT = _('Text') | ||||
LABEL_TAG = _('Tag') | ||||
LABEL_SEARCH = _('Search') | ||||
neko259
|
r1761 | LABEL_FILE = _('File') | ||
neko259
|
r1855 | LABEL_DUPLICATES = _('Check for duplicates') | ||
neko259
|
r1871 | LABEL_URL = _('Do not download URLs') | ||
neko259
|
r1757 | |||
neko259
|
r1766 | ERROR_MANY_FILES = 'You can post no more than %(files)d file.' | ||
ERROR_MANY_FILES_PLURAL = 'You can post no more than %(files)d files.' | ||||
neko259
|
r1855 | ERROR_DUPLICATES = 'Some files are already present on the board.' | ||
neko259
|
r1757 | |||
TAG_MAX_LENGTH = 20 | ||||
TEXTAREA_ROWS = 4 | ||||
neko259
|
r1923 | TRIPCODE_DELIM = '##' | ||
neko259
|
r1757 | |||
# TODO Maybe this may be converted into the database table? | ||||
MIMETYPE_EXTENSIONS = { | ||||
'image/jpeg': 'jpeg', | ||||
'image/png': 'png', | ||||
'image/gif': 'gif', | ||||
'video/webm': 'webm', | ||||
'application/pdf': 'pdf', | ||||
'x-diff': 'diff', | ||||
'image/svg+xml': 'svg', | ||||
'application/x-shockwave-flash': 'swf', | ||||
'image/x-ms-bmp': 'bmp', | ||||
'image/bmp': 'bmp', | ||||
} | ||||
neko259
|
r1916 | DOWN_MODE_DOWNLOAD = 'DOWNLOAD' | ||
neko259
|
r1959 | DOWN_MODE_DOWNLOAD_UNIQUE = 'DOWNLOAD_UNIQUE' | ||
neko259
|
r1916 | DOWN_MODE_URL = 'URL' | ||
DOWN_MODE_TRY = 'TRY' | ||||
neko259
|
r1757 | |||
logger = logging.getLogger('boards.forms') | ||||
def get_timezones(): | ||||
timezones = [] | ||||
for tz in pytz.common_timezones: | ||||
timezones.append((tz, tz),) | ||||
return timezones | ||||
class FormatPanel(forms.Textarea): | ||||
""" | ||||
Panel for text formatting. Consists of buttons to add different tags to the | ||||
form text area. | ||||
""" | ||||
def render(self, name, value, attrs=None): | ||||
neko259
|
r2038 | output_template = '<div id="mark-panel">{}</div>' | ||
neko259
|
r1757 | |||
neko259
|
r2038 | buttons = [self._get_button(formatter) for formatter in formatters] | ||
output = output_template.format(''.join(buttons)) | ||||
neko259
|
r1757 | output += super(FormatPanel, self).render(name, value, attrs=attrs) | ||
return output | ||||
neko259
|
r2038 | def _get_button(self, formatter): | ||
return FORMAT_PANEL_BUTTON.format( | ||||
formatter.format_left, | ||||
formatter.format_right, | ||||
formatter.preview_left, | ||||
formatter.name, | ||||
formatter.preview_right) | ||||
neko259
|
r1757 | |||
class PlainErrorList(ErrorList): | ||||
def __unicode__(self): | ||||
return self.as_text() | ||||
def as_text(self): | ||||
return ''.join(['(!) %s ' % e for e in self]) | ||||
class NeboardForm(forms.Form): | ||||
""" | ||||
Form with neboard-specific formatting. | ||||
""" | ||||
required_css_class = 'required-field' | ||||
def as_div(self): | ||||
""" | ||||
Returns this form rendered as HTML <as_div>s. | ||||
""" | ||||
return self._html_output( | ||||
# TODO Do not show hidden rows in the list here | ||||
normal_row='<div class="form-row">' | ||||
'<div class="form-label">' | ||||
'%(label)s' | ||||
'</div>' | ||||
'<div class="form-input">' | ||||
'%(field)s' | ||||
'</div>' | ||||
'</div>' | ||||
'<div class="form-row">' | ||||
'%(help_text)s' | ||||
'</div>', | ||||
error_row='<div class="form-row">' | ||||
'<div class="form-label"></div>' | ||||
'<div class="form-errors">%s</div>' | ||||
'</div>', | ||||
row_ender='</div>', | ||||
help_text_html='%s', | ||||
errors_on_separate_row=True) | ||||
def as_json_errors(self): | ||||
errors = [] | ||||
for name, field in list(self.fields.items()): | ||||
if self[name].errors: | ||||
errors.append({ | ||||
'field': name, | ||||
'errors': self[name].errors.as_text(), | ||||
}) | ||||
return errors | ||||
class PostForm(NeboardForm): | ||||
title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False, | ||||
label=LABEL_TITLE, | ||||
widget=forms.TextInput( | ||||
neko259
|
r1923 | attrs={ATTRIBUTE_PLACEHOLDER: 'Title{}tripcode'.format(TRIPCODE_DELIM)})) | ||
neko259
|
r1757 | text = forms.CharField( | ||
widget=FormatPanel(attrs={ | ||||
ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER, | ||||
ATTRIBUTE_ROWS: TEXTAREA_ROWS, | ||||
}), | ||||
required=False, label=LABEL_TEXT) | ||||
neko259
|
r1916 | download_mode = forms.ChoiceField( | ||
choices=( | ||||
neko259
|
r1959 | (DOWN_MODE_TRY, _('Download or insert as URLs')), | ||
(DOWN_MODE_DOWNLOAD, _('Download')), | ||||
(DOWN_MODE_DOWNLOAD_UNIQUE, _('Download and check for uniqueness')), | ||||
neko259
|
r1916 | (DOWN_MODE_URL, _('Insert as URLs')), | ||
), | ||||
initial=DOWN_MODE_TRY, | ||||
neko259
|
r1959 | label=_('File process mode')) | ||
neko259
|
r1761 | file = UrlFileField(required=False, label=LABEL_FILE) | ||
neko259
|
r1757 | |||
# This field is for spam prevention only | ||||
email = forms.CharField(max_length=100, required=False, label=_('e-mail'), | ||||
widget=forms.TextInput(attrs={ | ||||
'class': 'form-email'})) | ||||
neko259
|
r2083 | subscribe = forms.BooleanField(required=False, | ||
neko259
|
r2088 | label=_('Subscribe to thread')) | ||
neko259
|
r1757 | |||
guess = forms.CharField(widget=forms.HiddenInput(), required=False) | ||||
timestamp = forms.CharField(widget=forms.HiddenInput(), required=False) | ||||
iteration = forms.CharField(widget=forms.HiddenInput(), required=False) | ||||
need_to_ban = False | ||||
neko259
|
r2069 | def __init__(self, data=None, files=None, auto_id='id_%s', prefix=None, | ||
initial=None, error_class=ErrorList, label_suffix=None, | ||||
empty_permitted=False, field_order=None, | ||||
use_required_attribute=None, renderer=None, session=None): | ||||
super().__init__(data, files, auto_id, prefix, initial, error_class, | ||||
label_suffix, empty_permitted, field_order, | ||||
use_required_attribute, renderer) | ||||
self.session = session | ||||
neko259
|
r1757 | def clean_title(self): | ||
title = self.cleaned_data['title'] | ||||
if title: | ||||
if len(title) > TITLE_MAX_LENGTH: | ||||
raise forms.ValidationError(_('Title must have less than %s ' | ||||
'characters') % | ||||
str(TITLE_MAX_LENGTH)) | ||||
return title | ||||
def clean_text(self): | ||||
text = self.cleaned_data['text'].strip() | ||||
if text: | ||||
neko259
|
r1761 | max_length = board_settings.get_int(SECTION_FORMS, 'MaxTextLength') | ||
neko259
|
r1757 | if len(text) > max_length: | ||
raise forms.ValidationError(_('Text must have less than %s ' | ||||
'characters') % str(max_length)) | ||||
return text | ||||
neko259
|
r1761 | def clean_file(self): | ||
return self._clean_files(self.cleaned_data['file']) | ||||
neko259
|
r1757 | |||
def clean(self): | ||||
cleaned_data = super(PostForm, self).clean() | ||||
if cleaned_data['email']: | ||||
neko259
|
r1761 | if board_settings.get_bool(SECTION_FORMS, 'Autoban'): | ||
neko259
|
r1757 | self.need_to_ban = True | ||
raise forms.ValidationError('A human cannot enter a hidden field') | ||||
if not self.errors: | ||||
self._clean_text_file() | ||||
neko259
|
r1761 | limit_speed = board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed') | ||
limit_first = board_settings.get_bool(SECTION_FORMS, 'LimitFirstPosting') | ||||
neko259
|
r1757 | |||
settings_manager = get_settings_manager(self) | ||||
neko259
|
r2065 | if not self.errors and limit_speed or (limit_first and not settings_manager.get_setting(SETTING_CONFIRMED_USER)): | ||
neko259
|
r1761 | pow_difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty') | ||
neko259
|
r1757 | if pow_difficulty > 0: | ||
neko259
|
r2065 | validator = PowValidator( | ||
self.session, cleaned_data['timestamp'], | ||||
cleaned_data['iteration'], cleaned_data['guess'], | ||||
cleaned_data['text']) | ||||
neko259
|
r1757 | else: | ||
neko259
|
r2065 | validator = TimeValidator(self.session) | ||
validator.validate() | ||||
for error in validator.get_errors(): | ||||
self._add_general_error(error) | ||||
settings_manager.set_setting(SETTING_CONFIRMED_USER, True) | ||||
neko259
|
r1959 | if self.cleaned_data['download_mode'] == DOWN_MODE_DOWNLOAD_UNIQUE: | ||
neko259
|
r1855 | self._check_file_duplicates(self.get_files()) | ||
neko259
|
r1757 | |||
return cleaned_data | ||||
def get_files(self): | ||||
""" | ||||
Gets file from form or URL. | ||||
""" | ||||
files = [] | ||||
neko259
|
r1761 | for file in self.cleaned_data['file']: | ||
neko259
|
r1757 | if isinstance(file, UploadedFile): | ||
files.append(file) | ||||
return files | ||||
def get_file_urls(self): | ||||
files = [] | ||||
neko259
|
r1761 | for file in self.cleaned_data['file']: | ||
neko259
|
r1757 | if type(file) == str: | ||
files.append(file) | ||||
return files | ||||
def get_tripcode(self): | ||||
title = self.cleaned_data['title'] | ||||
if title is not None and TRIPCODE_DELIM in title: | ||||
neko259
|
r1973 | tripcode = get_tripcode_from_text(title.split(TRIPCODE_DELIM, maxsplit=1)[1]) | ||
neko259
|
r1757 | else: | ||
tripcode = '' | ||||
return tripcode | ||||
def get_title(self): | ||||
title = self.cleaned_data['title'] | ||||
if title is not None and TRIPCODE_DELIM in title: | ||||
return title.split(TRIPCODE_DELIM, maxsplit=1)[0] | ||||
else: | ||||
return title | ||||
def get_images(self): | ||||
neko259
|
r1917 | return self.cleaned_data.get('stickers', []) | ||
neko259
|
r1757 | |||
def is_subscribe(self): | ||||
return self.cleaned_data['subscribe'] | ||||
neko259
|
r1758 | def _update_file_extension(self, file): | ||
if file: | ||||
mimetype = get_file_mimetype(file) | ||||
extension = MIMETYPE_EXTENSIONS.get(mimetype) | ||||
if extension: | ||||
filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0] | ||||
new_filename = filename + FILE_EXTENSION_DELIMITER + extension | ||||
file.name = new_filename | ||||
else: | ||||
logger.info('Unrecognized file mimetype: {}'.format(mimetype)) | ||||
neko259
|
r1761 | def _clean_files(self, inputs): | ||
files = [] | ||||
neko259
|
r1758 | |||
neko259
|
r1761 | max_file_count = board_settings.get_int(SECTION_FORMS, 'MaxFileCount') | ||
neko259
|
r1762 | if len(inputs) > max_file_count: | ||
neko259
|
r1766 | raise forms.ValidationError( | ||
ungettext_lazy(ERROR_MANY_FILES, ERROR_MANY_FILES, | ||||
max_file_count) % {'files': max_file_count}) | ||||
neko259
|
r1983 | |||
size = 0 | ||||
neko259
|
r1763 | for file_input in inputs: | ||
if isinstance(file_input, UploadedFile): | ||||
neko259
|
r1983 | file = self._clean_file_file(file_input) | ||
size += file.size | ||||
files.append(file) | ||||
neko259
|
r1762 | else: | ||
neko259
|
r1763 | files.append(self._clean_file_url(file_input)) | ||
neko259
|
r1761 | |||
neko259
|
r1822 | for file in files: | ||
self._validate_image_dimensions(file) | ||||
neko259
|
r1983 | validate_file_size(size) | ||
neko259
|
r1822 | |||
neko259
|
r1761 | return files | ||
neko259
|
r1758 | |||
neko259
|
r1822 | def _validate_image_dimensions(self, file): | ||
if isinstance(file, UploadedFile): | ||||
mimetype = get_file_mimetype(file) | ||||
if mimetype.split('/')[-1] in FILE_TYPES_IMAGE: | ||||
Image.warnings.simplefilter('error', Image.DecompressionBombWarning) | ||||
try: | ||||
print(get_image_dimensions(file)) | ||||
except Exception: | ||||
raise forms.ValidationError('Possible decompression bomb or large image.') | ||||
neko259
|
r1758 | def _clean_file_file(self, file): | ||
self._update_file_extension(file) | ||||
return file | ||||
def _clean_file_url(self, url): | ||||
file = None | ||||
if url: | ||||
neko259
|
r1916 | mode = self.cleaned_data['download_mode'] | ||
if mode == DOWN_MODE_URL: | ||||
neko259
|
r1871 | return url | ||
neko259
|
r1758 | try: | ||
neko259
|
r1942 | image = get_attachment_by_alias(url, self.session) | ||
neko259
|
r1917 | if image is not None: | ||
neko259
|
r1919 | if 'stickers' not in self.cleaned_data: | ||
self.cleaned_data['stickers'] = [] | ||||
self.cleaned_data['stickers'].append(image) | ||||
neko259
|
r1758 | return | ||
if file is None: | ||||
file = self._get_file_from_url(url) | ||||
if not file: | ||||
raise forms.ValidationError(_('Invalid URL')) | ||||
self._update_file_extension(file) | ||||
except forms.ValidationError as e: | ||||
# Assume we will get the plain URL instead of a file and save it | ||||
neko259
|
r1916 | if mode == DOWN_MODE_TRY and (REGEX_URL.match(url) or REGEX_MAGNET.match(url)): | ||
neko259
|
r1758 | logger.info('Error in forms: {}'.format(e)) | ||
return url | ||||
else: | ||||
raise e | ||||
return file | ||||
neko259
|
r1757 | def _clean_text_file(self): | ||
text = self.cleaned_data.get('text') | ||||
file = self.get_files() | ||||
file_url = self.get_file_urls() | ||||
images = self.get_images() | ||||
if (not text) and (not file) and (not file_url) and len(images) == 0: | ||||
error_message = _('Either text or file must be entered.') | ||||
neko259
|
r1855 | self._add_general_error(error_message) | ||
neko259
|
r1757 | |||
def _get_file_from_url(self, url: str) -> SimpleUploadedFile: | ||||
""" | ||||
Gets an file file from URL. | ||||
""" | ||||
try: | ||||
return download(url) | ||||
except forms.ValidationError as e: | ||||
raise e | ||||
except Exception as e: | ||||
raise forms.ValidationError(e) | ||||
neko259
|
r1855 | def _check_file_duplicates(self, files): | ||
for file in files: | ||||
file_hash = utils.get_file_hash(file) | ||||
if Attachment.objects.get_existing_duplicate(file_hash, file): | ||||
self._add_general_error(_(ERROR_DUPLICATES)) | ||||
def _add_general_error(self, message): | ||||
self.add_error('text', forms.ValidationError(message)) | ||||
neko259
|
r1757 | |||
class ThreadForm(PostForm): | ||||
tags = forms.CharField( | ||||
widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}), | ||||
max_length=100, label=_('Tags'), required=True) | ||||
monochrome = forms.BooleanField(label=_('Monochrome'), required=False) | ||||
neko259
|
r1951 | stickerpack = forms.BooleanField(label=_('Sticker Pack'), required=False) | ||
neko259
|
r1757 | |||
def clean_tags(self): | ||||
tags = self.cleaned_data['tags'].strip() | ||||
if not tags or not REGEX_TAGS.match(tags): | ||||
raise forms.ValidationError( | ||||
_('Inappropriate characters in tags.')) | ||||
neko259
|
r1761 | default_tag_name = board_settings.get(SECTION_FORMS, 'DefaultTag')\ | ||
neko259
|
r1757 | .strip().lower() | ||
required_tag_exists = False | ||||
tag_set = set() | ||||
for tag_string in tags.split(): | ||||
neko259
|
r1860 | tag_name = tag_string.strip().lower() | ||
if tag_name == default_tag_name: | ||||
neko259
|
r1757 | required_tag_exists = True | ||
neko259
|
r1874 | tag, created = Tag.objects.get_or_create_with_alias( | ||
neko259
|
r1860 | name=tag_name, required=True) | ||
neko259
|
r1757 | else: | ||
neko259
|
r1874 | tag, created = Tag.objects.get_or_create_with_alias(name=tag_name) | ||
neko259
|
r1757 | tag_set.add(tag) | ||
# If this is a new tag, don't check for its parents because nobody | ||||
# added them yet | ||||
if not created: | ||||
tag_set |= set(tag.get_all_parents()) | ||||
for tag in tag_set: | ||||
if tag.required: | ||||
required_tag_exists = True | ||||
break | ||||
# Use default tag if no section exists | ||||
if not required_tag_exists: | ||||
neko259
|
r1890 | default_tag, created = Tag.objects.get_or_create_with_alias( | ||
neko259
|
r1757 | name=default_tag_name, required=True) | ||
tag_set.add(default_tag) | ||||
return tag_set | ||||
def clean(self): | ||||
cleaned_data = super(ThreadForm, self).clean() | ||||
return cleaned_data | ||||
def is_monochrome(self): | ||||
return self.cleaned_data['monochrome'] | ||||
neko259
|
r1951 | def clean_stickerpack(self): | ||
stickerpack = self.cleaned_data['stickerpack'] | ||||
if stickerpack: | ||||
tripcode = self.get_tripcode() | ||||
if not tripcode: | ||||
raise forms.ValidationError(_( | ||||
'Tripcode should be specified to own a stickerpack.')) | ||||
title = self.get_title() | ||||
if not title: | ||||
raise forms.ValidationError(_( | ||||
'Title should be specified as a stickerpack name.')) | ||||
if not REGEX_TAGS.match(title): | ||||
raise forms.ValidationError(_('Inappropriate sticker pack name.')) | ||||
existing_pack = StickerPack.objects.filter(name=title).first() | ||||
if existing_pack: | ||||
if existing_pack.tripcode != tripcode: | ||||
raise forms.ValidationError(_( | ||||
'A sticker pack with this name already exists and is' | ||||
' owned by another tripcode.')) | ||||
if not existing_pack.tripcode: | ||||
raise forms.ValidationError(_( | ||||
'This sticker pack can only be updated by an ' | ||||
'administrator.')) | ||||
return stickerpack | ||||
def is_stickerpack(self): | ||||
return self.cleaned_data['stickerpack'] | ||||
neko259
|
r1757 | |||
class SettingsForm(NeboardForm): | ||||
neko259
|
r1774 | theme = forms.ChoiceField( | ||
choices=board_settings.get_list_dict('View', 'Themes'), | ||||
label=_('Theme')) | ||||
image_viewer = forms.ChoiceField( | ||||
choices=board_settings.get_list_dict('View', 'ImageViewers'), | ||||
label=_('Image view mode')) | ||||
neko259
|
r1757 | username = forms.CharField(label=_('User name'), required=False) | ||
timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone')) | ||||
neko259
|
r2088 | subscribe_by_default = forms.BooleanField( | ||
required=False, label=_('Subscribe to threads by default')) | ||||
neko259
|
r1757 | |||
def clean_username(self): | ||||
username = self.cleaned_data['username'] | ||||
if username and not REGEX_USERNAMES.match(username): | ||||
raise forms.ValidationError(_('Inappropriate characters.')) | ||||
return username | ||||
class SearchForm(NeboardForm): | ||||
query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False) | ||||