##// END OF EJS Templates
Fixed tag sync and tests
neko259 -
r1895:19684540 default
parent child Browse files
Show More
@@ -1,385 +1,385 b''
1 1 import xml.etree.ElementTree as et
2 2 import logging
3 3 from xml.etree import ElementTree
4 4
5 5 from boards.abstracts.exceptions import SyncException
6 6 from boards.abstracts.sync_filters import ThreadFilter, TagsFilter,\
7 7 TimestampFromFilter
8 8 from boards.models import KeyPair, GlobalId, Signature, Post, Tag
9 9 from boards.models.attachment.downloaders import download
10 10 from boards.models.signature import TAG_REQUEST, ATTR_TYPE, TYPE_GET, \
11 11 ATTR_VERSION, TAG_MODEL, ATTR_NAME, TAG_ID, TYPE_LIST
12 12 from boards.utils import get_file_mimetype, get_file_hash
13 13 from django.db import transaction
14 14
15 15 EXCEPTION_NODE = 'Sync node returned an error: {}.'
16 16 EXCEPTION_DOWNLOAD = 'File was not downloaded.'
17 17 EXCEPTION_HASH = 'File hash does not match attachment hash.'
18 18 EXCEPTION_SIGNATURE = 'Invalid model signature for {}.'
19 19 EXCEPTION_AUTHOR_SIGNATURE = 'Model {} has no author signature.'
20 20 EXCEPTION_THREAD = 'No thread exists for post {}'
21 21 ENCODING_UNICODE = 'unicode'
22 22
23 23 TAG_MODEL = 'model'
24 24 TAG_REQUEST = 'request'
25 25 TAG_RESPONSE = 'response'
26 26 TAG_ID = 'id'
27 27 TAG_STATUS = 'status'
28 28 TAG_MODELS = 'models'
29 29 TAG_TITLE = 'title'
30 30 TAG_TEXT = 'text'
31 31 TAG_THREAD = 'thread'
32 32 TAG_PUB_TIME = 'pub-time'
33 33 TAG_SIGNATURES = 'signatures'
34 34 TAG_SIGNATURE = 'signature'
35 35 TAG_CONTENT = 'content'
36 36 TAG_ATTACHMENTS = 'attachments'
37 37 TAG_ATTACHMENT = 'attachment'
38 38 TAG_TAGS = 'tags'
39 39 TAG_TAG = 'tag'
40 40 TAG_ATTACHMENT_REFS = 'attachment-refs'
41 41 TAG_ATTACHMENT_REF = 'attachment-ref'
42 42 TAG_TRIPCODE = 'tripcode'
43 43 TAG_VERSION = 'version'
44 44
45 45 TYPE_GET = 'get'
46 46
47 47 ATTR_VERSION = 'version'
48 48 ATTR_TYPE = 'type'
49 49 ATTR_NAME = 'name'
50 50 ATTR_VALUE = 'value'
51 51 ATTR_MIMETYPE = 'mimetype'
52 52 ATTR_KEY = 'key'
53 53 ATTR_REF = 'ref'
54 54 ATTR_URL = 'url'
55 55 ATTR_ID_TYPE = 'id-type'
56 56
57 57 ID_TYPE_MD5 = 'md5'
58 58 ID_TYPE_URL = 'url'
59 59
60 60 STATUS_SUCCESS = 'success'
61 61
62 62
63 63 logger = logging.getLogger('boards.sync')
64 64
65 65
66 66 class SyncManager:
67 67 @staticmethod
68 68 def generate_response_get(model_list: list):
69 69 response = et.Element(TAG_RESPONSE)
70 70
71 71 status = et.SubElement(response, TAG_STATUS)
72 72 status.text = STATUS_SUCCESS
73 73
74 74 models = et.SubElement(response, TAG_MODELS)
75 75
76 76 for post in model_list:
77 77 model = et.SubElement(models, TAG_MODEL)
78 78 model.set(ATTR_NAME, 'post')
79 79
80 80 global_id = post.global_id
81 81
82 82 attachments = post.attachments.all()
83 83 if global_id.content:
84 84 model.append(et.fromstring(global_id.content))
85 85 if len(attachments) > 0:
86 86 internal_attachments = False
87 87 for attachment in attachments:
88 88 if attachment.is_internal():
89 89 internal_attachments = True
90 90 break
91 91
92 92 if internal_attachments:
93 93 attachment_refs = et.SubElement(model, TAG_ATTACHMENT_REFS)
94 94 for file in attachments:
95 95 SyncManager._attachment_to_xml(
96 96 None, attachment_refs, file)
97 97 else:
98 98 content_tag = et.SubElement(model, TAG_CONTENT)
99 99
100 100 tag_id = et.SubElement(content_tag, TAG_ID)
101 101 global_id.to_xml_element(tag_id)
102 102
103 103 title = et.SubElement(content_tag, TAG_TITLE)
104 104 title.text = post.title
105 105
106 106 text = et.SubElement(content_tag, TAG_TEXT)
107 107 text.text = post.get_sync_text()
108 108
109 109 thread = post.get_thread()
110 110 if post.is_opening():
111 111 tag_tags = et.SubElement(content_tag, TAG_TAGS)
112 112 for tag in thread.get_tags():
113 113 tag_tag = et.SubElement(tag_tags, TAG_TAG)
114 tag_tag.text = tag.name
114 tag_tag.text = tag.get_name()
115 115 else:
116 116 tag_thread = et.SubElement(content_tag, TAG_THREAD)
117 117 thread_id = et.SubElement(tag_thread, TAG_ID)
118 118 thread.get_opening_post().global_id.to_xml_element(thread_id)
119 119
120 120 pub_time = et.SubElement(content_tag, TAG_PUB_TIME)
121 121 pub_time.text = str(post.get_pub_time_str())
122 122
123 123 if post.tripcode:
124 124 tripcode = et.SubElement(content_tag, TAG_TRIPCODE)
125 125 tripcode.text = post.tripcode
126 126
127 127 if len(attachments) > 0:
128 128 attachments_tag = et.SubElement(content_tag, TAG_ATTACHMENTS)
129 129
130 130 internal_attachments = False
131 131 for attachment in attachments:
132 132 if attachment.is_internal():
133 133 internal_attachments = True
134 134 break
135 135
136 136 if internal_attachments:
137 137 attachment_refs = et.SubElement(model, TAG_ATTACHMENT_REFS)
138 138 else:
139 139 attachment_refs = None
140 140
141 141 for file in attachments:
142 142 SyncManager._attachment_to_xml(
143 143 attachments_tag, attachment_refs, file)
144 144 version_tag = et.SubElement(content_tag, TAG_VERSION)
145 145 version_tag.text = str(post.version)
146 146
147 147 global_id.content = et.tostring(content_tag, ENCODING_UNICODE)
148 148 global_id.save()
149 149
150 150 signatures_tag = et.SubElement(model, TAG_SIGNATURES)
151 151 post_signatures = global_id.signature_set.all()
152 152 if post_signatures:
153 153 signatures = post_signatures
154 154 else:
155 155 key = KeyPair.objects.get(public_key=global_id.key)
156 156 signature = Signature(
157 157 key_type=key.key_type,
158 158 key=key.public_key,
159 159 signature=key.sign(global_id.content),
160 160 global_id=global_id,
161 161 )
162 162 signature.save()
163 163 signatures = [signature]
164 164 for signature in signatures:
165 165 signature_tag = et.SubElement(signatures_tag, TAG_SIGNATURE)
166 166 signature_tag.set(ATTR_TYPE, signature.key_type)
167 167 signature_tag.set(ATTR_VALUE, signature.signature)
168 168 signature_tag.set(ATTR_KEY, signature.key)
169 169
170 170 return et.tostring(response, ENCODING_UNICODE)
171 171
172 172 @staticmethod
173 173 def parse_response_get(response_xml, hostname):
174 174 tag_root = et.fromstring(response_xml)
175 175 tag_status = tag_root.find(TAG_STATUS)
176 176 if STATUS_SUCCESS == tag_status.text:
177 177 tag_models = tag_root.find(TAG_MODELS)
178 178 for tag_model in tag_models:
179 179 SyncManager.parse_post(tag_model, hostname)
180 180 else:
181 181 raise SyncException(EXCEPTION_NODE.format(tag_status.text))
182 182
183 183 @staticmethod
184 184 @transaction.atomic
185 185 def parse_post(tag_model, hostname):
186 186 tag_content = tag_model.find(TAG_CONTENT)
187 187
188 188 content_str = et.tostring(tag_content, ENCODING_UNICODE)
189 189
190 190 tag_id = tag_content.find(TAG_ID)
191 191 global_id, exists = GlobalId.from_xml_element(tag_id)
192 192 signatures = SyncManager._verify_model(global_id, content_str, tag_model)
193 193
194 194 version = int(tag_content.find(TAG_VERSION).text)
195 195 is_old = exists and global_id.post.version < version
196 196 if exists and not is_old:
197 197 logger.debug('Post {} exists and is up to date.'.format(global_id))
198 198 else:
199 199 global_id.content = content_str
200 200 global_id.save()
201 201 for signature in signatures:
202 202 signature.global_id = global_id
203 203 signature.save()
204 204
205 205 title = tag_content.find(TAG_TITLE).text or ''
206 206 text = tag_content.find(TAG_TEXT).text or ''
207 207 pub_time = tag_content.find(TAG_PUB_TIME).text
208 208 tripcode_tag = tag_content.find(TAG_TRIPCODE)
209 209 if tripcode_tag is not None:
210 210 tripcode = tripcode_tag.text or ''
211 211 else:
212 212 tripcode = ''
213 213
214 214 thread = tag_content.find(TAG_THREAD)
215 215 tags = []
216 216 if thread:
217 217 thread_id = thread.find(TAG_ID)
218 218 op_global_id, exists = GlobalId.from_xml_element(thread_id)
219 219 if exists:
220 220 opening_post = Post.objects.get(global_id=op_global_id)
221 221 else:
222 222 raise Exception(EXCEPTION_THREAD.format(global_id))
223 223 else:
224 224 opening_post = None
225 225 tag_tags = tag_content.find(TAG_TAGS)
226 226 for tag_tag in tag_tags:
227 227 tag, created = Tag.objects.get_or_create(
228 name=tag_tag.text)
228 aliases__name=tag_tag.text)
229 229 tags.append(tag)
230 230
231 231 # TODO Check that the replied posts are already present
232 232 # before adding new ones
233 233
234 234 files = []
235 235 urls = []
236 236 tag_attachments = tag_content.find(TAG_ATTACHMENTS) or list()
237 237 tag_refs = tag_model.find(TAG_ATTACHMENT_REFS)
238 238 for attachment in tag_attachments:
239 239 if attachment.get(ATTR_ID_TYPE) == ID_TYPE_URL:
240 240 urls.append(attachment.text)
241 241 else:
242 242 tag_ref = tag_refs.find("{}[@ref='{}']".format(
243 243 TAG_ATTACHMENT_REF, attachment.text))
244 244 url = tag_ref.get(ATTR_URL)
245 245 attached_file = download(hostname + url, validate=False)
246 246 if attached_file is None:
247 247 raise SyncException(EXCEPTION_DOWNLOAD)
248 248
249 249 hash = get_file_hash(attached_file)
250 250 if hash != attachment.text:
251 251 raise SyncException(EXCEPTION_HASH)
252 252
253 253 files.append(attached_file)
254 254
255 255 if is_old:
256 256 post = global_id.post
257 257 Post.objects.update_post(
258 258 post, title=title, text=text, pub_time=pub_time,
259 259 tags=tags, files=files, file_urls=urls,
260 260 tripcode=tripcode, version=version)
261 261 logger.debug('Parsed updated post {}'.format(global_id))
262 262 else:
263 263 Post.objects.import_post(
264 264 title=title, text=text, pub_time=pub_time,
265 265 opening_post=opening_post, tags=tags,
266 266 global_id=global_id, files=files,
267 267 file_urls=urls, tripcode=tripcode,
268 268 version=version)
269 269 logger.debug('Parsed new post {}'.format(global_id))
270 270
271 271 @staticmethod
272 272 def generate_response_list(filters):
273 273 response = et.Element(TAG_RESPONSE)
274 274
275 275 status = et.SubElement(response, TAG_STATUS)
276 276 status.text = STATUS_SUCCESS
277 277
278 278 models = et.SubElement(response, TAG_MODELS)
279 279
280 280 posts = Post.objects.prefetch_related('global_id')
281 281 for post_filter in filters:
282 282 posts = post_filter.filter(posts)
283 283
284 284 for post in posts:
285 285 tag_model = et.SubElement(models, TAG_MODEL)
286 286 tag_id = et.SubElement(tag_model, TAG_ID)
287 287 post.global_id.to_xml_element(tag_id)
288 288 tag_version = et.SubElement(tag_model, TAG_VERSION)
289 289 tag_version.text = str(post.version)
290 290
291 291 return et.tostring(response, ENCODING_UNICODE)
292 292
293 293 @staticmethod
294 294 def _verify_model(global_id, content_str, tag_model):
295 295 """
296 296 Verifies all signatures for a single model.
297 297 """
298 298
299 299 signatures = []
300 300
301 301 tag_signatures = tag_model.find(TAG_SIGNATURES)
302 302 has_author_signature = False
303 303 for tag_signature in tag_signatures:
304 304 signature_type = tag_signature.get(ATTR_TYPE)
305 305 signature_value = tag_signature.get(ATTR_VALUE)
306 306 signature_key = tag_signature.get(ATTR_KEY)
307 307
308 308 if global_id.key_type == signature_type and\
309 309 global_id.key == signature_key:
310 310 has_author_signature = True
311 311
312 312 signature = Signature(key_type=signature_type,
313 313 key=signature_key,
314 314 signature=signature_value)
315 315
316 316 if not KeyPair.objects.verify(signature, content_str):
317 317 raise SyncException(EXCEPTION_SIGNATURE.format(content_str))
318 318
319 319 signatures.append(signature)
320 320 if not has_author_signature:
321 321 raise SyncException(EXCEPTION_AUTHOR_SIGNATURE.format(content_str))
322 322
323 323 return signatures
324 324
325 325 @staticmethod
326 326 def _attachment_to_xml(tag_attachments, tag_refs, attachment):
327 327 if tag_attachments is not None:
328 328 attachment_tag = et.SubElement(tag_attachments, TAG_ATTACHMENT)
329 329 if attachment.is_internal():
330 330 mimetype = get_file_mimetype(attachment.file.file)
331 331 attachment_tag.set(ATTR_MIMETYPE, mimetype)
332 332 attachment_tag.set(ATTR_ID_TYPE, ID_TYPE_MD5)
333 333 attachment_tag.text = attachment.hash
334 334 else:
335 335 attachment_tag.set(ATTR_ID_TYPE, ID_TYPE_URL)
336 336 attachment_tag.text = attachment.url
337 337
338 338 if tag_refs is not None and attachment.is_internal():
339 339 attachment_ref = et.SubElement(tag_refs, TAG_ATTACHMENT_REF)
340 340 attachment_ref.set(ATTR_REF, attachment.hash)
341 341 attachment_ref.set(ATTR_URL, attachment.file.url)
342 342
343 343 @staticmethod
344 344 def generate_request_get(global_id_list: list):
345 345 """
346 346 Form a get request from a list of ModelId objects.
347 347 """
348 348
349 349 request = et.Element(TAG_REQUEST)
350 350 request.set(ATTR_TYPE, TYPE_GET)
351 351 request.set(ATTR_VERSION, '1.0')
352 352
353 353 model = et.SubElement(request, TAG_MODEL)
354 354 model.set(ATTR_VERSION, '1.0')
355 355 model.set(ATTR_NAME, 'post')
356 356
357 357 for global_id in global_id_list:
358 358 tag_id = et.SubElement(model, TAG_ID)
359 359 global_id.to_xml_element(tag_id)
360 360
361 361 return et.tostring(request, 'unicode')
362 362
363 363 @staticmethod
364 364 def generate_request_list(opening_post=None, tags=list(),
365 365 timestamp_from=None):
366 366 """
367 367 Form a pull request from a list of ModelId objects.
368 368 """
369 369
370 370 request = et.Element(TAG_REQUEST)
371 371 request.set(ATTR_TYPE, TYPE_LIST)
372 372 request.set(ATTR_VERSION, '1.0')
373 373
374 374 model = et.SubElement(request, TAG_MODEL)
375 375 model.set(ATTR_VERSION, '1.0')
376 376 model.set(ATTR_NAME, 'post')
377 377
378 378 if opening_post:
379 379 ThreadFilter().add_filter(model, opening_post)
380 380 if tags:
381 381 TagsFilter().add_filter(model, tags)
382 382 if timestamp_from:
383 383 TimestampFromFilter().add_filter(model, timestamp_from)
384 384
385 385 return et.tostring(request, 'unicode')
@@ -1,70 +1,70 b''
1 1 import simplejson
2 2
3 3 from django.test import TestCase
4 4 from boards.views import api
5 5
6 6 from boards.models import Tag, Post
7 7 from boards.tests.mocks import MockRequest
8 8 from boards.utils import datetime_to_epoch
9 9
10 10
11 11 class ApiTest(TestCase):
12 12 def test_thread_diff(self):
13 tag = Tag.objects.create(name='test_tag')
13 tag, created = Tag.objects.get_or_create_with_alias(name='test_tag')
14 14 opening_post = Post.objects.create_post(title='title', text='text',
15 15 tags=[tag])
16 16
17 17 req = MockRequest()
18 18 req.POST['thread'] = opening_post.id
19 19 req.POST['uids'] = opening_post.uid
20 20 # Check the exact timestamp post was added
21 21 empty_response = api.api_get_threaddiff(req)
22 22 diff = simplejson.loads(empty_response.content)
23 23 self.assertEqual(0, len(diff['updated']),
24 24 'There must be no updated posts in the diff.')
25 25
26 26 uids = [opening_post.uid]
27 27
28 28 reply = Post.objects.create_post(title='',
29 29 text='[post]%d[/post]\ntext' % opening_post.id,
30 30 thread=opening_post.get_thread())
31 31 req = MockRequest()
32 32 req.POST['thread'] = opening_post.id
33 33 req.POST['uids'] = ' '.join(uids)
34 34 req.user = None
35 35 # Check the timestamp before post was added
36 36 response = api.api_get_threaddiff(req)
37 37 diff = simplejson.loads(response.content)
38 38 self.assertEqual(2, len(diff['updated']),
39 39 'There must be 2 updated posts in the diff.')
40 40
41 41 # Reload post to get the new UID
42 42 opening_post = Post.objects.get(id=opening_post.id)
43 43 req = MockRequest()
44 44 req.POST['thread'] = opening_post.id
45 45 req.POST['uids'] = ' '.join([opening_post.uid, reply.uid])
46 46 empty_response = api.api_get_threaddiff(req)
47 47 diff = simplejson.loads(empty_response.content)
48 48 self.assertEqual(0, len(diff['updated']),
49 49 'There must be no updated posts in the diff.')
50 50
51 51 def test_get_threads(self):
52 52 # Create 10 threads
53 tag = Tag.objects.create(name='test_tag')
53 tag, created = Tag.objects.get_or_create_with_alias(name='test_tag')
54 54 for i in range(5):
55 55 Post.objects.create_post(title='title', text='text', tags=[tag])
56 56
57 57 # Get all threads
58 58 response = api.api_get_threads(MockRequest(), 5)
59 59 diff = simplejson.loads(response.content)
60 60 self.assertEqual(5, len(diff), 'Invalid thread list response.')
61 61
62 62 # Get less threads then exist
63 63 response = api.api_get_threads(MockRequest(), 3)
64 64 diff = simplejson.loads(response.content)
65 65 self.assertEqual(3, len(diff), 'Invalid thread list response.')
66 66
67 67 # Get more threads then exist
68 68 response = api.api_get_threads(MockRequest(), 10)
69 69 diff = simplejson.loads(response.content)
70 70 self.assertEqual(5, len(diff), 'Invalid thread list response.')
@@ -1,56 +1,56 b''
1 1 from django.test import TestCase, Client
2 2 import time
3 3 from boards import settings
4 4 from boards.models import Post, Tag
5 5 import neboard
6 6
7 7
8 8 TEST_TAG = 'test_tag'
9 9
10 10 PAGE_404 = 'boards/404.html'
11 11
12 12 TEST_TEXT = 'test text'
13 13
14 14 NEW_THREAD_PAGE = '/'
15 15 THREAD_PAGE_ONE = '/thread/1/'
16 16 HTTP_CODE_REDIRECT = 302
17 17
18 18
19 19 class FormTest(TestCase):
20 20 def test_post_validation(self):
21 21 client = Client()
22 22
23 23 valid_tags = 'tag1 tag_2 Ρ‚Π΅Π³_3'
24 24 invalid_tags = '$%_356 ---'
25 Tag.objects.create(name='tag1', required=True)
25 Tag.objects.get_or_create_with_alias(name='tag1', required=True)
26 26
27 27 response = client.post(NEW_THREAD_PAGE, {'title': 'test title',
28 28 'text': TEST_TEXT,
29 29 'tags': valid_tags})
30 30 self.assertEqual(response.status_code, HTTP_CODE_REDIRECT,
31 31 msg='Posting new message failed: got code ' +
32 32 str(response.status_code))
33 33
34 34 self.assertEqual(1, Post.objects.count(),
35 35 msg='No posts were created')
36 36
37 37 client.post(NEW_THREAD_PAGE, {'text': TEST_TEXT,
38 38 'tags': invalid_tags})
39 39 self.assertEqual(1, Post.objects.count(), msg='The validation passed '
40 40 'where it should fail')
41 41
42 42 # Change posting delay so we don't have to wait for 30 seconds or more
43 43 old_posting_delay = neboard.settings.POSTING_DELAY
44 44 # Wait fot the posting delay or we won't be able to post
45 45 neboard.settings.POSTING_DELAY = 1
46 46 time.sleep(neboard.settings.POSTING_DELAY + 1)
47 47 response = client.post(THREAD_PAGE_ONE, {'text': TEST_TEXT,
48 48 'tags': valid_tags})
49 49 self.assertEqual(HTTP_CODE_REDIRECT, response.status_code,
50 50 msg='Posting new message failed: got code ' +
51 51 str(response.status_code))
52 52 # Restore posting delay
53 53 settings.POSTING_DELAY = old_posting_delay
54 54
55 55 self.assertEqual(2, Post.objects.count(),
56 56 msg='No posts were created')
@@ -1,52 +1,52 b''
1 1 from django.test import TestCase, Client
2 2 from boards.models import Tag, Post
3 3
4 4 TEST_TEXT = 'test'
5 5
6 6 NEW_THREAD_PAGE = '/'
7 7 THREAD_PAGE_ONE = '/thread/1/'
8 8 THREAD_PAGE = '/thread/'
9 9 TAG_PAGE = '/tag/'
10 10 HTTP_CODE_REDIRECT = 302
11 11 HTTP_CODE_OK = 200
12 12 HTTP_CODE_NOT_FOUND = 404
13 13
14 14
15 15 class PagesTest(TestCase):
16 16
17 17 def test_404(self):
18 18 """Test receiving error 404 when opening a non-existent page"""
19 19
20 20 tag_name = 'test_tag'
21 tag = Tag.objects.create(name=tag_name)
21 tag, created = Tag.objects.get_or_create_with_alias(name=tag_name)
22 22 client = Client()
23 23
24 24 Post.objects.create_post('title', TEST_TEXT, tags=[tag])
25 25
26 26 existing_post_id = Post.objects.all()[0].id
27 27 response_existing = client.get(THREAD_PAGE + str(existing_post_id) +
28 28 '/')
29 29 self.assertEqual(HTTP_CODE_OK, response_existing.status_code,
30 30 'Cannot open existing thread')
31 31
32 32 response_not_existing = client.get(THREAD_PAGE + str(
33 33 existing_post_id + 1) + '/')
34 34 self.assertEqual(HTTP_CODE_NOT_FOUND, response_not_existing.status_code,
35 35 'Not existing thread is opened')
36 36
37 37 response_existing = client.get(TAG_PAGE + tag_name + '/')
38 38 self.assertEqual(HTTP_CODE_OK,
39 39 response_existing.status_code,
40 40 'Cannot open existing tag')
41 41
42 42 response_not_existing = client.get(TAG_PAGE + 'not_tag' + '/')
43 43 self.assertEqual(HTTP_CODE_NOT_FOUND, response_not_existing.status_code,
44 44 'Not existing tag is opened')
45 45
46 46 reply_id = Post.objects.create_post('', TEST_TEXT,
47 47 thread=Post.objects.all()[0]
48 48 .get_thread()).id
49 49 response_not_existing = client.get(THREAD_PAGE + str(
50 50 reply_id) + '/')
51 51 self.assertEqual(HTTP_CODE_REDIRECT, response_not_existing.status_code,
52 52 'Reply is opened as a thread')
@@ -1,176 +1,176 b''
1 1 from django.core.paginator import Paginator
2 2 from django.test import TestCase
3 3
4 4 from boards import settings
5 5 from boards.models import Tag, Post, Thread, KeyPair
6 6 from boards.models.thread import STATUS_ARCHIVE
7 7
8 8
9 9 class PostTests(TestCase):
10 10
11 11 def _create_post(self):
12 tag, created = Tag.objects.get_or_create(name='test_tag')
12 tag, created = Tag.objects.get_or_create_with_alias(name='test_tag')
13 13 return Post.objects.create_post(title='title', text='text',
14 14 tags=[tag])
15 15
16 16 def test_post_add(self):
17 17 """Test adding post"""
18 18
19 19 post = self._create_post()
20 20
21 21 self.assertIsNotNone(post, 'No post was created.')
22 self.assertEqual('test_tag', post.get_thread().tags.all()[0].name,
22 self.assertEqual('test_tag', post.get_thread().tags.all()[0].get_name(),
23 23 'No tags were added to the post.')
24 24
25 25 def test_delete_post(self):
26 26 """Test post deletion"""
27 27
28 28 post = self._create_post()
29 29 post_id = post.id
30 30
31 31 post.delete()
32 32
33 33 self.assertFalse(Post.objects.filter(id=post_id).exists())
34 34
35 35 def test_delete_thread(self):
36 36 """Test thread deletion"""
37 37
38 38 opening_post = self._create_post()
39 39 thread = opening_post.get_thread()
40 40 reply = Post.objects.create_post("", "", thread=thread)
41 41
42 42 thread.delete()
43 43
44 44 self.assertFalse(Post.objects.filter(id=reply.id).exists(),
45 45 'Reply was not deleted with the thread.')
46 46 self.assertFalse(Post.objects.filter(id=opening_post.id).exists(),
47 47 'Opening post was not deleted with the thread.')
48 48
49 49 def test_post_to_thread(self):
50 50 """Test adding post to a thread"""
51 51
52 52 op = self._create_post()
53 53 post = Post.objects.create_post("", "", thread=op.get_thread())
54 54
55 55 self.assertIsNotNone(post, 'Reply to thread wasn\'t created')
56 56 self.assertEqual(op.get_thread().last_edit_time, post.pub_time,
57 57 'Post\'s create time doesn\'t match thread last edit'
58 58 ' time')
59 59
60 60 def test_delete_posts_by_ip(self):
61 61 """Test deleting posts with the given ip"""
62 62
63 63 post = self._create_post()
64 64 post_id = post.id
65 65
66 66 Post.objects.delete_posts_by_ip('0.0.0.0')
67 67
68 68 self.assertFalse(Post.objects.filter(id=post_id).exists())
69 69
70 70 def test_get_thread(self):
71 71 """Test getting all posts of a thread"""
72 72
73 73 opening_post = self._create_post()
74 74
75 75 for i in range(2):
76 76 Post.objects.create_post('title', 'text',
77 77 thread=opening_post.get_thread())
78 78
79 79 thread = opening_post.get_thread()
80 80
81 81 self.assertEqual(3, thread.get_replies().count())
82 82
83 83 def test_create_post_with_tag(self):
84 84 """Test adding tag to post"""
85 85
86 tag = Tag.objects.create(name='test_tag')
86 tag, created = Tag.objects.get_or_create_with_alias(name='test_tag')
87 87 post = Post.objects.create_post(title='title', text='text', tags=[tag])
88 88
89 89 thread = post.get_thread()
90 90 self.assertIsNotNone(post, 'Post not created')
91 91 self.assertTrue(tag in thread.tags.all(), 'Tag not added to thread')
92 92
93 93 def test_pages(self):
94 94 """Test that the thread list is properly split into pages"""
95 95
96 96 for i in range(settings.get_int('View', 'ThreadsPerPage') * 2):
97 97 self._create_post()
98 98
99 99 all_threads = Thread.objects.exclude(status=STATUS_ARCHIVE)
100 100
101 101 paginator = Paginator(Thread.objects.exclude(status=STATUS_ARCHIVE),
102 102 settings.get_int('View', 'ThreadsPerPage'))
103 103 posts_in_second_page = paginator.page(2).object_list
104 104 first_post = posts_in_second_page[0]
105 105
106 106 self.assertEqual(all_threads[settings.get_int('View', 'ThreadsPerPage')].id,
107 107 first_post.id)
108 108
109 109 def test_reflinks(self):
110 110 """
111 111 Tests that reflinks are parsed within post and connecting replies
112 112 to the replied posts.
113 113
114 114 Local reflink example: [post]123[/post]
115 115 Global reflink example: [post]key_type::key::123[/post]
116 116 """
117 117
118 118 key = KeyPair.objects.generate_key(primary=True)
119 119
120 tag = Tag.objects.create(name='test_tag')
120 tag, created = Tag.objects.get_or_create_with_alias(name='test_tag')
121 121
122 122 post = Post.objects.create_post(title='', text='', tags=[tag])
123 123 post_local_reflink = Post.objects.create_post(title='',
124 124 text='[post]%d[/post]' % post.id, thread=post.get_thread())
125 125
126 126 self.assertTrue(post_local_reflink in post.referenced_posts.all(),
127 127 'Local reflink not connecting posts.')
128 128
129 129
130 130 def test_thread_replies(self):
131 131 """
132 132 Tests that the replies can be queried from a thread in all possible
133 133 ways.
134 134 """
135 135
136 tag = Tag.objects.create(name='test_tag')
136 tag, created = Tag.objects.get_or_create_with_alias(name='test_tag')
137 137 opening_post = Post.objects.create_post(title='title', text='text',
138 138 tags=[tag])
139 139 thread = opening_post.get_thread()
140 140
141 141 Post.objects.create_post(title='title', text='text', thread=thread)
142 142 Post.objects.create_post(title='title', text='text', thread=thread)
143 143
144 144 replies = thread.get_replies()
145 145 self.assertTrue(len(replies) > 0, 'No replies found for thread.')
146 146
147 147 replies = thread.get_replies(view_fields_only=True)
148 148 self.assertTrue(len(replies) > 0,
149 149 'No replies found for thread with view fields only.')
150 150
151 151 def test_bumplimit(self):
152 152 """
153 153 Tests that the thread bumpable status is changed and post uids and
154 154 last update times are updated across all post threads.
155 155 """
156 156
157 157 op1 = Post.objects.create_post(title='title', text='text')
158 158
159 159 thread1 = op1.get_thread()
160 160 thread1.max_posts = 5
161 161 thread1.save()
162 162
163 163 uid_1 = op1.uid
164 164
165 165 # Create multi reply
166 166 Post.objects.create_post(
167 167 title='title', text='text', thread=thread1)
168 168 for i in range(6):
169 169 Post.objects.create_post(title='title', text='text',
170 170 thread=thread1)
171 171
172 172 self.assertFalse(op1.get_thread().can_bump(),
173 173 'Thread is bumpable when it should not be.')
174 174 self.assertNotEqual(
175 175 uid_1, Post.objects.get(id=op1.id).uid,
176 176 'UID of the first OP should be changed but it is not.')
@@ -1,253 +1,253 b''
1 1 from django.test import TestCase
2 2
3 3 from boards.models import KeyPair, Post, Tag
4 4 from boards.models.post.sync import SyncManager
5 5 from boards.tests.mocks import MockRequest
6 6 from boards.views.sync import response_get, response_list
7 7
8 8 __author__ = 'neko259'
9 9
10 10
11 11 class SyncTest(TestCase):
12 12 def test_get(self):
13 13 """
14 14 Forms a GET request of a post and checks the response.
15 15 """
16 16
17 17 key = KeyPair.objects.generate_key(primary=True)
18 tag = Tag.objects.create(name='tag1')
18 tag, created = Tag.objects.get_or_create_with_alias(name='tag1')
19 19 post = Post.objects.create_post(title='test_title',
20 20 text='test_text\rline two',
21 21 tags=[tag])
22 22
23 23 request = MockRequest()
24 24 request.body = (
25 25 '<request type="get" version="1.0">'
26 26 '<model name="post" version="1.0">'
27 27 '<id key="%s" local-id="%d" type="%s" />'
28 28 '</model>'
29 29 '</request>' % (post.global_id.key,
30 30 post.id,
31 31 post.global_id.key_type)
32 32 )
33 33
34 34 response = response_get(request).content.decode()
35 35 self.assertTrue(
36 36 '<status>success</status>'
37 37 '<models>'
38 38 '<model name="post">'
39 39 '<content>'
40 40 '<id key="%s" local-id="%d" type="%s" />'
41 41 '<title>%s</title>'
42 42 '<text>%s</text>'
43 43 '<tags><tag>%s</tag></tags>'
44 44 '<pub-time>%s</pub-time>'
45 45 '<version>%s</version>'
46 46 '</content>' % (
47 47 post.global_id.key,
48 48 post.global_id.local_id,
49 49 post.global_id.key_type,
50 50 post.title,
51 51 post.get_sync_text(),
52 post.get_thread().get_tags().first().name,
52 post.get_thread().get_tags().first().get_name(),
53 53 post.get_pub_time_str(),
54 54 post.version,
55 55 ) in response,
56 56 'Wrong response generated for the GET request.')
57 57
58 58 post.delete()
59 59 key.delete()
60 60
61 61 KeyPair.objects.generate_key(primary=True)
62 62
63 63 SyncManager.parse_response_get(response, None)
64 64 self.assertEqual(1, Post.objects.count(),
65 65 'Post was not created from XML response.')
66 66
67 67 parsed_post = Post.objects.first()
68 68 self.assertEqual('tag1',
69 parsed_post.get_thread().get_tags().first().name,
69 parsed_post.get_thread().get_tags().first().get_name(),
70 70 'Invalid tag was parsed.')
71 71
72 72 SyncManager.parse_response_get(response, None)
73 73 self.assertEqual(1, Post.objects.count(),
74 74 'The same post was imported twice.')
75 75
76 76 self.assertEqual(1, parsed_post.global_id.signature_set.count(),
77 77 'Signature was not saved.')
78 78
79 79 post = parsed_post
80 80
81 81 # Trying to sync the same once more
82 82 response = response_get(request).content.decode()
83 83
84 84 self.assertTrue(
85 85 '<status>success</status>'
86 86 '<models>'
87 87 '<model name="post">'
88 88 '<content>'
89 89 '<id key="%s" local-id="%d" type="%s" />'
90 90 '<title>%s</title>'
91 91 '<text>%s</text>'
92 92 '<tags><tag>%s</tag></tags>'
93 93 '<pub-time>%s</pub-time>'
94 94 '<version>%s</version>'
95 95 '</content>' % (
96 96 post.global_id.key,
97 97 post.global_id.local_id,
98 98 post.global_id.key_type,
99 99 post.title,
100 100 post.get_sync_text(),
101 post.get_thread().get_tags().first().name,
101 post.get_thread().get_tags().first().get_name(),
102 102 post.get_pub_time_str(),
103 103 post.version,
104 104 ) in response,
105 105 'Wrong response generated for the GET request.')
106 106
107 107 def test_list_all(self):
108 108 key = KeyPair.objects.generate_key(primary=True)
109 tag = Tag.objects.create(name='tag1')
109 tag, created = Tag.objects.get_or_create_with_alias(name='tag1')
110 110 post = Post.objects.create_post(title='test_title',
111 111 text='test_text\rline two',
112 112 tags=[tag])
113 113 post2 = Post.objects.create_post(title='test title 2',
114 114 text='test text 2',
115 115 tags=[tag])
116 116
117 117 request_all = MockRequest()
118 118 request_all.body = (
119 119 '<request type="list" version="1.0">'
120 120 '<model name="post" version="1.0">'
121 121 '</model>'
122 122 '</request>'
123 123 )
124 124
125 125 response_all = response_list(request_all).content.decode()
126 126 self.assertTrue(
127 127 '<status>success</status>'
128 128 '<models>'
129 129 '<model>'
130 130 '<id key="{}" local-id="{}" type="{}" />'
131 131 '<version>{}</version>'
132 132 '</model>'
133 133 '<model>'
134 134 '<id key="{}" local-id="{}" type="{}" />'
135 135 '<version>{}</version>'
136 136 '</model>'
137 137 '</models>'.format(
138 138 post.global_id.key,
139 139 post.global_id.local_id,
140 140 post.global_id.key_type,
141 141 post.version,
142 142 post2.global_id.key,
143 143 post2.global_id.local_id,
144 144 post2.global_id.key_type,
145 145 post2.version,
146 146 ) in response_all,
147 147 'Wrong response generated for the LIST request for all posts.')
148 148
149 149 def test_list_existing_thread(self):
150 150 key = KeyPair.objects.generate_key(primary=True)
151 tag = Tag.objects.create(name='tag1')
151 tag, created = Tag.objects.get_or_create_with_alias(name='tag1')
152 152 post = Post.objects.create_post(title='test_title',
153 153 text='test_text\rline two',
154 154 tags=[tag])
155 155 post2 = Post.objects.create_post(title='test title 2',
156 156 text='test text 2',
157 157 tags=[tag])
158 158
159 159 request_thread = MockRequest()
160 160 request_thread.body = (
161 161 '<request type="list" version="1.0">'
162 162 '<model name="post" version="1.0">'
163 163 '<thread>{}</thread>'
164 164 '</model>'
165 165 '</request>'.format(
166 166 post.id,
167 167 )
168 168 )
169 169
170 170 response_thread = response_list(request_thread).content.decode()
171 171 self.assertTrue(
172 172 '<status>success</status>'
173 173 '<models>'
174 174 '<model>'
175 175 '<id key="{}" local-id="{}" type="{}" />'
176 176 '<version>{}</version>'
177 177 '</model>'
178 178 '</models>'.format(
179 179 post.global_id.key,
180 180 post.global_id.local_id,
181 181 post.global_id.key_type,
182 182 post.version,
183 183 ) in response_thread,
184 184 'Wrong response generated for the LIST request for posts of '
185 185 'existing thread.')
186 186
187 187 def test_list_non_existing_thread(self):
188 188 key = KeyPair.objects.generate_key(primary=True)
189 tag = Tag.objects.create(name='tag1')
189 tag, created = Tag.objects.get_or_create_with_alias(name='tag1')
190 190 post = Post.objects.create_post(title='test_title',
191 191 text='test_text\rline two',
192 192 tags=[tag])
193 193 post2 = Post.objects.create_post(title='test title 2',
194 194 text='test text 2',
195 195 tags=[tag])
196 196
197 197 request_thread = MockRequest()
198 198 request_thread.body = (
199 199 '<request type="list" version="1.0">'
200 200 '<model name="post" version="1.0">'
201 201 '<thread>{}</thread>'
202 202 '</model>'
203 203 '</request>'.format(
204 204 0,
205 205 )
206 206 )
207 207
208 208 response_thread = response_list(request_thread).content.decode()
209 209 self.assertTrue(
210 210 '<status>success</status>'
211 211 '<models />'
212 212 in response_thread,
213 213 'Wrong response generated for the LIST request for posts of '
214 214 'non-existing thread.')
215 215
216 216 def test_list_pub_time(self):
217 217 key = KeyPair.objects.generate_key(primary=True)
218 tag = Tag.objects.create(name='tag1')
218 tag, created = Tag.objects.get_or_create_with_alias(name='tag1')
219 219 post = Post.objects.create_post(title='test_title',
220 220 text='test_text\rline two',
221 221 tags=[tag])
222 222 post2 = Post.objects.create_post(title='test title 2',
223 223 text='test text 2',
224 224 tags=[tag])
225 225
226 226 request_thread = MockRequest()
227 227 request_thread.body = (
228 228 '<request type="list" version="1.0">'
229 229 '<model name="post" version="1.0">'
230 230 '<timestamp_from>{}</timestamp_from>'
231 231 '</model>'
232 232 '</request>'.format(
233 233 post.pub_time,
234 234 )
235 235 )
236 236
237 237 response_thread = response_list(request_thread).content.decode()
238 238 self.assertTrue(
239 239 '<status>success</status>'
240 240 '<models>'
241 241 '<model>'
242 242 '<id key="{}" local-id="{}" type="{}" />'
243 243 '<version>{}</version>'
244 244 '</model>'
245 245 '</models>'.format(
246 246 post2.global_id.key,
247 247 post2.global_id.local_id,
248 248 post2.global_id.key_type,
249 249 post2.version,
250 250 ) in response_thread,
251 251 'Wrong response generated for the LIST request for posts of '
252 252 'existing thread.')
253 253
General Comments 0
You need to be logged in to leave comments. Login now