|
|
"""
|
|
|
This module contains helper functions and helper classes.
|
|
|
"""
|
|
|
import time
|
|
|
import uuid
|
|
|
|
|
|
import hashlib
|
|
|
import magic
|
|
|
import os
|
|
|
from django import forms
|
|
|
from django.core.cache import cache
|
|
|
from django.db.models import Model
|
|
|
from django.template.defaultfilters import filesizeformat
|
|
|
from django.utils import timezone
|
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
|
|
import boards
|
|
|
from django.conf import settings
|
|
|
from boards.abstracts.constants import FILE_DIRECTORY
|
|
|
from boards.settings import get_bool, SECTION_FORMS
|
|
|
|
|
|
HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR'
|
|
|
META_REMOTE_ADDR = 'REMOTE_ADDR'
|
|
|
|
|
|
SETTING_MESSAGES = 'Messages'
|
|
|
SETTING_ANON_MODE = 'AnonymousMode'
|
|
|
|
|
|
ANON_IP = '127.0.0.1'
|
|
|
|
|
|
FILE_EXTENSION_DELIMITER = '.'
|
|
|
URL_DELIMITER = '/'
|
|
|
|
|
|
CACHE_PARAMS = '{}:{}'
|
|
|
CACHE_KEY_DELIMITER = '_'
|
|
|
|
|
|
DEFAULT_MIMETYPE = 'application/octet-stream'
|
|
|
|
|
|
|
|
|
def is_anonymous_mode():
|
|
|
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE)
|
|
|
|
|
|
|
|
|
def get_client_ip(request):
|
|
|
if is_anonymous_mode():
|
|
|
ip = ANON_IP
|
|
|
else:
|
|
|
x_forwarded_for = request.META.get(HTTP_FORWARDED)
|
|
|
if x_forwarded_for:
|
|
|
ip = x_forwarded_for.split(',')[-1].strip()
|
|
|
else:
|
|
|
ip = request.META.get(META_REMOTE_ADDR)
|
|
|
return ip
|
|
|
|
|
|
|
|
|
# TODO The output format is not epoch because it includes microseconds
|
|
|
def datetime_to_epoch(datetime):
|
|
|
return int(time.mktime(timezone.localtime(
|
|
|
datetime,timezone.get_current_timezone()).timetuple())
|
|
|
* 1000000 + datetime.microsecond)
|
|
|
|
|
|
|
|
|
# TODO Test this carefully
|
|
|
def cached_result(key_method=None):
|
|
|
"""
|
|
|
Caches method result in the Django's cache system, persisted by object name,
|
|
|
object name, model id if object is a Django model, args and kwargs if any.
|
|
|
"""
|
|
|
def _cached_result(function):
|
|
|
def inner_func(obj, *args, **kwargs):
|
|
|
cache_key_params = [obj.__class__.__name__, function.__name__]
|
|
|
|
|
|
cache_key_params += args
|
|
|
for key, value in kwargs:
|
|
|
cache_key_params.append(CACHE_PARAMS.format(key, value))
|
|
|
|
|
|
if isinstance(obj, Model):
|
|
|
cache_key_params.append(str(obj.id))
|
|
|
|
|
|
if key_method is not None:
|
|
|
cache_key_params += [str(arg) for arg in key_method(obj)]
|
|
|
|
|
|
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params)
|
|
|
|
|
|
result = cache.get(cache_key)
|
|
|
if result is None:
|
|
|
result = function(obj, *args, **kwargs)
|
|
|
if result is not None:
|
|
|
cache.set(cache_key, result)
|
|
|
|
|
|
return result
|
|
|
|
|
|
return inner_func
|
|
|
return _cached_result
|
|
|
|
|
|
|
|
|
def get_file_hash(file) -> str:
|
|
|
md5 = hashlib.md5()
|
|
|
for chunk in file.chunks():
|
|
|
md5.update(chunk)
|
|
|
return md5.hexdigest()
|
|
|
|
|
|
|
|
|
def validate_file_size(size: int):
|
|
|
max_size = boards.settings.get_int(SECTION_FORMS, 'MaxFileSize')
|
|
|
if 0 < max_size < size:
|
|
|
raise forms.ValidationError(
|
|
|
_('Total file size must be less than %s but is %s.')
|
|
|
% (filesizeformat(max_size), filesizeformat(size)))
|
|
|
|
|
|
|
|
|
def get_extension(filename):
|
|
|
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0]
|
|
|
|
|
|
|
|
|
def get_upload_filename(model_instance, old_filename):
|
|
|
extension = get_extension(old_filename)
|
|
|
new_name = '{}.{}'.format(uuid.uuid4(), extension)
|
|
|
|
|
|
# Create 2 directories to split the files because holding many files in
|
|
|
# one directory may impact performance
|
|
|
dir1 = new_name[0]
|
|
|
dir2 = new_name[1]
|
|
|
|
|
|
return os.path.join(FILE_DIRECTORY, dir1, dir2, new_name)
|
|
|
|
|
|
|
|
|
def get_file_mimetype(file) -> str:
|
|
|
buf = b''
|
|
|
for chunk in file.chunks():
|
|
|
buf += chunk
|
|
|
|
|
|
file_type = magic.from_buffer(buf, mime=True)
|
|
|
if file_type is None:
|
|
|
file_type = DEFAULT_MIMETYPE
|
|
|
elif type(file_type) == bytes:
|
|
|
file_type = file_type.decode()
|
|
|
return file_type
|
|
|
|
|
|
|
|
|
def get_domain(url: str) -> str:
|
|
|
"""
|
|
|
Gets domain from an URL with random number of domain levels.
|
|
|
"""
|
|
|
domain_parts = url.split(URL_DELIMITER)
|
|
|
if len(domain_parts) >= 2:
|
|
|
full_domain = domain_parts[2]
|
|
|
else:
|
|
|
full_domain = ''
|
|
|
|
|
|
return full_domain
|
|
|
|
|
|
|
|
|
def get_tripcode_from_text(text: str) -> str:
|
|
|
tripcode = ''
|
|
|
if text:
|
|
|
code = text + settings.SECRET_KEY
|
|
|
tripcode = hashlib.md5(code.encode()).hexdigest()
|
|
|
return tripcode
|
|
|
|
|
|
|