import hashlib import re import time import logging import pytz from django import forms from django.core.files.uploadedfile import SimpleUploadedFile from django.core.exceptions import ObjectDoesNotExist from django.forms.utils import ErrorList from django.utils.translation import ugettext_lazy as _, ungettext_lazy from django.utils import timezone from boards.abstracts.settingsmanager import get_settings_manager from boards.abstracts.attachment_alias import get_image_by_alias from boards.mdx_neboard import formatters from boards.models.attachment.downloaders import download from boards.models.post import TITLE_MAX_LENGTH from boards.models import Tag, Post from boards.utils import validate_file_size, get_file_mimetype, \ FILE_EXTENSION_DELIMITER from neboard import settings import boards.settings as board_settings import neboard POW_HASH_LENGTH = 16 POW_LIFE_MINUTES = 5 REGEX_TAGS = re.compile(r'^[\w\s\d]+$', re.UNICODE) REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE) VETERAN_POSTING_DELAY = 5 ATTRIBUTE_PLACEHOLDER = 'placeholder' ATTRIBUTE_ROWS = 'rows' LAST_POST_TIME = 'last_post_time' LAST_LOGIN_TIME = 'last_login_time' TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.') TAGS_PLACEHOLDER = _('music images i_dont_like_tags') LABEL_TITLE = _('Title') LABEL_TEXT = _('Text') LABEL_TAG = _('Tag') LABEL_SEARCH = _('Search') ERROR_SPEED = 'Please wait %(delay)d second before sending message' ERROR_SPEED_PLURAL = 'Please wait %(delay)d seconds before sending message' TAG_MAX_LENGTH = 20 TEXTAREA_ROWS = 4 TRIPCODE_DELIM = '#' # TODO Maybe this may be converted into the database table? MIMETYPE_EXTENSIONS = { 'image/jpeg': 'jpeg', 'image/png': 'png', 'image/gif': 'gif', 'video/webm': 'webm', 'application/pdf': 'pdf', 'x-diff': 'diff', 'image/svg+xml': 'svg', 'application/x-shockwave-flash': 'swf', 'image/x-ms-bmp': 'bmp', 'image/bmp': 'bmp', } def get_timezones(): timezones = [] for tz in pytz.common_timezones: timezones.append((tz, tz),) return timezones class FormatPanel(forms.Textarea): """ Panel for text formatting. Consists of buttons to add different tags to the form text area. """ def render(self, name, value, attrs=None): output = '
' for formatter in formatters: output += '' + \ formatter.preview_left + formatter.name + \ formatter.preview_right + '' output += '
' output += super(FormatPanel, self).render(name, value, attrs=attrs) return output class PlainErrorList(ErrorList): def __unicode__(self): return self.as_text() def as_text(self): return ''.join(['(!) %s ' % e for e in self]) class NeboardForm(forms.Form): """ Form with neboard-specific formatting. """ def as_div(self): """ Returns this form rendered as HTML s. """ return self._html_output( # TODO Do not show hidden rows in the list here normal_row='
' '
' '%(label)s' '
' '
' '%(field)s' '
' '
' '
' '%(help_text)s' '
', error_row='
' '
' '
%s
' '
', row_ender='', help_text_html='%s', errors_on_separate_row=True) def as_json_errors(self): errors = [] for name, field in list(self.fields.items()): if self[name].errors: errors.append({ 'field': name, 'errors': self[name].errors.as_text(), }) return errors class PostForm(NeboardForm): title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False, label=LABEL_TITLE, widget=forms.TextInput( attrs={ATTRIBUTE_PLACEHOLDER: 'test#tripcode'})) text = forms.CharField( widget=FormatPanel(attrs={ ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER, ATTRIBUTE_ROWS: TEXTAREA_ROWS, }), required=False, label=LABEL_TEXT) file = forms.FileField(required=False, label=_('File'), widget=forms.ClearableFileInput( attrs={'accept': 'file/*'})) file_url = forms.CharField(required=False, label=_('File URL'), widget=forms.TextInput( attrs={ATTRIBUTE_PLACEHOLDER: 'http://example.com/image.png'})) # This field is for spam prevention only email = forms.CharField(max_length=100, required=False, label=_('e-mail'), widget=forms.TextInput(attrs={ 'class': 'form-email'})) threads = forms.CharField(required=False, label=_('Additional threads'), widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: '123 456 789'})) subscribe = forms.BooleanField(required=False, label=_('Subscribe to thread')) guess = forms.CharField(widget=forms.HiddenInput(), required=False) timestamp = forms.CharField(widget=forms.HiddenInput(), required=False) iteration = forms.CharField(widget=forms.HiddenInput(), required=False) session = None need_to_ban = False image = None def _update_file_extension(self, file): if file: mimetype = get_file_mimetype(file) extension = MIMETYPE_EXTENSIONS.get(mimetype) if extension: filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0] new_filename = filename + FILE_EXTENSION_DELIMITER + extension file.name = new_filename else: logger = logging.getLogger('boards.forms.extension') logger.info('Unrecognized file mimetype: {}'.format(mimetype)) def clean_title(self): title = self.cleaned_data['title'] if title: if len(title) > TITLE_MAX_LENGTH: raise forms.ValidationError(_('Title must have less than %s ' 'characters') % str(TITLE_MAX_LENGTH)) return title def clean_text(self): text = self.cleaned_data['text'].strip() if text: max_length = board_settings.get_int('Forms', 'MaxTextLength') if len(text) > max_length: raise forms.ValidationError(_('Text must have less than %s ' 'characters') % str(max_length)) return text def clean_file(self): file = self.cleaned_data['file'] if file: validate_file_size(file.size) self._update_file_extension(file) return file def clean_file_url(self): url = self.cleaned_data['file_url'] file = None if url: file = get_image_by_alias(url, self.session) self.image = file if file is not None: return if file is None: file = self._get_file_from_url(url) if not file: raise forms.ValidationError(_('Invalid URL')) else: validate_file_size(file.size) self._update_file_extension(file) return file def clean_threads(self): threads_str = self.cleaned_data['threads'] if len(threads_str) > 0: threads_id_list = threads_str.split(' ') threads = list() for thread_id in threads_id_list: try: thread = Post.objects.get(id=int(thread_id)) if not thread.is_opening() or thread.get_thread().is_archived(): raise ObjectDoesNotExist() threads.append(thread) except (ObjectDoesNotExist, ValueError): raise forms.ValidationError(_('Invalid additional thread list')) return threads def clean(self): cleaned_data = super(PostForm, self).clean() if cleaned_data['email']: if board_settings.get_bool('Forms', 'Autoban'): self.need_to_ban = True raise forms.ValidationError('A human cannot enter a hidden field') if not self.errors: self._clean_text_file() limit_speed = board_settings.get_bool('Forms', 'LimitPostingSpeed') limit_first = board_settings.get_bool('Forms', 'LimitFirstPosting') settings_manager = get_settings_manager(self) if not self.errors and limit_speed or (limit_first and not settings_manager.get_setting('confirmed_user')): pow_difficulty = board_settings.get_int('Forms', 'PowDifficulty') if pow_difficulty > 0: # PoW-based if cleaned_data['timestamp'] \ and cleaned_data['iteration'] and cleaned_data['guess'] \ and not settings_manager.get_setting('confirmed_user'): self._validate_hash(cleaned_data['timestamp'], cleaned_data['iteration'], cleaned_data['guess'], cleaned_data['text']) else: # Time-based self._validate_posting_speed() settings_manager.set_setting('confirmed_user', True) return cleaned_data def get_file(self): """ Gets file from form or URL. """ file = self.cleaned_data['file'] return file or self.cleaned_data['file_url'] def get_tripcode(self): title = self.cleaned_data['title'] if title is not None and TRIPCODE_DELIM in title: code = title.split(TRIPCODE_DELIM, maxsplit=1)[1] + neboard.settings.SECRET_KEY tripcode = hashlib.md5(code.encode()).hexdigest() else: tripcode = '' return tripcode def get_title(self): title = self.cleaned_data['title'] if title is not None and TRIPCODE_DELIM in title: return title.split(TRIPCODE_DELIM, maxsplit=1)[0] else: return title def get_images(self): if self.image: return [self.image] else: return [] def is_subscribe(self): return self.cleaned_data['subscribe'] def _clean_text_file(self): text = self.cleaned_data.get('text') file = self.get_file() images = self.get_images() if (not text) and (not file) and len(images) == 0: error_message = _('Either text or file must be entered.') self._errors['text'] = self.error_class([error_message]) def _validate_posting_speed(self): can_post = True posting_delay = board_settings.get_int('Forms', 'PostingDelay') if board_settings.get_bool('Forms', 'LimitPostingSpeed'): now = time.time() current_delay = 0 if LAST_POST_TIME not in self.session: self.session[LAST_POST_TIME] = now need_delay = True else: last_post_time = self.session.get(LAST_POST_TIME) current_delay = int(now - last_post_time) need_delay = current_delay < posting_delay if need_delay: delay = posting_delay - current_delay error_message = ungettext_lazy(ERROR_SPEED, ERROR_SPEED_PLURAL, delay) % {'delay': delay} self._errors['text'] = self.error_class([error_message]) can_post = False if can_post: self.session[LAST_POST_TIME] = now def _get_file_from_url(self, url: str) -> SimpleUploadedFile: """ Gets an file file from URL. """ try: return download(url) except forms.ValidationError as e: raise e except Exception as e: raise forms.ValidationError(e) def _validate_hash(self, timestamp: str, iteration: str, guess: str, message: str): post_time = timezone.datetime.fromtimestamp( int(timestamp[:-3]), tz=timezone.get_current_timezone()) payload = timestamp + message.replace('\r\n', '\n') difficulty = board_settings.get_int('Forms', 'PowDifficulty') target = str(int(2 ** (POW_HASH_LENGTH * 3) / difficulty)) if len(target) < POW_HASH_LENGTH: target = '0' * (POW_HASH_LENGTH - len(target)) + target computed_guess = hashlib.sha256((payload + iteration).encode())\ .hexdigest()[0:POW_HASH_LENGTH] if guess != computed_guess or guess > target: self._errors['text'] = self.error_class( [_('Invalid PoW.')]) class ThreadForm(PostForm): tags = forms.CharField( widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}), max_length=100, label=_('Tags'), required=True) monochrome = forms.BooleanField(label=_('Monochrome'), required=False) def clean_tags(self): tags = self.cleaned_data['tags'].strip() if not tags or not REGEX_TAGS.match(tags): raise forms.ValidationError( _('Inappropriate characters in tags.')) required_tag_exists = False tag_set = set() for tag_string in tags.split(): tag, created = Tag.objects.get_or_create(name=tag_string.strip().lower()) tag_set.add(tag) # If this is a new tag, don't check for its parents because nobody # added them yet if not created: tag_set |= set(tag.get_all_parents()) for tag in tag_set: if tag.required: required_tag_exists = True break if not required_tag_exists: raise forms.ValidationError( _('Need at least one section.')) return tag_set def clean(self): cleaned_data = super(ThreadForm, self).clean() return cleaned_data def is_monochrome(self): return self.cleaned_data['monochrome'] class SettingsForm(NeboardForm): theme = forms.ChoiceField(choices=settings.THEMES, label=_('Theme')) image_viewer = forms.ChoiceField(choices=settings.IMAGE_VIEWERS, label=_('Image view mode')) username = forms.CharField(label=_('User name'), required=False) timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone')) def clean_username(self): username = self.cleaned_data['username'] if username and not REGEX_USERNAMES.match(username): raise forms.ValidationError(_('Inappropriate characters.')) return username class SearchForm(NeboardForm): query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False)