import time import hashlib from django.core.cache import cache from django.utils.translation import ugettext_lazy as _, ungettext_lazy from boards import settings as board_settings from boards.settings import SECTION_FORMS LAST_POST_TIME = 'last_post_time' LAST_LOGIN_TIME = 'last_login_time' POW_HASH_LENGTH = 16 POW_LIFE_MINUTES = 5 ERROR_SPEED = 'Please wait %(delay)d second before sending message' ERROR_SPEED_PLURAL = 'Please wait %(delay)d seconds before sending message' ERROR_INVALID_POW = 'Invalid PoW.' class Validator: def __init__(self, session): self.errors = [] self.session = session def validate(self) -> None: """ Runs validation. If something fails, should add an error to the list. """ pass def get_errors(self): return self.errors def get_session(self): return self.session def add_error(self, error): self.errors.append(error) class TimeValidator(Validator): """ Validates the posting speed. New post must be sent no less than after some specific amount of time defined in settings. """ def validate(self): can_post = True posting_delay = board_settings.get_int(SECTION_FORMS, 'PostingDelay') if board_settings.get_bool(SECTION_FORMS, 'LimitPostingSpeed'): now = time.time() current_delay = 0 if LAST_POST_TIME not in self.get_session(): self.get_session()[LAST_POST_TIME] = now need_delay = True else: last_post_time = self._get_last_post_time() current_delay = int(now - last_post_time) need_delay = current_delay < posting_delay self._set_session_cache(LAST_POST_TIME, now) if need_delay: delay = posting_delay - current_delay error_message = ungettext_lazy(ERROR_SPEED, ERROR_SPEED_PLURAL, delay) % {'delay': delay} self.add_error(error_message) can_post = False if can_post: self.get_session()[LAST_POST_TIME] = now else: # Reset the time since posting failed self._set_session_cache(LAST_POST_TIME, self.get_session()[LAST_POST_TIME]) def _get_cache_key(self, key): return '{}_{}'.format(self.get_session().session_key, key) def _set_session_cache(self, key, value): cache.set(self._get_cache_key(key), value) def _get_session_cache(self, key): return cache.get(self._get_cache_key(key)) def _get_last_post_time(self): last = self._get_session_cache(LAST_POST_TIME) if last is None: last = self.get_session().get(LAST_POST_TIME) return last class PowValidator(Validator): """ Validates prof of work. A hash is computed on the client side and validated here, all data must match. PoW difficulty is set in the settings. """ def __init__(self, session, timestamp, iteration, guess, text): super().__init__(session) self.timestamp = timestamp self.iteration = iteration self.guess = guess self.text = text def validate(self): if self.timestamp and self.iteration and self.guess: self._validate_hash(self.timestamp, self.iteration, self.guess, self.text) else: self.add_error(_(ERROR_INVALID_POW)) def _validate_hash(self, timestamp: str, iteration: str, guess: str, message: str): payload = timestamp + message.replace('\r\n', '\n') difficulty = board_settings.get_int(SECTION_FORMS, 'PowDifficulty') target = str(int(2 ** (POW_HASH_LENGTH * 3) / difficulty)) if len(target) < POW_HASH_LENGTH: target = '0' * (POW_HASH_LENGTH - len(target)) + target computed_guess = hashlib.sha256((payload + iteration).encode()) \ .hexdigest()[0:POW_HASH_LENGTH] if guess != computed_guess or guess > target: self.add_error(_('Invalid PoW.'))