##// END OF EJS Templates
Removed strange 'exists' method in post manager, fixed tests in which it was...
neko259 -
r396:0d260248 default
parent child Browse files
Show More
@@ -1,294 +1,272 b''
1 1 import os
2 2 from random import random
3 3 import time
4 4 import math
5 5 import re
6 from django.core.cache import cache
7 6
8 7 from django.db import models
9 8 from django.http import Http404
10 9 from django.utils import timezone
11 10 from markupfield.fields import MarkupField
12 11
13 12 from neboard import settings
14 13 from boards import settings as boards_settings
15 14 from boards import thumbs
16 15
17 16 BAN_REASON_AUTO = 'Auto'
18 17
19 18 IMAGE_THUMB_SIZE = (200, 150)
20 19
21 20 TITLE_MAX_LENGTH = 50
22 21
23 22 DEFAULT_MARKUP_TYPE = 'markdown'
24 23
25 24 NO_PARENT = -1
26 25 NO_IP = '0.0.0.0'
27 26 UNKNOWN_UA = ''
28 27 ALL_PAGES = -1
29 28 IMAGES_DIRECTORY = 'images/'
30 29 FILE_EXTENSION_DELIMITER = '.'
31 30
32 31 SETTING_MODERATE = "moderate"
33 32
34 33 REGEX_REPLY = re.compile('>>(\d+)')
35 34
36 35
37 36 class PostManager(models.Manager):
38 37
39 38 def create_post(self, title, text, image=None, thread=None,
40 39 ip=NO_IP, tags=None, user=None):
41 40 posting_time = timezone.now()
42 41
43 42 post = self.create(title=title,
44 43 text=text,
45 44 pub_time=posting_time,
46 45 thread=thread,
47 46 image=image,
48 47 poster_ip=ip,
49 48 poster_user_agent=UNKNOWN_UA,
50 49 last_edit_time=posting_time,
51 50 bump_time=posting_time,
52 51 user=user)
53 52
54 53 if tags:
55 54 linked_tags = []
56 55 for tag in tags:
57 56 tag_linked_tags = tag.get_linked_tags()
58 57 if len(tag_linked_tags) > 0:
59 58 linked_tags.extend(tag_linked_tags)
60 59
61 60 tags.extend(linked_tags)
62 61 map(post.tags.add, tags)
63 62 for tag in tags:
64 63 tag.threads.add(post)
65 64
66 65 if thread:
67 66 thread.replies.add(post)
68 67 thread.bump()
69 68 thread.last_edit_time = posting_time
70 69 thread.save()
71 70 else:
72 71 self._delete_old_threads()
73 72
74 73 self.connect_replies(post)
75 74
76 75 return post
77 76
78 77 def delete_post(self, post):
79 78 if post.replies.count() > 0:
80 79 map(self.delete_post, post.replies.all())
81 80
82 # Update thread's last edit time (used as cache key)
81 # Update thread's last edit time
83 82 thread = post.thread
84 83 if thread:
85 thread.clear_cache()
86
87 84 thread.last_edit_time = timezone.now()
88 85 thread.save()
89 86
90 post.clear_cache()
91 87 post.delete()
92 88
93 89 def delete_posts_by_ip(self, ip):
94 90 posts = self.filter(poster_ip=ip)
95 91 map(self.delete_post, posts)
96 92
97 93 def get_threads(self, tag=None, page=ALL_PAGES,
98 94 order_by='-bump_time'):
99 95 if tag:
100 96 threads = tag.threads
101 97
102 98 if threads.count() == 0:
103 99 raise Http404
104 100 else:
105 101 threads = self.filter(thread=None)
106 102
107 103 threads = threads.order_by(order_by)
108 104
109 105 if page != ALL_PAGES:
110 106 thread_count = threads.count()
111 107
112 108 if page < self._get_page_count(thread_count):
113 109 start_thread = page * settings.THREADS_PER_PAGE
114 110 end_thread = min(start_thread + settings.THREADS_PER_PAGE,
115 111 thread_count)
116 112 threads = threads[start_thread:end_thread]
117 113
118 114 return threads
119 115
120 116 def get_thread(self, opening_post_id):
121 117 try:
122 118 opening_post = self.get(id=opening_post_id, thread=None)
123 119 except Post.DoesNotExist:
124 120 raise Http404
125 121
126 cache_key = opening_post.get_cache_key()
127 thread = cache.get(cache_key)
128 if thread:
129 return thread
130
131 122 if opening_post.replies:
132 123 thread = [opening_post]
133 124 thread.extend(opening_post.replies.all().order_by('pub_time'))
134 125
135 cache.set(cache_key, thread, boards_settings.CACHE_TIMEOUT)
136
137 126 return thread
138 127
139 128 def get_thread_page_count(self, tag=None):
140 129 if tag:
141 130 threads = self.filter(thread=None, tags=tag)
142 131 else:
143 132 threads = self.filter(thread=None)
144 133
145 134 return self._get_page_count(threads.count())
146 135
147 136 def _delete_old_threads(self):
148 137 """
149 138 Preserves maximum thread count. If there are too many threads,
150 139 delete the old ones.
151 140 """
152 141
153 142 # TODO Move old threads to the archive instead of deleting them.
154 143 # Maybe make some 'old' field in the model to indicate the thread
155 144 # must not be shown and be able for replying.
156 145
157 146 threads = self.get_threads()
158 147 thread_count = threads.count()
159 148
160 149 if thread_count > settings.MAX_THREAD_COUNT:
161 150 num_threads_to_delete = thread_count - settings.MAX_THREAD_COUNT
162 151 old_threads = threads[thread_count - num_threads_to_delete:]
163 152
164 153 map(self.delete_post, old_threads)
165 154
166 155 def connect_replies(self, post):
167 156 """Connect replies to a post to show them as a refmap"""
168 157
169 158 for reply_number in re.finditer(REGEX_REPLY, post.text.raw):
170 159 post_id = reply_number.group(1)
171 160 ref_post = self.filter(id=post_id)
172 161 if ref_post.count() > 0:
173 162 referenced_post = ref_post[0]
174 163 referenced_post.referenced_posts.add(post)
175 164 referenced_post.last_edit_time = post.pub_time
176 165 referenced_post.save()
177 166
178 167 def _get_page_count(self, thread_count):
179 168 return int(math.ceil(thread_count / float(settings.THREADS_PER_PAGE)))
180 169
181 170
182 171 class Post(models.Model):
183 172 """A post is a message."""
184 173
185 174 objects = PostManager()
186 175
187 176 class Meta:
188 177 app_label = 'boards'
189 178
190 179 def _update_image_filename(self, filename):
191 180 """Get unique image filename"""
192 181
193 182 path = IMAGES_DIRECTORY
194 183 new_name = str(int(time.mktime(time.gmtime())))
195 184 new_name += str(int(random() * 1000))
196 185 new_name += FILE_EXTENSION_DELIMITER
197 186 new_name += filename.split(FILE_EXTENSION_DELIMITER)[-1:][0]
198 187
199 188 return os.path.join(path, new_name)
200 189
201 190 title = models.CharField(max_length=TITLE_MAX_LENGTH)
202 191 pub_time = models.DateTimeField()
203 192 text = MarkupField(default_markup_type=DEFAULT_MARKUP_TYPE,
204 193 escape_html=False)
205 194
206 195 image_width = models.IntegerField(default=0)
207 196 image_height = models.IntegerField(default=0)
208 197
209 198 image = thumbs.ImageWithThumbsField(upload_to=_update_image_filename,
210 199 blank=True, sizes=(IMAGE_THUMB_SIZE,),
211 200 width_field='image_width',
212 201 height_field='image_height')
213 202
214 203 poster_ip = models.GenericIPAddressField()
215 204 poster_user_agent = models.TextField()
216 205
217 206 thread = models.ForeignKey('Post', null=True, default=None)
218 207 tags = models.ManyToManyField('Tag')
219 208 last_edit_time = models.DateTimeField()
220 209 bump_time = models.DateTimeField()
221 210 user = models.ForeignKey('User', null=True, default=None)
222 211
223 212 replies = models.ManyToManyField('Post', symmetrical=False, null=True,
224 213 blank=True, related_name='re+')
225 214 referenced_posts = models.ManyToManyField('Post', symmetrical=False,
226 215 null=True,
227 216 blank=True, related_name='rfp+')
228 217
229 218 def __unicode__(self):
230 219 return '#' + str(self.id) + ' ' + self.title + ' (' + \
231 220 self.text.raw[:50] + ')'
232 221
233 222 def get_title(self):
234 223 title = self.title
235 224 if len(title) == 0:
236 225 title = self.text.raw[:20]
237 226
238 227 return title
239 228
240 229 def get_reply_count(self):
241 230 return self.replies.count()
242 231
243 232 def get_images_count(self):
244 233 images_count = 1 if self.image else 0
245 234 images_count += self.replies.filter(image_width__gt=0).count()
246 235
247 236 return images_count
248 237
249 238 def can_bump(self):
250 239 """Check if the thread can be bumped by replying"""
251 240
252 241 post_count = self.get_reply_count()
253 242
254 243 return post_count <= settings.MAX_POSTS_PER_THREAD
255 244
256 def clear_cache(self):
257 """Remove the post from cache"""
258
259 cache_key = self.get_cache_key()
260 cache.delete(cache_key)
261
262 245 def bump(self):
263 246 """Bump (move to up) thread"""
264 247
265 248 if self.can_bump():
266 249 self.bump_time = timezone.now()
267 250
268 self.clear_cache()
269
270 251 def get_last_replies(self):
271 252 if settings.LAST_REPLIES_COUNT > 0:
272 253 reply_count = self.get_reply_count()
273 254
274 255 if reply_count > 0:
275 256 reply_count_to_show = min(settings.LAST_REPLIES_COUNT,
276 257 reply_count)
277 258 last_replies = self.replies.all().order_by('pub_time')[
278 259 reply_count - reply_count_to_show:]
279 260
280 261 return last_replies
281 262
282 263 def get_tags(self):
283 264 """Get a sorted tag list"""
284 265
285 266 return self.tags.order_by('name')
286 267
287 def get_cache_key(self):
288 return 'thread_cache' + str(self.id)
289
290 268 def get_sorted_referenced_posts(self):
291 269 return self.referenced_posts.order_by('id')
292 270
293 271 def is_referenced(self):
294 272 return self.referenced_posts.count() > 0
@@ -1,218 +1,218 b''
1 1 # coding=utf-8
2 2 from django.test import TestCase
3 3 from django.test.client import Client
4 4 import time
5 5
6 6 from boards.models import Post, Tag
7 7 from neboard import settings
8 8
9 9 PAGE_404 = 'boards/404.html'
10 10
11 11 TEST_TEXT = 'test text'
12 12
13 13 NEW_THREAD_PAGE = '/'
14 14 THREAD_PAGE_ONE = '/thread/1/'
15 15 THREAD_PAGE = '/thread/'
16 16 TAG_PAGE = '/tag/'
17 17 HTTP_CODE_REDIRECT = 302
18 18 HTTP_CODE_OK = 200
19 19 HTTP_CODE_NOT_FOUND = 404
20 20
21 21
22 22 class PostTests(TestCase):
23 23
24 24 def _create_post(self):
25 25 return Post.objects.create_post(title='title',
26 26 text='text')
27 27
28 28 def test_post_add(self):
29 29 """Test adding post"""
30 30
31 31 post = self._create_post()
32 32
33 33 self.assertIsNotNone(post)
34 34 self.assertIsNone(post.thread, 'Opening post has a thread')
35 35
36 36 def test_delete_post(self):
37 37 """Test post deletion"""
38 38
39 39 post = self._create_post()
40 40 post_id = post.id
41 41
42 42 Post.objects.delete_post(post)
43 43
44 self.assertFalse(Post.objects.exists(post_id))
44 self.assertFalse(Post.objects.filter(id=post_id).exists())
45 45
46 46 def test_delete_thread(self):
47 47 """Test thread deletion"""
48 48
49 49 thread = self._create_post()
50 50 reply = Post.objects.create_post("", "", thread=thread)
51 51
52 52 Post.objects.delete_post(thread)
53 53
54 self.assertFalse(Post.objects.exists(reply.id))
54 self.assertFalse(Post.objects.filter(id=reply.id).exists())
55 55
56 56 def test_post_to_thread(self):
57 57 """Test adding post to a thread"""
58 58
59 59 op = self._create_post()
60 60 post = Post.objects.create_post("", "", thread=op)
61 61
62 62 self.assertIsNotNone(post, 'Reply to thread wasn\'t created')
63 63 self.assertEqual(op.last_edit_time, post.pub_time,
64 64 'Post\'s create time doesn\'t match thread last edit'
65 65 ' time')
66 66
67 67 def test_delete_posts_by_ip(self):
68 68 """Test deleting posts with the given ip"""
69 69
70 70 post = self._create_post()
71 71 post_id = post.id
72 72
73 73 Post.objects.delete_posts_by_ip('0.0.0.0')
74 74
75 self.assertFalse(Post.objects.exists(post_id))
75 self.assertFalse(Post.objects.filter(id=post_id).exists())
76 76
77 77 def test_get_thread(self):
78 78 """Test getting all posts of a thread"""
79 79
80 80 opening_post = self._create_post()
81 81
82 82 for i in range(0, 2):
83 83 Post.objects.create_post('title', 'text', thread=opening_post)
84 84
85 85 thread = Post.objects.get_thread(opening_post.id)
86 86
87 87 self.assertEqual(3, len(thread))
88 88
89 89 def test_create_post_with_tag(self):
90 90 """Test adding tag to post"""
91 91
92 92 tag = Tag.objects.create(name='test_tag')
93 93 post = Post.objects.create_post(title='title', text='text', tags=[tag])
94 94 self.assertIsNotNone(post)
95 95
96 96 def test_thread_max_count(self):
97 97 """Test deletion of old posts when the max thread count is reached"""
98 98
99 99 for i in range(settings.MAX_THREAD_COUNT + 1):
100 100 self._create_post()
101 101
102 102 self.assertEqual(settings.MAX_THREAD_COUNT,
103 103 len(Post.objects.get_threads()))
104 104
105 105 def test_pages(self):
106 106 """Test that the thread list is properly split into pages"""
107 107
108 108 for i in range(settings.MAX_THREAD_COUNT):
109 109 self._create_post()
110 110
111 111 all_threads = Post.objects.get_threads()
112 112
113 113 posts_in_second_page = Post.objects.get_threads(page=1)
114 114 first_post = posts_in_second_page[0]
115 115
116 116 self.assertEqual(all_threads[settings.THREADS_PER_PAGE].id,
117 117 first_post.id)
118 118
119 119 def test_linked_tag(self):
120 120 """Test adding a linked tag"""
121 121
122 122 linked_tag = Tag.objects.create(name=u'tag1')
123 123 tag = Tag.objects.create(name=u'tag2', linked=linked_tag)
124 124
125 125 post = Post.objects.create_post("", "", tags=[tag])
126 126
127 127 self.assertTrue(linked_tag in post.tags.all(),
128 128 'Linked tag was not added')
129 129
130 130
131 131 class PagesTest(TestCase):
132 132
133 133 def test_404(self):
134 134 """Test receiving error 404 when opening a non-existent page"""
135 135
136 136 tag_name = u'test_tag'
137 137 tag = Tag.objects.create(name=tag_name)
138 138 client = Client()
139 139
140 140 Post.objects.create_post('title', TEST_TEXT, tags=[tag])
141 141
142 142 existing_post_id = Post.objects.all()[0].id
143 143 response_existing = client.get(THREAD_PAGE + str(existing_post_id) +
144 144 '/')
145 145 self.assertEqual(HTTP_CODE_OK, response_existing.status_code,
146 146 u'Cannot open existing thread')
147 147
148 148 response_not_existing = client.get(THREAD_PAGE + str(
149 149 existing_post_id + 1) + '/')
150 150 self.assertEqual(PAGE_404,
151 151 response_not_existing.templates[0].name,
152 152 u'Not existing thread is opened')
153 153
154 154 response_existing = client.get(TAG_PAGE + tag_name + '/')
155 155 self.assertEqual(HTTP_CODE_OK,
156 156 response_existing.status_code,
157 157 u'Cannot open existing tag')
158 158
159 159 response_not_existing = client.get(TAG_PAGE + u'not_tag' + '/')
160 160 self.assertEqual(PAGE_404,
161 161 response_not_existing.templates[0].name,
162 162 u'Not existing tag is opened')
163 163
164 164 reply_id = Post.objects.create_post('', TEST_TEXT,
165 165 thread=Post.objects.all()[0])
166 166 response_not_existing = client.get(THREAD_PAGE + str(
167 167 reply_id) + '/')
168 168 self.assertEqual(PAGE_404,
169 169 response_not_existing.templates[0].name,
170 170 u'Reply is opened as a thread')
171 171
172 172
173 173 class FormTest(TestCase):
174 174 def test_post_validation(self):
175 175 """Test the validation of the post form"""
176 176
177 177 # Disable captcha for the test
178 178 captcha_enabled = settings.ENABLE_CAPTCHA
179 179 settings.ENABLE_CAPTCHA = False
180 180
181 181 client = Client()
182 182
183 183 valid_tags = u'tag1 tag_2 тег_3'
184 184 invalid_tags = u'$%_356 ---'
185 185
186 186 response = client.post(NEW_THREAD_PAGE, {'title': 'test title',
187 187 'text': TEST_TEXT,
188 188 'tags': valid_tags})
189 189 self.assertEqual(response.status_code, HTTP_CODE_REDIRECT,
190 190 msg='Posting new message failed: got code ' +
191 191 str(response.status_code))
192 192
193 193 self.assertEqual(1, Post.objects.count(),
194 194 msg='No posts were created')
195 195
196 196 client.post(NEW_THREAD_PAGE, {'text': TEST_TEXT,
197 197 'tags': invalid_tags})
198 198 self.assertEqual(1, Post.objects.count(), msg='The validation passed '
199 199 'where it should fail')
200 200
201 201 # Change posting delay so we don't have to wait for 30 seconds or more
202 202 old_posting_delay = settings.POSTING_DELAY
203 203 # Wait fot the posting delay or we won't be able to post
204 204 settings.POSTING_DELAY = 1
205 205 time.sleep(settings.POSTING_DELAY + 1)
206 206 response = client.post(THREAD_PAGE_ONE, {'text': TEST_TEXT,
207 207 'tags': valid_tags})
208 208 self.assertEqual(HTTP_CODE_REDIRECT, response.status_code,
209 209 msg=u'Posting new message failed: got code ' +
210 210 str(response.status_code))
211 211 # Restore posting delay
212 212 settings.POSTING_DELAY = old_posting_delay
213 213
214 214 self.assertEqual(2, Post.objects.count(),
215 215 msg=u'No posts were created')
216 216
217 217 # Restore captcha setting
218 218 settings.ENABLE_CAPTCHA = captcha_enabled
General Comments 0
You need to be logged in to leave comments. Login now