##// END OF EJS Templates
Administration for sync sources. Now sync is available from the admin site instead of manual command line arguments. Still need to invoke sync_with_server by cron/timer
neko259 -
r2123:be532800 default
parent child Browse files
Show More
@@ -0,0 +1,24 b''
1 # Generated by Django 2.0.8 on 2018-08-05 17:55
2
3 from django.db import migrations, models
4
5
6 class Migration(migrations.Migration):
7
8 dependencies = [
9 ('boards', '0070_auto_20171225_1149'),
10 ]
11
12 operations = [
13 migrations.CreateModel(
14 name='SyncSource',
15 fields=[
16 ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
17 ('name', models.TextField()),
18 ('timestamp', models.DateTimeField(blank=True, null=True)),
19 ('url', models.TextField()),
20 ('tags', models.TextField(blank=True)),
21 ('query_split_limit', models.IntegerField()),
22 ],
23 ),
24 ]
@@ -1,193 +1,197 b''
1 1 from django.contrib import admin
2 2 from django.urls import reverse
3 3 from django.utils.translation import ugettext_lazy as _
4 4
5 5 from boards.models import Post, Tag, Ban, Thread, Banner, Attachment, \
6 6 KeyPair, GlobalId, TagAlias, STATUS_ACTIVE
7 7 from boards.models.attachment import FILE_TYPES_IMAGE, AttachmentSticker, \
8 8 StickerPack
9 from boards.models.source import ThreadSource
9 from boards.models.source import ThreadSource, SyncSource
10 10
11 11
12 12 @admin.register(Post)
13 13 class PostAdmin(admin.ModelAdmin):
14 14
15 15 list_display = ('id', 'title', 'text', 'poster_ip', 'linked_images',
16 16 'foreign', 'tags')
17 17 list_filter = ('pub_time',)
18 18 search_fields = ('id', 'title', 'text', 'poster_ip')
19 19 exclude = ('referenced_posts', 'refmap', 'images', 'global_id')
20 20 readonly_fields = ('poster_ip', 'thread', 'linked_images',
21 21 'attachments', 'uid', 'url', 'pub_time', 'opening', 'linked_global_id',
22 22 'foreign', 'tags')
23 23
24 24 def ban_poster(self, request, queryset):
25 25 bans = 0
26 26 for post in queryset:
27 27 poster_ip = post.poster_ip
28 28 ban, created = Ban.objects.get_or_create(ip=poster_ip)
29 29 if created:
30 30 bans += 1
31 31 self.message_user(request, _('{} posters were banned').format(bans))
32 32
33 33 def ban_latter_with_delete(self, request, queryset):
34 34 bans = 0
35 35 hidden = 0
36 36 for post in queryset:
37 37 poster_ip = post.poster_ip
38 38 ban, created = Ban.objects.get_or_create(ip=poster_ip)
39 39 if created:
40 40 bans += 1
41 41 posts = Post.objects.filter(poster_ip=poster_ip, id__gte=post.id)
42 42 hidden += posts.count()
43 43 posts.delete()
44 44 self.message_user(request, _('{} posters were banned, {} messages were removed.').format(bans, hidden))
45 45 ban_latter_with_delete.short_description = _('Ban user and delete posts starting from this one and later')
46 46
47 47 def linked_images(self, obj: Post):
48 48 images = obj.attachments.filter(mimetype__in=FILE_TYPES_IMAGE)
49 49 image_urls = ['<a href="{}"><img src="{}" /></a>'.format(
50 50 reverse('admin:%s_%s_change' % (image._meta.app_label,
51 51 image._meta.model_name),
52 52 args=[image.id]), image.get_thumb_url()) for image in images]
53 53 return ', '.join(image_urls)
54 54 linked_images.allow_tags = True
55 55
56 56 def linked_global_id(self, obj: Post):
57 57 global_id = obj.global_id
58 58 if global_id is not None:
59 59 return '<a href="{}">{}</a>'.format(
60 60 reverse('admin:%s_%s_change' % (global_id._meta.app_label,
61 61 global_id._meta.model_name),
62 62 args=[global_id.id]), str(global_id))
63 63 linked_global_id.allow_tags = True
64 64
65 65 def tags(self, obj: Post):
66 66 return ', '.join([tag.get_name() for tag in obj.get_tags()])
67 67
68 68 def save_model(self, request, obj, form, change):
69 69 obj.save()
70 70 obj.clear_cache()
71 71
72 72 def foreign(self, obj: Post):
73 73 return obj is not None and obj.global_id is not None and\
74 74 not obj.global_id.is_local()
75 75
76 76 actions = ['ban_poster', 'ban_latter_with_delete']
77 77
78 78
79 79 @admin.register(Tag)
80 80 class TagAdmin(admin.ModelAdmin):
81 81 def thread_count(self, obj: Tag) -> int:
82 82 return obj.get_thread_count()
83 83
84 84 def display_children(self, obj: Tag):
85 85 return ', '.join([str(child) for child in obj.get_children().all()])
86 86
87 87 def name(self, obj: Tag):
88 88 return obj.get_name()
89 89
90 90 def save_model(self, request, obj, form, change):
91 91 super().save_model(request, obj, form, change)
92 92 for thread in obj.get_threads().all():
93 93 thread.refresh_tags()
94 94
95 95 list_display = ('name', 'thread_count', 'display_children')
96 96 search_fields = ('id',)
97 97 readonly_fields = ('name',)
98 98
99 99
100 100 @admin.register(TagAlias)
101 101 class TagAliasAdmin(admin.ModelAdmin):
102 102 list_display = ('locale', 'name', 'parent')
103 103 list_filter = ('locale',)
104 104 search_fields = ('name',)
105 105
106 106
107 107 @admin.register(Thread)
108 108 class ThreadAdmin(admin.ModelAdmin):
109 109
110 110 def title(self, obj: Thread) -> str:
111 111 return obj.get_opening_post().get_title()
112 112
113 113 def reply_count(self, obj: Thread) -> int:
114 114 return obj.get_reply_count()
115 115
116 116 def ip(self, obj: Thread):
117 117 return obj.get_opening_post().poster_ip
118 118
119 119 def display_tags(self, obj: Thread):
120 120 return ', '.join([str(tag) for tag in obj.get_tags().all()])
121 121
122 122 def op(self, obj: Thread):
123 123 return obj.get_opening_post_id()
124 124
125 125 # Save parent tags when editing tags
126 126 def save_related(self, request, form, formsets, change):
127 127 super().save_related(request, form, formsets, change)
128 128 form.instance.refresh_tags()
129 129
130 130 def save_model(self, request, obj, form, change):
131 131 op = obj.get_opening_post()
132 132 obj.save()
133 133 op.clear_cache()
134 134
135 135 list_display = ('id', 'op', 'title', 'reply_count', 'status', 'ip',
136 136 'display_tags')
137 137 list_filter = ('bump_time', 'status')
138 138 search_fields = ('id', 'title')
139 139 filter_horizontal = ('tags',)
140 140
141 141
142 142 @admin.register(KeyPair)
143 143 class KeyPairAdmin(admin.ModelAdmin):
144 144 list_display = ('public_key', 'primary')
145 145 list_filter = ('primary',)
146 146 search_fields = ('public_key',)
147 147
148 148
149 149 @admin.register(Ban)
150 150 class BanAdmin(admin.ModelAdmin):
151 151 list_display = ('ip', 'can_read')
152 152 list_filter = ('can_read',)
153 153 search_fields = ('ip',)
154 154
155 155
156 156 @admin.register(Banner)
157 157 class BannerAdmin(admin.ModelAdmin):
158 158 list_display = ('title', 'text')
159 159
160 160
161 161 @admin.register(Attachment)
162 162 class AttachmentAdmin(admin.ModelAdmin):
163 163 list_display = ('__str__', 'mimetype', 'file', 'url')
164 164
165 165
166 166 @admin.register(AttachmentSticker)
167 167 class AttachmentStickerAdmin(admin.ModelAdmin):
168 168 search_fields = ('name',)
169 169
170 170
171 171 @admin.register(StickerPack)
172 172 class StickerPackAdmin(admin.ModelAdmin):
173 173 search_fields = ('name',)
174 174
175 175
176 176 @admin.register(GlobalId)
177 177 class GlobalIdAdmin(admin.ModelAdmin):
178 178 def is_linked(self, obj):
179 179 return Post.objects.filter(global_id=obj).exists()
180 180
181 181 list_display = ('__str__', 'is_linked',)
182 182 readonly_fields = ('content',)
183 183
184 184
185 185 @admin.register(ThreadSource)
186 186 class ThreadSourceAdmin(admin.ModelAdmin):
187 187 search_fields = ('name', 'source')
188 188
189 189 def formfield_for_foreignkey(self, db_field, request, **kwargs):
190 190 if db_field.name == 'thread':
191 191 kwargs['queryset'] = Thread.objects.filter(status=STATUS_ACTIVE)
192 192 return super().formfield_for_foreignkey(db_field, request, **kwargs)
193 193
194
195 @admin.register(SyncSource)
196 class SyncSourceAdmin(admin.ModelAdmin):
197 search_fields = ('name',) No newline at end of file
@@ -1,140 +1,13 b''
1 import re
2 import logging
3 import xml.etree.ElementTree as ET
1 from django.core.management import BaseCommand
4 2
5 import httplib2
6 from django.core.management import BaseCommand
7 from django.utils.dateparse import parse_datetime
8
9 from boards.models import GlobalId, KeyPair
10 from boards.models.post.sync import SyncManager, TAG_ID, TAG_UPDATE_TIME
3 from boards.models.source import SyncSource
11 4
12 5 __author__ = 'neko259'
13 6
14 7
15 REGEX_GLOBAL_ID = re.compile(r'(\w+)::([\w\+/]+)::(\d+)')
16
17
18 8 class Command(BaseCommand):
19 help = 'Send a sync or get request to the server.'
20
21 def add_arguments(self, parser):
22 parser.add_argument('url', type=str, help='Server root url')
23 parser.add_argument('--global-id', type=str, default='',
24 help='Post global ID')
25 parser.add_argument('--split-query', type=int, default=1,
26 help='Split GET query into separate by the given'
27 ' number of posts in one')
28 parser.add_argument('--thread', type=int,
29 help='Get posts of one specific thread')
30 parser.add_argument('--tags', type=str,
31 help='Get posts of the tags, comma-separated')
32 parser.add_argument('--time-from', type=str,
33 help='Get posts from the given timestamp')
9 help = 'Send a sync or get request to the servers.'
34 10
35 11 def handle(self, *args, **options):
36 logger = logging.getLogger('boards.sync')
37
38 url = options.get('url')
39
40 list_url = url + 'api/sync/list/'
41 get_url = url + 'api/sync/get/'
42 file_url = url[:-1]
43
44 global_id_str = options.get('global_id')
45 if global_id_str:
46 match = REGEX_GLOBAL_ID.match(global_id_str)
47 if match:
48 key_type = match.group(1)
49 key = match.group(2)
50 local_id = match.group(3)
51
52 global_id = GlobalId(key_type=key_type, key=key,
53 local_id=local_id)
54
55 xml = SyncManager.generate_request_get([global_id])
56 h = httplib2.Http()
57 response, content = h.request(get_url, method="POST", body=xml)
58
59 SyncManager.parse_response_get(content, file_url)
60 else:
61 raise Exception('Invalid global ID')
62 else:
63 logger.info('Running LIST request...')
64 h = httplib2.Http()
65
66 tags = []
67 tags_str = options.get('tags')
68 if tags_str:
69 tags = tags_str.split(',')
70
71 timestamp_str = options.get('time_from')
72 timestamp = None
73 if timestamp_str:
74 timestamp = parse_datetime(timestamp_str)
75 if not timestamp:
76 raise Exception('Timestamp {} cannot be parsed'.format(
77 timestamp_str))
78
79 xml = SyncManager.generate_request_list(
80 opening_post=options.get('thread'), tags=tags,
81 timestamp_from=timestamp).encode()
82 response, content = h.request(list_url, method="POST", body=xml)
83 if response.status != 200:
84 raise Exception('Server returned error {}'.format(response.status))
85
86 logger.info('Processing response...')
87
88 root = ET.fromstring(content)
89 status = root.findall('status')[0].text
90 if status == 'success':
91 ids_to_sync = list()
92
93 models = root.findall('models')[0]
94 for model in models:
95 self.add_to_sync_list(ids_to_sync, logger, model)
96 logger.info('Starting sync...')
97
98 if len(ids_to_sync) > 0:
99 limit = options.get('split_query', len(ids_to_sync))
100 for offset in range(0, len(ids_to_sync), limit):
101 xml = SyncManager.generate_request_get(ids_to_sync[offset:offset + limit])
102 h = httplib2.Http()
103 logger.info('Running GET request...')
104 response, content = h.request(get_url, method="POST", body=xml)
105 logger.info('Processing response...')
106
107 SyncManager.parse_response_get(content, file_url)
108
109 logger.info('Sync completed successfully')
110 else:
111 logger.info('Nothing to get, everything synced')
112 else:
113 raise Exception('Invalid response status')
114
115 def add_to_sync_list(self, ids_to_sync, logger, model):
116 tag_id = model.find(TAG_ID)
117 global_id, exists = GlobalId.from_xml_element(tag_id)
118 from_this_board = self._is_from_this_board(global_id)
119 if from_this_board:
120 # If the post originates from this board, no need to process
121 # it again, nobody else could modify it
122 logger.debug('NO SYNC Processed post {}'.format(global_id))
123 else:
124 tag_update_time = model.find(TAG_UPDATE_TIME)
125 if tag_update_time:
126 update_time = tag_update_time.text
127 else:
128 update_time = None
129 if not exists or update_time is None or global_id.post.last_edit_time < parse_datetime(
130 update_time):
131 logger.debug('SYNC Processed post {}'.format(global_id))
132 ids_to_sync.append(global_id)
133 else:
134 logger.debug('NO SYNC Processed post {}'.format(global_id))
135
136 def _is_from_this_board(self, global_id):
137 from_this_board = KeyPair.objects.filter(
138 key_type=global_id.key_type,
139 public_key=global_id.key).exists()
140 return from_this_board
12 for source in SyncSource.objects.all():
13 source.run_sync()
@@ -1,85 +1,195 b''
1 1 import feedparser
2 import httplib2
2 3 import logging
3 4 import calendar
5 import xml.etree.ElementTree as ET
4 6
5 7 from time import mktime
6 8 from datetime import datetime
7 9
8 10 from django.db import models, transaction
9 11 from django.utils.dateparse import parse_datetime
10 12 from django.utils.timezone import utc
11 13 from django.utils import timezone
12 14 from django.utils.html import strip_tags
13 15
14 from boards.models import Post
16 from boards.models import Post, GlobalId, KeyPair
15 17 from boards.models.post import TITLE_MAX_LENGTH
18 from boards.models.post.sync import SyncManager, TAG_ID, TAG_UPDATE_TIME
16 19 from boards.settings import SECTION_EXTERNAL
17 20 from boards.utils import get_tripcode_from_text
18 21 from boards import settings
19 22
23 DELIMITER_TAGS = ','
20 24
21 25 SOURCE_TYPE_MAX_LENGTH = 100
22 26 SOURCE_TYPE_RSS = 'RSS'
23 27 TYPE_CHOICES = (
24 28 (SOURCE_TYPE_RSS, SOURCE_TYPE_RSS),
25 29 )
26 30
27 31
28 32 class ThreadSource(models.Model):
29 33 class Meta:
30 34 app_label = 'boards'
31 35
32 36 name = models.TextField()
33 37 thread = models.ForeignKey('Thread', on_delete=models.CASCADE)
34 38 timestamp = models.DateTimeField()
35 39 source = models.TextField()
36 40 source_type = models.CharField(max_length=SOURCE_TYPE_MAX_LENGTH,
37 41 choices=TYPE_CHOICES)
38 42
39 43 def __str__(self):
40 44 return self.name
41 45
42 46 @transaction.atomic
43 47 def fetch_latest_posts(self):
44 48 """Creates new posts with the info fetched since the timestamp."""
45 49 logger = logging.getLogger('boards.source')
46 50
47 51 if self.thread.is_archived():
48 52 logger.error('The thread {} is archived, please try another one'.format(self.thread))
49 53 else:
50 54 tripcode = get_tripcode_from_text(
51 55 settings.get(SECTION_EXTERNAL, 'SourceFetcherTripcode'))
52 56 start_timestamp = self.timestamp
53 57 last_timestamp = start_timestamp
54 58 logger.info('Start timestamp is {}'.format(start_timestamp))
55 59 if self.thread.is_bumplimit():
56 60 logger.warn('The thread {} has reached its bumplimit, please create a new one'.format(self.thread))
57 61 if self.source_type == SOURCE_TYPE_RSS:
58 62 feed = feedparser.parse(self.source)
59 63 items = sorted(feed.entries, key=lambda entry: entry.published_parsed)
60 64 for item in items:
61 65 title = self.strip_title(item.title, TITLE_MAX_LENGTH)
62 66 timestamp = datetime.fromtimestamp(calendar.timegm(item.published_parsed), tz=utc)
63 67 if not timestamp:
64 68 logger.error('Invalid timestamp {} for {}'.format(item.published, title))
65 69 else:
66 70 if timestamp > last_timestamp:
67 71 last_timestamp = timestamp
68 72 if timestamp > start_timestamp:
69 73 Post.objects.create_post(title=title, text=self.parse_text(item.description),
70 74 thread=self.thread, file_urls=[item.link], tripcode=tripcode)
71 75 logger.info('Fetched item {} from {} into thread {}'.format(
72 76 title, self.name, self.thread))
73 77 logger.info('New timestamp is {}'.format(last_timestamp))
74 78 self.timestamp = last_timestamp
75 79 self.save(update_fields=['timestamp'])
76 80
77 81 def parse_text(self, text):
78 82 return strip_tags(text)
79 83
80 84 def strip_title(self, title, max_length):
81 85 result = title
82 86 if len(title) > max_length:
83 87 result = title[:max_length - 1] + '…'
84 88 return result
85 89
90
91 class SyncSource(models.Model):
92 class Meta:
93 app_label = 'boards'
94
95 name = models.TextField()
96 timestamp = models.DateTimeField(blank=True, null=True)
97 url = models.TextField()
98 tags = models.TextField(blank=True)
99 query_split_limit = models.IntegerField()
100
101 def __str__(self):
102 return self.name
103
104 @transaction.atomic
105 def run_sync(self):
106 logger = logging.getLogger('boards.sync')
107
108 tags = []
109 if self.tags:
110 tags = self.tags.split(DELIMITER_TAGS)
111
112 timestamp = None
113 if self.timestamp:
114 timestamp = self.timestamp
115
116 new_timestamp = timezone.now()
117
118 list_url = '{}api/sync/list/'.format(self.url)
119 get_url = '{}api/sync/get/'.format(self.url)
120 file_url = self.url[:-1]
121
122 xml = SyncManager.generate_request_list(
123 tags=tags,
124 timestamp_from=timestamp).encode()
125
126 logger.info('Running LIST request for {}...'.format(self.name))
127 h = httplib2.Http()
128 response, content = h.request(list_url, method="POST", body=xml)
129 if response.status != 200:
130 raise Exception('Server returned error {}'.format(response.status))
131
132 logger.info('Processing response...')
133
134 root = ET.fromstring(content)
135 status = root.findall('status')[0].text
136 if status == 'success':
137 ids_to_sync = list()
138
139 models = root.findall('models')[0]
140 for model in models:
141 self.add_to_sync_list(ids_to_sync, logger, model)
142 logger.info('Starting sync...')
143
144 if len(ids_to_sync) > 0:
145 if self.query_split_limit > 0:
146 limit = min(self.query_split_limit, len(ids_to_sync))
147 else:
148 limit = len(ids_to_sync)
149
150 for offset in range(0, len(ids_to_sync), limit):
151 xml = SyncManager.generate_request_get(
152 ids_to_sync[offset:offset + limit])
153 h = httplib2.Http()
154 logger.info('Running GET request...')
155 response, content = h.request(get_url, method="POST",
156 body=xml)
157 logger.info('Processing response...')
158
159 SyncManager.parse_response_get(content, file_url)
160
161 logger.info('Sync completed successfully for {}'.format(self.name))
162 else:
163 logger.info('Nothing to get for {}, everything synced'.format(self.name))
164 else:
165 raise Exception('Invalid response status')
166
167 self.timestamp = new_timestamp
168 self.save(update_fields=['timestamp'])
169
170 def add_to_sync_list(self, ids_to_sync, logger, model):
171 tag_id = model.find(TAG_ID)
172 global_id, exists = GlobalId.from_xml_element(tag_id)
173 from_this_board = self._is_from_this_board(global_id)
174 if from_this_board:
175 # If the post originates from this board, no need to process
176 # it again, nobody else could modify it
177 logger.debug('NO SYNC Processed post {}'.format(global_id))
178 else:
179 tag_update_time = model.find(TAG_UPDATE_TIME)
180 if tag_update_time:
181 update_time = tag_update_time.text
182 else:
183 update_time = None
184 if not exists or update_time is None or global_id.post.last_edit_time < parse_datetime(
185 update_time):
186 logger.debug('SYNC Processed post {}'.format(global_id))
187 ids_to_sync.append(global_id)
188 else:
189 logger.debug('NO SYNC Processed post {}'.format(global_id))
190
191 def _is_from_this_board(self, global_id):
192 from_this_board = KeyPair.objects.filter(
193 key_type=global_id.key_type,
194 public_key=global_id.key).exists()
195 return from_this_board
General Comments 0
You need to be logged in to leave comments. Login now