##// END OF EJS Templates
Search by attachment urls
Search by attachment urls

File last commit:

r1979:b71d904c default
r1982:2a32eaa4 default
Show More
__init__.py
586 lines | 20.3 KiB | text/x-python | PythonLexer
import logging
import time
import hashlib
import pytz
import re
from PIL import Image
from django import forms
from django.core.cache import cache
from django.core.files.images import get_image_dimensions
from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile
from django.forms.utils import ErrorList
from django.utils.translation import ugettext_lazy as _, ungettext_lazy
import boards.settings as board_settings
from boards import utils
from boards.abstracts.constants import REGEX_TAGS
from boards.abstracts.settingsmanager import get_settings_manager
from boards.abstracts.sticker_factory import get_attachment_by_alias
from boards.forms.fields import UrlFileField
from boards.mdx_neboard import formatters
from boards.models import Attachment
from boards.models import Tag
from boards.models.attachment import StickerPack
from boards.models.attachment.downloaders import download, REGEX_MAGNET
from boards.models.attachment.viewers import FILE_TYPES_IMAGE
from boards.models.post import TITLE_MAX_LENGTH
from boards.utils import validate_file_size, get_file_mimetype, \
FILE_EXTENSION_DELIMITER, get_tripcode_from_text
SECTION_FORMS = 'Forms'
POW_HASH_LENGTH = 16
POW_LIFE_MINUTES = 5
REGEX_USERNAMES = re.compile(r'^[\w\s\d,]+$', re.UNICODE)
REGEX_URL = re.compile(r'^(http|https|ftp):\/\/', re.UNICODE)
VETERAN_POSTING_DELAY = 5
ATTRIBUTE_PLACEHOLDER = 'placeholder'
ATTRIBUTE_ROWS = 'rows'
LAST_POST_TIME = 'last_post_time'
LAST_LOGIN_TIME = 'last_login_time'
TEXT_PLACEHOLDER = _('Type message here. Use formatting panel for more advanced usage.')
TAGS_PLACEHOLDER = _('music images i_dont_like_tags')
LABEL_TITLE = _('Title')
LABEL_TEXT = _('Text')
LABEL_TAG = _('Tag')
LABEL_SEARCH = _('Search')
LABEL_FILE = _('File')
LABEL_DUPLICATES = _('Check for duplicates')
LABEL_URL = _('Do not download URLs')
ERROR_SPEED = 'Please wait %(delay)d second before sending message'
ERROR_SPEED_PLURAL = 'Please wait %(delay)d seconds before sending message'
ERROR_MANY_FILES = 'You can post no more than %(files)d file.'
ERROR_MANY_FILES_PLURAL = 'You can post no more than %(files)d files.'
ERROR_DUPLICATES = 'Some files are already present on the board.'
TAG_MAX_LENGTH = 20
TEXTAREA_ROWS = 4
TRIPCODE_DELIM = '##'
# TODO Maybe this may be converted into the database table?
MIMETYPE_EXTENSIONS = {
'image/jpeg': 'jpeg',
'image/png': 'png',
'image/gif': 'gif',
'video/webm': 'webm',
'application/pdf': 'pdf',
'x-diff': 'diff',
'image/svg+xml': 'svg',
'application/x-shockwave-flash': 'swf',
'image/x-ms-bmp': 'bmp',
'image/bmp': 'bmp',
}
DOWN_MODE_DOWNLOAD = 'DOWNLOAD'
DOWN_MODE_DOWNLOAD_UNIQUE = 'DOWNLOAD_UNIQUE'
DOWN_MODE_URL = 'URL'
DOWN_MODE_TRY = 'TRY'
logger = logging.getLogger('boards.forms')
def get_timezones():
timezones = []
for tz in pytz.common_timezones:
timezones.append((tz, tz),)
return timezones
class FormatPanel(forms.Textarea):
"""
Panel for text formatting. Consists of buttons to add different tags to the
form text area.
"""
def render(self, name, value, attrs=None):
output = '<div id="mark-panel">'
for formatter in formatters:
output += '<span class="mark_btn"' + \
' onClick="addMarkToMsg(\'' + formatter.format_left + \
'\', \'' + formatter.format_right + '\')">' + \
formatter.preview_left + formatter.name + \
formatter.preview_right + '</span>'
output += '</div>'
output += super(FormatPanel, self).render(name, value, attrs=attrs)
return output
class PlainErrorList(ErrorList):
def __unicode__(self):
return self.as_text()
def as_text(self):
return ''.join(['(!) %s ' % e for e in self])
class NeboardForm(forms.Form):
"""
Form with neboard-specific formatting.
"""
required_css_class = 'required-field'
def as_div(self):
"""
Returns this form rendered as HTML <as_div>s.
"""
return self._html_output(
# TODO Do not show hidden rows in the list here
normal_row='<div class="form-row">'
'<div class="form-label">'
'%(label)s'
'</div>'
'<div class="form-input">'
'%(field)s'
'</div>'
'</div>'
'<div class="form-row">'
'%(help_text)s'
'</div>',
error_row='<div class="form-row">'
'<div class="form-label"></div>'
'<div class="form-errors">%s</div>'
'</div>',
row_ender='</div>',
help_text_html='%s',
errors_on_separate_row=True)
def as_json_errors(self):
errors = []
for name, field in list(self.fields.items()):
if self[name].errors:
errors.append({
'field': name,
'errors': self[name].errors.as_text(),
})
return errors
class PostForm(NeboardForm):
title = forms.CharField(max_length=TITLE_MAX_LENGTH, required=False,
label=LABEL_TITLE,
widget=forms.TextInput(
attrs={ATTRIBUTE_PLACEHOLDER: 'Title{}tripcode'.format(TRIPCODE_DELIM)}))
text = forms.CharField(
widget=FormatPanel(attrs={
ATTRIBUTE_PLACEHOLDER: TEXT_PLACEHOLDER,
ATTRIBUTE_ROWS: TEXTAREA_ROWS,
}),
required=False, label=LABEL_TEXT)
download_mode = forms.ChoiceField(
choices=(
(DOWN_MODE_TRY, _('Download or insert as URLs')),
(DOWN_MODE_DOWNLOAD, _('Download')),
(DOWN_MODE_DOWNLOAD_UNIQUE, _('Download and check for uniqueness')),
(DOWN_MODE_URL, _('Insert as URLs')),
),
initial=DOWN_MODE_TRY,
label=_('File process mode'))
file = UrlFileField(required=False, label=LABEL_FILE)
# This field is for spam prevention only
email = forms.CharField(max_length=100, required=False, label=_('e-mail'),
widget=forms.TextInput(attrs={
'class': 'form-email'}))
subscribe = forms.BooleanField(required=False, label=_('Subscribe to thread'))
guess = forms.CharField(widget=forms.HiddenInput(), required=False)
timestamp = forms.CharField(widget=forms.HiddenInput(), required=False)
iteration = forms.CharField(widget=forms.HiddenInput(), required=False)
session = None
need_to_ban = False
def clean_title(self):
title = self.cleaned_data['title']
if title:
if len(title) > TITLE_MAX_LENGTH:
raise forms.ValidationError(_('Title must have less than %s '
'characters') %
str(TITLE_MAX_LENGTH))
return title
def clean_text(self):
text = self.cleaned_data['text'].strip()
if text:
max_length = board_settings.get_int(SECTION_FORMS, 'MaxTextLength')
if len(text) > max_length:
raise forms.ValidationError(_('Text must have less than %s '
'characters') % str(max_length))
return text
def clean_file(self):
return self._clean_files(self.cleaned_data['file'])
def clean(self):
cleaned_data = super(PostForm, self).clean()
if cleaned_data['email']:
if board_settings.get_bool(SECTION_FORMS, 'Autoban'):
self.need_to_ban = True
raise forms.ValidationError('A human cannot enter a hidden field')
if not self.errors:
self._clean_text_file()
limit_speed = board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed')
limit_first = board_settings.get_bool(SECTION_FORMS, 'LimitFirstPosting')
settings_manager = get_settings_manager(self)
if not self.errors and limit_speed or (limit_first and not settings_manager.get_setting('confirmed_user')):
pow_difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty')
if pow_difficulty > 0:
# PoW-based
if cleaned_data['timestamp'] \
and cleaned_data['iteration'] and cleaned_data['guess'] \
and not settings_manager.get_setting('confirmed_user'):
self._validate_hash(cleaned_data['timestamp'], cleaned_data['iteration'], cleaned_data['guess'], cleaned_data['text'])
else:
# Time-based
self._validate_posting_speed()
settings_manager.set_setting('confirmed_user', True)
if self.cleaned_data['download_mode'] == DOWN_MODE_DOWNLOAD_UNIQUE:
self._check_file_duplicates(self.get_files())
return cleaned_data
def get_files(self):
"""
Gets file from form or URL.
"""
files = []
for file in self.cleaned_data['file']:
if isinstance(file, UploadedFile):
files.append(file)
return files
def get_file_urls(self):
files = []
for file in self.cleaned_data['file']:
if type(file) == str:
files.append(file)
return files
def get_tripcode(self):
title = self.cleaned_data['title']
if title is not None and TRIPCODE_DELIM in title:
tripcode = get_tripcode_from_text(title.split(TRIPCODE_DELIM, maxsplit=1)[1])
else:
tripcode = ''
return tripcode
def get_title(self):
title = self.cleaned_data['title']
if title is not None and TRIPCODE_DELIM in title:
return title.split(TRIPCODE_DELIM, maxsplit=1)[0]
else:
return title
def get_images(self):
return self.cleaned_data.get('stickers', [])
def is_subscribe(self):
return self.cleaned_data['subscribe']
def _update_file_extension(self, file):
if file:
mimetype = get_file_mimetype(file)
extension = MIMETYPE_EXTENSIONS.get(mimetype)
if extension:
filename = file.name.split(FILE_EXTENSION_DELIMITER, 1)[0]
new_filename = filename + FILE_EXTENSION_DELIMITER + extension
file.name = new_filename
else:
logger.info('Unrecognized file mimetype: {}'.format(mimetype))
def _clean_files(self, inputs):
files = []
max_file_count = board_settings.get_int(SECTION_FORMS, 'MaxFileCount')
if len(inputs) > max_file_count:
raise forms.ValidationError(
ungettext_lazy(ERROR_MANY_FILES, ERROR_MANY_FILES,
max_file_count) % {'files': max_file_count})
for file_input in inputs:
if isinstance(file_input, UploadedFile):
files.append(self._clean_file_file(file_input))
else:
files.append(self._clean_file_url(file_input))
for file in files:
self._validate_image_dimensions(file)
return files
def _validate_image_dimensions(self, file):
if isinstance(file, UploadedFile):
mimetype = get_file_mimetype(file)
if mimetype.split('/')[-1] in FILE_TYPES_IMAGE:
Image.warnings.simplefilter('error', Image.DecompressionBombWarning)
try:
print(get_image_dimensions(file))
except Exception:
raise forms.ValidationError('Possible decompression bomb or large image.')
def _clean_file_file(self, file):
validate_file_size(file.size)
self._update_file_extension(file)
return file
def _clean_file_url(self, url):
file = None
if url:
mode = self.cleaned_data['download_mode']
if mode == DOWN_MODE_URL:
return url
try:
image = get_attachment_by_alias(url, self.session)
if image is not None:
if 'stickers' not in self.cleaned_data:
self.cleaned_data['stickers'] = []
self.cleaned_data['stickers'].append(image)
return
if file is None:
file = self._get_file_from_url(url)
if not file:
raise forms.ValidationError(_('Invalid URL'))
else:
validate_file_size(file.size)
self._update_file_extension(file)
except forms.ValidationError as e:
# Assume we will get the plain URL instead of a file and save it
if mode == DOWN_MODE_TRY and (REGEX_URL.match(url) or REGEX_MAGNET.match(url)):
logger.info('Error in forms: {}'.format(e))
return url
else:
raise e
return file
def _clean_text_file(self):
text = self.cleaned_data.get('text')
file = self.get_files()
file_url = self.get_file_urls()
images = self.get_images()
if (not text) and (not file) and (not file_url) and len(images) == 0:
error_message = _('Either text or file must be entered.')
self._add_general_error(error_message)
def _get_cache_key(self, key):
return '{}_{}'.format(self.session.session_key, key)
def _set_session_cache(self, key, value):
cache.set(self._get_cache_key(key), value)
def _get_session_cache(self, key):
return cache.get(self._get_cache_key(key))
def _get_last_post_time(self):
last = self._get_session_cache(LAST_POST_TIME)
if last is None:
last = self.session.get(LAST_POST_TIME)
return last
def _validate_posting_speed(self):
can_post = True
posting_delay = board_settings.get_int(SECTION_FORMS, 'PostingDelay')
if board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed'):
now = time.time()
current_delay = 0
if LAST_POST_TIME not in self.session:
self.session[LAST_POST_TIME] = now
need_delay = True
else:
last_post_time = self._get_last_post_time()
current_delay = int(now - last_post_time)
need_delay = current_delay < posting_delay
self._set_session_cache(LAST_POST_TIME, now)
if need_delay:
delay = posting_delay - current_delay
error_message = ungettext_lazy(ERROR_SPEED, ERROR_SPEED_PLURAL,
delay) % {'delay': delay}
self._add_general_error(error_message)
can_post = False
if can_post:
self.session[LAST_POST_TIME] = now
else:
# Reset the time since posting failed
self._set_session_cache(LAST_POST_TIME, self.session[LAST_POST_TIME])
def _get_file_from_url(self, url: str) -> SimpleUploadedFile:
"""
Gets an file file from URL.
"""
try:
return download(url)
except forms.ValidationError as e:
raise e
except Exception as e:
raise forms.ValidationError(e)
def _validate_hash(self, timestamp: str, iteration: str, guess: str, message: str):
payload = timestamp + message.replace('\r\n', '\n')
difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty')
target = str(int(2 ** (POW_HASH_LENGTH * 3) / difficulty))
if len(target) < POW_HASH_LENGTH:
target = '0' * (POW_HASH_LENGTH - len(target)) + target
computed_guess = hashlib.sha256((payload + iteration).encode())\
.hexdigest()[0:POW_HASH_LENGTH]
if guess != computed_guess or guess > target:
self._add_general_error(_('Invalid PoW.'))
def _check_file_duplicates(self, files):
for file in files:
file_hash = utils.get_file_hash(file)
if Attachment.objects.get_existing_duplicate(file_hash, file):
self._add_general_error(_(ERROR_DUPLICATES))
def _add_general_error(self, message):
self.add_error('text', forms.ValidationError(message))
class ThreadForm(PostForm):
tags = forms.CharField(
widget=forms.TextInput(attrs={ATTRIBUTE_PLACEHOLDER: TAGS_PLACEHOLDER}),
max_length=100, label=_('Tags'), required=True)
monochrome = forms.BooleanField(label=_('Monochrome'), required=False)
stickerpack = forms.BooleanField(label=_('Sticker Pack'), required=False)
def clean_tags(self):
tags = self.cleaned_data['tags'].strip()
if not tags or not REGEX_TAGS.match(tags):
raise forms.ValidationError(
_('Inappropriate characters in tags.'))
default_tag_name = board_settings.get(SECTION_FORMS, 'DefaultTag')\
.strip().lower()
required_tag_exists = False
tag_set = set()
for tag_string in tags.split():
tag_name = tag_string.strip().lower()
if tag_name == default_tag_name:
required_tag_exists = True
tag, created = Tag.objects.get_or_create_with_alias(
name=tag_name, required=True)
else:
tag, created = Tag.objects.get_or_create_with_alias(name=tag_name)
tag_set.add(tag)
# If this is a new tag, don't check for its parents because nobody
# added them yet
if not created:
tag_set |= set(tag.get_all_parents())
for tag in tag_set:
if tag.required:
required_tag_exists = True
break
# Use default tag if no section exists
if not required_tag_exists:
default_tag, created = Tag.objects.get_or_create_with_alias(
name=default_tag_name, required=True)
tag_set.add(default_tag)
return tag_set
def clean(self):
cleaned_data = super(ThreadForm, self).clean()
return cleaned_data
def is_monochrome(self):
return self.cleaned_data['monochrome']
def clean_stickerpack(self):
stickerpack = self.cleaned_data['stickerpack']
if stickerpack:
tripcode = self.get_tripcode()
if not tripcode:
raise forms.ValidationError(_(
'Tripcode should be specified to own a stickerpack.'))
title = self.get_title()
if not title:
raise forms.ValidationError(_(
'Title should be specified as a stickerpack name.'))
if not REGEX_TAGS.match(title):
raise forms.ValidationError(_('Inappropriate sticker pack name.'))
existing_pack = StickerPack.objects.filter(name=title).first()
if existing_pack:
if existing_pack.tripcode != tripcode:
raise forms.ValidationError(_(
'A sticker pack with this name already exists and is'
' owned by another tripcode.'))
if not existing_pack.tripcode:
raise forms.ValidationError(_(
'This sticker pack can only be updated by an '
'administrator.'))
return stickerpack
def is_stickerpack(self):
return self.cleaned_data['stickerpack']
class SettingsForm(NeboardForm):
theme = forms.ChoiceField(
choices=board_settings.get_list_dict('View', 'Themes'),
label=_('Theme'))
image_viewer = forms.ChoiceField(
choices=board_settings.get_list_dict('View', 'ImageViewers'),
label=_('Image view mode'))
username = forms.CharField(label=_('User name'), required=False)
timezone = forms.ChoiceField(choices=get_timezones(), label=_('Time zone'))
def clean_username(self):
username = self.cleaned_data['username']
if username and not REGEX_USERNAMES.match(username):
raise forms.ValidationError(_('Inappropriate characters.'))
return username
class SearchForm(NeboardForm):
query = forms.CharField(max_length=500, label=LABEL_SEARCH, required=False)