source.py
195 lines
| 7.2 KiB
| text/x-python
|
PythonLexer
neko259
|
r1968 | import feedparser | ||
neko259
|
r2123 | import httplib2 | ||
neko259
|
r1968 | import logging | ||
neko259
|
r1970 | import calendar | ||
neko259
|
r2123 | import xml.etree.ElementTree as ET | ||
neko259
|
r1968 | |||
from time import mktime | ||||
from datetime import datetime | ||||
from django.db import models, transaction | ||||
from django.utils.dateparse import parse_datetime | ||||
from django.utils.timezone import utc | ||||
from django.utils import timezone | ||||
neko259
|
r1977 | from django.utils.html import strip_tags | ||
neko259
|
r1973 | |||
neko259
|
r2123 | from boards.models import Post, GlobalId, KeyPair | ||
neko259
|
r1968 | from boards.models.post import TITLE_MAX_LENGTH | ||
neko259
|
r2123 | from boards.models.post.sync import SyncManager, TAG_ID, TAG_UPDATE_TIME | ||
neko259
|
r2008 | from boards.settings import SECTION_EXTERNAL | ||
neko259
|
r1973 | from boards.utils import get_tripcode_from_text | ||
from boards import settings | ||||
neko259
|
r1968 | |||
neko259
|
r2123 | DELIMITER_TAGS = ',' | ||
neko259
|
r1968 | |||
SOURCE_TYPE_MAX_LENGTH = 100 | ||||
SOURCE_TYPE_RSS = 'RSS' | ||||
TYPE_CHOICES = ( | ||||
(SOURCE_TYPE_RSS, SOURCE_TYPE_RSS), | ||||
) | ||||
class ThreadSource(models.Model): | ||||
class Meta: | ||||
app_label = 'boards' | ||||
name = models.TextField() | ||||
neko259
|
r1986 | thread = models.ForeignKey('Thread', on_delete=models.CASCADE) | ||
neko259
|
r1968 | timestamp = models.DateTimeField() | ||
source = models.TextField() | ||||
source_type = models.CharField(max_length=SOURCE_TYPE_MAX_LENGTH, | ||||
choices=TYPE_CHOICES) | ||||
def __str__(self): | ||||
return self.name | ||||
@transaction.atomic | ||||
def fetch_latest_posts(self): | ||||
"""Creates new posts with the info fetched since the timestamp.""" | ||||
logger = logging.getLogger('boards.source') | ||||
if self.thread.is_archived(): | ||||
logger.error('The thread {} is archived, please try another one'.format(self.thread)) | ||||
else: | ||||
neko259
|
r1973 | tripcode = get_tripcode_from_text( | ||
neko259
|
r2008 | settings.get(SECTION_EXTERNAL, 'SourceFetcherTripcode')) | ||
neko259
|
r1969 | start_timestamp = self.timestamp | ||
neko259
|
r1968 | last_timestamp = start_timestamp | ||
neko259
|
r1969 | logger.info('Start timestamp is {}'.format(start_timestamp)) | ||
neko259
|
r1968 | if self.thread.is_bumplimit(): | ||
logger.warn('The thread {} has reached its bumplimit, please create a new one'.format(self.thread)) | ||||
if self.source_type == SOURCE_TYPE_RSS: | ||||
feed = feedparser.parse(self.source) | ||||
items = sorted(feed.entries, key=lambda entry: entry.published_parsed) | ||||
for item in items: | ||||
neko259
|
r1978 | title = self.strip_title(item.title, TITLE_MAX_LENGTH) | ||
neko259
|
r1970 | timestamp = datetime.fromtimestamp(calendar.timegm(item.published_parsed), tz=utc) | ||
neko259
|
r1968 | if not timestamp: | ||
logger.error('Invalid timestamp {} for {}'.format(item.published, title)) | ||||
else: | ||||
if timestamp > last_timestamp: | ||||
last_timestamp = timestamp | ||||
if timestamp > start_timestamp: | ||||
neko259
|
r1977 | Post.objects.create_post(title=title, text=self.parse_text(item.description), | ||
neko259
|
r1973 | thread=self.thread, file_urls=[item.link], tripcode=tripcode) | ||
neko259
|
r1968 | logger.info('Fetched item {} from {} into thread {}'.format( | ||
title, self.name, self.thread)) | ||||
neko259
|
r1969 | logger.info('New timestamp is {}'.format(last_timestamp)) | ||
neko259
|
r1968 | self.timestamp = last_timestamp | ||
self.save(update_fields=['timestamp']) | ||||
neko259
|
r1977 | def parse_text(self, text): | ||
return strip_tags(text) | ||||
neko259
|
r1978 | |||
def strip_title(self, title, max_length): | ||||
result = title | ||||
if len(title) > max_length: | ||||
result = title[:max_length - 1] + '…' | ||||
return result | ||||
neko259
|
r2123 | |||
class SyncSource(models.Model): | ||||
class Meta: | ||||
app_label = 'boards' | ||||
name = models.TextField() | ||||
timestamp = models.DateTimeField(blank=True, null=True) | ||||
url = models.TextField() | ||||
tags = models.TextField(blank=True) | ||||
query_split_limit = models.IntegerField() | ||||
def __str__(self): | ||||
return self.name | ||||
@transaction.atomic | ||||
def run_sync(self): | ||||
logger = logging.getLogger('boards.sync') | ||||
tags = [] | ||||
if self.tags: | ||||
tags = self.tags.split(DELIMITER_TAGS) | ||||
timestamp = None | ||||
if self.timestamp: | ||||
timestamp = self.timestamp | ||||
new_timestamp = timezone.now() | ||||
list_url = '{}api/sync/list/'.format(self.url) | ||||
get_url = '{}api/sync/get/'.format(self.url) | ||||
file_url = self.url[:-1] | ||||
xml = SyncManager.generate_request_list( | ||||
tags=tags, | ||||
timestamp_from=timestamp).encode() | ||||
logger.info('Running LIST request for {}...'.format(self.name)) | ||||
h = httplib2.Http() | ||||
response, content = h.request(list_url, method="POST", body=xml) | ||||
if response.status != 200: | ||||
raise Exception('Server returned error {}'.format(response.status)) | ||||
logger.info('Processing response...') | ||||
root = ET.fromstring(content) | ||||
status = root.findall('status')[0].text | ||||
if status == 'success': | ||||
ids_to_sync = list() | ||||
models = root.findall('models')[0] | ||||
for model in models: | ||||
self.add_to_sync_list(ids_to_sync, logger, model) | ||||
logger.info('Starting sync...') | ||||
if len(ids_to_sync) > 0: | ||||
if self.query_split_limit > 0: | ||||
limit = min(self.query_split_limit, len(ids_to_sync)) | ||||
else: | ||||
limit = len(ids_to_sync) | ||||
for offset in range(0, len(ids_to_sync), limit): | ||||
xml = SyncManager.generate_request_get( | ||||
ids_to_sync[offset:offset + limit]) | ||||
h = httplib2.Http() | ||||
logger.info('Running GET request...') | ||||
response, content = h.request(get_url, method="POST", | ||||
body=xml) | ||||
logger.info('Processing response...') | ||||
SyncManager.parse_response_get(content, file_url) | ||||
logger.info('Sync completed successfully for {}'.format(self.name)) | ||||
else: | ||||
logger.info('Nothing to get for {}, everything synced'.format(self.name)) | ||||
else: | ||||
raise Exception('Invalid response status') | ||||
self.timestamp = new_timestamp | ||||
self.save(update_fields=['timestamp']) | ||||
def add_to_sync_list(self, ids_to_sync, logger, model): | ||||
tag_id = model.find(TAG_ID) | ||||
global_id, exists = GlobalId.from_xml_element(tag_id) | ||||
from_this_board = self._is_from_this_board(global_id) | ||||
if from_this_board: | ||||
# If the post originates from this board, no need to process | ||||
# it again, nobody else could modify it | ||||
logger.debug('NO SYNC Processed post {}'.format(global_id)) | ||||
else: | ||||
tag_update_time = model.find(TAG_UPDATE_TIME) | ||||
if tag_update_time: | ||||
update_time = tag_update_time.text | ||||
else: | ||||
update_time = None | ||||
if not exists or update_time is None or global_id.post.last_edit_time < parse_datetime( | ||||
update_time): | ||||
logger.debug('SYNC Processed post {}'.format(global_id)) | ||||
ids_to_sync.append(global_id) | ||||
else: | ||||
logger.debug('NO SYNC Processed post {}'.format(global_id)) | ||||
def _is_from_this_board(self, global_id): | ||||
from_this_board = KeyPair.objects.filter( | ||||
key_type=global_id.key_type, | ||||
public_key=global_id.key).exists() | ||||
return from_this_board | ||||