##// END OF EJS Templates
Sync-import of a single post is working
neko259 -
r1229:3cb2475c decentral
parent child Browse files
Show More
@@ -1,506 +1,527 b''
1 1 from datetime import datetime, timedelta, date
2 2 from datetime import time as dtime
3 3 import logging
4 4 import re
5 5 import uuid
6 6
7 7 from django.core.exceptions import ObjectDoesNotExist
8 8 from django.core.urlresolvers import reverse
9 9 from django.db import models, transaction
10 10 from django.db.models import TextField, QuerySet
11 11 from django.template.loader import render_to_string
12 12 from django.utils import timezone
13 13
14 14 from boards.mdx_neboard import Parser
15 15 from boards.models import KeyPair, GlobalId
16 16 from boards import settings
17 17 from boards.models import PostImage
18 18 from boards.models.base import Viewable
19 19 from boards import utils
20 20 from boards.models.post.export import get_exporter, DIFF_TYPE_JSON
21 21 from boards.models.user import Notification, Ban
22 22 import boards.models.thread
23 23
24 24 WS_NOTIFICATION_TYPE_NEW_POST = 'new_post'
25 25 WS_NOTIFICATION_TYPE = 'notification_type'
26 26
27 27 WS_CHANNEL_THREAD = "thread:"
28 28
29 29 APP_LABEL_BOARDS = 'boards'
30 30
31 31 POSTS_PER_DAY_RANGE = 7
32 32
33 33 BAN_REASON_AUTO = 'Auto'
34 34
35 35 IMAGE_THUMB_SIZE = (200, 150)
36 36
37 37 TITLE_MAX_LENGTH = 200
38 38
39 # TODO This should be removed
40 39 NO_IP = '0.0.0.0'
41 40
42 41 REGEX_REPLY = re.compile(r'\[post\](\d+)\[/post\]')
43 42 REGEX_GLOBAL_REPLY = re.compile(r'\[post\](\w+)::([^:]+)::(\d+)\[/post\]')
44 43 REGEX_URL = re.compile(r'https?\://[a-zA-Z0-9\-\.]+\.[a-zA-Z]{2,3}(/\S*)?')
45 44 REGEX_NOTIFICATION = re.compile(r'\[user\](\w+)\[/user\]')
46 45
47 46 PARAMETER_TRUNCATED = 'truncated'
48 47 PARAMETER_TAG = 'tag'
49 48 PARAMETER_OFFSET = 'offset'
50 49 PARAMETER_DIFF_TYPE = 'type'
51 50 PARAMETER_CSS_CLASS = 'css_class'
52 51 PARAMETER_THREAD = 'thread'
53 52 PARAMETER_IS_OPENING = 'is_opening'
54 53 PARAMETER_MODERATOR = 'moderator'
55 54 PARAMETER_POST = 'post'
56 55 PARAMETER_OP_ID = 'opening_post_id'
57 56 PARAMETER_NEED_OPEN_LINK = 'need_open_link'
58 57 PARAMETER_REPLY_LINK = 'reply_link'
59 58 PARAMETER_NEED_OP_DATA = 'need_op_data'
60 59
61 60 POST_VIEW_PARAMS = (
62 61 'need_op_data',
63 62 'reply_link',
64 63 'moderator',
65 64 'need_open_link',
66 65 'truncated',
67 66 'mode_tree',
68 67 )
69 68
70 69 REFMAP_STR = '<a href="{}">&gt;&gt;{}</a>'
71 70
72 71
73 72 class PostManager(models.Manager):
74 73 @transaction.atomic
75 74 def create_post(self, title: str, text: str, image=None, thread=None,
76 75 ip=NO_IP, tags: list=None, opening_posts: list=None):
77 76 """
78 77 Creates new post
79 78 """
80 79
81 80 is_banned = Ban.objects.filter(ip=ip).exists()
82 81
83 82 # TODO Raise specific exception and catch it in the views
84 83 if is_banned:
85 84 raise Exception("This user is banned")
86 85
87 86 if not tags:
88 87 tags = []
89 88 if not opening_posts:
90 89 opening_posts = []
91 90
92 91 posting_time = timezone.now()
93 92 if not thread:
94 93 thread = boards.models.thread.Thread.objects.create(
95 94 bump_time=posting_time, last_edit_time=posting_time)
96 95 list(map(thread.tags.add, tags))
97 96 new_thread = True
98 97 else:
99 98 new_thread = False
100 99
101 100 pre_text = Parser().preparse(text)
102 101
103 102 post = self.create(title=title,
104 103 text=pre_text,
105 104 pub_time=posting_time,
106 105 poster_ip=ip,
107 106 thread=thread,
108 107 last_edit_time=posting_time)
109 108 post.threads.add(thread)
110 109
111 110 post.set_global_id()
112 111
113 112 logger = logging.getLogger('boards.post.create')
114 113
115 114 logger.info('Created post {} by {}'.format(post, post.poster_ip))
116 115
117 116 if image:
118 117 post.images.add(PostImage.objects.create_with_hash(image))
119 118
120 119 if new_thread:
121 120 boards.models.thread.Thread.objects.process_oldest_threads()
122 121 else:
123 122 thread.last_edit_time = posting_time
124 123 thread.bump()
125 124 thread.save()
126 125
127 126 post.build_url()
128 127 post.connect_replies()
129 128 post.connect_threads(opening_posts)
130 129 post.connect_notifications()
131 130
131 return post
132
133 @transaction.atomic
134 def import_post(self, title: str, text:str, pub_time: str,
135 opening_post=None):
136 if opening_post is None:
137 thread = boards.models.thread.Thread.objects.create(
138 bump_time=pub_time, last_edit_time=pub_time)
139 # list(map(thread.tags.add, tags))
140 new_thread = True
141 else:
142 thread = opening_post.get_thread()
143 new_thread = False
144
145 post = Post.objects.create(title=title, text=text,
146 pub_time=pub_time,
147 poster_ip=NO_IP,
148 last_edit_time=pub_time,
149 thread_id=thread.id)
150
132 151 post.build_url()
152 post.connect_replies()
153 post.connect_notifications()
133 154
134 155 return post
135 156
136 157 def delete_posts_by_ip(self, ip):
137 158 """
138 159 Deletes all posts of the author with same IP
139 160 """
140 161
141 162 posts = self.filter(poster_ip=ip)
142 163 for post in posts:
143 164 post.delete()
144 165
145 166 @utils.cached_result()
146 167 def get_posts_per_day(self) -> float:
147 168 """
148 169 Gets average count of posts per day for the last 7 days
149 170 """
150 171
151 172 day_end = date.today()
152 173 day_start = day_end - timedelta(POSTS_PER_DAY_RANGE)
153 174
154 175 day_time_start = timezone.make_aware(datetime.combine(
155 176 day_start, dtime()), timezone.get_current_timezone())
156 177 day_time_end = timezone.make_aware(datetime.combine(
157 178 day_end, dtime()), timezone.get_current_timezone())
158 179
159 180 posts_per_period = float(self.filter(
160 181 pub_time__lte=day_time_end,
161 182 pub_time__gte=day_time_start).count())
162 183
163 184 ppd = posts_per_period / POSTS_PER_DAY_RANGE
164 185
165 186 return ppd
166 187
167 188
168 189 class Post(models.Model, Viewable):
169 190 """A post is a message."""
170 191
171 192 objects = PostManager()
172 193
173 194 class Meta:
174 195 app_label = APP_LABEL_BOARDS
175 196 ordering = ('id',)
176 197
177 198 title = models.CharField(max_length=TITLE_MAX_LENGTH, null=True, blank=True)
178 199 pub_time = models.DateTimeField()
179 200 text = TextField(blank=True, null=True)
180 201 _text_rendered = TextField(blank=True, null=True, editable=False)
181 202
182 203 images = models.ManyToManyField(PostImage, null=True, blank=True,
183 204 related_name='ip+', db_index=True)
184 205
185 206 poster_ip = models.GenericIPAddressField()
186 207
187 208 # TODO This field can be removed cause UID is used for update now
188 209 last_edit_time = models.DateTimeField()
189 210
190 211 referenced_posts = models.ManyToManyField('Post', symmetrical=False,
191 212 null=True,
192 213 blank=True, related_name='refposts',
193 214 db_index=True)
194 215 refmap = models.TextField(null=True, blank=True)
195 216 threads = models.ManyToManyField('Thread', db_index=True)
196 217 thread = models.ForeignKey('Thread', db_index=True, related_name='pt+')
197 218
198 219 url = models.TextField()
199 220 uid = models.TextField(db_index=True)
200 221
201 222 # Global ID with author key. If the message was downloaded from another
202 223 # server, this indicates the server.
203 224 global_id = models.OneToOneField('GlobalId', null=True, blank=True)
204 225
205 226 # One post can be signed by many nodes that give their trust to it
206 227 signature = models.ManyToManyField('Signature', null=True, blank=True)
207 228
208 229 def __str__(self):
209 230 return 'P#{}/{}'.format(self.id, self.title)
210 231
211 232 def get_referenced_posts(self):
212 233 threads = self.get_threads().all()
213 234 return self.referenced_posts.filter(threads__in=threads)\
214 235 .order_by('pub_time').distinct().all()
215 236
216 237 def get_title(self) -> str:
217 238 """
218 239 Gets original post title or part of its text.
219 240 """
220 241
221 242 title = self.title
222 243 if not title:
223 244 title = self.get_text()
224 245
225 246 return title
226 247
227 248 def build_refmap(self) -> None:
228 249 """
229 250 Builds a replies map string from replies list. This is a cache to stop
230 251 the server from recalculating the map on every post show.
231 252 """
232 253
233 254 post_urls = [REFMAP_STR.format(refpost.get_absolute_url(), refpost.id)
234 255 for refpost in self.referenced_posts.all()]
235 256
236 257 self.refmap = ', '.join(post_urls)
237 258
238 259 def is_referenced(self) -> bool:
239 260 return self.refmap and len(self.refmap) > 0
240 261
241 262 def is_opening(self) -> bool:
242 263 """
243 264 Checks if this is an opening post or just a reply.
244 265 """
245 266
246 267 return self.get_thread().get_opening_post_id() == self.id
247 268
248 269 def get_absolute_url(self):
249 270 if self.url:
250 271 return self.url
251 272 else:
252 273 opening_id = self.get_thread().get_opening_post_id()
253 274 post_url = reverse('thread', kwargs={'post_id': opening_id})
254 275 if self.id != opening_id:
255 276 post_url += '#' + str(self.id)
256 277 return post_url
257 278
258 279 def get_thread(self):
259 280 return self.thread
260 281
261 282 def get_threads(self) -> QuerySet:
262 283 """
263 284 Gets post's thread.
264 285 """
265 286
266 287 return self.threads
267 288
268 289 def get_view(self, *args, **kwargs) -> str:
269 290 """
270 291 Renders post's HTML view. Some of the post params can be passed over
271 292 kwargs for the means of caching (if we view the thread, some params
272 293 are same for every post and don't need to be computed over and over.
273 294 """
274 295
275 296 thread = self.get_thread()
276 297 is_opening = kwargs.get(PARAMETER_IS_OPENING, self.is_opening())
277 298
278 299 if is_opening:
279 300 opening_post_id = self.id
280 301 else:
281 302 opening_post_id = thread.get_opening_post_id()
282 303
283 304 css_class = 'post'
284 305 if thread.archived:
285 306 css_class += ' archive_post'
286 307 elif not thread.can_bump():
287 308 css_class += ' dead_post'
288 309
289 310 params = dict()
290 311 for param in POST_VIEW_PARAMS:
291 312 if param in kwargs:
292 313 params[param] = kwargs[param]
293 314
294 315 params.update({
295 316 PARAMETER_POST: self,
296 317 PARAMETER_IS_OPENING: is_opening,
297 318 PARAMETER_THREAD: thread,
298 319 PARAMETER_CSS_CLASS: css_class,
299 320 PARAMETER_OP_ID: opening_post_id,
300 321 })
301 322
302 323 return render_to_string('boards/post.html', params)
303 324
304 325 def get_search_view(self, *args, **kwargs):
305 326 return self.get_view(need_op_data=True, *args, **kwargs)
306 327
307 328 def get_first_image(self) -> PostImage:
308 329 return self.images.earliest('id')
309 330
310 331 def delete(self, using=None):
311 332 """
312 333 Deletes all post images and the post itself.
313 334 """
314 335
315 336 for image in self.images.all():
316 337 image_refs_count = Post.objects.filter(images__in=[image]).count()
317 338 if image_refs_count == 1:
318 339 image.delete()
319 340
320 341 self.signature.all().delete()
321 342 if self.global_id:
322 343 self.global_id.delete()
323 344
324 345 thread = self.get_thread()
325 346 thread.last_edit_time = timezone.now()
326 347 thread.save()
327 348
328 349 super(Post, self).delete(using)
329 350
330 351 logging.getLogger('boards.post.delete').info(
331 352 'Deleted post {}'.format(self))
332 353
333 354 def set_global_id(self, key_pair=None):
334 355 """
335 356 Sets global id based on the given key pair. If no key pair is given,
336 357 default one is used.
337 358 """
338 359
339 360 if key_pair:
340 361 key = key_pair
341 362 else:
342 363 try:
343 364 key = KeyPair.objects.get(primary=True)
344 365 except KeyPair.DoesNotExist:
345 366 # Do not update the global id because there is no key defined
346 367 return
347 368 global_id = GlobalId(key_type=key.key_type,
348 369 key=key.public_key,
349 local_id = self.id)
370 local_id=self.id)
350 371 global_id.save()
351 372
352 373 self.global_id = global_id
353 374
354 375 self.save(update_fields=['global_id'])
355 376
356 def get_pub_time_epoch(self):
357 return utils.datetime_to_epoch(self.pub_time)
377 def get_pub_time_str(self):
378 return str(self.pub_time)
358 379
359 380 def get_replied_ids(self):
360 381 """
361 382 Gets ID list of the posts that this post replies.
362 383 """
363 384
364 385 raw_text = self.get_raw_text()
365 386
366 387 local_replied = REGEX_REPLY.findall(raw_text)
367 388 global_replied = []
368 389 for match in REGEX_GLOBAL_REPLY.findall(raw_text):
369 390 key_type = match[0]
370 391 key = match[1]
371 392 local_id = match[2]
372 393
373 394 try:
374 395 global_id = GlobalId.objects.get(key_type=key_type,
375 396 key=key, local_id=local_id)
376 397 for post in Post.objects.filter(global_id=global_id).only('id'):
377 398 global_replied.append(post.id)
378 399 except GlobalId.DoesNotExist:
379 400 pass
380 401 return local_replied + global_replied
381 402
382 403 def get_post_data(self, format_type=DIFF_TYPE_JSON, request=None,
383 404 include_last_update=False) -> str:
384 405 """
385 406 Gets post HTML or JSON data that can be rendered on a page or used by
386 407 API.
387 408 """
388 409
389 410 return get_exporter(format_type).export(self, request,
390 411 include_last_update)
391 412
392 413 def notify_clients(self, recursive=True):
393 414 """
394 415 Sends post HTML data to the thread web socket.
395 416 """
396 417
397 418 if not settings.get_bool('External', 'WebsocketsEnabled'):
398 419 return
399 420
400 421 thread_ids = list()
401 422 for thread in self.get_threads().all():
402 423 thread_ids.append(thread.id)
403 424
404 425 thread.notify_clients()
405 426
406 427 if recursive:
407 428 for reply_number in re.finditer(REGEX_REPLY, self.get_raw_text()):
408 429 post_id = reply_number.group(1)
409 430
410 431 try:
411 432 ref_post = Post.objects.get(id=post_id)
412 433
413 434 if ref_post.get_threads().exclude(id__in=thread_ids).exists():
414 435 # If post is in this thread, its thread was already notified.
415 436 # Otherwise, notify its thread separately.
416 437 ref_post.notify_clients(recursive=False)
417 438 except ObjectDoesNotExist:
418 439 pass
419 440
420 441 def build_url(self):
421 442 self.url = self.get_absolute_url()
422 443 self.save(update_fields=['url'])
423 444
424 445 def save(self, force_insert=False, force_update=False, using=None,
425 446 update_fields=None):
426 447 self._text_rendered = Parser().parse(self.get_raw_text())
427 448
428 449 self.uid = str(uuid.uuid4())
429 450 if update_fields is not None and 'uid' not in update_fields:
430 451 update_fields += ['uid']
431 452
432 453 if self.id:
433 454 for thread in self.get_threads().all():
434 455 thread.last_edit_time = self.last_edit_time
435 456
436 457 thread.save(update_fields=['last_edit_time'])
437 458
438 459 super().save(force_insert, force_update, using, update_fields)
439 460
440 461 def get_text(self) -> str:
441 462 return self._text_rendered
442 463
443 464 def get_raw_text(self) -> str:
444 465 return self.text
445 466
446 467 def get_sync_text(self) -> str:
447 468 """
448 469 Returns text applicable for sync. It has absolute post reflinks.
449 470 """
450 471
451 472 replacements = dict()
452 473 for post_id in REGEX_REPLY.findall(self.get_raw_text()):
453 474 absolute_post_id = str(Post.objects.get(id=post_id).global_id)
454 475 replacements[post_id] = absolute_post_id
455 476
456 477 text = self.get_raw_text()
457 478 for key in replacements:
458 479 text = text.replace('[post]{}[/post]'.format(key),
459 480 '[post]{}[/post]'.format(replacements[key]))
460 481
461 482 return text
462 483
463 484 def get_absolute_id(self) -> str:
464 485 """
465 486 If the post has many threads, shows its main thread OP id in the post
466 487 ID.
467 488 """
468 489
469 490 if self.get_threads().count() > 1:
470 491 return '{}/{}'.format(self.get_thread().get_opening_post_id(), self.id)
471 492 else:
472 493 return str(self.id)
473 494
474 495 def connect_notifications(self):
475 496 for reply_number in re.finditer(REGEX_NOTIFICATION, self.get_raw_text()):
476 497 user_name = reply_number.group(1).lower()
477 498 Notification.objects.get_or_create(name=user_name, post=self)
478 499
479 500 def connect_replies(self):
480 501 """
481 502 Connects replies to a post to show them as a reflink map
482 503 """
483 504
484 505 for reply_number in re.finditer(REGEX_REPLY, self.get_raw_text()):
485 506 post_id = reply_number.group(1)
486 507
487 508 try:
488 509 referenced_post = Post.objects.get(id=post_id)
489 510
490 511 referenced_post.referenced_posts.add(self)
491 512 referenced_post.last_edit_time = self.pub_time
492 513 referenced_post.build_refmap()
493 514 referenced_post.save(update_fields=['refmap', 'last_edit_time'])
494 515 except ObjectDoesNotExist:
495 516 pass
496 517
497 518 def connect_threads(self, opening_posts):
498 519 for opening_post in opening_posts:
499 520 threads = opening_post.get_threads().all()
500 521 for thread in threads:
501 522 if thread.can_bump():
502 523 thread.update_bump_status()
503 524
504 525 thread.last_edit_time = self.last_edit_time
505 526 thread.save(update_fields=['last_edit_time', 'bumpable'])
506 527 self.threads.add(opening_post.get_thread())
@@ -1,116 +1,125 b''
1 1 import xml.etree.ElementTree as et
2 from django.db import transaction
2 3 from boards.models import KeyPair, GlobalId, Signature, Post
3 4
4 5 ENCODING_UNICODE = 'unicode'
5 6
6 7 TAG_MODEL = 'model'
7 8 TAG_REQUEST = 'request'
8 9 TAG_RESPONSE = 'response'
9 10 TAG_ID = 'id'
10 11 TAG_STATUS = 'status'
11 12 TAG_MODELS = 'models'
12 13 TAG_TITLE = 'title'
13 14 TAG_TEXT = 'text'
14 15 TAG_THREAD = 'thread'
15 16 TAG_PUB_TIME = 'pub-time'
16 17 TAG_SIGNATURES = 'signatures'
17 18 TAG_SIGNATURE = 'signature'
18 19 TAG_CONTENT = 'content'
19 20 TAG_ATTACHMENTS = 'attachments'
20 21 TAG_ATTACHMENT = 'attachment'
21 22
22 23 TYPE_GET = 'get'
23 24
24 25 ATTR_VERSION = 'version'
25 26 ATTR_TYPE = 'type'
26 27 ATTR_NAME = 'name'
27 28 ATTR_VALUE = 'value'
28 29 ATTR_MIMETYPE = 'mimetype'
29 30
30 31 STATUS_SUCCESS = 'success'
31 32
32 33
33 34 # TODO Make this fully static
34 35 class SyncManager:
35 36 def generate_response_get(self, model_list: list):
36 37 response = et.Element(TAG_RESPONSE)
37 38
38 39 status = et.SubElement(response, TAG_STATUS)
39 40 status.text = STATUS_SUCCESS
40 41
41 42 models = et.SubElement(response, TAG_MODELS)
42 43
43 44 for post in model_list:
44 45 model = et.SubElement(models, TAG_MODEL)
45 46 model.set(ATTR_NAME, 'post')
46 47
47 48 content_tag = et.SubElement(model, TAG_CONTENT)
48 49
49 50 tag_id = et.SubElement(content_tag, TAG_ID)
50 51 post.global_id.to_xml_element(tag_id)
51 52
52 53 title = et.SubElement(content_tag, TAG_TITLE)
53 54 title.text = post.title
54 55
55 56 text = et.SubElement(content_tag, TAG_TEXT)
56 57 text.text = post.get_sync_text()
57 58
58 59 if not post.is_opening():
59 60 thread = et.SubElement(content_tag, TAG_THREAD)
60 61 thread_id = et.SubElement(thread, TAG_ID)
61 62 post.get_thread().get_opening_post().global_id.to_xml_element(thread_id)
62 63 else:
63 64 # TODO Output tags here
64 65 pass
65 66
66 67 pub_time = et.SubElement(content_tag, TAG_PUB_TIME)
67 pub_time.text = str(post.get_pub_time_epoch())
68 pub_time.text = str(post.get_pub_time_str())
68 69
69 70 signatures_tag = et.SubElement(model, TAG_SIGNATURES)
70 71 post_signatures = post.signature.all()
71 72 if post_signatures:
72 73 signatures = post.signatures
73 74 else:
74 75 # TODO Maybe the signature can be computed only once after
75 76 # the post is added? Need to add some on_save signal queue
76 77 # and add this there.
77 78 key = KeyPair.objects.get(public_key=post.global_id.key)
78 79 signatures = [Signature(
79 80 key_type=key.key_type,
80 81 key=key.public_key,
81 82 signature=key.sign(et.tostring(model, ENCODING_UNICODE)),
82 83 )]
83 84 for signature in signatures:
84 85 signature_tag = et.SubElement(signatures_tag, TAG_SIGNATURE)
85 86 signature_tag.set(ATTR_TYPE, signature.key_type)
86 87 signature_tag.set(ATTR_VALUE, signature.signature)
87 88
88 89 return et.tostring(response, ENCODING_UNICODE)
89 90
91 @transaction.atomic
90 92 def parse_response_get(self, response_xml):
91 93 tag_root = et.fromstring(response_xml)
92 94 tag_status = tag_root.find(TAG_STATUS)
93 95 if STATUS_SUCCESS == tag_status.text:
94 96 tag_models = tag_root.find(TAG_MODELS)
95 97 for tag_model in tag_models:
96 98 tag_content = tag_model.find(TAG_CONTENT)
97 99 tag_id = tag_content.find(TAG_ID)
98 100 try:
99 101 GlobalId.from_xml_element(tag_id, existing=True)
100 102 print('Post with same ID already exists')
101 103 except GlobalId.DoesNotExist:
102 104 global_id = GlobalId.from_xml_element(tag_id)
105 global_id.save()
103 106
104 107 title = tag_content.find(TAG_TITLE).text
105 108 text = tag_content.find(TAG_TEXT).text
109 pub_time = tag_content.find(TAG_PUB_TIME).text
106 110 # TODO Check that the replied posts are already present
107 111 # before adding new ones
108 112
109 113 # TODO Pub time, thread, tags
110 114
115 # FIXME This prints are for testing purposes only, they must
116 # be removed after sync is implemented
111 117 print(title)
112 118 print(text)
113 # post = Post.objects.create(title=title, text=text)
119
120 post = Post.objects.import_post(title=title, text=text,
121 pub_time=pub_time)
122 post.global_id = global_id
114 123 else:
115 124 # TODO Throw an exception?
116 125 pass
@@ -1,87 +1,87 b''
1 1 from base64 import b64encode
2 2 import logging
3 3
4 4 from django.test import TestCase
5 5 from boards.models import KeyPair, GlobalId, Post
6 6 from boards.models.post.sync import SyncManager
7 7
8 8 logger = logging.getLogger(__name__)
9 9
10 10
11 11 class KeyTest(TestCase):
12 12 def test_create_key(self):
13 13 key = KeyPair.objects.generate_key('ecdsa')
14 14
15 15 self.assertIsNotNone(key, 'The key was not created.')
16 16
17 17 def test_validation(self):
18 18 key = KeyPair.objects.generate_key(key_type='ecdsa')
19 19 message = 'msg'
20 20 signature = key.sign(message)
21 21 valid = KeyPair.objects.verify(key.public_key, message, signature,
22 22 key_type='ecdsa')
23 23
24 24 self.assertTrue(valid, 'Message verification failed.')
25 25
26 26 def test_primary_constraint(self):
27 27 KeyPair.objects.generate_key(key_type='ecdsa', primary=True)
28 28
29 29 with self.assertRaises(Exception):
30 30 KeyPair.objects.generate_key(key_type='ecdsa', primary=True)
31 31
32 32 def test_model_id_save(self):
33 33 model_id = GlobalId(key_type='test', key='test key', local_id='1')
34 34 model_id.save()
35 35
36 36 def test_request_get(self):
37 37 post = self._create_post_with_key()
38 38
39 39 request = GlobalId.objects.generate_request_get([post.global_id])
40 40 logger.debug(request)
41 41
42 42 key = KeyPair.objects.get(primary=True)
43 43 self.assertTrue('<request type="get" version="1.0">'
44 44 '<model name="post" version="1.0">'
45 45 '<id key="%s" local-id="1" type="%s" />'
46 46 '</model>'
47 47 '</request>' % (
48 48 key.public_key,
49 49 key.key_type,
50 50 ) in request,
51 51 'Wrong XML generated for the GET request.')
52 52
53 53 def test_response_get(self):
54 54 post = self._create_post_with_key()
55 55 reply_post = Post.objects.create_post(title='test_title',
56 56 text='[post]%d[/post]' % post.id,
57 57 thread=post.get_thread())
58 58
59 59 response = SyncManager().generate_response_get([reply_post])
60 60 logger.debug(response)
61 61
62 62 key = KeyPair.objects.get(primary=True)
63 63 self.assertTrue('<status>success</status>'
64 64 '<models>'
65 65 '<model name="post">'
66 66 '<content>'
67 67 '<id key="%s" local-id="%d" type="%s" />'
68 68 '<title>test_title</title>'
69 69 '<text>[post]%s[/post]</text>'
70 70 '<thread><id key="%s" local-id="%d" type="%s" /></thread>'
71 71 '<pub-time>%s</pub-time>'
72 72 '</content>' % (
73 73 key.public_key,
74 74 reply_post.id,
75 75 key.key_type,
76 76 str(post.global_id),
77 77 key.public_key,
78 78 post.id,
79 79 key.key_type,
80 str(reply_post.get_pub_time_epoch()),
80 str(reply_post.get_pub_time_str()),
81 81 ) in response,
82 82 'Wrong XML generated for the GET response.')
83 83
84 84 def _create_post_with_key(self):
85 85 KeyPair.objects.generate_key(primary=True)
86 86
87 87 return Post.objects.create_post(title='test_title', text='test_text')
@@ -1,48 +1,56 b''
1 1 from boards.models import KeyPair, Post
2 from boards.models.post.sync import SyncManager
2 3 from boards.tests.mocks import MockRequest
3 4 from boards.views.sync import response_get
4 5
5 6 __author__ = 'neko259'
6 7
7 8
8 9 from django.test import TestCase
9 10
10 11
11 12 class SyncTest(TestCase):
12 13 def test_get(self):
13 14 """
14 15 Forms a GET request of a post and checks the response.
15 16 """
16 17
17 18 KeyPair.objects.generate_key(primary=True)
18 19 post = Post.objects.create_post(title='test_title', text='test_text')
19 20
20 21 request = MockRequest()
21 22 request.body = (
22 23 '<request type="get" version="1.0">'
23 24 '<model name="post" version="1.0">'
24 25 '<id key="%s" local-id="%d" type="%s" />'
25 26 '</model>'
26 27 '</request>' % (post.global_id.key,
27 28 post.id,
28 29 post.global_id.key_type)
29 30 )
30 31
32 response = response_get(request).content.decode()
31 33 self.assertTrue(
32 34 '<status>success</status>'
33 35 '<models>'
34 36 '<model name="post">'
35 37 '<content>'
36 38 '<id key="%s" local-id="%d" type="%s" />'
37 39 '<title>%s</title>'
38 40 '<text>%s</text>'
39 '<pub-time>%d</pub-time>'
41 '<pub-time>%s</pub-time>'
40 42 '</content>' % (
41 43 post.global_id.key,
42 44 post.id,
43 45 post.global_id.key_type,
44 46 post.title,
45 47 post.get_raw_text(),
46 post.get_pub_time_epoch(),
48 post.get_pub_time_str(),
47 49 ) in response_get(request).content.decode(),
48 50 'Wrong response generated for the GET request.')
51
52 post.delete()
53
54 SyncManager().parse_response_get(response)
55 self.assertEqual(1, Post.objects.count(),
56 'Post was not created from XML response.')
General Comments 0
You need to be logged in to leave comments. Login now