##// END OF EJS Templates
test fixes
marcink -
r1732:8321b3d1 beta
parent child Browse files
Show More
@@ -1,215 +1,215 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.model.notification
4 4 ~~~~~~~~~~~~~~
5 5
6 6 Model for notifications
7 7
8 8
9 9 :created_on: Nov 20, 2011
10 10 :author: marcink
11 11 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
12 12 :license: GPLv3, see COPYING for more details.
13 13 """
14 14 # This program is free software: you can redistribute it and/or modify
15 15 # it under the terms of the GNU General Public License as published by
16 16 # the Free Software Foundation, either version 3 of the License, or
17 17 # (at your option) any later version.
18 18 #
19 19 # This program is distributed in the hope that it will be useful,
20 20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 22 # GNU General Public License for more details.
23 23 #
24 24 # You should have received a copy of the GNU General Public License
25 25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 26
27 27 import os
28 28 import logging
29 29 import traceback
30 30 import datetime
31 31
32 32 from pylons.i18n.translation import _
33 33
34 34 import rhodecode
35 35 from rhodecode.lib import helpers as h
36 36 from rhodecode.model import BaseModel
37 37 from rhodecode.model.db import Notification, User, UserNotification
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41
42 42 class NotificationModel(BaseModel):
43 43
44 44 def __get_user(self, user):
45 45 if isinstance(user, basestring):
46 46 return User.get_by_username(username=user)
47 47 else:
48 48 return self._get_instance(User, user)
49 49
50 50 def __get_notification(self, notification):
51 51 if isinstance(notification, Notification):
52 52 return notification
53 53 elif isinstance(notification, int):
54 54 return Notification.get(notification)
55 55 else:
56 56 if notification:
57 57 raise Exception('notification must be int or Instance'
58 58 ' of Notification got %s' % type(notification))
59 59
60 60 def create(self, created_by, subject, body, recipients=None,
61 61 type_=Notification.TYPE_MESSAGE, with_email=True,
62 62 email_kwargs={}):
63 63 """
64 64
65 65 Creates notification of given type
66 66
67 67 :param created_by: int, str or User instance. User who created this
68 68 notification
69 69 :param subject:
70 70 :param body:
71 71 :param recipients: list of int, str or User objects, when None
72 72 is given send to all admins
73 73 :param type_: type of notification
74 74 :param with_email: send email with this notification
75 75 :param email_kwargs: additional dict to pass as args to email template
76 76 """
77 77 from rhodecode.lib.celerylib import tasks, run_task
78 78
79 79 if recipients and not getattr(recipients, '__iter__', False):
80 80 raise Exception('recipients must be a list of iterable')
81 81
82 82 created_by_obj = self.__get_user(created_by)
83 83
84 84 if recipients:
85 85 recipients_objs = []
86 86 for u in recipients:
87 87 obj = self.__get_user(u)
88 88 if obj:
89 89 recipients_objs.append(obj)
90 90 recipients_objs = set(recipients_objs)
91 91 else:
92 92 # empty recipients means to all admins
93 93 recipients_objs = User.query().filter(User.admin == True).all()
94 94
95 95 notif = Notification.create(created_by=created_by_obj, subject=subject,
96 96 body=body, recipients=recipients_objs,
97 97 type_=type_)
98 98
99 99 if with_email is False:
100 100 return notif
101 101
102 102 # send email with notification
103 103 for rec in recipients_objs:
104 104 email_subject = NotificationModel().make_description(notif, False)
105 105 type_ = type_
106 106 email_body = body
107 107 kwargs = {'subject':subject, 'body':h.rst(body)}
108 108 kwargs.update(email_kwargs)
109 109 email_body_html = EmailNotificationModel()\
110 110 .get_email_tmpl(type_, **kwargs)
111 111 run_task(tasks.send_email, rec.email, email_subject, email_body,
112 112 email_body_html)
113 113
114 114 return notif
115 115
116 116 def delete(self, user, notification):
117 117 # we don't want to remove actual notification just the assignment
118 118 try:
119 119 notification = self.__get_notification(notification)
120 120 user = self.__get_user(user)
121 121 if notification and user:
122 122 obj = UserNotification.query()\
123 123 .filter(UserNotification.user == user)\
124 124 .filter(UserNotification.notification
125 125 == notification)\
126 126 .one()
127 127 self.sa.delete(obj)
128 128 return True
129 129 except Exception:
130 130 log.error(traceback.format_exc())
131 131 raise
132 132
133 133 def get_for_user(self, user):
134 134 user = self.__get_user(user)
135 135 return user.notifications
136 136
137 137 def get_unread_cnt_for_user(self, user):
138 138 user = self.__get_user(user)
139 139 return UserNotification.query()\
140 140 .filter(UserNotification.read == False)\
141 141 .filter(UserNotification.user == user).count()
142 142
143 143 def get_unread_for_user(self, user):
144 144 user = self.__get_user(user)
145 145 return [x.notification for x in UserNotification.query()\
146 146 .filter(UserNotification.read == False)\
147 147 .filter(UserNotification.user == user).all()]
148 148
149 149 def get_user_notification(self, user, notification):
150 150 user = self.__get_user(user)
151 151 notification = self.__get_notification(notification)
152 152
153 153 return UserNotification.query()\
154 154 .filter(UserNotification.notification == notification)\
155 155 .filter(UserNotification.user == user).scalar()
156 156
157 157 def make_description(self, notification, show_age=True):
158 158 """
159 159 Creates a human readable description based on properties
160 160 of notification object
161 161 """
162 162
163 163 _map = {notification.TYPE_CHANGESET_COMMENT:_('commented on commit'),
164 164 notification.TYPE_MESSAGE:_('sent message'),
165 165 notification.TYPE_MENTION:_('mentioned you'),
166 166 notification.TYPE_REGISTRATION:_('registered in RhodeCode')}
167 167
168 168 DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
169 169
170 170 tmpl = "%(user)s %(action)s %(when)s"
171 171 if show_age:
172 172 when = h.age(notification.created_on)
173 173 else:
174 174 DTF = lambda d: datetime.datetime.strftime(d, DATETIME_FORMAT)
175 175 when = DTF(notification.created_on)
176 176 data = dict(user=notification.created_by_user.username,
177 177 action=_map[notification.type_],
178 178 when=when)
179 179 return tmpl % data
180 180
181 181
182 182 class EmailNotificationModel(BaseModel):
183 183
184 TYPE_CHANGESET_COMMENT = 'changeset_comment'
184 TYPE_CHANGESET_COMMENT = Notification.TYPE_CHANGESET_COMMENT
185 185 TYPE_PASSWORD_RESET = 'passoword_link'
186 TYPE_REGISTRATION = 'registration'
186 TYPE_REGISTRATION = Notification.TYPE_REGISTRATION
187 187 TYPE_DEFAULT = 'default'
188 188
189 189 def __init__(self):
190 190 self._template_root = rhodecode.CONFIG['pylons.paths']['templates'][0]
191 191 self._tmpl_lookup = rhodecode.CONFIG['pylons.app_globals'].mako_lookup
192 192
193 193 self.email_types = {
194 194 self.TYPE_CHANGESET_COMMENT:'email_templates/changeset_comment.html',
195 195 self.TYPE_PASSWORD_RESET:'email_templates/password_reset.html',
196 196 self.TYPE_REGISTRATION:'email_templates/registration.html',
197 197 self.TYPE_DEFAULT:'email_templates/default.html'
198 198 }
199 199
200 200 def get_email_tmpl(self, type_, **kwargs):
201 201 """
202 202 return generated template for email based on given type
203 203
204 204 :param type_:
205 205 """
206 206
207 base = self.email_types.get(type_, self.TYPE_DEFAULT)
207 base = self.email_types.get(type_, self.email_types[self.TYPE_DEFAULT])
208 208 email_template = self._tmpl_lookup.get_template(base)
209 209 # translator inject
210 210 _kwargs = {'_':_}
211 211 _kwargs.update(kwargs)
212 212 log.debug('rendering tmpl %s with kwargs %s' % (base, _kwargs))
213 213 return email_template.render(**_kwargs)
214 214
215 215
@@ -1,83 +1,86 b''
1 1 from rhodecode.tests import *
2 2
3 3 from rhodecode.model.db import Repository
4 4
5 5 class TestForksController(TestController):
6 6
7 7 def test_index(self):
8 8 self.log_user()
9 9 repo_name = HG_REPO
10 10 response = self.app.get(url(controller='forks', action='forks',
11 11 repo_name=repo_name))
12 12
13 13 self.assertTrue("""There are no forks yet""" in response.body)
14 14
15 15
16 16 def test_index_with_fork(self):
17 17 self.log_user()
18 18
19 19 # create a fork
20 20 fork_name = HG_FORK
21 21 description = 'fork of vcs test'
22 22 repo_name = HG_REPO
23 23 org_repo = Repository.get_by_repo_name(repo_name)
24 24 response = self.app.post(url(controller='forks',
25 25 action='fork_create',
26 26 repo_name=repo_name),
27 27 {'repo_name':fork_name,
28 28 'repo_group':'',
29 29 'fork_parent_id':org_repo.repo_id,
30 30 'repo_type':'hg',
31 31 'description':description,
32 32 'private':'False'})
33 33
34 34 response = self.app.get(url(controller='forks', action='forks',
35 35 repo_name=repo_name))
36 36
37 37
38 38 self.assertTrue("""<a href="/%s/summary">"""
39 39 """vcs_test_hg_fork</a>""" % fork_name
40 40 in response.body)
41 41
42 42 #remove this fork
43 43 response = self.app.delete(url('repo', repo_name=fork_name))
44 44
45 45
46 46
47 47
48 48 def test_z_fork_create(self):
49 49 self.log_user()
50 50 fork_name = HG_FORK
51 51 description = 'fork of vcs test'
52 52 repo_name = HG_REPO
53 53 org_repo = Repository.get_by_repo_name(repo_name)
54 54 response = self.app.post(url(controller='forks', action='fork_create',
55 55 repo_name=repo_name),
56 56 {'repo_name':fork_name,
57 57 'repo_group':'',
58 58 'fork_parent_id':org_repo.repo_id,
59 59 'repo_type':'hg',
60 60 'description':description,
61 61 'private':'False'})
62 62
63 63 #test if we have a message that fork is ok
64 64 self.assertTrue('forked %s repository as %s' \
65 65 % (repo_name, fork_name) in response.session['flash'][0])
66 66
67 67 #test if the fork was created in the database
68 68 fork_repo = self.Session().query(Repository)\
69 69 .filter(Repository.repo_name == fork_name).one()
70 70
71 71 self.assertEqual(fork_repo.repo_name, fork_name)
72 72 self.assertEqual(fork_repo.fork.repo_name, repo_name)
73 73
74 74
75 75 #test if fork is visible in the list ?
76 76 response = response.follow()
77 77
78 78
79 #check if fork is marked as fork
79 # check if fork is marked as fork
80 # wait for cache to expire
81 import time
82 time.sleep(10)
80 83 response = self.app.get(url(controller='summary', action='index',
81 84 repo_name=fork_name))
82 85
83 86 self.assertTrue('Fork of %s' % repo_name in response.body)
@@ -1,260 +1,267 b''
1 1 # -*- coding: utf-8 -*-
2 2 from rhodecode.tests import *
3 from rhodecode.model.db import User
3 from rhodecode.model.db import User, Notification
4 4 from rhodecode.lib import generate_api_key
5 5 from rhodecode.lib.auth import check_password
6
6 from rhodecode.model.meta import Session
7 7
8 8 class TestLoginController(TestController):
9 9
10 def tearDown(self):
11 for n in Notification.query().all():
12 Session().delete(n)
13
14 Session().commit()
15 self.assertEqual(Notification.query().all(), [])
16
10 17 def test_index(self):
11 18 response = self.app.get(url(controller='login', action='index'))
12 19 self.assertEqual(response.status, '200 OK')
13 20 # Test response...
14 21
15 22 def test_login_admin_ok(self):
16 23 response = self.app.post(url(controller='login', action='index'),
17 24 {'username':'test_admin',
18 25 'password':'test12'})
19 26 self.assertEqual(response.status, '302 Found')
20 27 self.assertEqual(response.session['rhodecode_user'].get('username') ,
21 28 'test_admin')
22 29 response = response.follow()
23 30 self.assertTrue('%s repository' % HG_REPO in response.body)
24 31
25 32 def test_login_regular_ok(self):
26 33 response = self.app.post(url(controller='login', action='index'),
27 34 {'username':'test_regular',
28 35 'password':'test12'})
29 36
30 37 self.assertEqual(response.status, '302 Found')
31 38 self.assertEqual(response.session['rhodecode_user'].get('username') ,
32 39 'test_regular')
33 40 response = response.follow()
34 41 self.assertTrue('%s repository' % HG_REPO in response.body)
35 42 self.assertTrue('<a title="Admin" href="/_admin">' not in response.body)
36 43
37 44 def test_login_ok_came_from(self):
38 45 test_came_from = '/_admin/users'
39 46 response = self.app.post(url(controller='login', action='index',
40 47 came_from=test_came_from),
41 48 {'username':'test_admin',
42 49 'password':'test12'})
43 50 self.assertEqual(response.status, '302 Found')
44 51 response = response.follow()
45 52
46 53 self.assertEqual(response.status, '200 OK')
47 54 self.assertTrue('Users administration' in response.body)
48 55
49 56
50 57 def test_login_short_password(self):
51 58 response = self.app.post(url(controller='login', action='index'),
52 59 {'username':'test_admin',
53 60 'password':'as'})
54 61 self.assertEqual(response.status, '200 OK')
55 62
56 63 self.assertTrue('Enter 3 characters or more' in response.body)
57 64
58 65 def test_login_wrong_username_password(self):
59 66 response = self.app.post(url(controller='login', action='index'),
60 67 {'username':'error',
61 68 'password':'test12'})
62 69 self.assertEqual(response.status , '200 OK')
63 70
64 71 self.assertTrue('invalid user name' in response.body)
65 72 self.assertTrue('invalid password' in response.body)
66 73
67 74 #==========================================================================
68 75 # REGISTRATIONS
69 76 #==========================================================================
70 77 def test_register(self):
71 78 response = self.app.get(url(controller='login', action='register'))
72 79 self.assertTrue('Sign Up to RhodeCode' in response.body)
73 80
74 81 def test_register_err_same_username(self):
75 82 response = self.app.post(url(controller='login', action='register'),
76 83 {'username':'test_admin',
77 84 'password':'test12',
78 85 'password_confirmation':'test12',
79 86 'email':'goodmail@domain.com',
80 87 'name':'test',
81 88 'lastname':'test'})
82 89
83 90 self.assertEqual(response.status , '200 OK')
84 91 self.assertTrue('This username already exists' in response.body)
85 92
86 93 def test_register_err_same_email(self):
87 94 response = self.app.post(url(controller='login', action='register'),
88 95 {'username':'test_admin_0',
89 96 'password':'test12',
90 97 'password_confirmation':'test12',
91 98 'email':'test_admin@mail.com',
92 99 'name':'test',
93 100 'lastname':'test'})
94 101
95 102 self.assertEqual(response.status , '200 OK')
96 103 assert 'This e-mail address is already taken' in response.body
97 104
98 105 def test_register_err_same_email_case_sensitive(self):
99 106 response = self.app.post(url(controller='login', action='register'),
100 107 {'username':'test_admin_1',
101 108 'password':'test12',
102 109 'password_confirmation':'test12',
103 110 'email':'TesT_Admin@mail.COM',
104 111 'name':'test',
105 112 'lastname':'test'})
106 113 self.assertEqual(response.status , '200 OK')
107 114 assert 'This e-mail address is already taken' in response.body
108 115
109 116 def test_register_err_wrong_data(self):
110 117 response = self.app.post(url(controller='login', action='register'),
111 118 {'username':'xs',
112 119 'password':'test',
113 120 'password_confirmation':'test',
114 121 'email':'goodmailm',
115 122 'name':'test',
116 123 'lastname':'test'})
117 124 self.assertEqual(response.status , '200 OK')
118 125 assert 'An email address must contain a single @' in response.body
119 126 assert 'Enter a value 6 characters long or more' in response.body
120 127
121 128
122 129 def test_register_err_username(self):
123 130 response = self.app.post(url(controller='login', action='register'),
124 131 {'username':'error user',
125 132 'password':'test12',
126 133 'password_confirmation':'test12',
127 134 'email':'goodmailm',
128 135 'name':'test',
129 136 'lastname':'test'})
130 137
131 138 self.assertEqual(response.status , '200 OK')
132 139 assert 'An email address must contain a single @' in response.body
133 140 assert ('Username may only contain '
134 141 'alphanumeric characters underscores, '
135 142 'periods or dashes and must begin with '
136 143 'alphanumeric character') in response.body
137 144
138 145 def test_register_err_case_sensitive(self):
139 146 response = self.app.post(url(controller='login', action='register'),
140 147 {'username':'Test_Admin',
141 148 'password':'test12',
142 149 'password_confirmation':'test12',
143 150 'email':'goodmailm',
144 151 'name':'test',
145 152 'lastname':'test'})
146 153
147 154 self.assertEqual(response.status , '200 OK')
148 155 self.assertTrue('An email address must contain a single @' in response.body)
149 156 self.assertTrue('This username already exists' in response.body)
150 157
151 158
152 159
153 160 def test_register_special_chars(self):
154 161 response = self.app.post(url(controller='login', action='register'),
155 162 {'username':'xxxaxn',
156 163 'password':'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
157 164 'password_confirmation':'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
158 165 'email':'goodmailm@test.plx',
159 166 'name':'test',
160 167 'lastname':'test'})
161 168
162 169 self.assertEqual(response.status , '200 OK')
163 170 self.assertTrue('Invalid characters in password' in response.body)
164 171
165 172
166 173 def test_register_password_mismatch(self):
167 174 response = self.app.post(url(controller='login', action='register'),
168 175 {'username':'xs',
169 176 'password':'123qwe',
170 177 'password_confirmation':'qwe123',
171 178 'email':'goodmailm@test.plxa',
172 179 'name':'test',
173 180 'lastname':'test'})
174 181
175 182 self.assertEqual(response.status , '200 OK')
176 183 assert 'Passwords do not match' in response.body
177 184
178 185 def test_register_ok(self):
179 186 username = 'test_regular4'
180 187 password = 'qweqwe'
181 188 email = 'marcin@test.com'
182 189 name = 'testname'
183 190 lastname = 'testlastname'
184 191
185 192 response = self.app.post(url(controller='login', action='register'),
186 193 {'username':username,
187 194 'password':password,
188 195 'password_confirmation':password,
189 196 'email':email,
190 197 'name':name,
191 198 'lastname':lastname})
192 199 self.assertEqual(response.status , '302 Found')
193 200 assert 'You have successfully registered into rhodecode' in response.session['flash'][0], 'No flash message about user registration'
194 201
195 202 ret = self.Session().query(User).filter(User.username == 'test_regular4').one()
196 203 assert ret.username == username , 'field mismatch %s %s' % (ret.username, username)
197 204 assert check_password(password, ret.password) == True , 'password mismatch'
198 205 assert ret.email == email , 'field mismatch %s %s' % (ret.email, email)
199 206 assert ret.name == name , 'field mismatch %s %s' % (ret.name, name)
200 207 assert ret.lastname == lastname , 'field mismatch %s %s' % (ret.lastname, lastname)
201 208
202 209
203 210 def test_forgot_password_wrong_mail(self):
204 211 response = self.app.post(url(controller='login', action='password_reset'),
205 212 {'email':'marcin@wrongmail.org', })
206 213
207 214 assert "This e-mail address doesn't exist" in response.body, 'Missing error message about wrong email'
208 215
209 216 def test_forgot_password(self):
210 217 response = self.app.get(url(controller='login',
211 218 action='password_reset'))
212 219 self.assertEqual(response.status , '200 OK')
213 220
214 221 username = 'test_password_reset_1'
215 222 password = 'qweqwe'
216 223 email = 'marcin@python-works.com'
217 224 name = 'passwd'
218 225 lastname = 'reset'
219 226
220 227 new = User()
221 228 new.username = username
222 229 new.password = password
223 230 new.email = email
224 231 new.name = name
225 232 new.lastname = lastname
226 233 new.api_key = generate_api_key(username)
227 234 self.Session().add(new)
228 235 self.Session().commit()
229 236
230 237 response = self.app.post(url(controller='login',
231 238 action='password_reset'),
232 239 {'email':email, })
233 240
234 241 self.checkSessionFlash(response, 'Your password reset link was sent')
235 242
236 243 response = response.follow()
237 244
238 245 # BAD KEY
239 246
240 247 key = "bad"
241 248 response = self.app.get(url(controller='login',
242 249 action='password_reset_confirmation',
243 250 key=key))
244 251 self.assertEqual(response.status, '302 Found')
245 252 self.assertTrue(response.location.endswith(url('reset_password')))
246 253
247 254 # GOOD KEY
248 255
249 256 key = User.get_by_username(username).api_key
250 257 response = self.app.get(url(controller='login',
251 258 action='password_reset_confirmation',
252 259 key=key))
253 260 self.assertEqual(response.status, '302 Found')
254 261 self.assertTrue(response.location.endswith(url('login_home')))
255 262
256 263 self.checkSessionFlash(response,
257 264 ('Your password reset was successful, '
258 265 'new password has been sent to your email'))
259 266
260 267 response = response.follow()
General Comments 0
You need to be logged in to leave comments. Login now