##// END OF EJS Templates
Recognize flash mimetype properly. Store extension as mimetype in attachment...
Recognize flash mimetype properly. Store extension as mimetype in attachment object

File last commit:

r1382:8669bd71 default
r1382:8669bd71 default
Show More
utils.py
148 lines | 3.9 KiB | text/x-python | PythonLexer
"""
This module contains helper functions and helper classes.
"""
import hashlib
from random import random
import time
import hmac
from django.core.cache import cache
from django.db.models import Model
from django import forms
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
import magic
from portage import os
import boards
from boards.settings import get_bool
from neboard import settings
CACHE_KEY_DELIMITER = '_'
PERMISSION_MODERATE = 'moderation'
HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR'
META_REMOTE_ADDR = 'REMOTE_ADDR'
SETTING_MESSAGES = 'Messages'
SETTING_ANON_MODE = 'AnonymousMode'
ANON_IP = '127.0.0.1'
UPLOAD_DIRS ={
'PostImage': 'images/',
'Attachment': 'files/',
}
FILE_EXTENSION_DELIMITER = '.'
def is_anonymous_mode():
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE)
def get_client_ip(request):
if is_anonymous_mode():
ip = ANON_IP
else:
x_forwarded_for = request.META.get(HTTP_FORWARDED)
if x_forwarded_for:
ip = x_forwarded_for.split(',')[-1].strip()
else:
ip = request.META.get(META_REMOTE_ADDR)
return ip
# TODO The output format is not epoch because it includes microseconds
def datetime_to_epoch(datetime):
return int(time.mktime(timezone.localtime(
datetime,timezone.get_current_timezone()).timetuple())
* 1000000 + datetime.microsecond)
def get_websocket_token(user_id='', timestamp=''):
"""
Create token to validate information provided by new connection.
"""
sign = hmac.new(settings.CENTRIFUGE_PROJECT_SECRET.encode())
sign.update(settings.CENTRIFUGE_PROJECT_ID.encode())
sign.update(user_id.encode())
sign.update(timestamp.encode())
token = sign.hexdigest()
return token
def cached_result(key_method=None):
"""
Caches method result in the Django's cache system, persisted by object name,
object name and model id if object is a Django model.
"""
def _cached_result(function):
def inner_func(obj, *args, **kwargs):
# TODO Include method arguments to the cache key
cache_key_params = [obj.__class__.__name__, function.__name__]
if isinstance(obj, Model):
cache_key_params.append(str(obj.id))
if key_method is not None:
cache_key_params += [str(arg) for arg in key_method(obj)]
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params)
persisted_result = cache.get(cache_key)
if persisted_result is not None:
result = persisted_result
else:
result = function(obj, *args, **kwargs)
cache.set(cache_key, result)
return result
return inner_func
return _cached_result
def is_moderator(request):
try:
moderate = request.user.has_perm(PERMISSION_MODERATE)
except AttributeError:
moderate = False
return moderate
def get_file_hash(file) -> str:
md5 = hashlib.md5()
for chunk in file.chunks():
md5.update(chunk)
return md5.hexdigest()
def validate_file_size(size: int):
max_size = boards.settings.get_int('Forms', 'MaxFileSize')
if size > max_size:
raise forms.ValidationError(
_('File must be less than %s bytes')
% str(max_size))
def get_extension(filename):
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0]
def get_upload_filename(model_instance, old_filename):
# TODO Use something other than random number in file name
extension = get_extension(old_filename)
new_name = '{}{}.{}'.format(
str(int(time.mktime(time.gmtime()))),
str(int(random() * 1000)),
extension)
directory = UPLOAD_DIRS[type(model_instance).__name__]
return os.path.join(directory, new_name)
def get_file_mimetype(file) -> str:
return magic.from_buffer(file.chunks().__next__(), mime=True).decode()