##// END OF EJS Templates
Delete replies properly when deleting a thread. Delete thread directly, not by...
neko259 -
r950:e663371b default
parent child Browse files
Show More
@@ -1,451 +1,447 b''
1 1 from datetime import datetime, timedelta, date
2 2 from datetime import time as dtime
3 3 import logging
4 4 import re
5 5
6 6 from adjacent import Client
7 7 from django.core.cache import cache
8 8 from django.core.urlresolvers import reverse
9 9 from django.db import models, transaction
10 10 from django.db.models import TextField
11 11 from django.template.loader import render_to_string
12 12 from django.utils import timezone
13 13
14 14 from boards import settings
15 15 from boards.mdx_neboard import bbcode_extended
16 16 from boards.models import PostImage
17 17 from boards.models.base import Viewable
18 18 from boards.models.thread import Thread
19 19 from boards.utils import datetime_to_epoch
20 20
21 21
22 22 WS_NOTIFICATION_TYPE_NEW_POST = 'new_post'
23 23 WS_NOTIFICATION_TYPE = 'notification_type'
24 24
25 25 WS_CHANNEL_THREAD = "thread:"
26 26
27 27 APP_LABEL_BOARDS = 'boards'
28 28
29 29 CACHE_KEY_PPD = 'ppd'
30 30 CACHE_KEY_POST_URL = 'post_url'
31 31
32 32 POSTS_PER_DAY_RANGE = 7
33 33
34 34 BAN_REASON_AUTO = 'Auto'
35 35
36 36 IMAGE_THUMB_SIZE = (200, 150)
37 37
38 38 TITLE_MAX_LENGTH = 200
39 39
40 40 # TODO This should be removed
41 41 NO_IP = '0.0.0.0'
42 42
43 43 # TODO Real user agent should be saved instead of this
44 44 UNKNOWN_UA = ''
45 45
46 46 REGEX_REPLY = re.compile(r'\[post\](\d+)\[/post\]')
47 47
48 48 PARAMETER_TRUNCATED = 'truncated'
49 49 PARAMETER_TAG = 'tag'
50 50 PARAMETER_OFFSET = 'offset'
51 51 PARAMETER_DIFF_TYPE = 'type'
52 52 PARAMETER_BUMPABLE = 'bumpable'
53 53 PARAMETER_THREAD = 'thread'
54 54 PARAMETER_IS_OPENING = 'is_opening'
55 55 PARAMETER_MODERATOR = 'moderator'
56 56 PARAMETER_POST = 'post'
57 57 PARAMETER_OP_ID = 'opening_post_id'
58 58 PARAMETER_NEED_OPEN_LINK = 'need_open_link'
59 59
60 60 DIFF_TYPE_HTML = 'html'
61 61 DIFF_TYPE_JSON = 'json'
62 62
63 63 PREPARSE_PATTERNS = {
64 64 r'>>(\d+)': r'[post]\1[/post]', # Reflink ">>123"
65 65 r'^>([^>].+)': r'[quote]\1[/quote]', # Quote ">text"
66 66 r'^//(.+)': r'[comment]\1[/comment]', # Comment "//text"
67 67 }
68 68
69 69
70 70 class PostManager(models.Manager):
71 71 @transaction.atomic
72 72 def create_post(self, title: str, text: str, image=None, thread=None,
73 73 ip=NO_IP, tags: list=None):
74 74 """
75 75 Creates new post
76 76 """
77 77
78 78 if not tags:
79 79 tags = []
80 80
81 81 posting_time = timezone.now()
82 82 if not thread:
83 83 thread = Thread.objects.create(bump_time=posting_time,
84 84 last_edit_time=posting_time)
85 85 new_thread = True
86 86 else:
87 87 new_thread = False
88 88
89 89 pre_text = self._preparse_text(text)
90 90
91 91 post = self.create(title=title,
92 92 text=pre_text,
93 93 pub_time=posting_time,
94 94 thread_new=thread,
95 95 poster_ip=ip,
96 96 poster_user_agent=UNKNOWN_UA, # TODO Get UA at
97 97 # last!
98 98 last_edit_time=posting_time)
99 99
100 100 logger = logging.getLogger('boards.post.create')
101 101
102 102 logger.info('Created post {} by {}'.format(
103 103 post, post.poster_ip))
104 104
105 105 if image:
106 106 # Try to find existing image. If it exists, assign it to the post
107 107 # instead of createing the new one
108 108 image_hash = PostImage.get_hash(image)
109 109 existing = PostImage.objects.filter(hash=image_hash)
110 110 if len(existing) > 0:
111 111 post_image = existing[0]
112 112 else:
113 113 post_image = PostImage.objects.create(image=image)
114 114 logger.info('Created new image #{} for post #{}'.format(
115 115 post_image.id, post.id))
116 116 post.images.add(post_image)
117 117
118 118 thread.replies.add(post)
119 119 list(map(thread.add_tag, tags))
120 120
121 121 if new_thread:
122 122 Thread.objects.process_oldest_threads()
123 123 else:
124 124 thread.bump()
125 125 thread.last_edit_time = posting_time
126 126 thread.save()
127 127
128 128 self.connect_replies(post)
129 129
130 130 return post
131 131
132 132 def delete_posts_by_ip(self, ip):
133 133 """
134 134 Deletes all posts of the author with same IP
135 135 """
136 136
137 137 posts = self.filter(poster_ip=ip)
138 138 for post in posts:
139 139 post.delete()
140 140
141 141 def connect_replies(self, post):
142 142 """
143 143 Connects replies to a post to show them as a reflink map
144 144 """
145 145
146 146 for reply_number in re.finditer(REGEX_REPLY, post.get_raw_text()):
147 147 post_id = reply_number.group(1)
148 148 ref_post = self.filter(id=post_id)
149 149 if ref_post.count() > 0:
150 150 referenced_post = ref_post[0]
151 151 referenced_post.referenced_posts.add(post)
152 152 referenced_post.last_edit_time = post.pub_time
153 153 referenced_post.build_refmap()
154 154 referenced_post.save(update_fields=['refmap', 'last_edit_time'])
155 155
156 156 referenced_thread = referenced_post.get_thread()
157 157 referenced_thread.last_edit_time = post.pub_time
158 158 referenced_thread.save(update_fields=['last_edit_time'])
159 159
160 160 def get_posts_per_day(self):
161 161 """
162 162 Gets average count of posts per day for the last 7 days
163 163 """
164 164
165 165 day_end = date.today()
166 166 day_start = day_end - timedelta(POSTS_PER_DAY_RANGE)
167 167
168 168 cache_key = CACHE_KEY_PPD + str(day_end)
169 169 ppd = cache.get(cache_key)
170 170 if ppd:
171 171 return ppd
172 172
173 173 day_time_start = timezone.make_aware(datetime.combine(
174 174 day_start, dtime()), timezone.get_current_timezone())
175 175 day_time_end = timezone.make_aware(datetime.combine(
176 176 day_end, dtime()), timezone.get_current_timezone())
177 177
178 178 posts_per_period = float(self.filter(
179 179 pub_time__lte=day_time_end,
180 180 pub_time__gte=day_time_start).count())
181 181
182 182 ppd = posts_per_period / POSTS_PER_DAY_RANGE
183 183
184 184 cache.set(cache_key, ppd)
185 185 return ppd
186 186
187 187 def _preparse_text(self, text):
188 188 """
189 189 Preparses text to change patterns like '>>' to a proper bbcode
190 190 tags.
191 191 """
192 192
193 193 for key, value in PREPARSE_PATTERNS.items():
194 194 text = re.sub(key, value, text, flags=re.MULTILINE)
195 195
196 196 return text
197 197
198 198
199 199 class Post(models.Model, Viewable):
200 200 """A post is a message."""
201 201
202 202 objects = PostManager()
203 203
204 204 class Meta:
205 205 app_label = APP_LABEL_BOARDS
206 206 ordering = ('id',)
207 207
208 208 title = models.CharField(max_length=TITLE_MAX_LENGTH)
209 209 pub_time = models.DateTimeField()
210 210 text = TextField(blank=True, null=True)
211 211 _text_rendered = TextField(blank=True, null=True, editable=False)
212 212
213 213 images = models.ManyToManyField(PostImage, null=True, blank=True,
214 214 related_name='ip+', db_index=True)
215 215
216 216 poster_ip = models.GenericIPAddressField()
217 217 poster_user_agent = models.TextField()
218 218
219 219 thread_new = models.ForeignKey('Thread', null=True, default=None,
220 220 db_index=True)
221 221 last_edit_time = models.DateTimeField()
222 222
223 223 referenced_posts = models.ManyToManyField('Post', symmetrical=False,
224 224 null=True,
225 225 blank=True, related_name='rfp+',
226 226 db_index=True)
227 227 refmap = models.TextField(null=True, blank=True)
228 228
229 229 def __str__(self):
230 230 return 'P#{}/{}'.format(self.id, self.title)
231 231
232 232 def get_title(self) -> str:
233 233 """
234 234 Gets original post title or part of its text.
235 235 """
236 236
237 237 title = self.title
238 238 if not title:
239 239 title = self.get_text()
240 240
241 241 return title
242 242
243 243 def build_refmap(self) -> None:
244 244 """
245 245 Builds a replies map string from replies list. This is a cache to stop
246 246 the server from recalculating the map on every post show.
247 247 """
248 248 map_string = ''
249 249
250 250 first = True
251 251 for refpost in self.referenced_posts.all():
252 252 if not first:
253 253 map_string += ', '
254 254 map_string += '<a href="%s">&gt;&gt;%s</a>' % (refpost.get_url(),
255 255 refpost.id)
256 256 first = False
257 257
258 258 self.refmap = map_string
259 259
260 260 def get_sorted_referenced_posts(self):
261 261 return self.refmap
262 262
263 263 def is_referenced(self) -> bool:
264 264 if not self.refmap:
265 265 return False
266 266 else:
267 267 return len(self.refmap) > 0
268 268
269 269 def is_opening(self) -> bool:
270 270 """
271 271 Checks if this is an opening post or just a reply.
272 272 """
273 273
274 274 return self.get_thread().get_opening_post_id() == self.id
275 275
276 276 @transaction.atomic
277 277 def add_tag(self, tag):
278 278 edit_time = timezone.now()
279 279
280 280 thread = self.get_thread()
281 281 thread.add_tag(tag)
282 282 self.last_edit_time = edit_time
283 283 self.save(update_fields=['last_edit_time'])
284 284
285 285 thread.last_edit_time = edit_time
286 286 thread.save(update_fields=['last_edit_time'])
287 287
288 288 def get_url(self, thread=None):
289 289 """
290 290 Gets full url to the post.
291 291 """
292 292
293 293 cache_key = CACHE_KEY_POST_URL + str(self.id)
294 294 link = cache.get(cache_key)
295 295
296 296 if not link:
297 297 if not thread:
298 298 thread = self.get_thread()
299 299
300 300 opening_id = thread.get_opening_post_id()
301 301
302 302 if self.id != opening_id:
303 303 link = reverse('thread', kwargs={
304 304 'post_id': opening_id}) + '#' + str(self.id)
305 305 else:
306 306 link = reverse('thread', kwargs={'post_id': self.id})
307 307
308 308 cache.set(cache_key, link)
309 309
310 310 return link
311 311
312 312 def get_thread(self) -> Thread:
313 313 """
314 314 Gets post's thread.
315 315 """
316 316
317 317 return self.thread_new
318 318
319 319 def get_referenced_posts(self):
320 320 return self.referenced_posts.only('id', 'thread_new')
321 321
322 322 def get_view(self, moderator=False, need_open_link=False,
323 323 truncated=False, *args, **kwargs):
324 324 """
325 325 Renders post's HTML view. Some of the post params can be passed over
326 326 kwargs for the means of caching (if we view the thread, some params
327 327 are same for every post and don't need to be computed over and over.
328 328 """
329 329
330 330 is_opening = kwargs.get(PARAMETER_IS_OPENING, self.is_opening())
331 331 thread = kwargs.get(PARAMETER_THREAD, self.get_thread())
332 332 can_bump = kwargs.get(PARAMETER_BUMPABLE, thread.can_bump())
333 333
334 334 if is_opening:
335 335 opening_post_id = self.id
336 336 else:
337 337 opening_post_id = thread.get_opening_post_id()
338 338
339 339 return render_to_string('boards/post.html', {
340 340 PARAMETER_POST: self,
341 341 PARAMETER_MODERATOR: moderator,
342 342 PARAMETER_IS_OPENING: is_opening,
343 343 PARAMETER_THREAD: thread,
344 344 PARAMETER_BUMPABLE: can_bump,
345 345 PARAMETER_NEED_OPEN_LINK: need_open_link,
346 346 PARAMETER_TRUNCATED: truncated,
347 347 PARAMETER_OP_ID: opening_post_id,
348 348 })
349 349
350 350 def get_search_view(self, *args, **kwargs):
351 351 return self.get_view(args, kwargs)
352 352
353 353 def get_first_image(self) -> PostImage:
354 354 return self.images.earliest('id')
355 355
356 356 def delete(self, using=None):
357 357 """
358 Deletes all post images and the post itself. If the post is opening,
359 thread with all posts is deleted.
358 Deletes all post images and the post itself.
360 359 """
361 360
362 361 for image in self.images.all():
363 362 image_refs_count = Post.objects.filter(images__in=[image]).count()
364 363 if image_refs_count == 1:
365 364 image.delete()
366 365
367 if self.is_opening():
368 self.get_thread().delete()
369 else:
370 thread = self.get_thread()
371 thread.last_edit_time = timezone.now()
372 thread.save()
366 thread = self.get_thread()
367 thread.last_edit_time = timezone.now()
368 thread.save()
373 369
374 370 super(Post, self).delete(using)
375 371
376 372 logging.getLogger('boards.post.delete').info(
377 373 'Deleted post {}'.format(self))
378 374
379 375 def get_post_data(self, format_type=DIFF_TYPE_JSON, request=None,
380 376 include_last_update=False):
381 377 """
382 378 Gets post HTML or JSON data that can be rendered on a page or used by
383 379 API.
384 380 """
385 381
386 382 if format_type == DIFF_TYPE_HTML:
387 383 params = dict()
388 384 params['post'] = self
389 385 if PARAMETER_TRUNCATED in request.GET:
390 386 params[PARAMETER_TRUNCATED] = True
391 387
392 388 return render_to_string('boards/api_post.html', params)
393 389 elif format_type == DIFF_TYPE_JSON:
394 390 post_json = {
395 391 'id': self.id,
396 392 'title': self.title,
397 393 'text': self._text_rendered,
398 394 }
399 395 if self.images.exists():
400 396 post_image = self.get_first_image()
401 397 post_json['image'] = post_image.image.url
402 398 post_json['image_preview'] = post_image.image.url_200x150
403 399 if include_last_update:
404 400 post_json['bump_time'] = datetime_to_epoch(
405 401 self.thread_new.bump_time)
406 402 return post_json
407 403
408 404 def send_to_websocket(self, request, recursive=True):
409 405 """
410 406 Sends post HTML data to the thread web socket.
411 407 """
412 408
413 409 if not settings.WEBSOCKETS_ENABLED:
414 410 return
415 411
416 412 client = Client()
417 413
418 414 thread = self.get_thread()
419 415 thread_id = thread.id
420 416 channel_name = WS_CHANNEL_THREAD + str(thread.get_opening_post_id())
421 417 client.publish(channel_name, {
422 418 WS_NOTIFICATION_TYPE: WS_NOTIFICATION_TYPE_NEW_POST,
423 419 })
424 420 client.send()
425 421
426 422 logger = logging.getLogger('boards.post.websocket')
427 423
428 424 logger.info('Sent notification from post #{} to channel {}'.format(
429 425 self.id, channel_name))
430 426
431 427 if recursive:
432 428 for reply_number in re.finditer(REGEX_REPLY, self.get_raw_text()):
433 429 post_id = reply_number.group(1)
434 430 ref_post = Post.objects.filter(id=post_id)[0]
435 431
436 432 # If post is in this thread, its thread was already notified.
437 433 # Otherwise, notify its thread separately.
438 434 if ref_post.thread_new_id != thread_id:
439 435 ref_post.send_to_websocket(request, recursive=False)
440 436
441 437 def save(self, force_insert=False, force_update=False, using=None,
442 438 update_fields=None):
443 439 self._text_rendered = bbcode_extended(self.get_raw_text())
444 440
445 441 super().save(force_insert, force_update, using, update_fields)
446 442
447 443 def get_text(self) -> str:
448 444 return self._text_rendered
449 445
450 446 def get_raw_text(self) -> str:
451 447 return self.text
@@ -1,182 +1,190 b''
1 1 import logging
2 2 from django.db.models import Count, Sum
3 3 from django.utils import timezone
4 4 from django.core.cache import cache
5 5 from django.db import models
6 6 from boards import settings
7 7
8 8 __author__ = 'neko259'
9 9
10 10
11 11 logger = logging.getLogger(__name__)
12 12
13 13
14 14 CACHE_KEY_OPENING_POST = 'opening_post_id'
15 15
16 16
17 17 class ThreadManager(models.Manager):
18 18 def process_oldest_threads(self):
19 19 """
20 20 Preserves maximum thread count. If there are too many threads,
21 21 archive or delete the old ones.
22 22 """
23 23
24 24 threads = Thread.objects.filter(archived=False).order_by('-bump_time')
25 25 thread_count = threads.count()
26 26
27 27 if thread_count > settings.MAX_THREAD_COUNT:
28 28 num_threads_to_delete = thread_count - settings.MAX_THREAD_COUNT
29 29 old_threads = threads[thread_count - num_threads_to_delete:]
30 30
31 31 for thread in old_threads:
32 32 if settings.ARCHIVE_THREADS:
33 33 self._archive_thread(thread)
34 34 else:
35 35 thread.delete()
36 36
37 37 logger.info('Processed %d old threads' % num_threads_to_delete)
38 38
39 39 def _archive_thread(self, thread):
40 40 thread.archived = True
41 41 thread.bumpable = False
42 42 thread.last_edit_time = timezone.now()
43 43 thread.save(update_fields=['archived', 'last_edit_time', 'bumpable'])
44 44
45 45
46 46 class Thread(models.Model):
47 47 objects = ThreadManager()
48 48
49 49 class Meta:
50 50 app_label = 'boards'
51 51
52 52 tags = models.ManyToManyField('Tag')
53 53 bump_time = models.DateTimeField()
54 54 last_edit_time = models.DateTimeField()
55 55 replies = models.ManyToManyField('Post', symmetrical=False, null=True,
56 56 blank=True, related_name='tre+')
57 57 archived = models.BooleanField(default=False)
58 58 bumpable = models.BooleanField(default=True)
59 59
60 60 def get_tags(self):
61 61 """
62 62 Gets a sorted tag list.
63 63 """
64 64
65 65 return self.tags.order_by('name')
66 66
67 67 def bump(self):
68 68 """
69 69 Bumps (moves to up) thread if possible.
70 70 """
71 71
72 72 if self.can_bump():
73 73 self.bump_time = timezone.now()
74 74
75 75 if self.get_reply_count() >= settings.MAX_POSTS_PER_THREAD:
76 76 self.bumpable = False
77 77
78 78 logger.info('Bumped thread %d' % self.id)
79 79
80 80 def get_reply_count(self):
81 81 return self.replies.count()
82 82
83 83 def get_images_count(self):
84 84 return self.replies.annotate(images_count=Count(
85 85 'images')).aggregate(Sum('images_count'))['images_count__sum']
86 86
87 87 def can_bump(self):
88 88 """
89 89 Checks if the thread can be bumped by replying to it.
90 90 """
91 91
92 92 return self.bumpable
93 93
94 94 def get_last_replies(self):
95 95 """
96 96 Gets several last replies, not including opening post
97 97 """
98 98
99 99 if settings.LAST_REPLIES_COUNT > 0:
100 100 reply_count = self.get_reply_count()
101 101
102 102 if reply_count > 0:
103 103 reply_count_to_show = min(settings.LAST_REPLIES_COUNT,
104 104 reply_count - 1)
105 105 replies = self.get_replies()
106 106 last_replies = replies[reply_count - reply_count_to_show:]
107 107
108 108 return last_replies
109 109
110 110 def get_skipped_replies_count(self):
111 111 """
112 112 Gets number of posts between opening post and last replies.
113 113 """
114 114 reply_count = self.get_reply_count()
115 115 last_replies_count = min(settings.LAST_REPLIES_COUNT,
116 116 reply_count - 1)
117 117 return reply_count - last_replies_count - 1
118 118
119 119 def get_replies(self, view_fields_only=False):
120 120 """
121 121 Gets sorted thread posts
122 122 """
123 123
124 124 query = self.replies.order_by('pub_time').prefetch_related('images')
125 125 if view_fields_only:
126 126 query = query.defer('poster_user_agent')
127 127 return query.all()
128 128
129 129 def get_replies_with_images(self, view_fields_only=False):
130 """
131 Gets replies that have at least one image attached
132 """
133
130 134 return self.get_replies(view_fields_only).annotate(images_count=Count(
131 135 'images')).filter(images_count__gt=0)
132 136
133 137 def add_tag(self, tag):
134 138 """
135 139 Connects thread to a tag and tag to a thread
136 140 """
137 141
138 142 self.tags.add(tag)
139 143
140 144 def get_opening_post(self, only_id=False):
141 145 """
142 146 Gets the first post of the thread
143 147 """
144 148
145 149 query = self.replies.order_by('pub_time')
146 150 if only_id:
147 151 query = query.only('id')
148 152 opening_post = query.first()
149 153
150 154 return opening_post
151 155
152 156 def get_opening_post_id(self):
153 157 """
154 158 Gets ID of the first thread post.
155 159 """
156 160
157 161 cache_key = CACHE_KEY_OPENING_POST + str(self.id)
158 162 opening_post_id = cache.get(cache_key)
159 163 if not opening_post_id:
160 164 opening_post_id = self.get_opening_post(only_id=True).id
161 165 cache.set(cache_key, opening_post_id)
162 166
163 167 return opening_post_id
164 168
165 169 def __unicode__(self):
166 170 return str(self.id)
167 171
168 172 def get_pub_time(self):
169 173 """
170 174 Gets opening post's pub time because thread does not have its own one.
171 175 """
172 176
173 177 return self.get_opening_post().pub_time
174 178
175 179 def delete(self, using=None):
176 if self.replies.exists():
177 self.replies.all().delete()
180 """
181 Deletes thread with all replies.
182 """
183
184 for reply in self.replies.all():
185 reply.delete()
178 186
179 187 super(Thread, self).delete(using)
180 188
181 189 def __str__(self):
182 return 'T#{}/{}'.format(self.id, self.get_opening_post_id()) No newline at end of file
190 return 'T#{}/{}'.format(self.id, self.get_opening_post_id())
@@ -1,135 +1,135 b''
1 1 from django.core.paginator import Paginator
2 2 from django.test import TestCase
3 3 from boards import settings
4 4 from boards.models import Tag, Post, Thread
5 5
6 6
7 7 class PostTests(TestCase):
8 8
9 9 def _create_post(self):
10 10 tag = Tag.objects.create(name='test_tag')
11 11 return Post.objects.create_post(title='title', text='text',
12 12 tags=[tag])
13 13
14 14 def test_post_add(self):
15 15 """Test adding post"""
16 16
17 17 post = self._create_post()
18 18
19 19 self.assertIsNotNone(post, 'No post was created.')
20 20 self.assertEqual('test_tag', post.get_thread().tags.all()[0].name,
21 21 'No tags were added to the post.')
22 22
23 23 def test_delete_post(self):
24 24 """Test post deletion"""
25 25
26 26 post = self._create_post()
27 27 post_id = post.id
28 28
29 29 post.delete()
30 30
31 31 self.assertFalse(Post.objects.filter(id=post_id).exists())
32 32
33 33 def test_delete_thread(self):
34 34 """Test thread deletion"""
35 35
36 36 opening_post = self._create_post()
37 37 thread = opening_post.get_thread()
38 38 reply = Post.objects.create_post("", "", thread=thread)
39 39
40 opening_post.delete()
40 thread.delete()
41 41
42 42 self.assertFalse(Post.objects.filter(id=reply.id).exists(),
43 43 'Reply was not deleted with the thread.')
44 44 self.assertFalse(Post.objects.filter(id=opening_post.id).exists(),
45 45 'Opening post was not deleted with the thread.')
46 46
47 47 def test_post_to_thread(self):
48 48 """Test adding post to a thread"""
49 49
50 50 op = self._create_post()
51 51 post = Post.objects.create_post("", "", thread=op.get_thread())
52 52
53 53 self.assertIsNotNone(post, 'Reply to thread wasn\'t created')
54 54 self.assertEqual(op.get_thread().last_edit_time, post.pub_time,
55 55 'Post\'s create time doesn\'t match thread last edit'
56 56 ' time')
57 57
58 58 def test_delete_posts_by_ip(self):
59 59 """Test deleting posts with the given ip"""
60 60
61 61 post = self._create_post()
62 62 post_id = post.id
63 63
64 64 Post.objects.delete_posts_by_ip('0.0.0.0')
65 65
66 66 self.assertFalse(Post.objects.filter(id=post_id).exists())
67 67
68 68 def test_get_thread(self):
69 69 """Test getting all posts of a thread"""
70 70
71 71 opening_post = self._create_post()
72 72
73 73 for i in range(2):
74 74 Post.objects.create_post('title', 'text',
75 75 thread=opening_post.get_thread())
76 76
77 77 thread = opening_post.get_thread()
78 78
79 79 self.assertEqual(3, thread.replies.count())
80 80
81 81 def test_create_post_with_tag(self):
82 82 """Test adding tag to post"""
83 83
84 84 tag = Tag.objects.create(name='test_tag')
85 85 post = Post.objects.create_post(title='title', text='text', tags=[tag])
86 86
87 87 thread = post.get_thread()
88 88 self.assertIsNotNone(post, 'Post not created')
89 89 self.assertTrue(tag in thread.tags.all(), 'Tag not added to thread')
90 90
91 91 def test_thread_max_count(self):
92 92 """Test deletion of old posts when the max thread count is reached"""
93 93
94 94 for i in range(settings.MAX_THREAD_COUNT + 1):
95 95 self._create_post()
96 96
97 97 self.assertEqual(settings.MAX_THREAD_COUNT,
98 98 len(Thread.objects.filter(archived=False)))
99 99
100 100 def test_pages(self):
101 101 """Test that the thread list is properly split into pages"""
102 102
103 103 for i in range(settings.MAX_THREAD_COUNT):
104 104 self._create_post()
105 105
106 106 all_threads = Thread.objects.filter(archived=False)
107 107
108 108 paginator = Paginator(Thread.objects.filter(archived=False),
109 109 settings.THREADS_PER_PAGE)
110 110 posts_in_second_page = paginator.page(2).object_list
111 111 first_post = posts_in_second_page[0]
112 112
113 113 self.assertEqual(all_threads[settings.THREADS_PER_PAGE].id,
114 114 first_post.id)
115 115
116 116 def test_thread_replies(self):
117 117 """
118 118 Tests that the replies can be queried from a thread in all possible
119 119 ways.
120 120 """
121 121
122 122 tag = Tag.objects.create(name='test_tag')
123 123 opening_post = Post.objects.create_post(title='title', text='text',
124 124 tags=[tag])
125 125 thread = opening_post.get_thread()
126 126
127 127 reply1 = Post.objects.create_post(title='title', text='text', thread=thread)
128 128 reply2 = Post.objects.create_post(title='title', text='text', thread=thread)
129 129
130 130 replies = thread.get_replies()
131 131 self.assertTrue(len(replies) > 0, 'No replies found for thread.')
132 132
133 133 replies = thread.get_replies(view_fields_only=True)
134 134 self.assertTrue(len(replies) > 0,
135 135 'No replies found for thread with view fields only.')
General Comments 0
You need to be logged in to leave comments. Login now