utils.py
161 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
/ boards / utils.py
Ilyas
|
r78 | """ | ||
This module contains helper functions and helper classes. | ||||
""" | ||||
neko259
|
r1305 | import hashlib | ||
neko259
|
r1767 | import uuid | ||
neko259
|
r1590 | from boards.abstracts.constants import FILE_DIRECTORY | ||
neko259
|
r1368 | from random import random | ||
neko259
|
r716 | import time | ||
neko259
|
r853 | import hmac | ||
neko259
|
r1106 | |||
neko259
|
r957 | from django.core.cache import cache | ||
from django.db.models import Model | ||||
neko259
|
r1328 | from django import forms | ||
neko259
|
r1433 | from django.template.defaultfilters import filesizeformat | ||
neko259
|
r542 | from django.utils import timezone | ||
neko259
|
r1328 | from django.utils.translation import ugettext_lazy as _ | ||
neko259
|
r1371 | import magic | ||
neko259
|
r1497 | import os | ||
neko259
|
r1362 | |||
neko259
|
r1328 | import boards | ||
neko259
|
r1362 | from boards.settings import get_bool | ||
neko259
|
r81 | from neboard import settings | ||
Ilyas
|
r78 | |||
neko259
|
r1106 | CACHE_KEY_DELIMITER = '_' | ||
wnc_21
|
r95 | |||
neko259
|
r1362 | HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR' | ||
META_REMOTE_ADDR = 'REMOTE_ADDR' | ||||
SETTING_MESSAGES = 'Messages' | ||||
SETTING_ANON_MODE = 'AnonymousMode' | ||||
ANON_IP = '127.0.0.1' | ||||
neko259
|
r1368 | FILE_EXTENSION_DELIMITER = '.' | ||
neko259
|
r1362 | |||
def is_anonymous_mode(): | ||||
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE) | ||||
neko259
|
r210 | def get_client_ip(request): | ||
neko259
|
r1362 | if is_anonymous_mode(): | ||
ip = ANON_IP | ||||
neko259
|
r210 | else: | ||
neko259
|
r1362 | x_forwarded_for = request.META.get(HTTP_FORWARDED) | ||
if x_forwarded_for: | ||||
ip = x_forwarded_for.split(',')[-1].strip() | ||||
else: | ||||
ip = request.META.get(META_REMOTE_ADDR) | ||||
neko259
|
r542 | return ip | ||
neko259
|
r990 | # TODO The output format is not epoch because it includes microseconds | ||
neko259
|
r542 | def datetime_to_epoch(datetime): | ||
return int(time.mktime(timezone.localtime( | ||||
datetime,timezone.get_current_timezone()).timetuple()) | ||||
neko259
|
r765 | * 1000000 + datetime.microsecond) | ||
neko259
|
r853 | |||
def get_websocket_token(user_id='', timestamp=''): | ||||
""" | ||||
Create token to validate information provided by new connection. | ||||
""" | ||||
sign = hmac.new(settings.CENTRIFUGE_PROJECT_SECRET.encode()) | ||||
sign.update(settings.CENTRIFUGE_PROJECT_ID.encode()) | ||||
sign.update(user_id.encode()) | ||||
sign.update(timestamp.encode()) | ||||
token = sign.hexdigest() | ||||
neko259
|
r957 | return token | ||
neko259
|
r1424 | # TODO Test this carefully | ||
neko259
|
r1106 | def cached_result(key_method=None): | ||
neko259
|
r957 | """ | ||
Caches method result in the Django's cache system, persisted by object name, | ||||
neko259
|
r1424 | object name, model id if object is a Django model, args and kwargs if any. | ||
neko259
|
r957 | """ | ||
neko259
|
r1106 | def _cached_result(function): | ||
def inner_func(obj, *args, **kwargs): | ||||
cache_key_params = [obj.__class__.__name__, function.__name__] | ||||
neko259
|
r1424 | |||
cache_key_params += args | ||||
for key, value in kwargs: | ||||
cache_key_params.append(key + ':' + value) | ||||
neko259
|
r1106 | if isinstance(obj, Model): | ||
cache_key_params.append(str(obj.id)) | ||||
if key_method is not None: | ||||
cache_key_params += [str(arg) for arg in key_method(obj)] | ||||
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params) | ||||
neko259
|
r957 | |||
neko259
|
r1106 | persisted_result = cache.get(cache_key) | ||
if persisted_result is not None: | ||||
result = persisted_result | ||||
else: | ||||
result = function(obj, *args, **kwargs) | ||||
neko259
|
r1443 | if result is not None: | ||
cache.set(cache_key, result) | ||||
neko259
|
r957 | |||
neko259
|
r1106 | return result | ||
neko259
|
r957 | |||
neko259
|
r1106 | return inner_func | ||
return _cached_result | ||||
neko259
|
r1109 | |||
neko259
|
r1305 | def get_file_hash(file) -> str: | ||
md5 = hashlib.md5() | ||||
for chunk in file.chunks(): | ||||
md5.update(chunk) | ||||
return md5.hexdigest() | ||||
neko259
|
r1328 | |||
def validate_file_size(size: int): | ||||
max_size = boards.settings.get_int('Forms', 'MaxFileSize') | ||||
neko259
|
r1842 | if max_size > 0 and size > max_size: | ||
neko259
|
r1328 | raise forms.ValidationError( | ||
neko259
|
r1433 | _('File must be less than %s but is %s.') | ||
% (filesizeformat(max_size), filesizeformat(size))) | ||||
neko259
|
r1368 | |||
neko259
|
r1382 | def get_extension(filename): | ||
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0] | ||||
neko259
|
r1368 | def get_upload_filename(model_instance, old_filename): | ||
neko259
|
r1382 | extension = get_extension(old_filename) | ||
neko259
|
r1767 | new_name = '{}.{}'.format(uuid.uuid4(), extension) | ||
neko259
|
r1368 | |||
neko259
|
r1590 | return os.path.join(FILE_DIRECTORY, new_name) | ||
neko259
|
r1371 | |||
def get_file_mimetype(file) -> str: | ||||
neko259
|
r1871 | buf = b'' | ||
for chunk in file.chunks(): | ||||
buf += chunk | ||||
file_type = magic.from_buffer(buf, mime=True) | ||||
neko259
|
r1686 | if file_type is None: | ||
file_type = 'application/octet-stream' | ||||
neko259
|
r1688 | elif type(file_type) == bytes: | ||
neko259
|
r1686 | file_type = file_type.decode() | ||
neko259
|
r1688 | return file_type | ||
neko259
|
r1724 | |||
def get_domain(url: str) -> str: | ||||
""" | ||||
Gets domain from an URL with random number of domain levels. | ||||
""" | ||||
neko259
|
r1765 | domain_parts = url.split('/') | ||
if len(domain_parts) >= 2: | ||||
full_domain = domain_parts[2] | ||||
else: | ||||
full_domain = '' | ||||
neko259
|
r1724 | |||
neko259
|
r1772 | return full_domain | ||
neko259
|
r1724 | |||