##// END OF EJS Templates
Fixed thread gallery view
neko259 -
r889:ab1b39f3 default
parent child Browse files
Show More
@@ -1,191 +1,191 b''
1 1 import logging
2 2 from django.db.models import Count
3 3 from django.utils import timezone
4 4 from django.core.cache import cache
5 5 from django.db import models
6 6 from boards import settings
7 7
8 8 __author__ = 'neko259'
9 9
10 10
11 11 logger = logging.getLogger(__name__)
12 12
13 13
14 14 CACHE_KEY_OPENING_POST = 'opening_post_id'
15 15
16 16
17 17 class ThreadManager(models.Manager):
18 18 def process_oldest_threads(self):
19 19 """
20 20 Preserves maximum thread count. If there are too many threads,
21 21 archive or delete the old ones.
22 22 """
23 23
24 24 threads = Thread.objects.filter(archived=False).order_by('-bump_time')
25 25 thread_count = threads.count()
26 26
27 27 if thread_count > settings.MAX_THREAD_COUNT:
28 28 num_threads_to_delete = thread_count - settings.MAX_THREAD_COUNT
29 29 old_threads = threads[thread_count - num_threads_to_delete:]
30 30
31 31 for thread in old_threads:
32 32 if settings.ARCHIVE_THREADS:
33 33 self._archive_thread(thread)
34 34 else:
35 35 thread.delete()
36 36
37 37 logger.info('Processed %d old threads' % num_threads_to_delete)
38 38
39 39 def _archive_thread(self, thread):
40 40 thread.archived = True
41 41 thread.bumpable = False
42 42 thread.last_edit_time = timezone.now()
43 43 thread.save(update_fields=['archived', 'last_edit_time', 'bumpable'])
44 44
45 45
46 46 class Thread(models.Model):
47 47 objects = ThreadManager()
48 48
49 49 class Meta:
50 50 app_label = 'boards'
51 51
52 52 tags = models.ManyToManyField('Tag')
53 53 bump_time = models.DateTimeField()
54 54 last_edit_time = models.DateTimeField()
55 55 replies = models.ManyToManyField('Post', symmetrical=False, null=True,
56 56 blank=True, related_name='tre+')
57 57 archived = models.BooleanField(default=False)
58 58 bumpable = models.BooleanField(default=True)
59 59
60 60 def get_tags(self):
61 61 """
62 62 Gets a sorted tag list.
63 63 """
64 64
65 65 return self.tags.order_by('name')
66 66
67 67 def bump(self):
68 68 """
69 69 Bumps (moves to up) thread if possible.
70 70 """
71 71
72 72 if self.can_bump():
73 73 self.bump_time = timezone.now()
74 74
75 75 if self.get_reply_count() >= settings.MAX_POSTS_PER_THREAD:
76 76 self.bumpable = False
77 77
78 78 logger.info('Bumped thread %d' % self.id)
79 79
80 80 def get_reply_count(self):
81 81 return self.replies.count()
82 82
83 83 def get_images_count(self):
84 84 # TODO Use sum
85 85 total_count = 0
86 86 for post_with_image in self.replies.annotate(images_count=Count(
87 87 'images')):
88 88 total_count += post_with_image.images_count
89 89 return total_count
90 90
91 91 def can_bump(self):
92 92 """
93 93 Checks if the thread can be bumped by replying to it.
94 94 """
95 95
96 96 return self.bumpable
97 97
98 98 def get_last_replies(self):
99 99 """
100 100 Gets several last replies, not including opening post
101 101 """
102 102
103 103 if settings.LAST_REPLIES_COUNT > 0:
104 104 reply_count = self.get_reply_count()
105 105
106 106 if reply_count > 0:
107 107 reply_count_to_show = min(settings.LAST_REPLIES_COUNT,
108 108 reply_count - 1)
109 109 replies = self.get_replies()
110 110 last_replies = replies[reply_count - reply_count_to_show:]
111 111
112 112 return last_replies
113 113
114 114 def get_skipped_replies_count(self):
115 115 """
116 116 Gets number of posts between opening post and last replies.
117 117 """
118 118 reply_count = self.get_reply_count()
119 119 last_replies_count = min(settings.LAST_REPLIES_COUNT,
120 120 reply_count - 1)
121 121 return reply_count - last_replies_count - 1
122 122
123 123 def get_replies(self, view_fields_only=False):
124 124 """
125 125 Gets sorted thread posts
126 126 """
127 127
128 128 query = self.replies.order_by('pub_time').prefetch_related('images')
129 129 if view_fields_only:
130 query = query.defer('poster_user_agent', 'text_markup_type')
130 query = query.defer('poster_user_agent')
131 131 return query.all()
132 132
133 133 def get_replies_with_images(self, view_fields_only=False):
134 134 return self.get_replies(view_fields_only).annotate(images_count=Count(
135 135 'images')).filter(images_count__gt=0)
136 136
137 137 def add_tag(self, tag):
138 138 """
139 139 Connects thread to a tag and tag to a thread
140 140 """
141 141
142 142 self.tags.add(tag)
143 143 tag.threads.add(self)
144 144
145 145 def remove_tag(self, tag):
146 146 self.tags.remove(tag)
147 147 tag.threads.remove(self)
148 148
149 149 def get_opening_post(self, only_id=False):
150 150 """
151 151 Gets the first post of the thread
152 152 """
153 153
154 154 query = self.replies.order_by('pub_time')
155 155 if only_id:
156 156 query = query.only('id')
157 157 opening_post = query.first()
158 158
159 159 return opening_post
160 160
161 161 def get_opening_post_id(self):
162 162 """
163 163 Gets ID of the first thread post.
164 164 """
165 165
166 166 cache_key = CACHE_KEY_OPENING_POST + str(self.id)
167 167 opening_post_id = cache.get(cache_key)
168 168 if not opening_post_id:
169 169 opening_post_id = self.get_opening_post(only_id=True).id
170 170 cache.set(cache_key, opening_post_id)
171 171
172 172 return opening_post_id
173 173
174 174 def __unicode__(self):
175 175 return str(self.id)
176 176
177 177 def get_pub_time(self):
178 178 """
179 179 Gets opening post's pub time because thread does not have its own one.
180 180 """
181 181
182 182 return self.get_opening_post().pub_time
183 183
184 184 def delete(self, using=None):
185 185 if self.replies.exists():
186 186 self.replies.all().delete()
187 187
188 188 super(Thread, self).delete(using)
189 189
190 190 def __str__(self):
191 191 return 'T#{}/{}'.format(self.id, self.get_opening_post_id())
@@ -1,115 +1,136 b''
1 1 from django.core.paginator import Paginator
2 2 from django.test import TestCase
3 3 from boards import settings
4 4 from boards.models import Tag, Post, Thread
5 5
6 6
7 7 class PostTests(TestCase):
8 8
9 9 def _create_post(self):
10 10 tag = Tag.objects.create(name='test_tag')
11 11 return Post.objects.create_post(title='title', text='text',
12 12 tags=[tag])
13 13
14 14 def test_post_add(self):
15 15 """Test adding post"""
16 16
17 17 post = self._create_post()
18 18
19 19 self.assertIsNotNone(post, 'No post was created.')
20 20 self.assertEqual('test_tag', post.get_thread().tags.all()[0].name,
21 21 'No tags were added to the post.')
22 22
23 23 def test_delete_post(self):
24 24 """Test post deletion"""
25 25
26 26 post = self._create_post()
27 27 post_id = post.id
28 28
29 29 post.delete()
30 30
31 31 self.assertFalse(Post.objects.filter(id=post_id).exists())
32 32
33 33 def test_delete_thread(self):
34 34 """Test thread deletion"""
35 35
36 36 opening_post = self._create_post()
37 37 thread = opening_post.get_thread()
38 38 reply = Post.objects.create_post("", "", thread=thread)
39 39
40 40 opening_post.delete()
41 41
42 42 self.assertFalse(Post.objects.filter(id=reply.id).exists(),
43 43 'Reply was not deleted with the thread.')
44 44 self.assertFalse(Post.objects.filter(id=opening_post.id).exists(),
45 45 'Opening post was not deleted with the thread.')
46 46
47 47 def test_post_to_thread(self):
48 48 """Test adding post to a thread"""
49 49
50 50 op = self._create_post()
51 51 post = Post.objects.create_post("", "", thread=op.get_thread())
52 52
53 53 self.assertIsNotNone(post, 'Reply to thread wasn\'t created')
54 54 self.assertEqual(op.get_thread().last_edit_time, post.pub_time,
55 55 'Post\'s create time doesn\'t match thread last edit'
56 56 ' time')
57 57
58 58 def test_delete_posts_by_ip(self):
59 59 """Test deleting posts with the given ip"""
60 60
61 61 post = self._create_post()
62 62 post_id = post.id
63 63
64 64 Post.objects.delete_posts_by_ip('0.0.0.0')
65 65
66 66 self.assertFalse(Post.objects.filter(id=post_id).exists())
67 67
68 68 def test_get_thread(self):
69 69 """Test getting all posts of a thread"""
70 70
71 71 opening_post = self._create_post()
72 72
73 73 for i in range(2):
74 74 Post.objects.create_post('title', 'text',
75 75 thread=opening_post.get_thread())
76 76
77 77 thread = opening_post.get_thread()
78 78
79 79 self.assertEqual(3, thread.replies.count())
80 80
81 81 def test_create_post_with_tag(self):
82 82 """Test adding tag to post"""
83 83
84 84 tag = Tag.objects.create(name='test_tag')
85 85 post = Post.objects.create_post(title='title', text='text', tags=[tag])
86 86
87 87 thread = post.get_thread()
88 88 self.assertIsNotNone(post, 'Post not created')
89 89 self.assertTrue(tag in thread.tags.all(), 'Tag not added to thread')
90 90 self.assertTrue(thread in tag.threads.all(), 'Thread not added to tag')
91 91
92 92 def test_thread_max_count(self):
93 93 """Test deletion of old posts when the max thread count is reached"""
94 94
95 95 for i in range(settings.MAX_THREAD_COUNT + 1):
96 96 self._create_post()
97 97
98 98 self.assertEqual(settings.MAX_THREAD_COUNT,
99 99 len(Thread.objects.filter(archived=False)))
100 100
101 101 def test_pages(self):
102 102 """Test that the thread list is properly split into pages"""
103 103
104 104 for i in range(settings.MAX_THREAD_COUNT):
105 105 self._create_post()
106 106
107 107 all_threads = Thread.objects.filter(archived=False)
108 108
109 109 paginator = Paginator(Thread.objects.filter(archived=False),
110 110 settings.THREADS_PER_PAGE)
111 111 posts_in_second_page = paginator.page(2).object_list
112 112 first_post = posts_in_second_page[0]
113 113
114 114 self.assertEqual(all_threads[settings.THREADS_PER_PAGE].id,
115 first_post.id) No newline at end of file
115 first_post.id)
116
117 def test_thread_replies(self):
118 """
119 Tests that the replies can be queried from a thread in all possible
120 ways.
121 """
122
123 tag = Tag.objects.create(name='test_tag')
124 opening_post = Post.objects.create_post(title='title', text='text',
125 tags=[tag])
126 thread = opening_post.get_thread()
127
128 reply1 = Post.objects.create_post(title='title', text='text', thread=thread)
129 reply2 = Post.objects.create_post(title='title', text='text', thread=thread)
130
131 replies = thread.get_replies()
132 self.assertTrue(len(replies) > 0, 'No replies found for thread.')
133
134 replies = thread.get_replies(view_fields_only=True)
135 self.assertTrue(len(replies) > 0,
136 'No replies found for thread with view fields only.')
General Comments 0
You need to be logged in to leave comments. Login now