utils.py
138 lines
| 3.7 KiB
| text/x-python
|
PythonLexer
/ boards / utils.py
Ilyas
|
r78 | """ | ||
This module contains helper functions and helper classes. | ||||
""" | ||||
neko259
|
r1305 | import hashlib | ||
neko259
|
r1368 | from random import random | ||
neko259
|
r716 | import time | ||
neko259
|
r853 | import hmac | ||
neko259
|
r1106 | |||
neko259
|
r957 | from django.core.cache import cache | ||
from django.db.models import Model | ||||
neko259
|
r1328 | from django import forms | ||
neko259
|
r542 | from django.utils import timezone | ||
neko259
|
r1328 | from django.utils.translation import ugettext_lazy as _ | ||
neko259
|
r1371 | import magic | ||
neko259
|
r1368 | from portage import os | ||
neko259
|
r1362 | |||
neko259
|
r1328 | import boards | ||
neko259
|
r1362 | from boards.settings import get_bool | ||
neko259
|
r81 | from neboard import settings | ||
Ilyas
|
r78 | |||
neko259
|
r1106 | CACHE_KEY_DELIMITER = '_' | ||
wnc_21
|
r95 | |||
neko259
|
r1362 | HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR' | ||
META_REMOTE_ADDR = 'REMOTE_ADDR' | ||||
SETTING_MESSAGES = 'Messages' | ||||
SETTING_ANON_MODE = 'AnonymousMode' | ||||
ANON_IP = '127.0.0.1' | ||||
neko259
|
r1368 | UPLOAD_DIRS ={ | ||
'PostImage': 'images/', | ||||
'Attachment': 'files/', | ||||
} | ||||
FILE_EXTENSION_DELIMITER = '.' | ||||
neko259
|
r1362 | |||
def is_anonymous_mode(): | ||||
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE) | ||||
neko259
|
r210 | def get_client_ip(request): | ||
neko259
|
r1362 | if is_anonymous_mode(): | ||
ip = ANON_IP | ||||
neko259
|
r210 | else: | ||
neko259
|
r1362 | x_forwarded_for = request.META.get(HTTP_FORWARDED) | ||
if x_forwarded_for: | ||||
ip = x_forwarded_for.split(',')[-1].strip() | ||||
else: | ||||
ip = request.META.get(META_REMOTE_ADDR) | ||||
neko259
|
r542 | return ip | ||
neko259
|
r990 | # TODO The output format is not epoch because it includes microseconds | ||
neko259
|
r542 | def datetime_to_epoch(datetime): | ||
return int(time.mktime(timezone.localtime( | ||||
datetime,timezone.get_current_timezone()).timetuple()) | ||||
neko259
|
r765 | * 1000000 + datetime.microsecond) | ||
neko259
|
r853 | |||
def get_websocket_token(user_id='', timestamp=''): | ||||
""" | ||||
Create token to validate information provided by new connection. | ||||
""" | ||||
sign = hmac.new(settings.CENTRIFUGE_PROJECT_SECRET.encode()) | ||||
sign.update(settings.CENTRIFUGE_PROJECT_ID.encode()) | ||||
sign.update(user_id.encode()) | ||||
sign.update(timestamp.encode()) | ||||
token = sign.hexdigest() | ||||
neko259
|
r957 | return token | ||
neko259
|
r1106 | def cached_result(key_method=None): | ||
neko259
|
r957 | """ | ||
Caches method result in the Django's cache system, persisted by object name, | ||||
object name and model id if object is a Django model. | ||||
""" | ||||
neko259
|
r1106 | def _cached_result(function): | ||
def inner_func(obj, *args, **kwargs): | ||||
# TODO Include method arguments to the cache key | ||||
cache_key_params = [obj.__class__.__name__, function.__name__] | ||||
if isinstance(obj, Model): | ||||
cache_key_params.append(str(obj.id)) | ||||
if key_method is not None: | ||||
cache_key_params += [str(arg) for arg in key_method(obj)] | ||||
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params) | ||||
neko259
|
r957 | |||
neko259
|
r1106 | persisted_result = cache.get(cache_key) | ||
if persisted_result is not None: | ||||
result = persisted_result | ||||
else: | ||||
result = function(obj, *args, **kwargs) | ||||
cache.set(cache_key, result) | ||||
neko259
|
r957 | |||
neko259
|
r1106 | return result | ||
neko259
|
r957 | |||
neko259
|
r1106 | return inner_func | ||
return _cached_result | ||||
neko259
|
r1109 | |||
neko259
|
r1305 | def get_file_hash(file) -> str: | ||
md5 = hashlib.md5() | ||||
for chunk in file.chunks(): | ||||
md5.update(chunk) | ||||
return md5.hexdigest() | ||||
neko259
|
r1328 | |||
def validate_file_size(size: int): | ||||
max_size = boards.settings.get_int('Forms', 'MaxFileSize') | ||||
if size > max_size: | ||||
raise forms.ValidationError( | ||||
_('File must be less than %s bytes') | ||||
% str(max_size)) | ||||
neko259
|
r1368 | |||
neko259
|
r1382 | def get_extension(filename): | ||
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0] | ||||
neko259
|
r1368 | def get_upload_filename(model_instance, old_filename): | ||
# TODO Use something other than random number in file name | ||||
neko259
|
r1382 | extension = get_extension(old_filename) | ||
neko259
|
r1368 | new_name = '{}{}.{}'.format( | ||
str(int(time.mktime(time.gmtime()))), | ||||
str(int(random() * 1000)), | ||||
neko259
|
r1369 | extension) | ||
neko259
|
r1368 | |||
directory = UPLOAD_DIRS[type(model_instance).__name__] | ||||
return os.path.join(directory, new_name) | ||||
neko259
|
r1371 | |||
def get_file_mimetype(file) -> str: | ||||
neko259
|
r1372 | return magic.from_buffer(file.chunks().__next__(), mime=True).decode() | ||