import feedparser import httplib2 import logging import calendar import xml.etree.ElementTree as ET from time import mktime from datetime import datetime from django.db import models, transaction from django.utils.dateparse import parse_datetime from django.utils.timezone import utc from django.utils import timezone from django.utils.html import strip_tags from boards.models import Post, GlobalId, KeyPair from boards.models.post import TITLE_MAX_LENGTH from boards.models.post.sync import SyncManager, TAG_ID, TAG_UPDATE_TIME from boards.settings import SECTION_EXTERNAL from boards.utils import get_tripcode_from_text from boards import settings DELIMITER_TAGS = ',' SOURCE_TYPE_MAX_LENGTH = 100 SOURCE_TYPE_RSS = 'RSS' TYPE_CHOICES = ( (SOURCE_TYPE_RSS, SOURCE_TYPE_RSS), ) class ThreadSource(models.Model): class Meta: app_label = 'boards' name = models.TextField() thread = models.ForeignKey('Thread', on_delete=models.CASCADE) timestamp = models.DateTimeField() source = models.TextField() source_type = models.CharField(max_length=SOURCE_TYPE_MAX_LENGTH, choices=TYPE_CHOICES) def __str__(self): return self.name @transaction.atomic def fetch_latest_posts(self): """Creates new posts with the info fetched since the timestamp.""" logger = logging.getLogger('boards.source') if self.thread.is_archived(): logger.error('The thread {} is archived, please try another one'.format(self.thread)) else: tripcode = get_tripcode_from_text( settings.get(SECTION_EXTERNAL, 'SourceFetcherTripcode')) start_timestamp = self.timestamp last_timestamp = start_timestamp logger.info('Start timestamp is {}'.format(start_timestamp)) if self.thread.is_bumplimit(): logger.warn('The thread {} has reached its bumplimit, please create a new one'.format(self.thread)) if self.source_type == SOURCE_TYPE_RSS: feed = feedparser.parse(self.source) items = sorted(feed.entries, key=lambda entry: entry.published_parsed) for item in items: title = self.strip_title(item.title, TITLE_MAX_LENGTH) timestamp = datetime.fromtimestamp(calendar.timegm(item.published_parsed), tz=utc) if not timestamp: logger.error('Invalid timestamp {} for {}'.format(item.published, title)) else: if timestamp > last_timestamp: last_timestamp = timestamp if timestamp > start_timestamp: Post.objects.create_post(title=title, text=self.parse_text(item.description), thread=self.thread, file_urls=[item.link], tripcode=tripcode) logger.info('Fetched item {} from {} into thread {}'.format( title, self.name, self.thread)) logger.info('New timestamp is {}'.format(last_timestamp)) self.timestamp = last_timestamp self.save(update_fields=['timestamp']) def parse_text(self, text): return strip_tags(text) def strip_title(self, title, max_length): result = title if len(title) > max_length: result = title[:max_length - 1] + '…' return result class SyncSource(models.Model): class Meta: app_label = 'boards' name = models.TextField() timestamp = models.DateTimeField(blank=True, null=True) url = models.TextField() tags = models.TextField(blank=True) query_split_limit = models.IntegerField() def __str__(self): return self.name @transaction.atomic def run_sync(self): logger = logging.getLogger('boards.sync') tags = [] if self.tags: tags = self.tags.split(DELIMITER_TAGS) timestamp = None if self.timestamp: timestamp = self.timestamp new_timestamp = timezone.now() list_url = '{}api/sync/list/'.format(self.url) get_url = '{}api/sync/get/'.format(self.url) file_url = self.url[:-1] xml = SyncManager.generate_request_list( tags=tags, timestamp_from=timestamp).encode() logger.info('Running LIST request for {}...'.format(self.name)) h = httplib2.Http() response, content = h.request(list_url, method="POST", body=xml) if response.status != 200: raise Exception('Server returned error {}'.format(response.status)) logger.info('Processing response...') root = ET.fromstring(content) status = root.findall('status')[0].text if status == 'success': ids_to_sync = list() models = root.findall('models')[0] for model in models: self.add_to_sync_list(ids_to_sync, logger, model) logger.info('Starting sync...') if len(ids_to_sync) > 0: if self.query_split_limit > 0: limit = min(self.query_split_limit, len(ids_to_sync)) else: limit = len(ids_to_sync) for offset in range(0, len(ids_to_sync), limit): xml = SyncManager.generate_request_get( ids_to_sync[offset:offset + limit]) h = httplib2.Http() logger.info('Running GET request...') response, content = h.request(get_url, method="POST", body=xml) logger.info('Processing response...') SyncManager.parse_response_get(content, file_url) logger.info('Sync completed successfully for {}'.format(self.name)) else: logger.info('Nothing to get for {}, everything synced'.format(self.name)) else: raise Exception('Invalid response status') self.timestamp = new_timestamp self.save(update_fields=['timestamp']) def add_to_sync_list(self, ids_to_sync, logger, model): tag_id = model.find(TAG_ID) global_id, exists = GlobalId.from_xml_element(tag_id) from_this_board = self._is_from_this_board(global_id) if from_this_board: # If the post originates from this board, no need to process # it again, nobody else could modify it logger.debug('NO SYNC Processed post {}'.format(global_id)) else: tag_update_time = model.find(TAG_UPDATE_TIME) if tag_update_time: update_time = tag_update_time.text else: update_time = None if not exists or update_time is None or global_id.post.last_edit_time < parse_datetime( update_time): logger.debug('SYNC Processed post {}'.format(global_id)) ids_to_sync.append(global_id) else: logger.debug('NO SYNC Processed post {}'.format(global_id)) def _is_from_this_board(self, global_id): from_this_board = KeyPair.objects.filter( key_type=global_id.key_type, public_key=global_id.key).exists() return from_this_board