##// END OF EJS Templates
Minor fixes. Added test for posting to thread
neko259 -
r380:631bd658 default
parent child Browse files
Show More
@@ -1,439 +1,439 b''
1 1 import os
2 2 from random import random
3 3 import time
4 4 import math
5 5 from django.core.cache import cache
6 6
7 7 from django.db import models
8 8 from django.db.models import Count
9 9 from django.http import Http404
10 10 from django.utils import timezone
11 11 from markupfield.fields import MarkupField
12 12 from boards import settings as board_settings
13 13
14 14 from neboard import settings
15 15 import thumbs
16 16
17 17 import re
18 18
19 19 BAN_REASON_MAX_LENGTH = 200
20 20
21 21 BAN_REASON_AUTO = 'Auto'
22 22
23 23 IMAGE_THUMB_SIZE = (200, 150)
24 24
25 25 TITLE_MAX_LENGTH = 50
26 26
27 27 DEFAULT_MARKUP_TYPE = 'markdown'
28 28
29 29 NO_PARENT = -1
30 30 NO_IP = '0.0.0.0'
31 31 UNKNOWN_UA = ''
32 32 ALL_PAGES = -1
33 33 OPENING_POST_POPULARITY_WEIGHT = 2
34 34 IMAGES_DIRECTORY = 'images/'
35 35 FILE_EXTENSION_DELIMITER = '.'
36 36
37 37 RANK_ADMIN = 0
38 38 RANK_MODERATOR = 10
39 39 RANK_USER = 100
40 40
41 41 SETTING_MODERATE = "moderate"
42 42
43 43 REGEX_REPLY = re.compile('>>(\d+)')
44 44
45 45
46 46 class PostManager(models.Manager):
47 47
48 48 def create_post(self, title, text, image=None, thread=None,
49 49 ip=NO_IP, tags=None, user=None):
50 50 posting_time = timezone.now()
51 51
52 52 post = self.create(title=title,
53 53 text=text,
54 54 pub_time=posting_time,
55 55 thread=thread,
56 56 image=image,
57 57 poster_ip=ip,
58 58 poster_user_agent=UNKNOWN_UA,
59 59 last_edit_time=posting_time,
60 60 bump_time=posting_time,
61 61 user=user)
62 62
63 63 if tags:
64 64 map(post.tags.add, tags)
65 65 for tag in tags:
66 66 tag.threads.add(post)
67 67
68 68 if thread:
69 69 thread.replies.add(post)
70 70 thread.bump()
71 71 thread.last_edit_time = posting_time
72 72 thread.save()
73 73
74 74 #cache_key = thread.get_cache_key()
75 75 #cache.delete(cache_key)
76 76
77 77 else:
78 78 self._delete_old_threads()
79 79
80 80 self.connect_replies(post)
81 81
82 82 return post
83 83
84 84 def delete_post(self, post):
85 85 if post.replies.count() > 0:
86 86 map(self.delete_post, post.replies.all())
87 87
88 88 # Update thread's last edit time (used as cache key)
89 89 thread = post.thread
90 90 if thread:
91 91 thread.last_edit_time = timezone.now()
92 92 thread.save()
93 93
94 94 #cache_key = thread.get_cache_key()
95 95 #cache.delete(cache_key)
96 96
97 97 post.delete()
98 98
99 99 def delete_posts_by_ip(self, ip):
100 100 posts = self.filter(poster_ip=ip)
101 101 map(self.delete_post, posts)
102 102
103 103 def get_threads(self, tag=None, page=ALL_PAGES,
104 104 order_by='-bump_time'):
105 105 if tag:
106 106 threads = tag.threads
107 107
108 108 if threads.count() == 0:
109 109 raise Http404
110 110 else:
111 111 threads = self.filter(thread=None)
112 112
113 113 threads = threads.order_by(order_by)
114 114
115 115 if page != ALL_PAGES:
116 116 thread_count = threads.count()
117 117
118 118 if page < self._get_page_count(thread_count):
119 119 start_thread = page * settings.THREADS_PER_PAGE
120 120 end_thread = min(start_thread + settings.THREADS_PER_PAGE,
121 121 thread_count)
122 122 threads = threads[start_thread:end_thread]
123 123
124 124 return threads
125 125
126 126 def get_thread(self, opening_post_id):
127 127 try:
128 128 opening_post = self.get(id=opening_post_id, thread=None)
129 129 except Post.DoesNotExist:
130 130 raise Http404
131 131
132 132 #cache_key = opening_post.get_cache_key()
133 133 #thread = cache.get(cache_key)
134 134 #if thread:
135 135 # return thread
136 136
137 137 if opening_post.replies:
138 138 thread = [opening_post]
139 139 thread.extend(opening_post.replies.all().order_by('pub_time'))
140 140
141 141 #cache.set(cache_key, thread, board_settings.CACHE_TIMEOUT)
142 142
143 143 return thread
144 144
145 145 def exists(self, post_id):
146 146 posts = self.filter(id=post_id)
147 147
148 148 return posts.count() > 0
149 149
150 150 def get_thread_page_count(self, tag=None):
151 151 if tag:
152 152 threads = self.filter(thread=None, tags=tag)
153 153 else:
154 154 threads = self.filter(thread=None)
155 155
156 156 return self._get_page_count(threads.count())
157 157
158 158 def _delete_old_threads(self):
159 159 """
160 160 Preserves maximum thread count. If there are too many threads,
161 161 delete the old ones.
162 162 """
163 163
164 164 # TODO Move old threads to the archive instead of deleting them.
165 165 # Maybe make some 'old' field in the model to indicate the thread
166 166 # must not be shown and be able for replying.
167 167
168 168 threads = self.get_threads()
169 169 thread_count = threads.count()
170 170
171 171 if thread_count > settings.MAX_THREAD_COUNT:
172 172 num_threads_to_delete = thread_count - settings.MAX_THREAD_COUNT
173 173 old_threads = threads[thread_count - num_threads_to_delete:]
174 174
175 175 map(self.delete_post, old_threads)
176 176
177 177 def connect_replies(self, post):
178 178 """Connect replies to a post to show them as a refmap"""
179 179
180 180 for reply_number in re.finditer(REGEX_REPLY, post.text.raw):
181 id = reply_number.group(1)
182 ref_post = self.filter(id=id)
181 post_id = reply_number.group(1)
182 ref_post = self.filter(id=post_id)
183 183 if ref_post.count() > 0:
184 184 referenced_post = ref_post[0]
185 185 referenced_post.referenced_posts.add(post)
186 186 referenced_post.last_edit_time = post.pub_time
187 187 referenced_post.save()
188 188
189 189 def _get_page_count(self, thread_count):
190 190 return int(math.ceil(thread_count / float(settings.THREADS_PER_PAGE)))
191 191
192 192
193 193 class TagManager(models.Manager):
194 194
195 195 def get_not_empty_tags(self):
196 196 tags = self.annotate(Count('threads')) \
197 197 .filter(threads__count__gt=0).order_by('name')
198 198
199 199 return tags
200 200
201 201
202 202 class Tag(models.Model):
203 203 """
204 204 A tag is a text node assigned to the post. The tag serves as a board
205 205 section. There can be multiple tags for each message
206 206 """
207 207
208 208 objects = TagManager()
209 209
210 210 name = models.CharField(max_length=100)
211 211 threads = models.ManyToManyField('Post', null=True,
212 212 blank=True, related_name='tag+')
213 213 linked = models.ForeignKey('Tag', null=True, blank=True)
214 214
215 215 def __unicode__(self):
216 216 return self.name
217 217
218 218 def is_empty(self):
219 219 return self.get_post_count() == 0
220 220
221 221 def get_post_count(self):
222 222 return self.threads.count()
223 223
224 224 def get_popularity(self):
225 225 posts_with_tag = Post.objects.get_threads(tag=self)
226 226 reply_count = 0
227 227 for post in posts_with_tag:
228 228 reply_count += post.get_reply_count()
229 229 reply_count += OPENING_POST_POPULARITY_WEIGHT
230 230
231 231 return reply_count
232 232
233 233 def get_linked_tags(self):
234 234 tag_list = []
235 235 self.get_linked_tags_list(tag_list)
236 236
237 237 return tag_list
238 238
239 239 def get_linked_tags_list(self, tag_list=[]):
240 240 """
241 241 Returns the list of tags linked to current. The list can be got
242 242 through returned value or tag_list parameter
243 243 """
244 244
245 245 linked_tag = self.linked
246 246
247 247 if linked_tag and not (linked_tag in tag_list):
248 248 tag_list.append(linked_tag)
249 249
250 250 linked_tag.get_linked_tags_list(tag_list)
251 251
252 252
253 253 class Post(models.Model):
254 254 """A post is a message."""
255 255
256 256 objects = PostManager()
257 257
258 258 def _update_image_filename(self, filename):
259 259 """Get unique image filename"""
260 260
261 261 path = IMAGES_DIRECTORY
262 262 new_name = str(int(time.mktime(time.gmtime())))
263 263 new_name += str(int(random() * 1000))
264 264 new_name += FILE_EXTENSION_DELIMITER
265 265 new_name += filename.split(FILE_EXTENSION_DELIMITER)[-1:][0]
266 266
267 267 return os.path.join(path, new_name)
268 268
269 269 title = models.CharField(max_length=TITLE_MAX_LENGTH)
270 270 pub_time = models.DateTimeField()
271 271 text = MarkupField(default_markup_type=DEFAULT_MARKUP_TYPE,
272 272 escape_html=False)
273 273
274 274 image_width = models.IntegerField(default=0)
275 275 image_height = models.IntegerField(default=0)
276 276
277 277 image = thumbs.ImageWithThumbsField(upload_to=_update_image_filename,
278 278 blank=True, sizes=(IMAGE_THUMB_SIZE,),
279 279 width_field='image_width',
280 280 height_field='image_height')
281 281
282 282 poster_ip = models.GenericIPAddressField()
283 283 poster_user_agent = models.TextField()
284 284
285 285 thread = models.ForeignKey('Post', null=True, default=None)
286 286 tags = models.ManyToManyField(Tag)
287 287 last_edit_time = models.DateTimeField()
288 288 bump_time = models.DateTimeField()
289 289 user = models.ForeignKey('User', null=True, default=None)
290 290
291 291 replies = models.ManyToManyField('Post', symmetrical=False, null=True,
292 292 blank=True, related_name='re+')
293 293 referenced_posts = models.ManyToManyField('Post', symmetrical=False,
294 294 null=True,
295 295 blank=True, related_name='rfp+')
296 296
297 297 def __unicode__(self):
298 298 return '#' + str(self.id) + ' ' + self.title + ' (' + \
299 299 self.text.raw[:50] + ')'
300 300
301 301 def get_title(self):
302 302 title = self.title
303 303 if len(title) == 0:
304 304 title = self.text.raw[:20]
305 305
306 306 return title
307 307
308 308 def get_reply_count(self):
309 309 return self.replies.count()
310 310
311 311 def get_images_count(self):
312 312 images_count = 1 if self.image else 0
313 313 images_count += self.replies.filter(image_width__gt=0).count()
314 314
315 315 return images_count
316 316
317 317 def can_bump(self):
318 318 """Check if the thread can be bumped by replying"""
319 319
320 320 post_count = self.get_reply_count()
321 321
322 322 return post_count <= settings.MAX_POSTS_PER_THREAD
323 323
324 324 def bump(self):
325 325 """Bump (move to up) thread"""
326 326
327 327 if self.can_bump():
328 328 self.bump_time = timezone.now()
329 329
330 330 def get_last_replies(self):
331 331 if settings.LAST_REPLIES_COUNT > 0:
332 332 reply_count = self.get_reply_count()
333 333
334 334 if reply_count > 0:
335 335 reply_count_to_show = min(settings.LAST_REPLIES_COUNT,
336 336 reply_count)
337 337 last_replies = self.replies.all().order_by('pub_time')[
338 338 reply_count - reply_count_to_show:]
339 339
340 340 return last_replies
341 341
342 342 def get_tags(self):
343 343 """Get a sorted tag list"""
344 344
345 345 return self.tags.order_by('name')
346 346
347 347 def get_cache_key(self):
348 348 return str(self.id) + str(self.last_edit_time.microsecond)
349 349
350 350 def get_sorted_referenced_posts(self):
351 351 return self.referenced_posts.order_by('id')
352 352
353 353 def is_referenced(self):
354 354 return self.referenced_posts.count() > 0
355 355
356 356
357 357 class User(models.Model):
358 358
359 359 user_id = models.CharField(max_length=50)
360 360 rank = models.IntegerField()
361 361
362 362 registration_time = models.DateTimeField()
363 363
364 364 fav_tags = models.ManyToManyField(Tag, null=True, blank=True)
365 365 fav_threads = models.ManyToManyField(Post, related_name='+', null=True,
366 366 blank=True)
367 367
368 368 def save_setting(self, name, value):
369 369 setting, created = Setting.objects.get_or_create(name=name, user=self)
370 370 setting.value = str(value)
371 371 setting.save()
372 372
373 373 return setting
374 374
375 375 def get_setting(self, name):
376 376 if Setting.objects.filter(name=name, user=self).exists():
377 377 setting = Setting.objects.get(name=name, user=self)
378 378 setting_value = setting.value
379 379 else:
380 380 setting_value = None
381 381
382 382 return setting_value
383 383
384 384 def is_moderator(self):
385 385 return RANK_MODERATOR >= self.rank
386 386
387 387 def get_sorted_fav_tags(self):
388 388 cache_key = self._get_tag_cache_key()
389 389 fav_tags = cache.get(cache_key)
390 390 if fav_tags:
391 391 return fav_tags
392 392
393 393 tags = self.fav_tags.annotate(Count('threads'))\
394 394 .filter(threads__count__gt=0).order_by('name')
395 395
396 396 if tags:
397 397 cache.set(cache_key, tags, board_settings.CACHE_TIMEOUT)
398 398
399 399 return tags
400 400
401 401 def get_post_count(self):
402 402 return Post.objects.filter(user=self).count()
403 403
404 404 def __unicode__(self):
405 405 return self.user_id + '(' + str(self.rank) + ')'
406 406
407 407 def get_last_access_time(self):
408 408 posts = Post.objects.filter(user=self)
409 409 if posts.count() > 0:
410 410 return posts.latest('pub_time').pub_time
411 411
412 412 def add_tag(self, tag):
413 413 self.fav_tags.add(tag)
414 414 cache.delete(self._get_tag_cache_key())
415 415
416 416 def remove_tag(self, tag):
417 417 self.fav_tags.remove(tag)
418 418 cache.delete(self._get_tag_cache_key())
419 419
420 420 def _get_tag_cache_key(self):
421 421 return self.user_id + '_tags'
422 422
423 423
424 424 class Setting(models.Model):
425 425
426 426 name = models.CharField(max_length=50)
427 427 value = models.CharField(max_length=50)
428 428 user = models.ForeignKey(User)
429 429
430 430
431 431 class Ban(models.Model):
432 432
433 433 ip = models.GenericIPAddressField()
434 434 reason = models.CharField(default=BAN_REASON_AUTO,
435 435 max_length=BAN_REASON_MAX_LENGTH)
436 436 can_read = models.BooleanField(default=True)
437 437
438 438 def __unicode__(self):
439 439 return self.ip
@@ -1,175 +1,184 b''
1 1 # coding=utf-8
2 2 from django.test import TestCase
3 3 from django.test.client import Client
4 4 import time
5 5
6 6 from boards.models import Post, Tag
7 7 from neboard import settings
8 8
9 9 PAGE_404 = 'boards/404.html'
10 10
11 11 TEST_TEXT = 'test text'
12 12
13 13 NEW_THREAD_PAGE = '/'
14 14 THREAD_PAGE_ONE = '/thread/1/'
15 15 THREAD_PAGE = '/thread/'
16 16 TAG_PAGE = '/tag/'
17 17 HTTP_CODE_REDIRECT = 302
18 18 HTTP_CODE_OK = 200
19 19 HTTP_CODE_NOT_FOUND = 404
20 20
21 21
22 22 class BoardTests(TestCase):
23 23
24 24 def _create_post(self):
25 25 return Post.objects.create_post(title='title',
26 26 text='text')
27 27
28 28 def test_post_add(self):
29 29 post = self._create_post()
30 30
31 31 self.assertIsNotNone(post)
32 32 self.assertIsNone(post.thread, 'Opening post has a thread')
33 33
34 34 def test_delete_post(self):
35 35 post = self._create_post()
36 36 post_id = post.id
37 37
38 38 Post.objects.delete_post(post)
39 39
40 40 self.assertFalse(Post.objects.exists(post_id))
41 41
42 def test_post_to_thread(self):
43 op = self._create_post()
44 post = Post.objects.create_post("", "", thread=op)
45
46 self.assertIsNotNone(post, 'Reply to thread wasn\'t created')
47 self.assertEqual(op.last_edit_time, post.pub_time,
48 'Post\'s create time doesn\'t match thread last edit'
49 ' time')
50
42 51 def test_delete_posts_by_ip(self):
43 52 post = self._create_post()
44 53 post_id = post.id
45 54
46 55 Post.objects.delete_posts_by_ip('0.0.0.0')
47 56
48 57 self.assertFalse(Post.objects.exists(post_id))
49 58
50 59 def test_get_thread(self):
51 60 opening_post = self._create_post()
52 61
53 62 for i in range(0, 2):
54 63 Post.objects.create_post('title', 'text', thread=opening_post)
55 64
56 65 thread = Post.objects.get_thread(opening_post.id)
57 66
58 67 self.assertEqual(3, len(thread))
59 68
60 69 def test_create_post_with_tag(self):
61 70 tag = Tag.objects.create(name='test_tag')
62 71 post = Post.objects.create_post(title='title', text='text', tags=[tag])
63 72 self.assertIsNotNone(post)
64 73
65 74 def test_thread_max_count(self):
66 75 for i in range(settings.MAX_THREAD_COUNT + 1):
67 76 self._create_post()
68 77
69 78 self.assertEqual(settings.MAX_THREAD_COUNT,
70 79 len(Post.objects.get_threads()))
71 80
72 81 def test_pages(self):
73 82 """Test that the thread list is properly split into pages"""
74 83
75 84 for i in range(settings.MAX_THREAD_COUNT):
76 85 self._create_post()
77 86
78 87 all_threads = Post.objects.get_threads()
79 88
80 89 posts_in_second_page = Post.objects.get_threads(page=1)
81 90 first_post = posts_in_second_page[0]
82 91
83 92 self.assertEqual(all_threads[settings.THREADS_PER_PAGE].id,
84 93 first_post.id)
85 94
86 95 def test_post_validation(self):
87 96 """Test the validation of the post form"""
88 97
89 98 # Disable captcha for the test
90 99 captcha_enabled = settings.ENABLE_CAPTCHA
91 100 settings.ENABLE_CAPTCHA = False
92 101
93 102 client = Client()
94 103
95 104 valid_tags = u'tag1 tag_2 Ρ‚Π΅Π³_3'
96 105 invalid_tags = u'$%_356 ---'
97 106
98 107 response = client.post(NEW_THREAD_PAGE, {'title': 'test title',
99 108 'text': TEST_TEXT,
100 109 'tags': valid_tags})
101 110 self.assertEqual(response.status_code, HTTP_CODE_REDIRECT,
102 111 msg='Posting new message failed: got code ' +
103 112 str(response.status_code))
104 113
105 114 self.assertEqual(1, Post.objects.count(),
106 115 msg='No posts were created')
107 116
108 117 client.post(NEW_THREAD_PAGE, {'text': TEST_TEXT,
109 118 'tags': invalid_tags})
110 119 self.assertEqual(1, Post.objects.count(), msg='The validation passed '
111 120 'where it should fail')
112 121
113 122 # Change posting delay so we don't have to wait for 30 seconds or more
114 123 old_posting_delay = settings.POSTING_DELAY
115 124 # Wait fot the posting delay or we won't be able to post
116 125 settings.POSTING_DELAY = 1
117 126 time.sleep(settings.POSTING_DELAY + 1)
118 127 response = client.post(THREAD_PAGE_ONE, {'text': TEST_TEXT,
119 128 'tags': valid_tags})
120 129 self.assertEqual(HTTP_CODE_REDIRECT, response.status_code,
121 130 msg=u'Posting new message failed: got code ' +
122 131 str(response.status_code))
123 132 # Restore posting delay
124 133 settings.POSTING_DELAY = old_posting_delay
125 134
126 135 self.assertEqual(2, Post.objects.count(),
127 136 msg=u'No posts were created')
128 137
129 138 # Restore captcha setting
130 139 settings.ENABLE_CAPTCHA = captcha_enabled
131 140
132 141 def test_404(self):
133 142 """Test receiving error 404 when opening a non-existent page"""
134 143
135 144 tag_name = u'test_tag'
136 145 tag = Tag.objects.create(name=tag_name)
137 146 client = Client()
138 147
139 148 Post.objects.create_post('title', TEST_TEXT, tags=[tag])
140 149
141 150 existing_post_id = Post.objects.all()[0].id
142 151 response_existing = client.get(THREAD_PAGE + str(existing_post_id) +
143 152 '/')
144 153 self.assertEqual(HTTP_CODE_OK, response_existing.status_code,
145 154 u'Cannot open existing thread')
146 155
147 156 response_not_existing = client.get(THREAD_PAGE + str(
148 157 existing_post_id + 1) + '/')
149 158 self.assertEqual(PAGE_404,
150 159 response_not_existing.templates[0].name,
151 160 u'Not existing thread is opened')
152 161
153 162 response_existing = client.get(TAG_PAGE + tag_name + '/')
154 163 self.assertEqual(HTTP_CODE_OK,
155 164 response_existing.status_code,
156 165 u'Cannot open existing tag')
157 166
158 167 response_not_existing = client.get(TAG_PAGE + u'not_tag' + '/')
159 168 self.assertEqual(PAGE_404,
160 169 response_not_existing.templates[0].name,
161 170 u'Not existing tag is opened')
162 171
163 172 reply_id = Post.objects.create_post('', TEST_TEXT,
164 173 thread=Post.objects.all()[0])
165 174 response_not_existing = client.get(THREAD_PAGE + str(
166 175 reply_id) + '/')
167 176 self.assertEqual(PAGE_404,
168 177 response_not_existing.templates[0].name,
169 178 u'Reply is opened as a thread')
170 179
171 180 def test_linked_tag(self):
172 181 tag = Tag.objects.create(name=u'tag1')
173 182 linked_tag = Tag.objects.create(name=u'tag2', linked=tag)
174 183
175 184 # TODO run add post view and check the tag is added No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now