import hashlib import logging import re import time import traceback import pytz from PIL import Image from django import forms from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile from django.forms.utils import ErrorList from django.utils.translation import ugettext_lazy as _, ungettext_lazy from django.core.files.images import get_image_dimensions import boards.settings as board_settings import neboard from boards import utils from boards.abstracts.attachment_alias import get_image_by_alias from boards.abstracts.settingsmanager import get_settings_manager from boards.forms.fields import UrlFileField from boards.mdx_neboard import formatters from boards.models import Attachment from boards.models import Tag from boards.models.attachment.downloaders import download, REGEX_MAGNET from boards.models.post import TITLE_MAX_LENGTH from boards.utils import validate_file_size, get_file_mimetype, \ FILE_EXTENSION_DELIMITER from boards.models.attachment.viewers import FILE_TYPES_IMAGE from neboard import settings SECTION_FORMS = 'Forms' POW_HASH_LENGTH = 16 POW_LIFE_MINUTES = 5 REGEX_TAGS = re.compile(r'^[\w\s\d]+$', re.UNICODE) REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE) REGEX_URL = re.compile(r'^(http|https|ftp):\/\/', re.UNICODE) VETERAN_POSTING_DELAY = 5 ATTRIBUTE_PLACEHOLDER = 'placeholder' ATTRIBUTE_ROWS = 'rows' LAST_POST_TIME = 'last_post_time' LAST_LOGIN_TIME = 'last_login_time' TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.') TAGS_PLACEHOLDER = _('music images i_dont_like_tags') LABEL_TITLE = _('Title') LABEL_TEXT = _('Text') LABEL_TAG = _('Tag') LABEL_SEARCH = _('Search') LABEL_FILE = _('File') LABEL_DUPLICATES = _('Check for duplicates') LABEL_URL = _('Do not download URLs') ERROR_SPEED = 'Please wait %(delay)d second before sending message' ERROR_SPEED_PLURAL = 'Please wait %(delay)d seconds before sending message' ERROR_MANY_FILES = 'You can post no more than %(files)d file.' ERROR_MANY_FILES_PLURAL = 'You can post no more than %(files)d files.' ERROR_DUPLICATES = 'Some files are already present on the board.' TAG_MAX_LENGTH = 20 TEXTAREA_ROWS = 4 TRIPCODE_DELIM = '#' # TODO Maybe this may be converted into the database table? MIMETYPE_EXTENSIONS = { 'image/jpeg': 'jpeg', 'image/png': 'png', 'image/gif': 'gif', 'video/webm': 'webm', 'application/pdf': 'pdf', 'x-diff': 'diff', 'image/svg+xml': 'svg', 'application/x-shockwave-flash': 'swf', 'image/x-ms-bmp': 'bmp', 'image/bmp': 'bmp', } logger = logging.getLogger('boards.forms') def get_timezones(): timezones = [] for tz in pytz.common_timezones: timezones.append((tz, tz),) return timezones class FormatPanel(forms.Textarea): """ Panel for text formatting. Consists of buttons to add different tags to the form text area. """ def render(self, name, value, attrs=None): output = '
' for formatter in formatters: output += '' + \ formatter.preview_left + formatter.name + \ formatter.preview_right + '' output += '
' output += super(FormatPanel, self).render(name, value, attrs=attrs) return output class PlainErrorList(ErrorList): def __unicode__(self): return self.as_text() def as_text(self): return ''.join(['(!) %s ' % e for e in self]) class NeboardForm(forms.Form): """ Form with neboard-specific formatting. """ required_css_class = 'required-field' def as_div(self): """ Returns this form rendered as HTML s. """ return self._html_output( # TODO Do not show hidden rows in the list here normal_row='
' '
' '%(label)s' '
' '
' '%(field)s' '
' '
' '
' '%(help_text)s' '
', error_row='
' '
' '
%s
' '
', row_ender='', help_text_html='%s', errors_on_separate_row=True) def as_json_errors(self): errors = [] for name, field in list(self.fields.items()): if self[name].errors: errors.append({ 'field': name, 'errors': self[name].errors.as_text(), }) return errors class PostForm(NeboardForm): title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False, label=LABEL_TITLE, widget=forms.TextInput( attrs={ATTRIBUTE_PLACEHOLDER: 'title#tripcode'})) text = forms.CharField( widget=FormatPanel(attrs={ ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER, ATTRIBUTE_ROWS: TEXTAREA_ROWS, }), required=False, label=LABEL_TEXT) no_download = forms.BooleanField(required=False, label=LABEL_URL) file = UrlFileField(required=False, label=LABEL_FILE) # This field is for spam prevention only email = forms.CharField(max_length=100, required=False, label=_('e-mail'), widget=forms.TextInput(attrs={ 'class': 'form-email'})) subscribe = forms.BooleanField(required=False, label=_('Subscribe to thread')) check_duplicates = forms.BooleanField(required=False, label=LABEL_DUPLICATES) guess = forms.CharField(widget=forms.HiddenInput(), required=False) timestamp = forms.CharField(widget=forms.HiddenInput(), required=False) iteration = forms.CharField(widget=forms.HiddenInput(), required=False) session = None need_to_ban = False image = None def clean_title(self): title = self.cleaned_data['title'] if title: if len(title) > TITLE_MAX_LENGTH: raise forms.ValidationError(_('Title must have less than %s ' 'characters') % str(TITLE_MAX_LENGTH)) return title def clean_text(self): text = self.cleaned_data['text'].strip() if text: max_length = board_settings.get_int(SECTION_FORMS, 'MaxTextLength') if len(text) > max_length: raise forms.ValidationError(_('Text must have less than %s ' 'characters') % str(max_length)) return text def clean_file(self): return self._clean_files(self.cleaned_data['file']) def clean(self): cleaned_data = super(PostForm, self).clean() if cleaned_data['email']: if board_settings.get_bool(SECTION_FORMS, 'Autoban'): self.need_to_ban = True raise forms.ValidationError('A human cannot enter a hidden field') if not self.errors: self._clean_text_file() limit_speed = board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed') limit_first = board_settings.get_bool(SECTION_FORMS, 'LimitFirstPosting') settings_manager = get_settings_manager(self) if not self.errors and limit_speed or (limit_first and not settings_manager.get_setting('confirmed_user')): pow_difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty') if pow_difficulty > 0: # PoW-based if cleaned_data['timestamp'] \ and cleaned_data['iteration'] and cleaned_data['guess'] \ and not settings_manager.get_setting('confirmed_user'): self._validate_hash(cleaned_data['timestamp'], cleaned_data['iteration'], cleaned_data['guess'], cleaned_data['text']) else: # Time-based self._validate_posting_speed() settings_manager.set_setting('confirmed_user', True) if self.cleaned_data['check_duplicates']: self._check_file_duplicates(self.get_files()) return cleaned_data def get_files(self): """ Gets file from form or URL. """ files = [] for file in self.cleaned_data['file']: if isinstance(file, UploadedFile): files.append(file) return files def get_file_urls(self): files = [] for file in self.cleaned_data['file']: if type(file) == str: files.append(file) return files def get_tripcode(self): title = self.cleaned_data['title'] if title is not None and TRIPCODE_DELIM in title: code = title.split(TRIPCODE_DELIM, maxsplit=1)[1] + neboard.settings.SECRET_KEY tripcode = hashlib.md5(code.encode()).hexdigest() else: tripcode = '' return tripcode def get_title(self): title = self.cleaned_data['title'] if title is not None and TRIPCODE_DELIM in title: return title.split(TRIPCODE_DELIM, maxsplit=1)[0] else: return title def get_images(self): if self.image: return [self.image] else: return [] def is_subscribe(self): return self.cleaned_data['subscribe'] def _update_file_extension(self, file): if file: mimetype = get_file_mimetype(file) extension = MIMETYPE_EXTENSIONS.get(mimetype) if extension: filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0] new_filename = filename + FILE_EXTENSION_DELIMITER + extension file.name = new_filename else: logger.info('Unrecognized file mimetype: {}'.format(mimetype)) def _clean_files(self, inputs): files = [] max_file_count = board_settings.get_int(SECTION_FORMS, 'MaxFileCount') if len(inputs) > max_file_count: raise forms.ValidationError( ungettext_lazy(ERROR_MANY_FILES, ERROR_MANY_FILES, max_file_count) % {'files': max_file_count}) for file_input in inputs: if isinstance(file_input, UploadedFile): files.append(self._clean_file_file(file_input)) else: files.append(self._clean_file_url(file_input)) for file in files: self._validate_image_dimensions(file) return files def _validate_image_dimensions(self, file): if isinstance(file, UploadedFile): mimetype = get_file_mimetype(file) if mimetype.split('/')[-1] in FILE_TYPES_IMAGE: Image.warnings.simplefilter('error', Image.DecompressionBombWarning) try: print(get_image_dimensions(file)) except Exception: raise forms.ValidationError('Possible decompression bomb or large image.') def _clean_file_file(self, file): validate_file_size(file.size) self._update_file_extension(file) return file def _clean_file_url(self, url): file = None if url: if self.cleaned_data['no_download']: return url try: file = get_image_by_alias(url, self.session) self.image = file if file is not None: return if file is None: file = self._get_file_from_url(url) if not file: raise forms.ValidationError(_('Invalid URL')) else: validate_file_size(file.size) self._update_file_extension(file) except forms.ValidationError as e: # Assume we will get the plain URL instead of a file and save it if REGEX_URL.match(url) or REGEX_MAGNET.match(url): logger.info('Error in forms: {}'.format(e)) return url else: raise e return file def _clean_text_file(self): text = self.cleaned_data.get('text') file = self.get_files() file_url = self.get_file_urls() images = self.get_images() if (not text) and (not file) and (not file_url) and len(images) == 0: error_message = _('Either text or file must be entered.') self._add_general_error(error_message) def _validate_posting_speed(self): can_post = True posting_delay = board_settings.get_int(SECTION_FORMS, 'PostingDelay') if board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed'): now = time.time() current_delay = 0 if LAST_POST_TIME not in self.session: self.session[LAST_POST_TIME] = now need_delay = True else: last_post_time = self.session.get(LAST_POST_TIME) current_delay = int(now - last_post_time) need_delay = current_delay < posting_delay if need_delay: delay = posting_delay - current_delay error_message = ungettext_lazy(ERROR_SPEED, ERROR_SPEED_PLURAL, delay) % {'delay': delay} self._add_general_error(error_message) can_post = False if can_post: self.session[LAST_POST_TIME] = now def _get_file_from_url(self, url: str) -> SimpleUploadedFile: """ Gets an file file from URL. """ try: return download(url) except forms.ValidationError as e: raise e except Exception as e: raise forms.ValidationError(e) def _validate_hash(self, timestamp: str, iteration: str, guess: str, message: str): payload = timestamp + message.replace('\r\n', '\n') difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty') target = str(int(2 ** (POW_HASH_LENGTH * 3) / difficulty)) if len(target) < POW_HASH_LENGTH: target = '0' * (POW_HASH_LENGTH - len(target)) + target computed_guess = hashlib.sha256((payload + iteration).encode())\ .hexdigest()[0:POW_HASH_LENGTH] if guess != computed_guess or guess > target: self._add_general_error(_('Invalid PoW.')) def _check_file_duplicates(self, files): for file in files: file_hash = utils.get_file_hash(file) if Attachment.objects.get_existing_duplicate(file_hash, file): self._add_general_error(_(ERROR_DUPLICATES)) def _add_general_error(self, message): self.add_error('text', forms.ValidationError(message)) class ThreadForm(PostForm): tags = forms.CharField( widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}), max_length=100, label=_('Tags'), required=True) monochrome = forms.BooleanField(label=_('Monochrome'), required=False) def clean_tags(self): tags = self.cleaned_data['tags'].strip() if not tags or not REGEX_TAGS.match(tags): raise forms.ValidationError( _('Inappropriate characters in tags.')) default_tag_name = board_settings.get(SECTION_FORMS, 'DefaultTag')\ .strip().lower() required_tag_exists = False tag_set = set() for tag_string in tags.split(): tag_name = tag_string.strip().lower() if tag_name == default_tag_name: required_tag_exists = True tag, created = Tag.objects.get_or_create_with_alias( name=tag_name, required=True) else: tag, created = Tag.objects.get_or_create_with_alias(name=tag_name) tag_set.add(tag) # If this is a new tag, don't check for its parents because nobody # added them yet if not created: tag_set |= set(tag.get_all_parents()) for tag in tag_set: if tag.required: required_tag_exists = True break # Use default tag if no section exists if not required_tag_exists: default_tag, created = Tag.objects.get_or_create( name=default_tag_name, required=True) tag_set.add(default_tag) return tag_set def clean(self): cleaned_data = super(ThreadForm, self).clean() return cleaned_data def is_monochrome(self): return self.cleaned_data['monochrome'] class SettingsForm(NeboardForm): theme = forms.ChoiceField( choices=board_settings.get_list_dict('View', 'Themes'), label=_('Theme')) image_viewer = forms.ChoiceField( choices=board_settings.get_list_dict('View', 'ImageViewers'), label=_('Image view mode')) username = forms.CharField(label=_('User name'), required=False) timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone')) def clean_username(self): username = self.cleaned_data['username'] if username and not REGEX_USERNAMES.match(username): raise forms.ValidationError(_('Inappropriate characters.')) return username class SearchForm(NeboardForm): query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False)