|
|
"""
|
|
|
This module contains helper functions and helper classes.
|
|
|
"""
|
|
|
import hashlib
|
|
|
from random import random
|
|
|
import time
|
|
|
import hmac
|
|
|
|
|
|
from django.core.cache import cache
|
|
|
from django.db.models import Model
|
|
|
from django import forms
|
|
|
from django.template.defaultfilters import filesizeformat
|
|
|
from django.utils import timezone
|
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
import magic
|
|
|
from portage import os
|
|
|
|
|
|
import boards
|
|
|
from boards.settings import get_bool
|
|
|
from neboard import settings
|
|
|
|
|
|
CACHE_KEY_DELIMITER = '_'
|
|
|
|
|
|
HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR'
|
|
|
META_REMOTE_ADDR = 'REMOTE_ADDR'
|
|
|
|
|
|
SETTING_MESSAGES = 'Messages'
|
|
|
SETTING_ANON_MODE = 'AnonymousMode'
|
|
|
|
|
|
ANON_IP = '127.0.0.1'
|
|
|
|
|
|
UPLOAD_DIRS ={
|
|
|
'PostImage': 'images/',
|
|
|
'Attachment': 'files/',
|
|
|
}
|
|
|
FILE_EXTENSION_DELIMITER = '.'
|
|
|
|
|
|
|
|
|
def is_anonymous_mode():
|
|
|
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE)
|
|
|
|
|
|
|
|
|
def get_client_ip(request):
|
|
|
if is_anonymous_mode():
|
|
|
ip = ANON_IP
|
|
|
else:
|
|
|
x_forwarded_for = request.META.get(HTTP_FORWARDED)
|
|
|
if x_forwarded_for:
|
|
|
ip = x_forwarded_for.split(',')[-1].strip()
|
|
|
else:
|
|
|
ip = request.META.get(META_REMOTE_ADDR)
|
|
|
return ip
|
|
|
|
|
|
|
|
|
# TODO The output format is not epoch because it includes microseconds
|
|
|
def datetime_to_epoch(datetime):
|
|
|
return int(time.mktime(timezone.localtime(
|
|
|
datetime,timezone.get_current_timezone()).timetuple())
|
|
|
* 1000000 + datetime.microsecond)
|
|
|
|
|
|
|
|
|
def get_websocket_token(user_id='', timestamp=''):
|
|
|
"""
|
|
|
Create token to validate information provided by new connection.
|
|
|
"""
|
|
|
|
|
|
sign = hmac.new(settings.CENTRIFUGE_PROJECT_SECRET.encode())
|
|
|
sign.update(settings.CENTRIFUGE_PROJECT_ID.encode())
|
|
|
sign.update(user_id.encode())
|
|
|
sign.update(timestamp.encode())
|
|
|
token = sign.hexdigest()
|
|
|
|
|
|
return token
|
|
|
|
|
|
|
|
|
# TODO Test this carefully
|
|
|
def cached_result(key_method=None):
|
|
|
"""
|
|
|
Caches method result in the Django's cache system, persisted by object name,
|
|
|
object name, model id if object is a Django model, args and kwargs if any.
|
|
|
"""
|
|
|
def _cached_result(function):
|
|
|
def inner_func(obj, *args, **kwargs):
|
|
|
cache_key_params = [obj.__class__.__name__, function.__name__]
|
|
|
|
|
|
cache_key_params += args
|
|
|
for key, value in kwargs:
|
|
|
cache_key_params.append(key + ':' + value)
|
|
|
|
|
|
if isinstance(obj, Model):
|
|
|
cache_key_params.append(str(obj.id))
|
|
|
|
|
|
if key_method is not None:
|
|
|
cache_key_params += [str(arg) for arg in key_method(obj)]
|
|
|
|
|
|
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params)
|
|
|
|
|
|
persisted_result = cache.get(cache_key)
|
|
|
if persisted_result is not None:
|
|
|
result = persisted_result
|
|
|
else:
|
|
|
result = function(obj, *args, **kwargs)
|
|
|
cache.set(cache_key, result)
|
|
|
|
|
|
return result
|
|
|
|
|
|
return inner_func
|
|
|
return _cached_result
|
|
|
|
|
|
|
|
|
def get_file_hash(file) -> str:
|
|
|
md5 = hashlib.md5()
|
|
|
for chunk in file.chunks():
|
|
|
md5.update(chunk)
|
|
|
return md5.hexdigest()
|
|
|
|
|
|
|
|
|
def validate_file_size(size: int):
|
|
|
max_size = boards.settings.get_int('Forms', 'MaxFileSize')
|
|
|
if size > max_size:
|
|
|
raise forms.ValidationError(
|
|
|
_('File must be less than %s but is %s.')
|
|
|
% (filesizeformat(max_size), filesizeformat(size)))
|
|
|
|
|
|
|
|
|
def get_extension(filename):
|
|
|
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0]
|
|
|
|
|
|
|
|
|
def get_upload_filename(model_instance, old_filename):
|
|
|
# TODO Use something other than random number in file name
|
|
|
extension = get_extension(old_filename)
|
|
|
new_name = '{}{}.{}'.format(
|
|
|
str(int(time.mktime(time.gmtime()))),
|
|
|
str(int(random() * 1000)),
|
|
|
extension)
|
|
|
|
|
|
directory = UPLOAD_DIRS[type(model_instance).__name__]
|
|
|
|
|
|
return os.path.join(directory, new_name)
|
|
|
|
|
|
|
|
|
def get_file_mimetype(file) -> str:
|
|
|
return magic.from_buffer(file.chunks().__next__(), mime=True).decode()
|
|
|
|