forms.py
440 lines
| 14.4 KiB
| text/x-python
|
PythonLexer
/ boards / forms.py
neko259
|
r1299 | import hashlib | ||
neko259
|
r69 | import re | ||
neko259
|
r527 | import time | ||
neko259
|
r1372 | import logging | ||
neko259
|
r1428 | |||
neko259
|
r1372 | import pytz | ||
neko259
|
r1296 | |||
Ilyas
|
r14 | from django import forms | ||
neko259
|
r954 | from django.core.files.uploadedfile import SimpleUploadedFile | ||
neko259
|
r1077 | from django.core.exceptions import ObjectDoesNotExist | ||
neko259
|
r76 | from django.forms.util import ErrorList | ||
neko259
|
r1333 | from django.utils.translation import ugettext_lazy as _, ungettext_lazy | ||
neko259
|
r1428 | from django.utils import timezone | ||
neko259
|
r527 | |||
neko259
|
r438 | from boards.mdx_neboard import formatters | ||
neko259
|
r1328 | from boards.models.attachment.downloaders import Downloader | ||
neko259
|
r386 | from boards.models.post import TITLE_MAX_LENGTH | ||
neko259
|
r1077 | from boards.models import Tag, Post | ||
neko259
|
r1371 | from boards.utils import validate_file_size, get_file_mimetype, \ | ||
FILE_EXTENSION_DELIMITER | ||||
neko259
|
r35 | from neboard import settings | ||
neko259
|
r333 | import boards.settings as board_settings | ||
neko259
|
r1299 | import neboard | ||
neko259
|
r76 | |||
neko259
|
r1428 | POW_HASH_LENGTH = 16 | ||
POW_LIFE_MINUTES = 1 | ||||
neko259
|
r937 | REGEX_TAGS = re.compile(r'^[\w\s\d]+$', re.UNICODE) | ||
neko259
|
r1429 | REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE) | ||
neko259
|
r937 | |||
neko259
|
r642 | VETERAN_POSTING_DELAY = 5 | ||
neko259
|
r517 | ATTRIBUTE_PLACEHOLDER = 'placeholder' | ||
neko259
|
r1006 | ATTRIBUTE_ROWS = 'rows' | ||
neko259
|
r517 | |||
LAST_POST_TIME = 'last_post_time' | ||||
LAST_LOGIN_TIME = 'last_login_time' | ||||
neko259
|
r937 | TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.') | ||
neko259
|
r1093 | TAGS_PLACEHOLDER = _('music images i_dont_like_tags') | ||
neko259
|
r211 | |||
neko259
|
r527 | LABEL_TITLE = _('Title') | ||
LABEL_TEXT = _('Text') | ||||
neko259
|
r566 | LABEL_TAG = _('Tag') | ||
neko259
|
r718 | LABEL_SEARCH = _('Search') | ||
neko259
|
r566 | |||
neko259
|
r1333 | ERROR_SPEED = 'Please wait %(delay)d second before sending message' | ||
ERROR_SPEED_PLURAL = 'Please wait %(delay)d seconds before sending message' | ||||
neko259
|
r1105 | |||
neko259
|
r566 | TAG_MAX_LENGTH = 20 | ||
neko259
|
r1006 | TEXTAREA_ROWS = 4 | ||
neko259
|
r1367 | TRIPCODE_DELIM = '#' | ||
neko259
|
r1372 | # TODO Maybe this may be converted into the database table? | ||
MIMETYPE_EXTENSIONS = { | ||||
'image/jpeg': 'jpeg', | ||||
'image/png': 'png', | ||||
'image/gif': 'gif', | ||||
'video/webm': 'webm', | ||||
'application/pdf': 'pdf', | ||||
'x-diff': 'diff', | ||||
neko259
|
r1373 | 'image/svg+xml': 'svg', | ||
neko259
|
r1382 | 'application/x-shockwave-flash': 'swf', | ||
neko259
|
r1372 | } | ||
neko259
|
r153 | |||
neko259
|
r1065 | def get_timezones(): | ||
timezones = [] | ||||
for tz in pytz.common_timezones: | ||||
timezones.append((tz, tz),) | ||||
return timezones | ||||
neko259
|
r438 | class FormatPanel(forms.Textarea): | ||
neko259
|
r937 | """ | ||
Panel for text formatting. Consists of buttons to add different tags to the | ||||
form text area. | ||||
""" | ||||
neko259
|
r438 | def render(self, name, value, attrs=None): | ||
output = '<div id="mark-panel">' | ||||
for formatter in formatters: | ||||
neko259
|
r769 | output += '<span class="mark_btn"' + \ | ||
' onClick="addMarkToMsg(\'' + formatter.format_left + \ | ||||
neko259
|
r438 | '\', \'' + formatter.format_right + '\')">' + \ | ||
formatter.preview_left + formatter.name + \ | ||||
neko259
|
r769 | formatter.preview_right + '</span>' | ||
neko259
|
r438 | |||
output += '</div>' | ||||
neko259
|
r1408 | output += super(FormatPanel, self).render(name, value, attrs=attrs) | ||
neko259
|
r438 | |||
return output | ||||
neko259
|
r76 | class PlainErrorList(ErrorList): | ||
def __unicode__(self): | ||||
return self.as_text() | ||||
def as_text(self): | ||||
neko259
|
r769 | return ''.join(['(!) %s ' % e for e in self]) | ||
neko259
|
r76 | |||
neko259
|
r16 | |||
neko259
|
r205 | class NeboardForm(forms.Form): | ||
neko259
|
r937 | """ | ||
Form with neboard-specific formatting. | ||||
""" | ||||
neko259
|
r205 | |||
neko259
|
r426 | def as_div(self): | ||
neko259
|
r438 | """ | ||
Returns this form rendered as HTML <as_div>s. | ||||
""" | ||||
neko259
|
r205 | return self._html_output( | ||
neko259
|
r425 | # TODO Do not show hidden rows in the list here | ||
neko259
|
r1132 | normal_row='<div class="form-row">' | ||
'<div class="form-label">' | ||||
neko259
|
r205 | '%(label)s' | ||
neko259
|
r1132 | '</div>' | ||
'<div class="form-input">' | ||||
neko259
|
r205 | '%(field)s' | ||
neko259
|
r1132 | '</div>' | ||
'</div>' | ||||
neko259
|
r680 | '<div class="form-row">' | ||
neko259
|
r205 | '%(help_text)s' | ||
'</div>', | ||||
neko259
|
r426 | error_row='<div class="form-row">' | ||
'<div class="form-label"></div>' | ||||
'<div class="form-errors">%s</div>' | ||||
'</div>', | ||||
row_ender='</div>', | ||||
help_text_html='%s', | ||||
neko259
|
r205 | errors_on_separate_row=True) | ||
neko259
|
r533 | def as_json_errors(self): | ||
errors = [] | ||||
neko259
|
r771 | for name, field in list(self.fields.items()): | ||
neko259
|
r533 | if self[name].errors: | ||
errors.append({ | ||||
'field': name, | ||||
'errors': self[name].errors.as_text(), | ||||
}) | ||||
return errors | ||||
neko259
|
r438 | |||
neko259
|
r205 | class PostForm(NeboardForm): | ||
neko259
|
r76 | |||
neko259
|
r232 | title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False, | ||
neko259
|
r1299 | label=LABEL_TITLE, | ||
widget=forms.TextInput( | ||||
attrs={ATTRIBUTE_PLACEHOLDER: | ||||
'test#tripcode'})) | ||||
neko259
|
r517 | text = forms.CharField( | ||
neko259
|
r1006 | widget=FormatPanel(attrs={ | ||
ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER, | ||||
ATTRIBUTE_ROWS: TEXTAREA_ROWS, | ||||
}), | ||||
neko259
|
r527 | required=False, label=LABEL_TEXT) | ||
neko259
|
r1273 | file = forms.FileField(required=False, label=_('File'), | ||
neko259
|
r721 | widget=forms.ClearableFileInput( | ||
neko259
|
r1273 | attrs={'accept': 'file/*'})) | ||
file_url = forms.CharField(required=False, label=_('File URL'), | ||||
neko259
|
r954 | widget=forms.TextInput( | ||
attrs={ATTRIBUTE_PLACEHOLDER: | ||||
'http://example.com/image.png'})) | ||||
neko259
|
r29 | |||
neko259
|
r207 | # This field is for spam prevention only | ||
neko259
|
r232 | email = forms.CharField(max_length=100, required=False, label=_('e-mail'), | ||
widget=forms.TextInput(attrs={ | ||||
'class': 'form-email'})) | ||||
neko259
|
r1077 | threads = forms.CharField(required=False, label=_('Additional threads'), | ||
widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: | ||||
'123 456 789'})) | ||||
neko259
|
r207 | |||
neko259
|
r1428 | guess = forms.CharField(widget=forms.HiddenInput(), required=False) | ||
timestamp = forms.CharField(widget=forms.HiddenInput(), required=False) | ||||
iteration = forms.CharField(widget=forms.HiddenInput(), required=False) | ||||
neko259
|
r153 | session = None | ||
neko259
|
r271 | need_to_ban = False | ||
neko259
|
r153 | |||
neko259
|
r1371 | def _update_file_extension(self, file): | ||
if file: | ||||
neko259
|
r1372 | mimetype = get_file_mimetype(file) | ||
extension = MIMETYPE_EXTENSIONS.get(mimetype) | ||||
if extension: | ||||
filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0] | ||||
new_filename = filename + FILE_EXTENSION_DELIMITER + extension | ||||
neko259
|
r1371 | |||
neko259
|
r1372 | file.name = new_filename | ||
else: | ||||
logger = logging.getLogger('boards.forms.extension') | ||||
logger.info('Unrecognized file mimetype: {}'.format(mimetype)) | ||||
neko259
|
r1371 | |||
neko259
|
r76 | def clean_title(self): | ||
title = self.cleaned_data['title'] | ||||
if title: | ||||
if len(title) > TITLE_MAX_LENGTH: | ||||
neko259
|
r211 | raise forms.ValidationError(_('Title must have less than %s ' | ||
'characters') % | ||||
str(TITLE_MAX_LENGTH)) | ||||
neko259
|
r76 | return title | ||
neko259
|
r29 | def clean_text(self): | ||
neko259
|
r678 | text = self.cleaned_data['text'].strip() | ||
neko259
|
r29 | if text: | ||
neko259
|
r1153 | max_length = board_settings.get_int('Forms', 'MaxTextLength') | ||
if len(text) > max_length: | ||||
neko259
|
r211 | raise forms.ValidationError(_('Text must have less than %s ' | ||
neko259
|
r1153 | 'characters') % str(max_length)) | ||
neko259
|
r29 | return text | ||
neko259
|
r1273 | def clean_file(self): | ||
file = self.cleaned_data['file'] | ||||
neko259
|
r954 | |||
neko259
|
r1273 | if file: | ||
neko259
|
r1328 | validate_file_size(file.size) | ||
neko259
|
r1371 | self._update_file_extension(file) | ||
neko259
|
r954 | |||
neko259
|
r1273 | return file | ||
neko259
|
r954 | |||
neko259
|
r1273 | def clean_file_url(self): | ||
url = self.cleaned_data['file_url'] | ||||
neko259
|
r954 | |||
neko259
|
r1273 | file = None | ||
neko259
|
r954 | if url: | ||
neko259
|
r1273 | file = self._get_file_from_url(url) | ||
neko259
|
r954 | |||
neko259
|
r1273 | if not file: | ||
neko259
|
r954 | raise forms.ValidationError(_('Invalid URL')) | ||
neko259
|
r1027 | else: | ||
neko259
|
r1328 | validate_file_size(file.size) | ||
neko259
|
r1371 | self._update_file_extension(file) | ||
neko259
|
r527 | |||
neko259
|
r1273 | return file | ||
neko259
|
r29 | |||
neko259
|
r1077 | def clean_threads(self): | ||
threads_str = self.cleaned_data['threads'] | ||||
if len(threads_str) > 0: | ||||
threads_id_list = threads_str.split(' ') | ||||
threads = list() | ||||
for thread_id in threads_id_list: | ||||
try: | ||||
thread = Post.objects.get(id=int(thread_id)) | ||||
neko259
|
r1414 | if not thread.is_opening() or thread.get_thread().is_archived(): | ||
neko259
|
r1077 | raise ObjectDoesNotExist() | ||
threads.append(thread) | ||||
except (ObjectDoesNotExist, ValueError): | ||||
raise forms.ValidationError(_('Invalid additional thread list')) | ||||
return threads | ||||
neko259
|
r29 | def clean(self): | ||
cleaned_data = super(PostForm, self).clean() | ||||
neko259
|
r211 | if cleaned_data['email']: | ||
neko259
|
r271 | self.need_to_ban = True | ||
neko259
|
r211 | raise forms.ValidationError('A human cannot enter a hidden field') | ||
neko259
|
r207 | |||
neko259
|
r151 | if not self.errors: | ||
neko259
|
r1273 | self._clean_text_file() | ||
neko259
|
r77 | |||
neko259
|
r1428 | limit_speed = board_settings.get_bool('Forms', 'LimitPostingSpeed') | ||
if not self.errors and limit_speed: | ||||
pow_difficulty = board_settings.get_int('Forms', 'PowDifficulty') | ||||
if pow_difficulty > 0 and cleaned_data['timestamp'] and cleaned_data['iteration'] and cleaned_data['guess']: | ||||
self._validate_hash(cleaned_data['timestamp'], cleaned_data['iteration'], cleaned_data['guess'], cleaned_data['text']) | ||||
else: | ||||
self._validate_posting_speed() | ||||
neko259
|
r153 | |||
neko259
|
r76 | return cleaned_data | ||
neko259
|
r1273 | def get_file(self): | ||
neko259
|
r954 | """ | ||
neko259
|
r1273 | Gets file from form or URL. | ||
neko259
|
r954 | """ | ||
neko259
|
r1273 | file = self.cleaned_data['file'] | ||
return file or self.cleaned_data['file_url'] | ||||
neko259
|
r954 | |||
neko259
|
r1293 | def get_tripcode(self): | ||
neko259
|
r1299 | title = self.cleaned_data['title'] | ||
neko259
|
r1367 | if title is not None and TRIPCODE_DELIM in title: | ||
code = title.split(TRIPCODE_DELIM, maxsplit=1)[1] + neboard.settings.SECRET_KEY | ||||
tripcode = hashlib.md5(code.encode()).hexdigest() | ||||
else: | ||||
tripcode = '' | ||||
return tripcode | ||||
neko259
|
r1299 | |||
def get_title(self): | ||||
title = self.cleaned_data['title'] | ||||
neko259
|
r1367 | if title is not None and TRIPCODE_DELIM in title: | ||
return title.split(TRIPCODE_DELIM, maxsplit=1)[0] | ||||
neko259
|
r1299 | else: | ||
return title | ||||
neko259
|
r1293 | |||
neko259
|
r1273 | def _clean_text_file(self): | ||
neko259
|
r76 | text = self.cleaned_data.get('text') | ||
neko259
|
r1273 | file = self.get_file() | ||
neko259
|
r29 | |||
neko259
|
r1273 | if (not text) and (not file): | ||
error_message = _('Either text or file must be entered.') | ||||
neko259
|
r77 | self._errors['text'] = self.error_class([error_message]) | ||
neko259
|
r153 | |||
def _validate_posting_speed(self): | ||||
can_post = True | ||||
neko259
|
r728 | posting_delay = settings.POSTING_DELAY | ||
neko259
|
r642 | |||
neko259
|
r1153 | if board_settings.get_bool('Forms', 'LimitPostingSpeed'): | ||
neko259
|
r153 | now = time.time() | ||
neko259
|
r1105 | |||
current_delay = 0 | ||||
neko259
|
r1333 | if LAST_POST_TIME not in self.session: | ||
neko259
|
r1105 | self.session[LAST_POST_TIME] = now | ||
neko259
|
r153 | |||
neko259
|
r1105 | need_delay = True | ||
else: | ||||
last_post_time = self.session.get(LAST_POST_TIME) | ||||
current_delay = int(now - last_post_time) | ||||
neko259
|
r153 | |||
neko259
|
r1105 | need_delay = current_delay < posting_delay | ||
if need_delay: | ||||
neko259
|
r1333 | delay = posting_delay - current_delay | ||
error_message = ungettext_lazy(ERROR_SPEED, ERROR_SPEED_PLURAL, | ||||
delay) % {'delay': delay} | ||||
neko259
|
r153 | self._errors['text'] = self.error_class([error_message]) | ||
can_post = False | ||||
neko259
|
r1109 | if can_post: | ||
self.session[LAST_POST_TIME] = now | ||||
neko259
|
r29 | |||
neko259
|
r1273 | def _get_file_from_url(self, url: str) -> SimpleUploadedFile: | ||
neko259
|
r954 | """ | ||
neko259
|
r1273 | Gets an file file from URL. | ||
neko259
|
r954 | """ | ||
img_temp = None | ||||
try: | ||||
neko259
|
r1328 | for downloader in Downloader.__subclasses__(): | ||
if downloader.handles(url): | ||||
return downloader.download(url) | ||||
# If nobody of the specific downloaders handles this, use generic | ||||
# one | ||||
return Downloader.download(url) | ||||
except forms.ValidationError as e: | ||||
raise e | ||||
neko259
|
r1276 | except Exception as e: | ||
neko259
|
r1273 | # Just return no file | ||
neko259
|
r954 | pass | ||
neko259
|
r1428 | def _validate_hash(self, timestamp: str, iteration: str, guess: str, message: str): | ||
post_time = timezone.datetime.fromtimestamp( | ||||
int(timestamp[:-3]), tz=timezone.get_current_timezone()) | ||||
timedelta = (timezone.now() - post_time).seconds / 60 | ||||
if timedelta > POW_LIFE_MINUTES: | ||||
self._errors['text'] = self.error_class([_('Stale PoW.')]) | ||||
payload = timestamp + message.replace('\r\n', '\n') | ||||
difficulty = board_settings.get_int('Forms', 'PowDifficulty') | ||||
target = str(int(2 ** (POW_HASH_LENGTH * 3) / difficulty)) | ||||
if len(target) < POW_HASH_LENGTH: | ||||
target = '0' * (POW_HASH_LENGTH - len(target)) + target | ||||
computed_guess = hashlib.sha256((payload + iteration).encode())\ | ||||
.hexdigest()[0:POW_HASH_LENGTH] | ||||
if guess != computed_guess or guess > target: | ||||
self._errors['text'] = self.error_class( | ||||
[_('Invalid PoW.')]) | ||||
neko259
|
r29 | |||
class ThreadForm(PostForm): | ||||
neko259
|
r232 | |||
neko259
|
r517 | tags = forms.CharField( | ||
widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}), | ||||
neko259
|
r679 | max_length=100, label=_('Tags'), required=True) | ||
neko259
|
r31 | |||
def clean_tags(self): | ||||
neko259
|
r678 | tags = self.cleaned_data['tags'].strip() | ||
neko259
|
r69 | |||
neko259
|
r937 | if not tags or not REGEX_TAGS.match(tags): | ||
neko259
|
r679 | raise forms.ValidationError( | ||
_('Inappropriate characters in tags.')) | ||||
neko259
|
r31 | |||
neko259
|
r922 | required_tag_exists = False | ||
neko259
|
r1348 | tag_set = set() | ||
for tag_string in tags.split(): | ||||
tag, created = Tag.objects.get_or_create(name=tag_string.strip().lower()) | ||||
tag_set.add(tag) | ||||
# If this is a new tag, don't check for its parents because nobody | ||||
# added them yet | ||||
if not created: | ||||
neko259
|
r1361 | tag_set |= set(tag.get_all_parents()) | ||
neko259
|
r1348 | |||
for tag in tag_set: | ||||
if tag.required: | ||||
neko259
|
r1102 | required_tag_exists = True | ||
neko259
|
r937 | break | ||
neko259
|
r922 | |||
if not required_tag_exists: | ||||
neko259
|
r1099 | raise forms.ValidationError( | ||
neko259
|
r1259 | _('Need at least one section.')) | ||
neko259
|
r922 | |||
neko259
|
r1348 | return tag_set | ||
neko259
|
r31 | |||
def clean(self): | ||||
cleaned_data = super(ThreadForm, self).clean() | ||||
neko259
|
r35 | return cleaned_data | ||
neko259
|
r205 | class SettingsForm(NeboardForm): | ||
neko259
|
r1065 | theme = forms.ChoiceField(choices=settings.THEMES, label=_('Theme')) | ||
neko259
|
r1296 | image_viewer = forms.ChoiceField(choices=settings.IMAGE_VIEWERS, label=_('Image view mode')) | ||
neko259
|
r990 | username = forms.CharField(label=_('User name'), required=False) | ||
neko259
|
r1065 | timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone')) | ||
neko259
|
r144 | |||
neko259
|
r995 | def clean_username(self): | ||
username = self.cleaned_data['username'] | ||||
neko259
|
r1429 | if username and not REGEX_USERNAMES.match(username): | ||
neko259
|
r995 | raise forms.ValidationError(_('Inappropriate characters.')) | ||
return username | ||||
neko259
|
r144 | |||
neko259
|
r718 | class SearchForm(NeboardForm): | ||
neko259
|
r729 | query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False) | ||