##// END OF EJS Templates
Changed sync processor output a bit
Changed sync processor output a bit

File last commit:

r2068:24da88f2 default
r2121:b378f45d default
Show More
utils.py
159 lines | 4.2 KiB | text/x-python | PythonLexer
"""
This module contains helper functions and helper classes.
"""
import time
import uuid
import hashlib
import magic
import os
from django import forms
from django.core.cache import cache
from django.db.models import Model
from django.template.defaultfilters import filesizeformat
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
import boards
from django.conf import settings
from boards.abstracts.constants import FILE_DIRECTORY
from boards.settings import get_bool, SECTION_FORMS
HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR'
META_REMOTE_ADDR = 'REMOTE_ADDR'
SETTING_MESSAGES = 'Messages'
SETTING_ANON_MODE = 'AnonymousMode'
ANON_IP = '127.0.0.1'
FILE_EXTENSION_DELIMITER = '.'
URL_DELIMITER = '/'
CACHE_PARAMS = '{}:{}'
CACHE_KEY_DELIMITER = '_'
DEFAULT_MIMETYPE = 'application/octet-stream'
def is_anonymous_mode():
return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE)
def get_client_ip(request):
if is_anonymous_mode():
ip = ANON_IP
else:
x_forwarded_for = request.META.get(HTTP_FORWARDED)
if x_forwarded_for:
ip = x_forwarded_for.split(',')[-1].strip()
else:
ip = request.META.get(META_REMOTE_ADDR)
return ip
# TODO The output format is not epoch because it includes microseconds
def datetime_to_epoch(datetime):
return int(time.mktime(timezone.localtime(
datetime,timezone.get_current_timezone()).timetuple())
* 1000000 + datetime.microsecond)
# TODO Test this carefully
def cached_result(key_method=None):
"""
Caches method result in the Django's cache system, persisted by object name,
object name, model id if object is a Django model, args and kwargs if any.
"""
def _cached_result(function):
def inner_func(obj, *args, **kwargs):
cache_key_params = [obj.__class__.__name__, function.__name__]
cache_key_params += args
for key, value in kwargs:
cache_key_params.append(CACHE_PARAMS.format(key, value))
if isinstance(obj, Model):
cache_key_params.append(str(obj.id))
if key_method is not None:
cache_key_params += [str(arg) for arg in key_method(obj)]
cache_key = CACHE_KEY_DELIMITER.join(cache_key_params)
result = cache.get(cache_key)
if result is None:
result = function(obj, *args, **kwargs)
if result is not None:
cache.set(cache_key, result)
return result
return inner_func
return _cached_result
def get_file_hash(file) -> str:
md5 = hashlib.md5()
for chunk in file.chunks():
md5.update(chunk)
return md5.hexdigest()
def validate_file_size(size: int):
max_size = boards.settings.get_int(SECTION_FORMS, 'MaxFileSize')
if 0 < max_size < size:
raise forms.ValidationError(
_('Total file size must be less than %s but is %s.')
% (filesizeformat(max_size), filesizeformat(size)))
def get_extension(filename):
return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0]
def get_upload_filename(model_instance, old_filename):
extension = get_extension(old_filename)
new_name = '{}.{}'.format(uuid.uuid4(), extension)
# Create 2 directories to split the files because holding many files in
# one directory may impact performance
dir1 = new_name[0]
dir2 = new_name[1]
return os.path.join(FILE_DIRECTORY, dir1, dir2, new_name)
def get_file_mimetype(file) -> str:
buf = b''
for chunk in file.chunks():
buf += chunk
file_type = magic.from_buffer(buf, mime=True)
if file_type is None:
file_type = DEFAULT_MIMETYPE
elif type(file_type) == bytes:
file_type = file_type.decode()
return file_type
def get_domain(url: str) -> str:
"""
Gets domain from an URL with random number of domain levels.
"""
domain_parts = url.split(URL_DELIMITER)
if len(domain_parts) >= 2:
full_domain = domain_parts[2]
else:
full_domain = ''
return full_domain
def get_tripcode_from_text(text: str) -> str:
tripcode = ''
if text:
code = text + settings.SECRET_KEY
tripcode = hashlib.md5(code.encode()).hexdigest()
return tripcode