""" This module contains helper functions and helper classes. """ import hashlib import uuid from boards.abstracts.constants import FILE_DIRECTORY from random import random import time import hmac from django.core.cache import cache from django.db.models import Model from django import forms from django.template.defaultfilters import filesizeformat from django.utils import timezone from django.utils.translation import ugettext_lazy as _ import magic import os import boards from boards.settings import get_bool from neboard import settings CACHE_KEY_DELIMITER = '_' HTTP_FORWARDED = 'HTTP_X_FORWARDED_FOR' META_REMOTE_ADDR = 'REMOTE_ADDR' SETTING_MESSAGES = 'Messages' SETTING_ANON_MODE = 'AnonymousMode' ANON_IP = '127.0.0.1' FILE_EXTENSION_DELIMITER = '.' def is_anonymous_mode(): return get_bool(SETTING_MESSAGES, SETTING_ANON_MODE) def get_client_ip(request): if is_anonymous_mode(): ip = ANON_IP else: x_forwarded_for = request.META.get(HTTP_FORWARDED) if x_forwarded_for: ip = x_forwarded_for.split(',')[-1].strip() else: ip = request.META.get(META_REMOTE_ADDR) return ip # TODO The output format is not epoch because it includes microseconds def datetime_to_epoch(datetime): return int(time.mktime(timezone.localtime( datetime,timezone.get_current_timezone()).timetuple()) * 1000000 + datetime.microsecond) def get_websocket_token(user_id='', timestamp=''): """ Create token to validate information provided by new connection. """ sign = hmac.new(settings.CENTRIFUGE_PROJECT_SECRET.encode()) sign.update(settings.CENTRIFUGE_PROJECT_ID.encode()) sign.update(user_id.encode()) sign.update(timestamp.encode()) token = sign.hexdigest() return token # TODO Test this carefully def cached_result(key_method=None): """ Caches method result in the Django's cache system, persisted by object name, object name, model id if object is a Django model, args and kwargs if any. """ def _cached_result(function): def inner_func(obj, *args, **kwargs): cache_key_params = [obj.__class__.__name__, function.__name__] cache_key_params += args for key, value in kwargs: cache_key_params.append(key + ':' + value) if isinstance(obj, Model): cache_key_params.append(str(obj.id)) if key_method is not None: cache_key_params += [str(arg) for arg in key_method(obj)] cache_key = CACHE_KEY_DELIMITER.join(cache_key_params) persisted_result = cache.get(cache_key) if persisted_result is not None: result = persisted_result else: result = function(obj, *args, **kwargs) if result is not None: cache.set(cache_key, result) return result return inner_func return _cached_result def get_file_hash(file) -> str: md5 = hashlib.md5() for chunk in file.chunks(): md5.update(chunk) return md5.hexdigest() def validate_file_size(size: int): max_size = boards.settings.get_int('Forms', 'MaxFileSize') if max_size > 0 and size > max_size: raise forms.ValidationError( _('File must be less than %s but is %s.') % (filesizeformat(max_size), filesizeformat(size))) def get_extension(filename): return filename.split(FILE_EXTENSION_DELIMITER)[-1:][0] def get_upload_filename(model_instance, old_filename): extension = get_extension(old_filename) new_name = '{}.{}'.format(uuid.uuid4(), extension) return os.path.join(FILE_DIRECTORY, new_name) def get_file_mimetype(file) -> str: file_type = magic.from_buffer(file.chunks().__next__(), mime=True) if file_type is None: file_type = 'application/octet-stream' elif type(file_type) == bytes: file_type = file_type.decode() return file_type def get_domain(url: str) -> str: """ Gets domain from an URL with random number of domain levels. """ domain_parts = url.split('/') if len(domain_parts) >= 2: full_domain = domain_parts[2] else: full_domain = '' return full_domain