utils.py
159 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
/ boards / utils.py
Ilyas
|
r78 | """ | ||
This module contains helper functions and helper classes. | ||||
""" | ||||
neko259
|
r1948 | import time | ||
neko259
|
r1767 | import uuid | ||
neko259
|
r1948 | import hashlib | ||
import magic | ||||
import os | ||||
from django import forms | ||||
neko259
|
r957 | from django.core.cache import cache | ||
from django.db.models import Model | ||||
neko259
|
r1433 | from django.template.defaultfilters import filesizeformat | ||
neko259
|
r542 | from django.utils import timezone | ||
neko259
|
r1328 | from django.utils.translation import ugettext_lazy as _ | ||
neko259
|
r1362 | |||
neko259
|
r1328 | import boards | ||
neko259
|
r1973 | from neboard import settings | ||
neko259
|
r1948 | from boards.abstracts.constants import FILE_DIRECTORY | ||
neko259
|
r2003 | from boards.settings import get_bool, SECTION_FORMS | ||
Ilyas
|
r78 | |||
neko259
|
r1362 | HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR' | ||
META_REMOTE_ADDR = 'REMOTE_ADDR' | ||||
SETTING_MESSAGES = 'Messages' | ||||
SETTING_ANON_MODE = 'AnonymousMode' | ||||
ANON_IP = '127.0.0.1' | ||||
neko259
|
r1368 | FILE_EXTENSION_DELIMITER = '.' | ||
neko259
|
r2002 | URL_DELIMITER = '/' | ||
neko259
|
r2013 | |||
CACHE_PARAMS = '{}:{}' | ||||
CACHE_KEY_DELIMITER = '_' | ||||
neko259
|
r2002 | |||
DEFAULT_MIMETYPE = 'application/octet-stream' | ||||
neko259
|
r1368 | |||
neko259
|
r1362 | |||
def is_anonymous_mode(): | ||||
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE) | ||||
neko259
|
r210 | def get_client_ip(request): | ||
neko259
|
r1362 | if is_anonymous_mode(): | ||
ip = ANON_IP | ||||
neko259
|
r210 | else: | ||
neko259
|
r1362 | x_forwarded_for = request.META.get(HTTP_FORWARDED) | ||
if x_forwarded_for: | ||||
ip = x_forwarded_for.split(',')[-1].strip() | ||||
else: | ||||
ip = request.META.get(META_REMOTE_ADDR) | ||||
neko259
|
r542 | return ip | ||
neko259
|
r990 | # TODO The output format is not epoch because it includes microseconds | ||
neko259
|
r542 | def datetime_to_epoch(datetime): | ||
return int(time.mktime(timezone.localtime( | ||||
datetime,timezone.get_current_timezone()).timetuple()) | ||||
neko259
|
r765 | * 1000000 + datetime.microsecond) | ||
neko259
|
r853 | |||
neko259
|
r1424 | # TODO Test this carefully | ||
neko259
|
r1106 | def cached_result(key_method=None): | ||
neko259
|
r957 | """ | ||
Caches method result in the Django's cache system, persisted by object name, | ||||
neko259
|
r1424 | object name, model id if object is a Django model, args and kwargs if any. | ||
neko259
|
r957 | """ | ||
neko259
|
r1106 | def _cached_result(function): | ||
def inner_func(obj, *args, **kwargs): | ||||
cache_key_params = [obj.__class__.__name__, function.__name__] | ||||
neko259
|
r1424 | |||
cache_key_params += args | ||||
for key, value in kwargs: | ||||
neko259
|
r2013 | cache_key_params.append(CACHE_PARAMS.format(key, value)) | ||
neko259
|
r1424 | |||
neko259
|
r1106 | if isinstance(obj, Model): | ||
cache_key_params.append(str(obj.id)) | ||||
if key_method is not None: | ||||
cache_key_params += [str(arg) for arg in key_method(obj)] | ||||
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params) | ||||
neko259
|
r957 | |||
neko259
|
r2027 | result = cache.get(cache_key) | ||
if result is None: | ||||
neko259
|
r1106 | result = function(obj, *args, **kwargs) | ||
neko259
|
r1443 | if result is not None: | ||
cache.set(cache_key, result) | ||||
neko259
|
r957 | |||
neko259
|
r1106 | return result | ||
neko259
|
r957 | |||
neko259
|
r1106 | return inner_func | ||
return _cached_result | ||||
neko259
|
r1109 | |||
neko259
|
r1305 | def get_file_hash(file) -> str: | ||
md5 = hashlib.md5() | ||||
for chunk in file.chunks(): | ||||
md5.update(chunk) | ||||
return md5.hexdigest() | ||||
neko259
|
r1328 | |||
def validate_file_size(size: int): | ||||
neko259
|
r2003 | max_size = boards.settings.get_int(SECTION_FORMS, 'MaxFileSize') | ||
neko259
|
r1948 | if 0 < max_size < size: | ||
neko259
|
r1328 | raise forms.ValidationError( | ||
neko259
|
r1983 | _('Total file size must be less than %s but is %s.') | ||
neko259
|
r1433 | % (filesizeformat(max_size), filesizeformat(size))) | ||
neko259
|
r1368 | |||
neko259
|
r1382 | def get_extension(filename): | ||
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0] | ||||
neko259
|
r1368 | def get_upload_filename(model_instance, old_filename): | ||
neko259
|
r1382 | extension = get_extension(old_filename) | ||
neko259
|
r1767 | new_name = '{}.{}'.format(uuid.uuid4(), extension) | ||
neko259
|
r1368 | |||
neko259
|
r1999 | # Create 2 directories to split the files because holding many files in | ||
# one directory may impact performance | ||||
dir1 = new_name[0] | ||||
dir2 = new_name[1] | ||||
return os.path.join(FILE_DIRECTORY, dir1, dir2, new_name) | ||||
neko259
|
r1371 | |||
def get_file_mimetype(file) -> str: | ||||
neko259
|
r1871 | buf = b'' | ||
for chunk in file.chunks(): | ||||
buf += chunk | ||||
file_type = magic.from_buffer(buf, mime=True) | ||||
neko259
|
r1686 | if file_type is None: | ||
neko259
|
r2002 | file_type = DEFAULT_MIMETYPE | ||
neko259
|
r1688 | elif type(file_type) == bytes: | ||
neko259
|
r1686 | file_type = file_type.decode() | ||
neko259
|
r1688 | return file_type | ||
neko259
|
r1724 | |||
def get_domain(url: str) -> str: | ||||
""" | ||||
Gets domain from an URL with random number of domain levels. | ||||
""" | ||||
neko259
|
r2002 | domain_parts = url.split(URL_DELIMITER) | ||
neko259
|
r1765 | if len(domain_parts) >= 2: | ||
full_domain = domain_parts[2] | ||||
else: | ||||
full_domain = '' | ||||
neko259
|
r1724 | |||
neko259
|
r1772 | return full_domain | ||
neko259
|
r1724 | |||
neko259
|
r1973 | |||
def get_tripcode_from_text(text: str) -> str: | ||||
tripcode = '' | ||||
if text: | ||||
code = text + settings.SECRET_KEY | ||||
tripcode = hashlib.md5(code.encode()).hexdigest() | ||||
return tripcode | ||||