##// END OF EJS Templates
my-account: security change, added select filed with email from extra emails while editing user profile, now adding extra emails required type password. Task #5386
Bartłomiej Wołyńczyk -
r2592:0e0508de default
parent child Browse files
Show More
@@ -1,203 +1,204 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 # -*- coding: utf-8 -*-
21 21
22 22 # Copyright (C) 2016-2018 RhodeCode GmbH
23 23 #
24 24 # This program is free software: you can redistribute it and/or modify
25 25 # it under the terms of the GNU Affero General Public License, version 3
26 26 # (only), as published by the Free Software Foundation.
27 27 #
28 28 # This program is distributed in the hope that it will be useful,
29 29 # but WITHOUT ANY WARRANTY; without even the implied warranty of
30 30 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 31 # GNU General Public License for more details.
32 32 #
33 33 # You should have received a copy of the GNU Affero General Public License
34 34 # along with this program. If not, see <http://www.gnu.org/licenses/>.
35 35 #
36 36 # This program is dual-licensed. If you wish to learn more about the
37 37 # RhodeCode Enterprise Edition, including its added features, Support services,
38 38 # and proprietary license terms, please see https://rhodecode.com/licenses/
39 39
40 40 import pytest
41 41
42 42 from rhodecode.model.db import User
43 43 from rhodecode.tests import TestController, assert_session_flash
44 44 from rhodecode.lib import helpers as h
45 45
46 46
47 47 def route_path(name, params=None, **kwargs):
48 48 import urllib
49 49 from rhodecode.apps._base import ADMIN_PREFIX
50 50
51 51 base_url = {
52 52 'my_account_edit': ADMIN_PREFIX + '/my_account/edit',
53 53 'my_account_update': ADMIN_PREFIX + '/my_account/update',
54 54 'my_account_pullrequests': ADMIN_PREFIX + '/my_account/pull_requests',
55 55 'my_account_pullrequests_data': ADMIN_PREFIX + '/my_account/pull_requests/data',
56 56 }[name].format(**kwargs)
57 57
58 58 if params:
59 59 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
60 60 return base_url
61 61
62 62
63 63 class TestMyAccountEdit(TestController):
64 64
65 65 def test_my_account_edit(self):
66 66 self.log_user()
67 67 response = self.app.get(route_path('my_account_edit'))
68 68
69 69 response.mustcontain('value="test_admin')
70 70
71 71 @pytest.mark.backends("git", "hg")
72 72 def test_my_account_my_pullrequests(self, pr_util):
73 73 self.log_user()
74 74 response = self.app.get(route_path('my_account_pullrequests'))
75 75 response.mustcontain('There are currently no open pull '
76 76 'requests requiring your participation.')
77 77
78 78 @pytest.mark.backends("git", "hg")
79 79 def test_my_account_my_pullrequests_data(self, pr_util, xhr_header):
80 80 self.log_user()
81 81 response = self.app.get(route_path('my_account_pullrequests_data'),
82 82 extra_environ=xhr_header)
83 83 assert response.json == {
84 84 u'data': [], u'draw': None,
85 85 u'recordsFiltered': 0, u'recordsTotal': 0}
86 86
87 87 pr = pr_util.create_pull_request(title='TestMyAccountPR')
88 88 expected = {
89 89 'author_raw': 'RhodeCode Admin',
90 90 'name_raw': pr.pull_request_id
91 91 }
92 92 response = self.app.get(route_path('my_account_pullrequests_data'),
93 93 extra_environ=xhr_header)
94 94 assert response.json['recordsTotal'] == 1
95 95 assert response.json['data'][0]['author_raw'] == expected['author_raw']
96 96
97 97 assert response.json['data'][0]['author_raw'] == expected['author_raw']
98 98 assert response.json['data'][0]['name_raw'] == expected['name_raw']
99 99
100 100 @pytest.mark.parametrize(
101 101 "name, attrs", [
102 102 ('firstname', {'firstname': 'new_username'}),
103 103 ('lastname', {'lastname': 'new_username'}),
104 104 ('admin', {'admin': True}),
105 105 ('admin', {'admin': False}),
106 106 ('extern_type', {'extern_type': 'ldap'}),
107 107 ('extern_type', {'extern_type': None}),
108 108 # ('extern_name', {'extern_name': 'test'}),
109 109 # ('extern_name', {'extern_name': None}),
110 110 ('active', {'active': False}),
111 111 ('active', {'active': True}),
112 ('email', {'email': 'some@email.com'}),
112 ('email', {'email': u'some@email.com'}),
113 113 ])
114 114 def test_my_account_update(self, name, attrs, user_util):
115 115 usr = user_util.create_user(password='qweqwe')
116 116 params = usr.get_api_data() # current user data
117 117 user_id = usr.user_id
118 118 self.log_user(
119 119 username=usr.username, password='qweqwe')
120 120
121 121 params.update({'password_confirmation': ''})
122 122 params.update({'new_password': ''})
123 params.update({'extern_type': 'rhodecode'})
124 params.update({'extern_name': 'rhodecode'})
123 params.update({'extern_type': u'rhodecode'})
124 params.update({'extern_name': u'rhodecode'})
125 125 params.update({'csrf_token': self.csrf_token})
126 126
127 127 params.update(attrs)
128 128 # my account page cannot set language param yet, only for admins
129 129 del params['language']
130 if name == 'email':
131 uem = user_util.create_additional_user_email(usr, attrs['email'])
132 email_before = User.get(user_id).email
133
130 134 response = self.app.post(route_path('my_account_update'), params)
131 135
132 136 assert_session_flash(
133 137 response, 'Your account was updated successfully')
134 138
135 139 del params['csrf_token']
136 140
137 141 updated_user = User.get(user_id)
138 142 updated_params = updated_user.get_api_data()
139 143 updated_params.update({'password_confirmation': ''})
140 144 updated_params.update({'new_password': ''})
141 145
142 146 params['last_login'] = updated_params['last_login']
143 147 params['last_activity'] = updated_params['last_activity']
144 148 # my account page cannot set language param yet, only for admins
145 149 # but we get this info from API anyway
146 150 params['language'] = updated_params['language']
147 151
148 152 if name == 'email':
149 params['emails'] = [attrs['email']]
153 params['emails'] = [attrs['email'], email_before]
150 154 if name == 'extern_type':
151 155 # cannot update this via form, expected value is original one
152 156 params['extern_type'] = "rhodecode"
153 157 if name == 'extern_name':
154 158 # cannot update this via form, expected value is original one
155 159 params['extern_name'] = str(user_id)
156 160 if name == 'active':
157 161 # my account cannot deactivate account
158 162 params['active'] = True
159 163 if name == 'admin':
160 164 # my account cannot make you an admin !
161 165 params['admin'] = False
162 166
163 167 assert params == updated_params
164 168
165 def test_my_account_update_err_email_exists(self):
169 def test_my_account_update_err_email_not_exists_in_emails(self):
166 170 self.log_user()
167 171
168 new_email = 'test_regular@mail.com' # already existing email
172 new_email = 'test_regular@mail.com' # not in emails
169 173 params = {
170 174 'username': 'test_admin',
171 175 'new_password': 'test12',
172 176 'password_confirmation': 'test122',
173 177 'firstname': 'NewName',
174 178 'lastname': 'NewLastname',
175 179 'email': new_email,
176 180 'csrf_token': self.csrf_token,
177 181 }
178 182
179 183 response = self.app.post(route_path('my_account_update'),
180 184 params=params)
181 185
182 response.mustcontain('This e-mail address is already taken')
186 response.mustcontain('"test_regular@mail.com" is not one of test_admin@mail.com')
183 187
184 188 def test_my_account_update_bad_email_address(self):
185 189 self.log_user('test_regular2', 'test12')
186 190
187 191 new_email = 'newmail.pl'
188 192 params = {
189 193 'username': 'test_admin',
190 194 'new_password': 'test12',
191 195 'password_confirmation': 'test122',
192 196 'firstname': 'NewName',
193 197 'lastname': 'NewLastname',
194 198 'email': new_email,
195 199 'csrf_token': self.csrf_token,
196 200 }
197 201 response = self.app.post(route_path('my_account_update'),
198 202 params=params)
199 203
200 response.mustcontain('An email address must contain a single @')
201 msg = u'Username "%(username)s" already exists'
202 msg = h.html_escape(msg % {'username': 'test_admin'})
203 response.mustcontain(u"%s" % msg)
204 response.mustcontain('"newmail.pl" is not one of test_regular2@mail.com')
@@ -1,93 +1,77 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.apps._base import ADMIN_PREFIX
24 24 from rhodecode.model.db import User, UserEmailMap
25 25 from rhodecode.tests import (
26 26 TestController, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL,
27 assert_session_flash)
27 assert_session_flash, TEST_USER_REGULAR_PASS)
28 28 from rhodecode.tests.fixture import Fixture
29 29
30 30 fixture = Fixture()
31 31
32 32
33 33 def route_path(name, **kwargs):
34 34 return {
35 35 'my_account_emails':
36 36 ADMIN_PREFIX + '/my_account/emails',
37 37 'my_account_emails_add':
38 38 ADMIN_PREFIX + '/my_account/emails/new',
39 39 'my_account_emails_delete':
40 40 ADMIN_PREFIX + '/my_account/emails/delete',
41 41 }[name].format(**kwargs)
42 42
43 43
44 44 class TestMyAccountEmails(TestController):
45 45 def test_my_account_my_emails(self):
46 46 self.log_user()
47 47 response = self.app.get(route_path('my_account_emails'))
48 48 response.mustcontain('No additional emails specified')
49 49
50 def test_my_account_my_emails_add_existing_email(self):
51 self.log_user()
52 response = self.app.get(route_path('my_account_emails'))
53 response.mustcontain('No additional emails specified')
54 response = self.app.post(route_path('my_account_emails_add'),
55 {'new_email': TEST_USER_REGULAR_EMAIL,
56 'csrf_token': self.csrf_token})
57 assert_session_flash(response, 'This e-mail address is already taken')
58
59 def test_my_account_my_emails_add_mising_email_in_form(self):
60 self.log_user()
61 response = self.app.get(route_path('my_account_emails'))
62 response.mustcontain('No additional emails specified')
63 response = self.app.post(route_path('my_account_emails_add'),
64 {'csrf_token': self.csrf_token})
65 assert_session_flash(response, 'Please enter an email address')
66
67 50 def test_my_account_my_emails_add_remove(self):
68 51 self.log_user()
69 52 response = self.app.get(route_path('my_account_emails'))
70 53 response.mustcontain('No additional emails specified')
71 54
72 55 response = self.app.post(route_path('my_account_emails_add'),
73 {'new_email': 'foo@barz.com',
56 {'email': 'foo@barz.com',
57 'current_password': TEST_USER_REGULAR_PASS,
74 58 'csrf_token': self.csrf_token})
75 59
76 60 response = self.app.get(route_path('my_account_emails'))
77 61
78 62 email_id = UserEmailMap.query().filter(
79 63 UserEmailMap.user == User.get_by_username(
80 64 TEST_USER_ADMIN_LOGIN)).filter(
81 65 UserEmailMap.email == 'foo@barz.com').one().email_id
82 66
83 67 response.mustcontain('foo@barz.com')
84 68 response.mustcontain('<input id="del_email_id" name="del_email_id" '
85 69 'type="hidden" value="%s" />' % email_id)
86 70
87 71 response = self.app.post(
88 72 route_path('my_account_emails_delete'), {
89 73 'del_email_id': email_id,
90 74 'csrf_token': self.csrf_token})
91 75 assert_session_flash(response, 'Email successfully deleted')
92 76 response = self.app.get(route_path('my_account_emails'))
93 77 response.mustcontain('No additional emails specified')
@@ -1,599 +1,604 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22 import datetime
23 23
24 24 import formencode
25 25 import formencode.htmlfill
26 26 from pyramid.httpexceptions import HTTPFound
27 27 from pyramid.view import view_config
28 28 from pyramid.renderers import render
29 29 from pyramid.response import Response
30 30
31 31 from rhodecode.apps._base import BaseAppView, DataGridAppView
32 32 from rhodecode import forms
33 33 from rhodecode.lib import helpers as h
34 34 from rhodecode.lib import audit_logger
35 35 from rhodecode.lib.ext_json import json
36 36 from rhodecode.lib.auth import LoginRequired, NotAnonymous, CSRFRequired
37 37 from rhodecode.lib.channelstream import (
38 38 channelstream_request, ChannelstreamException)
39 39 from rhodecode.lib.utils2 import safe_int, md5, str2bool
40 40 from rhodecode.model.auth_token import AuthTokenModel
41 41 from rhodecode.model.comment import CommentsModel
42 42 from rhodecode.model.db import (
43 43 Repository, UserEmailMap, UserApiKeys, UserFollowing, joinedload,
44 44 PullRequest)
45 45 from rhodecode.model.forms import UserForm, UserExtraEmailForm
46 46 from rhodecode.model.meta import Session
47 47 from rhodecode.model.pull_request import PullRequestModel
48 48 from rhodecode.model.scm import RepoList
49 49 from rhodecode.model.user import UserModel
50 50 from rhodecode.model.repo import RepoModel
51 51 from rhodecode.model.user_group import UserGroupModel
52 52 from rhodecode.model.validation_schema.schemas import user_schema
53 53
54 54 log = logging.getLogger(__name__)
55 55
56 56
57 57 class MyAccountView(BaseAppView, DataGridAppView):
58 58 ALLOW_SCOPED_TOKENS = False
59 59 """
60 60 This view has alternative version inside EE, if modified please take a look
61 61 in there as well.
62 62 """
63 63
64 64 def load_default_context(self):
65 65 c = self._get_local_tmpl_context()
66 66 c.user = c.auth_user.get_instance()
67 67 c.allow_scoped_tokens = self.ALLOW_SCOPED_TOKENS
68 68
69 69 return c
70 70
71 71 @LoginRequired()
72 72 @NotAnonymous()
73 73 @view_config(
74 74 route_name='my_account_profile', request_method='GET',
75 75 renderer='rhodecode:templates/admin/my_account/my_account.mako')
76 76 def my_account_profile(self):
77 77 c = self.load_default_context()
78 78 c.active = 'profile'
79 79 return self._get_template_context(c)
80 80
81 81 @LoginRequired()
82 82 @NotAnonymous()
83 83 @view_config(
84 84 route_name='my_account_password', request_method='GET',
85 85 renderer='rhodecode:templates/admin/my_account/my_account.mako')
86 86 def my_account_password(self):
87 87 c = self.load_default_context()
88 88 c.active = 'password'
89 89 c.extern_type = c.user.extern_type
90 90
91 91 schema = user_schema.ChangePasswordSchema().bind(
92 92 username=c.user.username)
93 93
94 94 form = forms.Form(
95 95 schema,
96 96 action=h.route_path('my_account_password_update'),
97 97 buttons=(forms.buttons.save, forms.buttons.reset))
98 98
99 99 c.form = form
100 100 return self._get_template_context(c)
101 101
102 102 @LoginRequired()
103 103 @NotAnonymous()
104 104 @CSRFRequired()
105 105 @view_config(
106 106 route_name='my_account_password_update', request_method='POST',
107 107 renderer='rhodecode:templates/admin/my_account/my_account.mako')
108 108 def my_account_password_update(self):
109 109 _ = self.request.translate
110 110 c = self.load_default_context()
111 111 c.active = 'password'
112 112 c.extern_type = c.user.extern_type
113 113
114 114 schema = user_schema.ChangePasswordSchema().bind(
115 115 username=c.user.username)
116 116
117 117 form = forms.Form(
118 118 schema, buttons=(forms.buttons.save, forms.buttons.reset))
119 119
120 120 if c.extern_type != 'rhodecode':
121 121 raise HTTPFound(self.request.route_path('my_account_password'))
122 122
123 123 controls = self.request.POST.items()
124 124 try:
125 125 valid_data = form.validate(controls)
126 126 UserModel().update_user(c.user.user_id, **valid_data)
127 127 c.user.update_userdata(force_password_change=False)
128 128 Session().commit()
129 129 except forms.ValidationFailure as e:
130 130 c.form = e
131 131 return self._get_template_context(c)
132 132
133 133 except Exception:
134 134 log.exception("Exception updating password")
135 135 h.flash(_('Error occurred during update of user password'),
136 136 category='error')
137 137 else:
138 138 instance = c.auth_user.get_instance()
139 139 self.session.setdefault('rhodecode_user', {}).update(
140 140 {'password': md5(instance.password)})
141 141 self.session.save()
142 142 h.flash(_("Successfully updated password"), category='success')
143 143
144 144 raise HTTPFound(self.request.route_path('my_account_password'))
145 145
146 146 @LoginRequired()
147 147 @NotAnonymous()
148 148 @view_config(
149 149 route_name='my_account_auth_tokens', request_method='GET',
150 150 renderer='rhodecode:templates/admin/my_account/my_account.mako')
151 151 def my_account_auth_tokens(self):
152 152 _ = self.request.translate
153 153
154 154 c = self.load_default_context()
155 155 c.active = 'auth_tokens'
156 156 c.lifetime_values = AuthTokenModel.get_lifetime_values(translator=_)
157 157 c.role_values = [
158 158 (x, AuthTokenModel.cls._get_role_name(x))
159 159 for x in AuthTokenModel.cls.ROLES]
160 160 c.role_options = [(c.role_values, _("Role"))]
161 161 c.user_auth_tokens = AuthTokenModel().get_auth_tokens(
162 162 c.user.user_id, show_expired=True)
163 163 c.role_vcs = AuthTokenModel.cls.ROLE_VCS
164 164 return self._get_template_context(c)
165 165
166 166 def maybe_attach_token_scope(self, token):
167 167 # implemented in EE edition
168 168 pass
169 169
170 170 @LoginRequired()
171 171 @NotAnonymous()
172 172 @CSRFRequired()
173 173 @view_config(
174 174 route_name='my_account_auth_tokens_add', request_method='POST',)
175 175 def my_account_auth_tokens_add(self):
176 176 _ = self.request.translate
177 177 c = self.load_default_context()
178 178
179 179 lifetime = safe_int(self.request.POST.get('lifetime'), -1)
180 180 description = self.request.POST.get('description')
181 181 role = self.request.POST.get('role')
182 182
183 183 token = AuthTokenModel().create(
184 184 c.user.user_id, description, lifetime, role)
185 185 token_data = token.get_api_data()
186 186
187 187 self.maybe_attach_token_scope(token)
188 188 audit_logger.store_web(
189 189 'user.edit.token.add', action_data={
190 190 'data': {'token': token_data, 'user': 'self'}},
191 191 user=self._rhodecode_user, )
192 192 Session().commit()
193 193
194 194 h.flash(_("Auth token successfully created"), category='success')
195 195 return HTTPFound(h.route_path('my_account_auth_tokens'))
196 196
197 197 @LoginRequired()
198 198 @NotAnonymous()
199 199 @CSRFRequired()
200 200 @view_config(
201 201 route_name='my_account_auth_tokens_delete', request_method='POST')
202 202 def my_account_auth_tokens_delete(self):
203 203 _ = self.request.translate
204 204 c = self.load_default_context()
205 205
206 206 del_auth_token = self.request.POST.get('del_auth_token')
207 207
208 208 if del_auth_token:
209 209 token = UserApiKeys.get_or_404(del_auth_token)
210 210 token_data = token.get_api_data()
211 211
212 212 AuthTokenModel().delete(del_auth_token, c.user.user_id)
213 213 audit_logger.store_web(
214 214 'user.edit.token.delete', action_data={
215 215 'data': {'token': token_data, 'user': 'self'}},
216 216 user=self._rhodecode_user,)
217 217 Session().commit()
218 218 h.flash(_("Auth token successfully deleted"), category='success')
219 219
220 220 return HTTPFound(h.route_path('my_account_auth_tokens'))
221 221
222 222 @LoginRequired()
223 223 @NotAnonymous()
224 224 @view_config(
225 225 route_name='my_account_emails', request_method='GET',
226 226 renderer='rhodecode:templates/admin/my_account/my_account.mako')
227 227 def my_account_emails(self):
228 228 _ = self.request.translate
229 229
230 230 c = self.load_default_context()
231 231 c.active = 'emails'
232 232
233 233 c.user_email_map = UserEmailMap.query()\
234 234 .filter(UserEmailMap.user == c.user).all()
235
236 schema = user_schema.AddEmailSchema().bind(
237 username=c.user.username, user_emails=c.user.emails)
238
239 form = forms.RcForm(schema,
240 action=h.route_path('my_account_emails_add'),
241 buttons=(forms.buttons.save, forms.buttons.reset))
242
243 c.form = form
235 244 return self._get_template_context(c)
236 245
237 246 @LoginRequired()
238 247 @NotAnonymous()
239 248 @CSRFRequired()
240 249 @view_config(
241 route_name='my_account_emails_add', request_method='POST')
250 route_name='my_account_emails_add', request_method='POST',
251 renderer='rhodecode:templates/admin/my_account/my_account.mako')
242 252 def my_account_emails_add(self):
243 253 _ = self.request.translate
244 254 c = self.load_default_context()
255 c.active = 'emails'
245 256
246 email = self.request.POST.get('new_email')
257 schema = user_schema.AddEmailSchema().bind(
258 username=c.user.username, user_emails=c.user.emails)
247 259
260 form = forms.RcForm(
261 schema, action=h.route_path('my_account_emails_add'),
262 buttons=(forms.buttons.save, forms.buttons.reset))
263
264 controls = self.request.POST.items()
248 265 try:
249 form = UserExtraEmailForm(self.request.translate)()
250 data = form.to_python({'email': email})
251 email = data['email']
252
253 UserModel().add_extra_email(c.user.user_id, email)
266 valid_data = form.validate(controls)
267 UserModel().add_extra_email(c.user.user_id, valid_data['email'])
254 268 audit_logger.store_web(
255 269 'user.edit.email.add', action_data={
256 'data': {'email': email, 'user': 'self'}},
270 'data': {'email': valid_data['email'], 'user': 'self'}},
257 271 user=self._rhodecode_user,)
258
259 272 Session().commit()
260 h.flash(_("Added new email address `%s` for user account") % email,
261 category='success')
262 273 except formencode.Invalid as error:
263 274 h.flash(h.escape(error.error_dict['email']), category='error')
275 except forms.ValidationFailure as e:
276 c.user_email_map = UserEmailMap.query() \
277 .filter(UserEmailMap.user == c.user).all()
278 c.form = e
279 return self._get_template_context(c)
264 280 except Exception:
265 log.exception("Exception in my_account_emails")
266 h.flash(_('An error occurred during email saving'),
281 log.exception("Exception adding email")
282 h.flash(_('Error occurred during adding email'),
267 283 category='error')
268 return HTTPFound(h.route_path('my_account_emails'))
284 else:
285 h.flash(_("Successfully added email"), category='success')
286
287 raise HTTPFound(self.request.route_path('my_account_emails'))
269 288
270 289 @LoginRequired()
271 290 @NotAnonymous()
272 291 @CSRFRequired()
273 292 @view_config(
274 293 route_name='my_account_emails_delete', request_method='POST')
275 294 def my_account_emails_delete(self):
276 295 _ = self.request.translate
277 296 c = self.load_default_context()
278 297
279 298 del_email_id = self.request.POST.get('del_email_id')
280 299 if del_email_id:
281 300 email = UserEmailMap.get_or_404(del_email_id).email
282 301 UserModel().delete_extra_email(c.user.user_id, del_email_id)
283 302 audit_logger.store_web(
284 303 'user.edit.email.delete', action_data={
285 304 'data': {'email': email, 'user': 'self'}},
286 305 user=self._rhodecode_user,)
287 306 Session().commit()
288 307 h.flash(_("Email successfully deleted"),
289 308 category='success')
290 309 return HTTPFound(h.route_path('my_account_emails'))
291 310
292 311 @LoginRequired()
293 312 @NotAnonymous()
294 313 @CSRFRequired()
295 314 @view_config(
296 315 route_name='my_account_notifications_test_channelstream',
297 316 request_method='POST', renderer='json_ext')
298 317 def my_account_notifications_test_channelstream(self):
299 318 message = 'Test message sent via Channelstream by user: {}, on {}'.format(
300 319 self._rhodecode_user.username, datetime.datetime.now())
301 320 payload = {
302 321 # 'channel': 'broadcast',
303 322 'type': 'message',
304 323 'timestamp': datetime.datetime.utcnow(),
305 324 'user': 'system',
306 325 'pm_users': [self._rhodecode_user.username],
307 326 'message': {
308 327 'message': message,
309 328 'level': 'info',
310 329 'topic': '/notifications'
311 330 }
312 331 }
313 332
314 333 registry = self.request.registry
315 334 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
316 335 channelstream_config = rhodecode_plugins.get('channelstream', {})
317 336
318 337 try:
319 338 channelstream_request(channelstream_config, [payload], '/message')
320 339 except ChannelstreamException as e:
321 340 log.exception('Failed to send channelstream data')
322 341 return {"response": 'ERROR: {}'.format(e.__class__.__name__)}
323 342 return {"response": 'Channelstream data sent. '
324 343 'You should see a new live message now.'}
325 344
326 345 def _load_my_repos_data(self, watched=False):
327 346 if watched:
328 347 admin = False
329 348 follows_repos = Session().query(UserFollowing)\
330 349 .filter(UserFollowing.user_id == self._rhodecode_user.user_id)\
331 350 .options(joinedload(UserFollowing.follows_repository))\
332 351 .all()
333 352 repo_list = [x.follows_repository for x in follows_repos]
334 353 else:
335 354 admin = True
336 355 repo_list = Repository.get_all_repos(
337 356 user_id=self._rhodecode_user.user_id)
338 357 repo_list = RepoList(repo_list, perm_set=[
339 358 'repository.read', 'repository.write', 'repository.admin'])
340 359
341 360 repos_data = RepoModel().get_repos_as_dict(
342 361 repo_list=repo_list, admin=admin)
343 362 # json used to render the grid
344 363 return json.dumps(repos_data)
345 364
346 365 @LoginRequired()
347 366 @NotAnonymous()
348 367 @view_config(
349 368 route_name='my_account_repos', request_method='GET',
350 369 renderer='rhodecode:templates/admin/my_account/my_account.mako')
351 370 def my_account_repos(self):
352 371 c = self.load_default_context()
353 372 c.active = 'repos'
354 373
355 374 # json used to render the grid
356 375 c.data = self._load_my_repos_data()
357 376 return self._get_template_context(c)
358 377
359 378 @LoginRequired()
360 379 @NotAnonymous()
361 380 @view_config(
362 381 route_name='my_account_watched', request_method='GET',
363 382 renderer='rhodecode:templates/admin/my_account/my_account.mako')
364 383 def my_account_watched(self):
365 384 c = self.load_default_context()
366 385 c.active = 'watched'
367 386
368 387 # json used to render the grid
369 388 c.data = self._load_my_repos_data(watched=True)
370 389 return self._get_template_context(c)
371 390
372 391 @LoginRequired()
373 392 @NotAnonymous()
374 393 @view_config(
375 394 route_name='my_account_perms', request_method='GET',
376 395 renderer='rhodecode:templates/admin/my_account/my_account.mako')
377 396 def my_account_perms(self):
378 397 c = self.load_default_context()
379 398 c.active = 'perms'
380 399
381 400 c.perm_user = c.auth_user
382 401 return self._get_template_context(c)
383 402
384 403 @LoginRequired()
385 404 @NotAnonymous()
386 405 @view_config(
387 406 route_name='my_account_notifications', request_method='GET',
388 407 renderer='rhodecode:templates/admin/my_account/my_account.mako')
389 408 def my_notifications(self):
390 409 c = self.load_default_context()
391 410 c.active = 'notifications'
392 411
393 412 return self._get_template_context(c)
394 413
395 414 @LoginRequired()
396 415 @NotAnonymous()
397 416 @CSRFRequired()
398 417 @view_config(
399 418 route_name='my_account_notifications_toggle_visibility',
400 419 request_method='POST', renderer='json_ext')
401 420 def my_notifications_toggle_visibility(self):
402 421 user = self._rhodecode_db_user
403 422 new_status = not user.user_data.get('notification_status', True)
404 423 user.update_userdata(notification_status=new_status)
405 424 Session().commit()
406 425 return user.user_data['notification_status']
407 426
408 427 @LoginRequired()
409 428 @NotAnonymous()
410 429 @view_config(
411 430 route_name='my_account_edit',
412 431 request_method='GET',
413 432 renderer='rhodecode:templates/admin/my_account/my_account.mako')
414 433 def my_account_edit(self):
415 434 c = self.load_default_context()
416 435 c.active = 'profile_edit'
417
418 c.perm_user = c.auth_user
419 436 c.extern_type = c.user.extern_type
420 437 c.extern_name = c.user.extern_name
421 438
422 defaults = c.user.get_dict()
439 schema = user_schema.UserProfileSchema().bind(
440 username=c.user.username, user_emails=c.user.emails)
441 appstruct = {
442 'username': c.user.username,
443 'email': c.user.email,
444 'firstname': c.user.firstname,
445 'lastname': c.user.lastname,
446 }
447 c.form = forms.RcForm(
448 schema, appstruct=appstruct,
449 action=h.route_path('my_account_update'),
450 buttons=(forms.buttons.save, forms.buttons.reset))
423 451
424 data = render('rhodecode:templates/admin/my_account/my_account.mako',
425 self._get_template_context(c), self.request)
426 html = formencode.htmlfill.render(
427 data,
428 defaults=defaults,
429 encoding="UTF-8",
430 force_defaults=False
431 )
432 return Response(html)
452 return self._get_template_context(c)
433 453
434 454 @LoginRequired()
435 455 @NotAnonymous()
436 456 @CSRFRequired()
437 457 @view_config(
438 458 route_name='my_account_update',
439 459 request_method='POST',
440 460 renderer='rhodecode:templates/admin/my_account/my_account.mako')
441 461 def my_account_update(self):
442 462 _ = self.request.translate
443 463 c = self.load_default_context()
444 464 c.active = 'profile_edit'
445
446 465 c.perm_user = c.auth_user
447 466 c.extern_type = c.user.extern_type
448 467 c.extern_name = c.user.extern_name
449 468
450 _form = UserForm(self.request.translate, edit=True,
451 old_data={'user_id': self._rhodecode_user.user_id,
452 'email': self._rhodecode_user.email})()
453 form_result = {}
469 schema = user_schema.UserProfileSchema().bind(
470 username=c.user.username, user_emails=c.user.emails)
471 form = forms.RcForm(
472 schema, buttons=(forms.buttons.save, forms.buttons.reset))
473
474 controls = self.request.POST.items()
454 475 try:
455 post_data = dict(self.request.POST)
456 post_data['new_password'] = ''
457 post_data['password_confirmation'] = ''
458 form_result = _form.to_python(post_data)
459 # skip updating those attrs for my account
476 valid_data = form.validate(controls)
460 477 skip_attrs = ['admin', 'active', 'extern_type', 'extern_name',
461 478 'new_password', 'password_confirmation']
462 # TODO: plugin should define if username can be updated
463 479 if c.extern_type != "rhodecode":
464 480 # forbid updating username for external accounts
465 481 skip_attrs.append('username')
466
482 old_email = c.user.email
467 483 UserModel().update_user(
468 self._rhodecode_user.user_id, skip_attrs=skip_attrs,
469 **form_result)
470 h.flash(_('Your account was updated successfully'),
471 category='success')
484 self._rhodecode_user.user_id, skip_attrs=skip_attrs,
485 **valid_data)
486 if old_email != valid_data['email']:
487 old = UserEmailMap.query() \
488 .filter(UserEmailMap.user == c.user).filter(UserEmailMap.email == valid_data['email']).first()
489 old.email = old_email
490 h.flash(_('Your account was updated successfully'), category='success')
472 491 Session().commit()
473
474 except formencode.Invalid as errors:
475 data = render(
476 'rhodecode:templates/admin/my_account/my_account.mako',
477 self._get_template_context(c), self.request)
478
479 html = formencode.htmlfill.render(
480 data,
481 defaults=errors.value,
482 errors=errors.error_dict or {},
483 prefix_error=False,
484 encoding="UTF-8",
485 force_defaults=False)
486 return Response(html)
487
492 except forms.ValidationFailure as e:
493 c.form = e
494 return self._get_template_context(c)
488 495 except Exception:
489 496 log.exception("Exception updating user")
490 h.flash(_('Error occurred during update of user %s')
491 % form_result.get('username'), category='error')
492 raise HTTPFound(h.route_path('my_account_profile'))
493
497 h.flash(_('Error occurred during update of user'),
498 category='error')
494 499 raise HTTPFound(h.route_path('my_account_profile'))
495 500
496 501 def _get_pull_requests_list(self, statuses):
497 502 draw, start, limit = self._extract_chunk(self.request)
498 503 search_q, order_by, order_dir = self._extract_ordering(self.request)
499 504 _render = self.request.get_partial_renderer(
500 505 'rhodecode:templates/data_table/_dt_elements.mako')
501 506
502 507 pull_requests = PullRequestModel().get_im_participating_in(
503 508 user_id=self._rhodecode_user.user_id,
504 509 statuses=statuses,
505 510 offset=start, length=limit, order_by=order_by,
506 511 order_dir=order_dir)
507 512
508 513 pull_requests_total_count = PullRequestModel().count_im_participating_in(
509 514 user_id=self._rhodecode_user.user_id, statuses=statuses)
510 515
511 516 data = []
512 517 comments_model = CommentsModel()
513 518 for pr in pull_requests:
514 519 repo_id = pr.target_repo_id
515 520 comments = comments_model.get_all_comments(
516 521 repo_id, pull_request=pr)
517 522 owned = pr.user_id == self._rhodecode_user.user_id
518 523
519 524 data.append({
520 525 'target_repo': _render('pullrequest_target_repo',
521 526 pr.target_repo.repo_name),
522 527 'name': _render('pullrequest_name',
523 528 pr.pull_request_id, pr.target_repo.repo_name,
524 529 short=True),
525 530 'name_raw': pr.pull_request_id,
526 531 'status': _render('pullrequest_status',
527 532 pr.calculated_review_status()),
528 533 'title': _render(
529 534 'pullrequest_title', pr.title, pr.description),
530 535 'description': h.escape(pr.description),
531 536 'updated_on': _render('pullrequest_updated_on',
532 537 h.datetime_to_time(pr.updated_on)),
533 538 'updated_on_raw': h.datetime_to_time(pr.updated_on),
534 539 'created_on': _render('pullrequest_updated_on',
535 540 h.datetime_to_time(pr.created_on)),
536 541 'created_on_raw': h.datetime_to_time(pr.created_on),
537 542 'author': _render('pullrequest_author',
538 543 pr.author.full_contact, ),
539 544 'author_raw': pr.author.full_name,
540 545 'comments': _render('pullrequest_comments', len(comments)),
541 546 'comments_raw': len(comments),
542 547 'closed': pr.is_closed(),
543 548 'owned': owned
544 549 })
545 550
546 551 # json used to render the grid
547 552 data = ({
548 553 'draw': draw,
549 554 'data': data,
550 555 'recordsTotal': pull_requests_total_count,
551 556 'recordsFiltered': pull_requests_total_count,
552 557 })
553 558 return data
554 559
555 560 @LoginRequired()
556 561 @NotAnonymous()
557 562 @view_config(
558 563 route_name='my_account_pullrequests',
559 564 request_method='GET',
560 565 renderer='rhodecode:templates/admin/my_account/my_account.mako')
561 566 def my_account_pullrequests(self):
562 567 c = self.load_default_context()
563 568 c.active = 'pullrequests'
564 569 req_get = self.request.GET
565 570
566 571 c.closed = str2bool(req_get.get('pr_show_closed'))
567 572
568 573 return self._get_template_context(c)
569 574
570 575 @LoginRequired()
571 576 @NotAnonymous()
572 577 @view_config(
573 578 route_name='my_account_pullrequests_data',
574 579 request_method='GET', renderer='json_ext')
575 580 def my_account_pullrequests_data(self):
576 581 self.load_default_context()
577 582 req_get = self.request.GET
578 583 closed = str2bool(req_get.get('closed'))
579 584
580 585 statuses = [PullRequest.STATUS_NEW, PullRequest.STATUS_OPEN]
581 586 if closed:
582 587 statuses += [PullRequest.STATUS_CLOSED]
583 588
584 589 data = self._get_pull_requests_list(statuses=statuses)
585 590 return data
586 591
587 592 @LoginRequired()
588 593 @NotAnonymous()
589 594 @view_config(
590 595 route_name='my_account_user_group_membership',
591 596 request_method='GET',
592 597 renderer='rhodecode:templates/admin/my_account/my_account.mako')
593 598 def my_account_user_group_membership(self):
594 599 c = self.load_default_context()
595 600 c.active = 'user_group_membership'
596 601 groups = [UserGroupModel.get_user_groups_as_dict(group.users_group)
597 602 for group in self._rhodecode_db_user.group_member]
598 603 c.user_groups = json.dumps(groups)
599 604 return self._get_template_context(c)
@@ -1,125 +1,188 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import re
22 22 import colander
23 23
24 24 from rhodecode import forms
25 from rhodecode.model.db import User
25 from rhodecode.model.db import User, UserEmailMap
26 26 from rhodecode.model.validation_schema import types, validators
27 27 from rhodecode.translation import _
28 28 from rhodecode.lib.auth import check_password
29 from rhodecode.lib import helpers as h
29 30
30 31
31 32 @colander.deferred
32 33 def deferred_user_password_validator(node, kw):
33 34 username = kw.get('username')
34 35 user = User.get_by_username(username)
35 36
36 37 def _user_password_validator(node, value):
37 38 if not check_password(value, user.password):
38 39 msg = _('Password is incorrect')
39 40 raise colander.Invalid(node, msg)
40 41 return _user_password_validator
41 42
42 43
44
43 45 class ChangePasswordSchema(colander.Schema):
44 46
45 47 current_password = colander.SchemaNode(
46 48 colander.String(),
47 49 missing=colander.required,
48 50 widget=forms.widget.PasswordWidget(redisplay=True),
49 51 validator=deferred_user_password_validator)
50 52
51 53 new_password = colander.SchemaNode(
52 54 colander.String(),
53 55 missing=colander.required,
54 56 widget=forms.widget.CheckedPasswordWidget(redisplay=True),
55 57 validator=colander.Length(min=6))
56 58
57 59 def validator(self, form, values):
58 60 if values['current_password'] == values['new_password']:
59 61 exc = colander.Invalid(form)
60 62 exc['new_password'] = _('New password must be different '
61 63 'to old password')
62 64 raise exc
63 65
64 66
65 67 @colander.deferred
66 68 def deferred_username_validator(node, kw):
67 69
68 70 def name_validator(node, value):
69 71 msg = _(
70 72 u'Username may only contain alphanumeric characters '
71 73 u'underscores, periods or dashes and must begin with '
72 74 u'alphanumeric character or underscore')
73 75
74 76 if not re.match(r'^[\w]{1}[\w\-\.]{0,254}$', value):
75 77 raise colander.Invalid(node, msg)
76 78
77 79 return name_validator
78 80
79 81
80 82 @colander.deferred
81 83 def deferred_email_validator(node, kw):
82 84 # NOTE(marcink): we might provide uniqueness validation later here...
83 85 return colander.Email()
84 86
85 87
86 88 class UserSchema(colander.Schema):
87 89 username = colander.SchemaNode(
88 90 colander.String(),
89 91 validator=deferred_username_validator)
90 92
91 93 email = colander.SchemaNode(
92 94 colander.String(),
93 95 validator=deferred_email_validator)
94 96
95 97 password = colander.SchemaNode(
96 98 colander.String(), missing='')
97 99
98 100 first_name = colander.SchemaNode(
99 101 colander.String(), missing='')
100 102
101 103 last_name = colander.SchemaNode(
102 104 colander.String(), missing='')
103 105
104 106 active = colander.SchemaNode(
105 107 types.StringBooleanType(),
106 108 missing=False)
107 109
108 110 admin = colander.SchemaNode(
109 111 types.StringBooleanType(),
110 112 missing=False)
111 113
112 114 extern_name = colander.SchemaNode(
113 115 colander.String(), missing='')
114 116
115 117 extern_type = colander.SchemaNode(
116 118 colander.String(), missing='')
117 119
118 120 def deserialize(self, cstruct):
119 121 """
120 122 Custom deserialize that allows to chain validation, and verify
121 123 permissions, and as last step uniqueness
122 124 """
123 125
124 126 appstruct = super(UserSchema, self).deserialize(cstruct)
125 127 return appstruct
128
129
130 @colander.deferred
131 def deferred_user_email_in_emails_validator(node, kw):
132 return colander.OneOf(kw.get('user_emails'))
133
134
135 @colander.deferred
136 def deferred_additional_email_validator(node, kw):
137 emails = kw.get('user_emails')
138
139 def name_validator(node, value):
140 if value in emails:
141 msg = _('This e-mail address is already taken')
142 raise colander.Invalid(node, msg)
143 user = User.get_by_email(value, case_insensitive=True)
144 if user:
145 msg = _(u'This e-mail address is already taken')
146 raise colander.Invalid(node, msg)
147 c = colander.Email()
148 return c(node, value)
149 return name_validator
150
151
152 @colander.deferred
153 def deferred_user_email_in_emails_widget(node, kw):
154 import deform.widget
155 emails = [(email, email) for email in kw.get('user_emails')]
156 return deform.widget.Select2Widget(values=emails)
157
158
159 class UserProfileSchema(colander.Schema):
160 username = colander.SchemaNode(
161 colander.String(),
162 validator=deferred_username_validator)
163
164 firstname = colander.SchemaNode(
165 colander.String(), missing='', title='First name')
166
167 lastname = colander.SchemaNode(
168 colander.String(), missing='', title='Last name')
169
170 email = colander.SchemaNode(
171 colander.String(), widget=deferred_user_email_in_emails_widget,
172 validator=deferred_user_email_in_emails_validator,
173 description=h.literal(
174 _('Additional emails can be specified at <a href="{}">extra emails</a> page.').format(
175 '/_admin/my_account/emails')),
176 )
177
178
179 class AddEmailSchema(colander.Schema):
180 current_password = colander.SchemaNode(
181 colander.String(),
182 missing=colander.required,
183 widget=forms.widget.PasswordWidget(redisplay=True),
184 validator=deferred_user_password_validator)
185
186 email = colander.SchemaNode(
187 colander.String(), title='New Email',
188 validator=deferred_additional_email_validator)
@@ -1,72 +1,54 b''
1 1 <%namespace name="base" file="/base/base.mako"/>
2 2
3 3 <div class="panel panel-default">
4 4 <div class="panel-heading">
5 5 <h3 class="panel-title">${_('Account Emails')}</h3>
6 6 </div>
7 7
8 8 <div class="panel-body">
9 9 <div class="emails_wrap">
10 10 <table class="rctable account_emails">
11 11 <tr>
12 12 <td class="td-user">
13 13 ${base.gravatar(c.user.email, 16)}
14 14 <span class="user email">${c.user.email}</span>
15 15 </td>
16 16 <td class="td-tags">
17 17 <span class="tag tag1">${_('Primary')}</span>
18 18 </td>
19 19 </tr>
20 20 %if c.user_email_map:
21 21 %for em in c.user_email_map:
22 22 <tr>
23 23 <td class="td-user">
24 24 ${base.gravatar(em.email, 16)}
25 25 <span class="user email">${em.email}</span>
26 26 </td>
27 27 <td class="td-action">
28 28 ${h.secure_form(h.route_path('my_account_emails_delete'), request=request)}
29 29 ${h.hidden('del_email_id',em.email_id)}
30 30 <button class="btn btn-link btn-danger" type="submit" id="${'remove_email_%s'.format(em.email_id)}"
31 31 onclick="return confirm('${_('Confirm to delete this email: {}').format(em.email)}');">
32 32 ${_('Delete')}
33 33 </button>
34 34 ${h.end_form()}
35 35 </td>
36 36 </tr>
37 37 %endfor
38 38 %else:
39 39 <tr class="noborder">
40 40 <td colspan="3">
41 41 <div class="td-email">
42 42 ${_('No additional emails specified')}
43 43 </div>
44 44 </td>
45 45 </tr>
46 46 %endif
47 47 </table>
48 48 </div>
49 49
50 50 <div>
51 ${h.secure_form(h.route_path('my_account_emails_add'), request=request)}
52 <div class="form">
53 <!-- fields -->
54 <div class="fields">
55 <div class="field">
56 <div class="label">
57 <label for="new_email">${_('New email address')}:</label>
58 </div>
59 <div class="input">
60 ${h.text('new_email', class_='medium')}
61 </div>
62 </div>
63 <div class="buttons">
64 ${h.submit('save',_('Add'),class_="btn")}
65 ${h.reset('reset',_('Reset'),class_="btn")}
66 </div>
67 </div>
68 </div>
69 ${h.end_form()}
51 ${c.form.render() | n}
70 52 </div>
71 53 </div>
72 54 </div>
@@ -1,113 +1,71 b''
1 1 <%namespace name="base" file="/base/base.mako"/>
2 2 <div class="panel panel-default user-profile">
3 3 <div class="panel-heading">
4 4 <h3 class="panel-title">${_('My Profile')}</h3>
5 5 <a href="${h.route_path('my_account_profile')}" class="panel-edit">Close</a>
6 6 </div>
7 7
8 8 <div class="panel-body">
9 ${h.secure_form(h.route_path('my_account_update'), class_='form', request=request)}
10 9 <% readonly = None %>
11 10 <% disabled = "" %>
12 11
13 % if c.extern_type != 'rhodecode':
12 %if c.extern_type != 'rhodecode':
14 13 <% readonly = "readonly" %>
15 14 <% disabled = "disabled" %>
16 15 <div class="infoform">
17 16 <div class="fields">
18 17 <p>${_('Your user account details are managed by an external source. Details cannot be managed here.')}
19 18 <br/>${_('Source type')}: <strong>${c.extern_type}</strong>
20 19 </p>
21 20
22 21 <div class="field">
23 22 <div class="label">
24 23 <label for="username">${_('Username')}:</label>
25 24 </div>
26 25 <div class="input">
27 ${h.text('username', class_='input-valuedisplay', readonly=readonly)}
26 ${c.user.username}
28 27 </div>
29 28 </div>
30 29
31 30 <div class="field">
32 31 <div class="label">
33 32 <label for="name">${_('First Name')}:</label>
34 33 </div>
35 34 <div class="input">
36 ${h.text('firstname', class_='input-valuedisplay', readonly=readonly)}
35 ${c.user.firstname}
37 36 </div>
38 37 </div>
39 38
40 39 <div class="field">
41 40 <div class="label">
42 41 <label for="lastname">${_('Last Name')}:</label>
43 42 </div>
44 43 <div class="input-valuedisplay">
45 ${h.text('lastname', class_='input-valuedisplay', readonly=readonly)}
44 ${c.user.lastname}
46 45 </div>
47 46 </div>
48 47 </div>
49 48 </div>
50 49 % else:
51 50 <div class="form">
52 51 <div class="fields">
53 52 <div class="field">
54 53 <div class="label photo">
55 54 ${_('Photo')}:
56 55 </div>
57 56 <div class="input profile">
58 57 %if c.visual.use_gravatar:
59 58 ${base.gravatar(c.user.email, 100)}
60 59 <p class="help-block">${_('Change your avatar at')} <a href="http://gravatar.com">gravatar.com</a>.</p>
61 60 %else:
62 61 ${base.gravatar(c.user.email, 20)}
63 62 ${_('Avatars are disabled')}
64 63 %endif
65 64 </div>
66 65 </div>
67 <div class="field">
68 <div class="label">
69 <label for="username">${_('Username')}:</label>
70 </div>
71 <div class="input">
72 ${h.text('username', class_='medium%s' % disabled, readonly=readonly)}
73 ${h.hidden('extern_name', c.extern_name)}
74 ${h.hidden('extern_type', c.extern_type)}
75 </div>
76 </div>
77 <div class="field">
78 <div class="label">
79 <label for="name">${_('First Name')}:</label>
80 </div>
81 <div class="input">
82 ${h.text('firstname', class_="medium")}
83 </div>
84 </div>
85
86 <div class="field">
87 <div class="label">
88 <label for="lastname">${_('Last Name')}:</label>
89 </div>
90 <div class="input">
91 ${h.text('lastname', class_="medium")}
92 </div>
93 </div>
94
95 <div class="field">
96 <div class="label">
97 <label for="email">${_('Email')}:</label>
98 </div>
99 <div class="input">
100 ## we should be able to edit email !
101 ${h.text('email', class_="medium")}
102 </div>
103 </div>
104
105 <div class="buttons">
106 ${h.submit('save', _('Save'), class_="btn")}
107 ${h.reset('reset', _('Reset'), class_="btn")}
108 </div>
66 ${c.form.render()| n}
109 67 </div>
110 68 </div>
111 69 % endif
112 70 </div>
113 71 </div> No newline at end of file
@@ -1,342 +1,349 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Helpers for fixture generation
23 23 """
24 24
25 25 import os
26 26 import time
27 27 import tempfile
28 28 import shutil
29 29
30 30 import configobj
31 31
32 32 from rhodecode.tests import *
33 from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist
33 from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist, UserEmailMap
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36 from rhodecode.model.user import UserModel
37 37 from rhodecode.model.repo_group import RepoGroupModel
38 38 from rhodecode.model.user_group import UserGroupModel
39 39 from rhodecode.model.gist import GistModel
40 40 from rhodecode.model.auth_token import AuthTokenModel
41 41
42 42 dn = os.path.dirname
43 43 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
44 44
45 45
46 46 def error_function(*args, **kwargs):
47 47 raise Exception('Total Crash !')
48 48
49 49
50 50 class TestINI(object):
51 51 """
52 52 Allows to create a new test.ini file as a copy of existing one with edited
53 53 data. Example usage::
54 54
55 55 with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
56 56 print 'paster server %s' % new_test_ini
57 57 """
58 58
59 59 def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT',
60 60 destroy=True, dir=None):
61 61 self.ini_file_path = ini_file_path
62 62 self.ini_params = ini_params
63 63 self.new_path = None
64 64 self.new_path_prefix = new_file_prefix
65 65 self._destroy = destroy
66 66 self._dir = dir
67 67
68 68 def __enter__(self):
69 69 return self.create()
70 70
71 71 def __exit__(self, exc_type, exc_val, exc_tb):
72 72 self.destroy()
73 73
74 74 def create(self):
75 75 config = configobj.ConfigObj(
76 76 self.ini_file_path, file_error=True, write_empty_values=True)
77 77
78 78 for data in self.ini_params:
79 79 section, ini_params = data.items()[0]
80 80 for key, val in ini_params.items():
81 81 config[section][key] = val
82 82 with tempfile.NamedTemporaryFile(
83 83 prefix=self.new_path_prefix, suffix='.ini', dir=self._dir,
84 84 delete=False) as new_ini_file:
85 85 config.write(new_ini_file)
86 86 self.new_path = new_ini_file.name
87 87
88 88 return self.new_path
89 89
90 90 def destroy(self):
91 91 if self._destroy:
92 92 os.remove(self.new_path)
93 93
94 94
95 95 class Fixture(object):
96 96
97 97 def anon_access(self, status):
98 98 """
99 99 Context process for disabling anonymous access. use like:
100 100 fixture = Fixture()
101 101 with fixture.anon_access(False):
102 102 #tests
103 103
104 104 after this block anon access will be set to `not status`
105 105 """
106 106
107 107 class context(object):
108 108 def __enter__(self):
109 109 anon = User.get_default_user()
110 110 anon.active = status
111 111 Session().add(anon)
112 112 Session().commit()
113 113 time.sleep(1.5) # must sleep for cache (1s to expire)
114 114
115 115 def __exit__(self, exc_type, exc_val, exc_tb):
116 116 anon = User.get_default_user()
117 117 anon.active = not status
118 118 Session().add(anon)
119 119 Session().commit()
120 120
121 121 return context()
122 122
123 123 def _get_repo_create_params(self, **custom):
124 124 defs = {
125 125 'repo_name': None,
126 126 'repo_type': 'hg',
127 127 'clone_uri': '',
128 128 'push_uri': '',
129 129 'repo_group': '-1',
130 130 'repo_description': 'DESC',
131 131 'repo_private': False,
132 132 'repo_landing_rev': 'rev:tip',
133 133 'repo_copy_permissions': False,
134 134 'repo_state': Repository.STATE_CREATED,
135 135 }
136 136 defs.update(custom)
137 137 if 'repo_name_full' not in custom:
138 138 defs.update({'repo_name_full': defs['repo_name']})
139 139
140 140 # fix the repo name if passed as repo_name_full
141 141 if defs['repo_name']:
142 142 defs['repo_name'] = defs['repo_name'].split('/')[-1]
143 143
144 144 return defs
145 145
146 146 def _get_group_create_params(self, **custom):
147 147 defs = {
148 148 'group_name': None,
149 149 'group_description': 'DESC',
150 150 'perm_updates': [],
151 151 'perm_additions': [],
152 152 'perm_deletions': [],
153 153 'group_parent_id': -1,
154 154 'enable_locking': False,
155 155 'recursive': False,
156 156 }
157 157 defs.update(custom)
158 158
159 159 return defs
160 160
161 161 def _get_user_create_params(self, name, **custom):
162 162 defs = {
163 163 'username': name,
164 164 'password': 'qweqwe',
165 165 'email': '%s+test@rhodecode.org' % name,
166 166 'firstname': 'TestUser',
167 167 'lastname': 'Test',
168 168 'active': True,
169 169 'admin': False,
170 170 'extern_type': 'rhodecode',
171 171 'extern_name': None,
172 172 }
173 173 defs.update(custom)
174 174
175 175 return defs
176 176
177 177 def _get_user_group_create_params(self, name, **custom):
178 178 defs = {
179 179 'users_group_name': name,
180 180 'user_group_description': 'DESC',
181 181 'users_group_active': True,
182 182 'user_group_data': {},
183 183 }
184 184 defs.update(custom)
185 185
186 186 return defs
187 187
188 188 def create_repo(self, name, **kwargs):
189 189 repo_group = kwargs.get('repo_group')
190 190 if isinstance(repo_group, RepoGroup):
191 191 kwargs['repo_group'] = repo_group.group_id
192 192 name = name.split(Repository.NAME_SEP)[-1]
193 193 name = Repository.NAME_SEP.join((repo_group.group_name, name))
194 194
195 195 if 'skip_if_exists' in kwargs:
196 196 del kwargs['skip_if_exists']
197 197 r = Repository.get_by_repo_name(name)
198 198 if r:
199 199 return r
200 200
201 201 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
202 202 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
203 203 RepoModel().create(form_data, cur_user)
204 204 Session().commit()
205 205 repo = Repository.get_by_repo_name(name)
206 206 assert repo
207 207 return repo
208 208
209 209 def create_fork(self, repo_to_fork, fork_name, **kwargs):
210 210 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
211 211
212 212 form_data = self._get_repo_create_params(repo_name=fork_name,
213 213 fork_parent_id=repo_to_fork.repo_id,
214 214 repo_type=repo_to_fork.repo_type,
215 215 **kwargs)
216 216 #TODO: fix it !!
217 217 form_data['description'] = form_data['repo_description']
218 218 form_data['private'] = form_data['repo_private']
219 219 form_data['landing_rev'] = form_data['repo_landing_rev']
220 220
221 221 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
222 222 RepoModel().create_fork(form_data, cur_user=owner)
223 223 Session().commit()
224 224 r = Repository.get_by_repo_name(fork_name)
225 225 assert r
226 226 return r
227 227
228 228 def destroy_repo(self, repo_name, **kwargs):
229 229 RepoModel().delete(repo_name, **kwargs)
230 230 Session().commit()
231 231
232 232 def destroy_repo_on_filesystem(self, repo_name):
233 233 rm_path = os.path.join(RepoModel().repos_path, repo_name)
234 234 if os.path.isdir(rm_path):
235 235 shutil.rmtree(rm_path)
236 236
237 237 def create_repo_group(self, name, **kwargs):
238 238 if 'skip_if_exists' in kwargs:
239 239 del kwargs['skip_if_exists']
240 240 gr = RepoGroup.get_by_group_name(group_name=name)
241 241 if gr:
242 242 return gr
243 243 form_data = self._get_group_create_params(group_name=name, **kwargs)
244 244 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
245 245 gr = RepoGroupModel().create(
246 246 group_name=form_data['group_name'],
247 247 group_description=form_data['group_name'],
248 248 owner=owner)
249 249 Session().commit()
250 250 gr = RepoGroup.get_by_group_name(gr.group_name)
251 251 return gr
252 252
253 253 def destroy_repo_group(self, repogroupid):
254 254 RepoGroupModel().delete(repogroupid)
255 255 Session().commit()
256 256
257 257 def create_user(self, name, **kwargs):
258 258 if 'skip_if_exists' in kwargs:
259 259 del kwargs['skip_if_exists']
260 260 user = User.get_by_username(name)
261 261 if user:
262 262 return user
263 263 form_data = self._get_user_create_params(name, **kwargs)
264 264 user = UserModel().create(form_data)
265 265
266 266 # create token for user
267 267 AuthTokenModel().create(
268 268 user=user, description=u'TEST_USER_TOKEN')
269 269
270 270 Session().commit()
271 271 user = User.get_by_username(user.username)
272 272 return user
273 273
274 274 def destroy_user(self, userid):
275 275 UserModel().delete(userid)
276 276 Session().commit()
277 277
278 def create_additional_user_email(self, user, email):
279 uem = UserEmailMap()
280 uem.user = user
281 uem.email = email
282 Session().add(uem)
283 return uem
284
278 285 def destroy_users(self, userid_iter):
279 286 for user_id in userid_iter:
280 287 if User.get_by_username(user_id):
281 288 UserModel().delete(user_id)
282 289 Session().commit()
283 290
284 291 def create_user_group(self, name, **kwargs):
285 292 if 'skip_if_exists' in kwargs:
286 293 del kwargs['skip_if_exists']
287 294 gr = UserGroup.get_by_group_name(group_name=name)
288 295 if gr:
289 296 return gr
290 297 # map active flag to the real attribute. For API consistency of fixtures
291 298 if 'active' in kwargs:
292 299 kwargs['users_group_active'] = kwargs['active']
293 300 del kwargs['active']
294 301 form_data = self._get_user_group_create_params(name, **kwargs)
295 302 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
296 303 user_group = UserGroupModel().create(
297 304 name=form_data['users_group_name'],
298 305 description=form_data['user_group_description'],
299 306 owner=owner, active=form_data['users_group_active'],
300 307 group_data=form_data['user_group_data'])
301 308 Session().commit()
302 309 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
303 310 return user_group
304 311
305 312 def destroy_user_group(self, usergroupid):
306 313 UserGroupModel().delete(user_group=usergroupid, force=True)
307 314 Session().commit()
308 315
309 316 def create_gist(self, **kwargs):
310 317 form_data = {
311 318 'description': 'new-gist',
312 319 'owner': TEST_USER_ADMIN_LOGIN,
313 320 'gist_type': GistModel.cls.GIST_PUBLIC,
314 321 'lifetime': -1,
315 322 'acl_level': Gist.ACL_LEVEL_PUBLIC,
316 323 'gist_mapping': {'filename1.txt': {'content': 'hello world'},}
317 324 }
318 325 form_data.update(kwargs)
319 326 gist = GistModel().create(
320 327 description=form_data['description'], owner=form_data['owner'],
321 328 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
322 329 lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level']
323 330 )
324 331 Session().commit()
325 332 return gist
326 333
327 334 def destroy_gists(self, gistid=None):
328 335 for g in GistModel.cls.get_all():
329 336 if gistid:
330 337 if gistid == g.gist_access_id:
331 338 GistModel().delete(g)
332 339 else:
333 340 GistModel().delete(g)
334 341 Session().commit()
335 342
336 343 def load_resource(self, resource_name, strip=False):
337 344 with open(os.path.join(FIXTURES, resource_name)) as f:
338 345 source = f.read()
339 346 if strip:
340 347 source = source.strip()
341 348
342 349 return source
@@ -1,1861 +1,1865 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33 import functools
34 34
35 35 import mock
36 36 import pyramid.testing
37 37 import pytest
38 38 import colander
39 39 import requests
40 40 import pyramid.paster
41 41
42 42 import rhodecode
43 43 from rhodecode.lib.utils2 import AttributeDict
44 44 from rhodecode.model.changeset_status import ChangesetStatusModel
45 45 from rhodecode.model.comment import CommentsModel
46 46 from rhodecode.model.db import (
47 47 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
48 48 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
49 49 from rhodecode.model.meta import Session
50 50 from rhodecode.model.pull_request import PullRequestModel
51 51 from rhodecode.model.repo import RepoModel
52 52 from rhodecode.model.repo_group import RepoGroupModel
53 53 from rhodecode.model.user import UserModel
54 54 from rhodecode.model.settings import VcsSettingsModel
55 55 from rhodecode.model.user_group import UserGroupModel
56 56 from rhodecode.model.integration import IntegrationModel
57 57 from rhodecode.integrations import integration_type_registry
58 58 from rhodecode.integrations.types.base import IntegrationTypeBase
59 59 from rhodecode.lib.utils import repo2db_mapper
60 60 from rhodecode.lib.vcs import create_vcsserver_proxy
61 61 from rhodecode.lib.vcs.backends import get_backend
62 62 from rhodecode.lib.vcs.nodes import FileNode
63 63 from rhodecode.tests import (
64 64 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
65 65 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
66 66 TEST_USER_REGULAR_PASS)
67 67 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
68 68 from rhodecode.tests.fixture import Fixture
69 69 from rhodecode.config import utils as config_utils
70 70
71 71 def _split_comma(value):
72 72 return value.split(',')
73 73
74 74
75 75 def pytest_addoption(parser):
76 76 parser.addoption(
77 77 '--keep-tmp-path', action='store_true',
78 78 help="Keep the test temporary directories")
79 79 parser.addoption(
80 80 '--backends', action='store', type=_split_comma,
81 81 default=['git', 'hg', 'svn'],
82 82 help="Select which backends to test for backend specific tests.")
83 83 parser.addoption(
84 84 '--dbs', action='store', type=_split_comma,
85 85 default=['sqlite'],
86 86 help="Select which database to test for database specific tests. "
87 87 "Possible options are sqlite,postgres,mysql")
88 88 parser.addoption(
89 89 '--appenlight', '--ae', action='store_true',
90 90 help="Track statistics in appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-api-key', '--ae-key',
93 93 help="API key for Appenlight.")
94 94 parser.addoption(
95 95 '--appenlight-url', '--ae-url',
96 96 default="https://ae.rhodecode.com",
97 97 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
98 98 parser.addoption(
99 99 '--sqlite-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with SQLite")
101 101 parser.addoption(
102 102 '--postgres-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with Postgres")
104 104 parser.addoption(
105 105 '--mysql-connection-string', action='store',
106 106 default='', help="Connection string for the dbs tests with MySQL")
107 107 parser.addoption(
108 108 '--repeat', type=int, default=100,
109 109 help="Number of repetitions in performance tests.")
110 110
111 111
112 112 def pytest_configure(config):
113 113 from rhodecode.config import patches
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.function.backends.args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 185 return http_environ(
186 186 http_host_stub=http_host_stub())
187 187
188 188
189 189 @pytest.fixture
190 190 def http_host_stub():
191 191 """
192 192 Value of HTTP_HOST in the test run.
193 193 """
194 194 return 'example.com:80'
195 195
196 196
197 197 @pytest.fixture
198 198 def http_host_only_stub():
199 199 """
200 200 Value of HTTP_HOST in the test run.
201 201 """
202 202 return http_host_stub().split(':')[0]
203 203
204 204
205 205 @pytest.fixture
206 206 def http_environ(http_host_stub):
207 207 """
208 208 HTTP extra environ keys.
209 209
210 210 User by the test application and as well for setting up the pylons
211 211 environment. In the case of the fixture "app" it should be possible
212 212 to override this for a specific test case.
213 213 """
214 214 return {
215 215 'SERVER_NAME': http_host_only_stub(),
216 216 'SERVER_PORT': http_host_stub.split(':')[1],
217 217 'HTTP_HOST': http_host_stub,
218 218 'HTTP_USER_AGENT': 'rc-test-agent',
219 219 'REQUEST_METHOD': 'GET'
220 220 }
221 221
222 222
223 223 @pytest.fixture(scope='session')
224 224 def baseapp(ini_config, vcsserver, http_environ_session):
225 225 from rhodecode.lib.pyramid_utils import get_app_config
226 226 from rhodecode.config.middleware import make_pyramid_app
227 227
228 228 print("Using the RhodeCode configuration:{}".format(ini_config))
229 229 pyramid.paster.setup_logging(ini_config)
230 230
231 231 settings = get_app_config(ini_config)
232 232 app = make_pyramid_app({'__file__': ini_config}, **settings)
233 233
234 234 return app
235 235
236 236
237 237 @pytest.fixture(scope='function')
238 238 def app(request, config_stub, baseapp, http_environ):
239 239 app = CustomTestApp(
240 240 baseapp,
241 241 extra_environ=http_environ)
242 242 if request.cls:
243 243 request.cls.app = app
244 244 return app
245 245
246 246
247 247 @pytest.fixture(scope='session')
248 248 def app_settings(baseapp, ini_config):
249 249 """
250 250 Settings dictionary used to create the app.
251 251
252 252 Parses the ini file and passes the result through the sanitize and apply
253 253 defaults mechanism in `rhodecode.config.middleware`.
254 254 """
255 255 return baseapp.config.get_settings()
256 256
257 257
258 258 @pytest.fixture(scope='session')
259 259 def db_connection(ini_settings):
260 260 # Initialize the database connection.
261 261 config_utils.initialize_database(ini_settings)
262 262
263 263
264 264 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
265 265
266 266
267 267 def _autologin_user(app, *args):
268 268 session = login_user_session(app, *args)
269 269 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
270 270 return LoginData(csrf_token, session['rhodecode_user'])
271 271
272 272
273 273 @pytest.fixture
274 274 def autologin_user(app):
275 275 """
276 276 Utility fixture which makes sure that the admin user is logged in
277 277 """
278 278 return _autologin_user(app)
279 279
280 280
281 281 @pytest.fixture
282 282 def autologin_regular_user(app):
283 283 """
284 284 Utility fixture which makes sure that the regular user is logged in
285 285 """
286 286 return _autologin_user(
287 287 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
288 288
289 289
290 290 @pytest.fixture(scope='function')
291 291 def csrf_token(request, autologin_user):
292 292 return autologin_user.csrf_token
293 293
294 294
295 295 @pytest.fixture(scope='function')
296 296 def xhr_header(request):
297 297 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
298 298
299 299
300 300 @pytest.fixture
301 301 def real_crypto_backend(monkeypatch):
302 302 """
303 303 Switch the production crypto backend on for this test.
304 304
305 305 During the test run the crypto backend is replaced with a faster
306 306 implementation based on the MD5 algorithm.
307 307 """
308 308 monkeypatch.setattr(rhodecode, 'is_test', False)
309 309
310 310
311 311 @pytest.fixture(scope='class')
312 312 def index_location(request, baseapp):
313 313 index_location = baseapp.config.get_settings()['search.location']
314 314 if request.cls:
315 315 request.cls.index_location = index_location
316 316 return index_location
317 317
318 318
319 319 @pytest.fixture(scope='session', autouse=True)
320 320 def tests_tmp_path(request):
321 321 """
322 322 Create temporary directory to be used during the test session.
323 323 """
324 324 if not os.path.exists(TESTS_TMP_PATH):
325 325 os.makedirs(TESTS_TMP_PATH)
326 326
327 327 if not request.config.getoption('--keep-tmp-path'):
328 328 @request.addfinalizer
329 329 def remove_tmp_path():
330 330 shutil.rmtree(TESTS_TMP_PATH)
331 331
332 332 return TESTS_TMP_PATH
333 333
334 334
335 335 @pytest.fixture
336 336 def test_repo_group(request):
337 337 """
338 338 Create a temporary repository group, and destroy it after
339 339 usage automatically
340 340 """
341 341 fixture = Fixture()
342 342 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
343 343 repo_group = fixture.create_repo_group(repogroupid)
344 344
345 345 def _cleanup():
346 346 fixture.destroy_repo_group(repogroupid)
347 347
348 348 request.addfinalizer(_cleanup)
349 349 return repo_group
350 350
351 351
352 352 @pytest.fixture
353 353 def test_user_group(request):
354 354 """
355 355 Create a temporary user group, and destroy it after
356 356 usage automatically
357 357 """
358 358 fixture = Fixture()
359 359 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
360 360 user_group = fixture.create_user_group(usergroupid)
361 361
362 362 def _cleanup():
363 363 fixture.destroy_user_group(user_group)
364 364
365 365 request.addfinalizer(_cleanup)
366 366 return user_group
367 367
368 368
369 369 @pytest.fixture(scope='session')
370 370 def test_repo(request):
371 371 container = TestRepoContainer()
372 372 request.addfinalizer(container._cleanup)
373 373 return container
374 374
375 375
376 376 class TestRepoContainer(object):
377 377 """
378 378 Container for test repositories which are used read only.
379 379
380 380 Repositories will be created on demand and re-used during the lifetime
381 381 of this object.
382 382
383 383 Usage to get the svn test repository "minimal"::
384 384
385 385 test_repo = TestContainer()
386 386 repo = test_repo('minimal', 'svn')
387 387
388 388 """
389 389
390 390 dump_extractors = {
391 391 'git': utils.extract_git_repo_from_dump,
392 392 'hg': utils.extract_hg_repo_from_dump,
393 393 'svn': utils.extract_svn_repo_from_dump,
394 394 }
395 395
396 396 def __init__(self):
397 397 self._cleanup_repos = []
398 398 self._fixture = Fixture()
399 399 self._repos = {}
400 400
401 401 def __call__(self, dump_name, backend_alias, config=None):
402 402 key = (dump_name, backend_alias)
403 403 if key not in self._repos:
404 404 repo = self._create_repo(dump_name, backend_alias, config)
405 405 self._repos[key] = repo.repo_id
406 406 return Repository.get(self._repos[key])
407 407
408 408 def _create_repo(self, dump_name, backend_alias, config):
409 409 repo_name = '%s-%s' % (backend_alias, dump_name)
410 410 backend_class = get_backend(backend_alias)
411 411 dump_extractor = self.dump_extractors[backend_alias]
412 412 repo_path = dump_extractor(dump_name, repo_name)
413 413
414 414 vcs_repo = backend_class(repo_path, config=config)
415 415 repo2db_mapper({repo_name: vcs_repo})
416 416
417 417 repo = RepoModel().get_by_repo_name(repo_name)
418 418 self._cleanup_repos.append(repo_name)
419 419 return repo
420 420
421 421 def _cleanup(self):
422 422 for repo_name in reversed(self._cleanup_repos):
423 423 self._fixture.destroy_repo(repo_name)
424 424
425 425
426 426 @pytest.fixture
427 427 def backend(request, backend_alias, baseapp, test_repo):
428 428 """
429 429 Parametrized fixture which represents a single backend implementation.
430 430
431 431 It respects the option `--backends` to focus the test run on specific
432 432 backend implementations.
433 433
434 434 It also supports `pytest.mark.xfail_backends` to mark tests as failing
435 435 for specific backends. This is intended as a utility for incremental
436 436 development of a new backend implementation.
437 437 """
438 438 if backend_alias not in request.config.getoption('--backends'):
439 439 pytest.skip("Backend %s not selected." % (backend_alias, ))
440 440
441 441 utils.check_xfail_backends(request.node, backend_alias)
442 442 utils.check_skip_backends(request.node, backend_alias)
443 443
444 444 repo_name = 'vcs_test_%s' % (backend_alias, )
445 445 backend = Backend(
446 446 alias=backend_alias,
447 447 repo_name=repo_name,
448 448 test_name=request.node.name,
449 449 test_repo_container=test_repo)
450 450 request.addfinalizer(backend.cleanup)
451 451 return backend
452 452
453 453
454 454 @pytest.fixture
455 455 def backend_git(request, baseapp, test_repo):
456 456 return backend(request, 'git', baseapp, test_repo)
457 457
458 458
459 459 @pytest.fixture
460 460 def backend_hg(request, baseapp, test_repo):
461 461 return backend(request, 'hg', baseapp, test_repo)
462 462
463 463
464 464 @pytest.fixture
465 465 def backend_svn(request, baseapp, test_repo):
466 466 return backend(request, 'svn', baseapp, test_repo)
467 467
468 468
469 469 @pytest.fixture
470 470 def backend_random(backend_git):
471 471 """
472 472 Use this to express that your tests need "a backend.
473 473
474 474 A few of our tests need a backend, so that we can run the code. This
475 475 fixture is intended to be used for such cases. It will pick one of the
476 476 backends and run the tests.
477 477
478 478 The fixture `backend` would run the test multiple times for each
479 479 available backend which is a pure waste of time if the test is
480 480 independent of the backend type.
481 481 """
482 482 # TODO: johbo: Change this to pick a random backend
483 483 return backend_git
484 484
485 485
486 486 @pytest.fixture
487 487 def backend_stub(backend_git):
488 488 """
489 489 Use this to express that your tests need a backend stub
490 490
491 491 TODO: mikhail: Implement a real stub logic instead of returning
492 492 a git backend
493 493 """
494 494 return backend_git
495 495
496 496
497 497 @pytest.fixture
498 498 def repo_stub(backend_stub):
499 499 """
500 500 Use this to express that your tests need a repository stub
501 501 """
502 502 return backend_stub.create_repo()
503 503
504 504
505 505 class Backend(object):
506 506 """
507 507 Represents the test configuration for one supported backend
508 508
509 509 Provides easy access to different test repositories based on
510 510 `__getitem__`. Such repositories will only be created once per test
511 511 session.
512 512 """
513 513
514 514 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
515 515 _master_repo = None
516 516 _commit_ids = {}
517 517
518 518 def __init__(self, alias, repo_name, test_name, test_repo_container):
519 519 self.alias = alias
520 520 self.repo_name = repo_name
521 521 self._cleanup_repos = []
522 522 self._test_name = test_name
523 523 self._test_repo_container = test_repo_container
524 524 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
525 525 # Fixture will survive in the end.
526 526 self._fixture = Fixture()
527 527
528 528 def __getitem__(self, key):
529 529 return self._test_repo_container(key, self.alias)
530 530
531 531 def create_test_repo(self, key, config=None):
532 532 return self._test_repo_container(key, self.alias, config)
533 533
534 534 @property
535 535 def repo(self):
536 536 """
537 537 Returns the "current" repository. This is the vcs_test repo or the
538 538 last repo which has been created with `create_repo`.
539 539 """
540 540 from rhodecode.model.db import Repository
541 541 return Repository.get_by_repo_name(self.repo_name)
542 542
543 543 @property
544 544 def default_branch_name(self):
545 545 VcsRepository = get_backend(self.alias)
546 546 return VcsRepository.DEFAULT_BRANCH_NAME
547 547
548 548 @property
549 549 def default_head_id(self):
550 550 """
551 551 Returns the default head id of the underlying backend.
552 552
553 553 This will be the default branch name in case the backend does have a
554 554 default branch. In the other cases it will point to a valid head
555 555 which can serve as the base to create a new commit on top of it.
556 556 """
557 557 vcsrepo = self.repo.scm_instance()
558 558 head_id = (
559 559 vcsrepo.DEFAULT_BRANCH_NAME or
560 560 vcsrepo.commit_ids[-1])
561 561 return head_id
562 562
563 563 @property
564 564 def commit_ids(self):
565 565 """
566 566 Returns the list of commits for the last created repository
567 567 """
568 568 return self._commit_ids
569 569
570 570 def create_master_repo(self, commits):
571 571 """
572 572 Create a repository and remember it as a template.
573 573
574 574 This allows to easily create derived repositories to construct
575 575 more complex scenarios for diff, compare and pull requests.
576 576
577 577 Returns a commit map which maps from commit message to raw_id.
578 578 """
579 579 self._master_repo = self.create_repo(commits=commits)
580 580 return self._commit_ids
581 581
582 582 def create_repo(
583 583 self, commits=None, number_of_commits=0, heads=None,
584 584 name_suffix=u'', **kwargs):
585 585 """
586 586 Create a repository and record it for later cleanup.
587 587
588 588 :param commits: Optional. A sequence of dict instances.
589 589 Will add a commit per entry to the new repository.
590 590 :param number_of_commits: Optional. If set to a number, this number of
591 591 commits will be added to the new repository.
592 592 :param heads: Optional. Can be set to a sequence of of commit
593 593 names which shall be pulled in from the master repository.
594 594
595 595 """
596 596 self.repo_name = self._next_repo_name() + name_suffix
597 597 repo = self._fixture.create_repo(
598 598 self.repo_name, repo_type=self.alias, **kwargs)
599 599 self._cleanup_repos.append(repo.repo_name)
600 600
601 601 commits = commits or [
602 602 {'message': 'Commit %s of %s' % (x, self.repo_name)}
603 603 for x in xrange(number_of_commits)]
604 604 self._add_commits_to_repo(repo.scm_instance(), commits)
605 605 if heads:
606 606 self.pull_heads(repo, heads)
607 607
608 608 return repo
609 609
610 610 def pull_heads(self, repo, heads):
611 611 """
612 612 Make sure that repo contains all commits mentioned in `heads`
613 613 """
614 614 vcsmaster = self._master_repo.scm_instance()
615 615 vcsrepo = repo.scm_instance()
616 616 vcsrepo.config.clear_section('hooks')
617 617 commit_ids = [self._commit_ids[h] for h in heads]
618 618 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
619 619
620 620 def create_fork(self):
621 621 repo_to_fork = self.repo_name
622 622 self.repo_name = self._next_repo_name()
623 623 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
624 624 self._cleanup_repos.append(self.repo_name)
625 625 return repo
626 626
627 627 def new_repo_name(self, suffix=u''):
628 628 self.repo_name = self._next_repo_name() + suffix
629 629 self._cleanup_repos.append(self.repo_name)
630 630 return self.repo_name
631 631
632 632 def _next_repo_name(self):
633 633 return u"%s_%s" % (
634 634 self.invalid_repo_name.sub(u'_', self._test_name),
635 635 len(self._cleanup_repos))
636 636
637 637 def ensure_file(self, filename, content='Test content\n'):
638 638 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
639 639 commits = [
640 640 {'added': [
641 641 FileNode(filename, content=content),
642 642 ]},
643 643 ]
644 644 self._add_commits_to_repo(self.repo.scm_instance(), commits)
645 645
646 646 def enable_downloads(self):
647 647 repo = self.repo
648 648 repo.enable_downloads = True
649 649 Session().add(repo)
650 650 Session().commit()
651 651
652 652 def cleanup(self):
653 653 for repo_name in reversed(self._cleanup_repos):
654 654 self._fixture.destroy_repo(repo_name)
655 655
656 656 def _add_commits_to_repo(self, repo, commits):
657 657 commit_ids = _add_commits_to_repo(repo, commits)
658 658 if not commit_ids:
659 659 return
660 660 self._commit_ids = commit_ids
661 661
662 662 # Creating refs for Git to allow fetching them from remote repository
663 663 if self.alias == 'git':
664 664 refs = {}
665 665 for message in self._commit_ids:
666 666 # TODO: mikhail: do more special chars replacements
667 667 ref_name = 'refs/test-refs/{}'.format(
668 668 message.replace(' ', ''))
669 669 refs[ref_name] = self._commit_ids[message]
670 670 self._create_refs(repo, refs)
671 671
672 672 def _create_refs(self, repo, refs):
673 673 for ref_name in refs:
674 674 repo.set_refs(ref_name, refs[ref_name])
675 675
676 676
677 677 @pytest.fixture
678 678 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
679 679 """
680 680 Parametrized fixture which represents a single vcs backend implementation.
681 681
682 682 See the fixture `backend` for more details. This one implements the same
683 683 concept, but on vcs level. So it does not provide model instances etc.
684 684
685 685 Parameters are generated dynamically, see :func:`pytest_generate_tests`
686 686 for how this works.
687 687 """
688 688 if backend_alias not in request.config.getoption('--backends'):
689 689 pytest.skip("Backend %s not selected." % (backend_alias, ))
690 690
691 691 utils.check_xfail_backends(request.node, backend_alias)
692 692 utils.check_skip_backends(request.node, backend_alias)
693 693
694 694 repo_name = 'vcs_test_%s' % (backend_alias, )
695 695 repo_path = os.path.join(tests_tmp_path, repo_name)
696 696 backend = VcsBackend(
697 697 alias=backend_alias,
698 698 repo_path=repo_path,
699 699 test_name=request.node.name,
700 700 test_repo_container=test_repo)
701 701 request.addfinalizer(backend.cleanup)
702 702 return backend
703 703
704 704
705 705 @pytest.fixture
706 706 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
707 707 return vcsbackend(request, 'git', tests_tmp_path, baseapp, test_repo)
708 708
709 709
710 710 @pytest.fixture
711 711 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
712 712 return vcsbackend(request, 'hg', tests_tmp_path, baseapp, test_repo)
713 713
714 714
715 715 @pytest.fixture
716 716 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
717 717 return vcsbackend(request, 'svn', tests_tmp_path, baseapp, test_repo)
718 718
719 719
720 720 @pytest.fixture
721 721 def vcsbackend_random(vcsbackend_git):
722 722 """
723 723 Use this to express that your tests need "a vcsbackend".
724 724
725 725 The fixture `vcsbackend` would run the test multiple times for each
726 726 available vcs backend which is a pure waste of time if the test is
727 727 independent of the vcs backend type.
728 728 """
729 729 # TODO: johbo: Change this to pick a random backend
730 730 return vcsbackend_git
731 731
732 732
733 733 @pytest.fixture
734 734 def vcsbackend_stub(vcsbackend_git):
735 735 """
736 736 Use this to express that your test just needs a stub of a vcsbackend.
737 737
738 738 Plan is to eventually implement an in-memory stub to speed tests up.
739 739 """
740 740 return vcsbackend_git
741 741
742 742
743 743 class VcsBackend(object):
744 744 """
745 745 Represents the test configuration for one supported vcs backend.
746 746 """
747 747
748 748 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
749 749
750 750 def __init__(self, alias, repo_path, test_name, test_repo_container):
751 751 self.alias = alias
752 752 self._repo_path = repo_path
753 753 self._cleanup_repos = []
754 754 self._test_name = test_name
755 755 self._test_repo_container = test_repo_container
756 756
757 757 def __getitem__(self, key):
758 758 return self._test_repo_container(key, self.alias).scm_instance()
759 759
760 760 @property
761 761 def repo(self):
762 762 """
763 763 Returns the "current" repository. This is the vcs_test repo of the last
764 764 repo which has been created.
765 765 """
766 766 Repository = get_backend(self.alias)
767 767 return Repository(self._repo_path)
768 768
769 769 @property
770 770 def backend(self):
771 771 """
772 772 Returns the backend implementation class.
773 773 """
774 774 return get_backend(self.alias)
775 775
776 776 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
777 777 repo_name = self._next_repo_name()
778 778 self._repo_path = get_new_dir(repo_name)
779 779 repo_class = get_backend(self.alias)
780 780 src_url = None
781 781 if _clone_repo:
782 782 src_url = _clone_repo.path
783 783 repo = repo_class(self._repo_path, create=True, src_url=src_url)
784 784 self._cleanup_repos.append(repo)
785 785
786 786 commits = commits or [
787 787 {'message': 'Commit %s of %s' % (x, repo_name)}
788 788 for x in xrange(number_of_commits)]
789 789 _add_commits_to_repo(repo, commits)
790 790 return repo
791 791
792 792 def clone_repo(self, repo):
793 793 return self.create_repo(_clone_repo=repo)
794 794
795 795 def cleanup(self):
796 796 for repo in self._cleanup_repos:
797 797 shutil.rmtree(repo.path)
798 798
799 799 def new_repo_path(self):
800 800 repo_name = self._next_repo_name()
801 801 self._repo_path = get_new_dir(repo_name)
802 802 return self._repo_path
803 803
804 804 def _next_repo_name(self):
805 805 return "%s_%s" % (
806 806 self.invalid_repo_name.sub('_', self._test_name),
807 807 len(self._cleanup_repos))
808 808
809 809 def add_file(self, repo, filename, content='Test content\n'):
810 810 imc = repo.in_memory_commit
811 811 imc.add(FileNode(filename, content=content))
812 812 imc.commit(
813 813 message=u'Automatic commit from vcsbackend fixture',
814 814 author=u'Automatic')
815 815
816 816 def ensure_file(self, filename, content='Test content\n'):
817 817 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
818 818 self.add_file(self.repo, filename, content)
819 819
820 820
821 821 def _add_commits_to_repo(vcs_repo, commits):
822 822 commit_ids = {}
823 823 if not commits:
824 824 return commit_ids
825 825
826 826 imc = vcs_repo.in_memory_commit
827 827 commit = None
828 828
829 829 for idx, commit in enumerate(commits):
830 830 message = unicode(commit.get('message', 'Commit %s' % idx))
831 831
832 832 for node in commit.get('added', []):
833 833 imc.add(FileNode(node.path, content=node.content))
834 834 for node in commit.get('changed', []):
835 835 imc.change(FileNode(node.path, content=node.content))
836 836 for node in commit.get('removed', []):
837 837 imc.remove(FileNode(node.path))
838 838
839 839 parents = [
840 840 vcs_repo.get_commit(commit_id=commit_ids[p])
841 841 for p in commit.get('parents', [])]
842 842
843 843 operations = ('added', 'changed', 'removed')
844 844 if not any((commit.get(o) for o in operations)):
845 845 imc.add(FileNode('file_%s' % idx, content=message))
846 846
847 847 commit = imc.commit(
848 848 message=message,
849 849 author=unicode(commit.get('author', 'Automatic')),
850 850 date=commit.get('date'),
851 851 branch=commit.get('branch'),
852 852 parents=parents)
853 853
854 854 commit_ids[commit.message] = commit.raw_id
855 855
856 856 return commit_ids
857 857
858 858
859 859 @pytest.fixture
860 860 def reposerver(request):
861 861 """
862 862 Allows to serve a backend repository
863 863 """
864 864
865 865 repo_server = RepoServer()
866 866 request.addfinalizer(repo_server.cleanup)
867 867 return repo_server
868 868
869 869
870 870 class RepoServer(object):
871 871 """
872 872 Utility to serve a local repository for the duration of a test case.
873 873
874 874 Supports only Subversion so far.
875 875 """
876 876
877 877 url = None
878 878
879 879 def __init__(self):
880 880 self._cleanup_servers = []
881 881
882 882 def serve(self, vcsrepo):
883 883 if vcsrepo.alias != 'svn':
884 884 raise TypeError("Backend %s not supported" % vcsrepo.alias)
885 885
886 886 proc = subprocess32.Popen(
887 887 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
888 888 '--root', vcsrepo.path])
889 889 self._cleanup_servers.append(proc)
890 890 self.url = 'svn://localhost'
891 891
892 892 def cleanup(self):
893 893 for proc in self._cleanup_servers:
894 894 proc.terminate()
895 895
896 896
897 897 @pytest.fixture
898 898 def pr_util(backend, request, config_stub):
899 899 """
900 900 Utility for tests of models and for functional tests around pull requests.
901 901
902 902 It gives an instance of :class:`PRTestUtility` which provides various
903 903 utility methods around one pull request.
904 904
905 905 This fixture uses `backend` and inherits its parameterization.
906 906 """
907 907
908 908 util = PRTestUtility(backend)
909 909 request.addfinalizer(util.cleanup)
910 910
911 911 return util
912 912
913 913
914 914 class PRTestUtility(object):
915 915
916 916 pull_request = None
917 917 pull_request_id = None
918 918 mergeable_patcher = None
919 919 mergeable_mock = None
920 920 notification_patcher = None
921 921
922 922 def __init__(self, backend):
923 923 self.backend = backend
924 924
925 925 def create_pull_request(
926 926 self, commits=None, target_head=None, source_head=None,
927 927 revisions=None, approved=False, author=None, mergeable=False,
928 928 enable_notifications=True, name_suffix=u'', reviewers=None,
929 929 title=u"Test", description=u"Description"):
930 930 self.set_mergeable(mergeable)
931 931 if not enable_notifications:
932 932 # mock notification side effect
933 933 self.notification_patcher = mock.patch(
934 934 'rhodecode.model.notification.NotificationModel.create')
935 935 self.notification_patcher.start()
936 936
937 937 if not self.pull_request:
938 938 if not commits:
939 939 commits = [
940 940 {'message': 'c1'},
941 941 {'message': 'c2'},
942 942 {'message': 'c3'},
943 943 ]
944 944 target_head = 'c1'
945 945 source_head = 'c2'
946 946 revisions = ['c2']
947 947
948 948 self.commit_ids = self.backend.create_master_repo(commits)
949 949 self.target_repository = self.backend.create_repo(
950 950 heads=[target_head], name_suffix=name_suffix)
951 951 self.source_repository = self.backend.create_repo(
952 952 heads=[source_head], name_suffix=name_suffix)
953 953 self.author = author or UserModel().get_by_username(
954 954 TEST_USER_ADMIN_LOGIN)
955 955
956 956 model = PullRequestModel()
957 957 self.create_parameters = {
958 958 'created_by': self.author,
959 959 'source_repo': self.source_repository.repo_name,
960 960 'source_ref': self._default_branch_reference(source_head),
961 961 'target_repo': self.target_repository.repo_name,
962 962 'target_ref': self._default_branch_reference(target_head),
963 963 'revisions': [self.commit_ids[r] for r in revisions],
964 964 'reviewers': reviewers or self._get_reviewers(),
965 965 'title': title,
966 966 'description': description,
967 967 }
968 968 self.pull_request = model.create(**self.create_parameters)
969 969 assert model.get_versions(self.pull_request) == []
970 970
971 971 self.pull_request_id = self.pull_request.pull_request_id
972 972
973 973 if approved:
974 974 self.approve()
975 975
976 976 Session().add(self.pull_request)
977 977 Session().commit()
978 978
979 979 return self.pull_request
980 980
981 981 def approve(self):
982 982 self.create_status_votes(
983 983 ChangesetStatus.STATUS_APPROVED,
984 984 *self.pull_request.reviewers)
985 985
986 986 def close(self):
987 987 PullRequestModel().close_pull_request(self.pull_request, self.author)
988 988
989 989 def _default_branch_reference(self, commit_message):
990 990 reference = '%s:%s:%s' % (
991 991 'branch',
992 992 self.backend.default_branch_name,
993 993 self.commit_ids[commit_message])
994 994 return reference
995 995
996 996 def _get_reviewers(self):
997 997 return [
998 998 (TEST_USER_REGULAR_LOGIN, ['default1'], False, []),
999 999 (TEST_USER_REGULAR2_LOGIN, ['default2'], False, []),
1000 1000 ]
1001 1001
1002 1002 def update_source_repository(self, head=None):
1003 1003 heads = [head or 'c3']
1004 1004 self.backend.pull_heads(self.source_repository, heads=heads)
1005 1005
1006 1006 def add_one_commit(self, head=None):
1007 1007 self.update_source_repository(head=head)
1008 1008 old_commit_ids = set(self.pull_request.revisions)
1009 1009 PullRequestModel().update_commits(self.pull_request)
1010 1010 commit_ids = set(self.pull_request.revisions)
1011 1011 new_commit_ids = commit_ids - old_commit_ids
1012 1012 assert len(new_commit_ids) == 1
1013 1013 return new_commit_ids.pop()
1014 1014
1015 1015 def remove_one_commit(self):
1016 1016 assert len(self.pull_request.revisions) == 2
1017 1017 source_vcs = self.source_repository.scm_instance()
1018 1018 removed_commit_id = source_vcs.commit_ids[-1]
1019 1019
1020 1020 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1021 1021 # remove the if once that's sorted out.
1022 1022 if self.backend.alias == "git":
1023 1023 kwargs = {'branch_name': self.backend.default_branch_name}
1024 1024 else:
1025 1025 kwargs = {}
1026 1026 source_vcs.strip(removed_commit_id, **kwargs)
1027 1027
1028 1028 PullRequestModel().update_commits(self.pull_request)
1029 1029 assert len(self.pull_request.revisions) == 1
1030 1030 return removed_commit_id
1031 1031
1032 1032 def create_comment(self, linked_to=None):
1033 1033 comment = CommentsModel().create(
1034 1034 text=u"Test comment",
1035 1035 repo=self.target_repository.repo_name,
1036 1036 user=self.author,
1037 1037 pull_request=self.pull_request)
1038 1038 assert comment.pull_request_version_id is None
1039 1039
1040 1040 if linked_to:
1041 1041 PullRequestModel()._link_comments_to_version(linked_to)
1042 1042
1043 1043 return comment
1044 1044
1045 1045 def create_inline_comment(
1046 1046 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1047 1047 comment = CommentsModel().create(
1048 1048 text=u"Test comment",
1049 1049 repo=self.target_repository.repo_name,
1050 1050 user=self.author,
1051 1051 line_no=line_no,
1052 1052 f_path=file_path,
1053 1053 pull_request=self.pull_request)
1054 1054 assert comment.pull_request_version_id is None
1055 1055
1056 1056 if linked_to:
1057 1057 PullRequestModel()._link_comments_to_version(linked_to)
1058 1058
1059 1059 return comment
1060 1060
1061 1061 def create_version_of_pull_request(self):
1062 1062 pull_request = self.create_pull_request()
1063 1063 version = PullRequestModel()._create_version_from_snapshot(
1064 1064 pull_request)
1065 1065 return version
1066 1066
1067 1067 def create_status_votes(self, status, *reviewers):
1068 1068 for reviewer in reviewers:
1069 1069 ChangesetStatusModel().set_status(
1070 1070 repo=self.pull_request.target_repo,
1071 1071 status=status,
1072 1072 user=reviewer.user_id,
1073 1073 pull_request=self.pull_request)
1074 1074
1075 1075 def set_mergeable(self, value):
1076 1076 if not self.mergeable_patcher:
1077 1077 self.mergeable_patcher = mock.patch.object(
1078 1078 VcsSettingsModel, 'get_general_settings')
1079 1079 self.mergeable_mock = self.mergeable_patcher.start()
1080 1080 self.mergeable_mock.return_value = {
1081 1081 'rhodecode_pr_merge_enabled': value}
1082 1082
1083 1083 def cleanup(self):
1084 1084 # In case the source repository is already cleaned up, the pull
1085 1085 # request will already be deleted.
1086 1086 pull_request = PullRequest().get(self.pull_request_id)
1087 1087 if pull_request:
1088 1088 PullRequestModel().delete(pull_request, pull_request.author)
1089 1089 Session().commit()
1090 1090
1091 1091 if self.notification_patcher:
1092 1092 self.notification_patcher.stop()
1093 1093
1094 1094 if self.mergeable_patcher:
1095 1095 self.mergeable_patcher.stop()
1096 1096
1097 1097
1098 1098 @pytest.fixture
1099 1099 def user_admin(baseapp):
1100 1100 """
1101 1101 Provides the default admin test user as an instance of `db.User`.
1102 1102 """
1103 1103 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1104 1104 return user
1105 1105
1106 1106
1107 1107 @pytest.fixture
1108 1108 def user_regular(baseapp):
1109 1109 """
1110 1110 Provides the default regular test user as an instance of `db.User`.
1111 1111 """
1112 1112 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1113 1113 return user
1114 1114
1115 1115
1116 1116 @pytest.fixture
1117 1117 def user_util(request, db_connection):
1118 1118 """
1119 1119 Provides a wired instance of `UserUtility` with integrated cleanup.
1120 1120 """
1121 1121 utility = UserUtility(test_name=request.node.name)
1122 1122 request.addfinalizer(utility.cleanup)
1123 1123 return utility
1124 1124
1125 1125
1126 1126 # TODO: johbo: Split this up into utilities per domain or something similar
1127 1127 class UserUtility(object):
1128 1128
1129 1129 def __init__(self, test_name="test"):
1130 1130 self._test_name = self._sanitize_name(test_name)
1131 1131 self.fixture = Fixture()
1132 1132 self.repo_group_ids = []
1133 1133 self.repos_ids = []
1134 1134 self.user_ids = []
1135 1135 self.user_group_ids = []
1136 1136 self.user_repo_permission_ids = []
1137 1137 self.user_group_repo_permission_ids = []
1138 1138 self.user_repo_group_permission_ids = []
1139 1139 self.user_group_repo_group_permission_ids = []
1140 1140 self.user_user_group_permission_ids = []
1141 1141 self.user_group_user_group_permission_ids = []
1142 1142 self.user_permissions = []
1143 1143
1144 1144 def _sanitize_name(self, name):
1145 1145 for char in ['[', ']']:
1146 1146 name = name.replace(char, '_')
1147 1147 return name
1148 1148
1149 1149 def create_repo_group(
1150 1150 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1151 1151 group_name = "{prefix}_repogroup_{count}".format(
1152 1152 prefix=self._test_name,
1153 1153 count=len(self.repo_group_ids))
1154 1154 repo_group = self.fixture.create_repo_group(
1155 1155 group_name, cur_user=owner)
1156 1156 if auto_cleanup:
1157 1157 self.repo_group_ids.append(repo_group.group_id)
1158 1158 return repo_group
1159 1159
1160 1160 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1161 1161 auto_cleanup=True, repo_type='hg'):
1162 1162 repo_name = "{prefix}_repository_{count}".format(
1163 1163 prefix=self._test_name,
1164 1164 count=len(self.repos_ids))
1165 1165
1166 1166 repository = self.fixture.create_repo(
1167 1167 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type)
1168 1168 if auto_cleanup:
1169 1169 self.repos_ids.append(repository.repo_id)
1170 1170 return repository
1171 1171
1172 1172 def create_user(self, auto_cleanup=True, **kwargs):
1173 1173 user_name = "{prefix}_user_{count}".format(
1174 1174 prefix=self._test_name,
1175 1175 count=len(self.user_ids))
1176 1176 user = self.fixture.create_user(user_name, **kwargs)
1177 1177 if auto_cleanup:
1178 1178 self.user_ids.append(user.user_id)
1179 1179 return user
1180 1180
1181 def create_additional_user_email(self, user, email):
1182 uem = self.fixture.create_additional_user_email(user=user, email=email)
1183 return uem
1184
1181 1185 def create_user_with_group(self):
1182 1186 user = self.create_user()
1183 1187 user_group = self.create_user_group(members=[user])
1184 1188 return user, user_group
1185 1189
1186 1190 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1187 1191 auto_cleanup=True, **kwargs):
1188 1192 group_name = "{prefix}_usergroup_{count}".format(
1189 1193 prefix=self._test_name,
1190 1194 count=len(self.user_group_ids))
1191 1195 user_group = self.fixture.create_user_group(
1192 1196 group_name, cur_user=owner, **kwargs)
1193 1197
1194 1198 if auto_cleanup:
1195 1199 self.user_group_ids.append(user_group.users_group_id)
1196 1200 if members:
1197 1201 for user in members:
1198 1202 UserGroupModel().add_user_to_group(user_group, user)
1199 1203 return user_group
1200 1204
1201 1205 def grant_user_permission(self, user_name, permission_name):
1202 1206 self._inherit_default_user_permissions(user_name, False)
1203 1207 self.user_permissions.append((user_name, permission_name))
1204 1208
1205 1209 def grant_user_permission_to_repo_group(
1206 1210 self, repo_group, user, permission_name):
1207 1211 permission = RepoGroupModel().grant_user_permission(
1208 1212 repo_group, user, permission_name)
1209 1213 self.user_repo_group_permission_ids.append(
1210 1214 (repo_group.group_id, user.user_id))
1211 1215 return permission
1212 1216
1213 1217 def grant_user_group_permission_to_repo_group(
1214 1218 self, repo_group, user_group, permission_name):
1215 1219 permission = RepoGroupModel().grant_user_group_permission(
1216 1220 repo_group, user_group, permission_name)
1217 1221 self.user_group_repo_group_permission_ids.append(
1218 1222 (repo_group.group_id, user_group.users_group_id))
1219 1223 return permission
1220 1224
1221 1225 def grant_user_permission_to_repo(
1222 1226 self, repo, user, permission_name):
1223 1227 permission = RepoModel().grant_user_permission(
1224 1228 repo, user, permission_name)
1225 1229 self.user_repo_permission_ids.append(
1226 1230 (repo.repo_id, user.user_id))
1227 1231 return permission
1228 1232
1229 1233 def grant_user_group_permission_to_repo(
1230 1234 self, repo, user_group, permission_name):
1231 1235 permission = RepoModel().grant_user_group_permission(
1232 1236 repo, user_group, permission_name)
1233 1237 self.user_group_repo_permission_ids.append(
1234 1238 (repo.repo_id, user_group.users_group_id))
1235 1239 return permission
1236 1240
1237 1241 def grant_user_permission_to_user_group(
1238 1242 self, target_user_group, user, permission_name):
1239 1243 permission = UserGroupModel().grant_user_permission(
1240 1244 target_user_group, user, permission_name)
1241 1245 self.user_user_group_permission_ids.append(
1242 1246 (target_user_group.users_group_id, user.user_id))
1243 1247 return permission
1244 1248
1245 1249 def grant_user_group_permission_to_user_group(
1246 1250 self, target_user_group, user_group, permission_name):
1247 1251 permission = UserGroupModel().grant_user_group_permission(
1248 1252 target_user_group, user_group, permission_name)
1249 1253 self.user_group_user_group_permission_ids.append(
1250 1254 (target_user_group.users_group_id, user_group.users_group_id))
1251 1255 return permission
1252 1256
1253 1257 def revoke_user_permission(self, user_name, permission_name):
1254 1258 self._inherit_default_user_permissions(user_name, True)
1255 1259 UserModel().revoke_perm(user_name, permission_name)
1256 1260
1257 1261 def _inherit_default_user_permissions(self, user_name, value):
1258 1262 user = UserModel().get_by_username(user_name)
1259 1263 user.inherit_default_permissions = value
1260 1264 Session().add(user)
1261 1265 Session().commit()
1262 1266
1263 1267 def cleanup(self):
1264 1268 self._cleanup_permissions()
1265 1269 self._cleanup_repos()
1266 1270 self._cleanup_repo_groups()
1267 1271 self._cleanup_user_groups()
1268 1272 self._cleanup_users()
1269 1273
1270 1274 def _cleanup_permissions(self):
1271 1275 if self.user_permissions:
1272 1276 for user_name, permission_name in self.user_permissions:
1273 1277 self.revoke_user_permission(user_name, permission_name)
1274 1278
1275 1279 for permission in self.user_repo_permission_ids:
1276 1280 RepoModel().revoke_user_permission(*permission)
1277 1281
1278 1282 for permission in self.user_group_repo_permission_ids:
1279 1283 RepoModel().revoke_user_group_permission(*permission)
1280 1284
1281 1285 for permission in self.user_repo_group_permission_ids:
1282 1286 RepoGroupModel().revoke_user_permission(*permission)
1283 1287
1284 1288 for permission in self.user_group_repo_group_permission_ids:
1285 1289 RepoGroupModel().revoke_user_group_permission(*permission)
1286 1290
1287 1291 for permission in self.user_user_group_permission_ids:
1288 1292 UserGroupModel().revoke_user_permission(*permission)
1289 1293
1290 1294 for permission in self.user_group_user_group_permission_ids:
1291 1295 UserGroupModel().revoke_user_group_permission(*permission)
1292 1296
1293 1297 def _cleanup_repo_groups(self):
1294 1298 def _repo_group_compare(first_group_id, second_group_id):
1295 1299 """
1296 1300 Gives higher priority to the groups with the most complex paths
1297 1301 """
1298 1302 first_group = RepoGroup.get(first_group_id)
1299 1303 second_group = RepoGroup.get(second_group_id)
1300 1304 first_group_parts = (
1301 1305 len(first_group.group_name.split('/')) if first_group else 0)
1302 1306 second_group_parts = (
1303 1307 len(second_group.group_name.split('/')) if second_group else 0)
1304 1308 return cmp(second_group_parts, first_group_parts)
1305 1309
1306 1310 sorted_repo_group_ids = sorted(
1307 1311 self.repo_group_ids, cmp=_repo_group_compare)
1308 1312 for repo_group_id in sorted_repo_group_ids:
1309 1313 self.fixture.destroy_repo_group(repo_group_id)
1310 1314
1311 1315 def _cleanup_repos(self):
1312 1316 sorted_repos_ids = sorted(self.repos_ids)
1313 1317 for repo_id in sorted_repos_ids:
1314 1318 self.fixture.destroy_repo(repo_id)
1315 1319
1316 1320 def _cleanup_user_groups(self):
1317 1321 def _user_group_compare(first_group_id, second_group_id):
1318 1322 """
1319 1323 Gives higher priority to the groups with the most complex paths
1320 1324 """
1321 1325 first_group = UserGroup.get(first_group_id)
1322 1326 second_group = UserGroup.get(second_group_id)
1323 1327 first_group_parts = (
1324 1328 len(first_group.users_group_name.split('/'))
1325 1329 if first_group else 0)
1326 1330 second_group_parts = (
1327 1331 len(second_group.users_group_name.split('/'))
1328 1332 if second_group else 0)
1329 1333 return cmp(second_group_parts, first_group_parts)
1330 1334
1331 1335 sorted_user_group_ids = sorted(
1332 1336 self.user_group_ids, cmp=_user_group_compare)
1333 1337 for user_group_id in sorted_user_group_ids:
1334 1338 self.fixture.destroy_user_group(user_group_id)
1335 1339
1336 1340 def _cleanup_users(self):
1337 1341 for user_id in self.user_ids:
1338 1342 self.fixture.destroy_user(user_id)
1339 1343
1340 1344
1341 1345 # TODO: Think about moving this into a pytest-pyro package and make it a
1342 1346 # pytest plugin
1343 1347 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1344 1348 def pytest_runtest_makereport(item, call):
1345 1349 """
1346 1350 Adding the remote traceback if the exception has this information.
1347 1351
1348 1352 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1349 1353 to the exception instance.
1350 1354 """
1351 1355 outcome = yield
1352 1356 report = outcome.get_result()
1353 1357 if call.excinfo:
1354 1358 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1355 1359
1356 1360
1357 1361 def _add_vcsserver_remote_traceback(report, exc):
1358 1362 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1359 1363
1360 1364 if vcsserver_traceback:
1361 1365 section = 'VCSServer remote traceback ' + report.when
1362 1366 report.sections.append((section, vcsserver_traceback))
1363 1367
1364 1368
1365 1369 @pytest.fixture(scope='session')
1366 1370 def testrun():
1367 1371 return {
1368 1372 'uuid': uuid.uuid4(),
1369 1373 'start': datetime.datetime.utcnow().isoformat(),
1370 1374 'timestamp': int(time.time()),
1371 1375 }
1372 1376
1373 1377
1374 1378 @pytest.fixture(autouse=True)
1375 1379 def collect_appenlight_stats(request, testrun):
1376 1380 """
1377 1381 This fixture reports memory consumtion of single tests.
1378 1382
1379 1383 It gathers data based on `psutil` and sends them to Appenlight. The option
1380 1384 ``--ae`` has te be used to enable this fixture and the API key for your
1381 1385 application has to be provided in ``--ae-key``.
1382 1386 """
1383 1387 try:
1384 1388 # cygwin cannot have yet psutil support.
1385 1389 import psutil
1386 1390 except ImportError:
1387 1391 return
1388 1392
1389 1393 if not request.config.getoption('--appenlight'):
1390 1394 return
1391 1395 else:
1392 1396 # Only request the baseapp fixture if appenlight tracking is
1393 1397 # enabled. This will speed up a test run of unit tests by 2 to 3
1394 1398 # seconds if appenlight is not enabled.
1395 1399 baseapp = request.getfuncargvalue("baseapp")
1396 1400 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1397 1401 client = AppenlightClient(
1398 1402 url=url,
1399 1403 api_key=request.config.getoption('--appenlight-api-key'),
1400 1404 namespace=request.node.nodeid,
1401 1405 request=str(testrun['uuid']),
1402 1406 testrun=testrun)
1403 1407
1404 1408 client.collect({
1405 1409 'message': "Starting",
1406 1410 })
1407 1411
1408 1412 server_and_port = baseapp.config.get_settings()['vcs.server']
1409 1413 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1410 1414 server = create_vcsserver_proxy(server_and_port, protocol)
1411 1415 with server:
1412 1416 vcs_pid = server.get_pid()
1413 1417 server.run_gc()
1414 1418 vcs_process = psutil.Process(vcs_pid)
1415 1419 mem = vcs_process.memory_info()
1416 1420 client.tag_before('vcsserver.rss', mem.rss)
1417 1421 client.tag_before('vcsserver.vms', mem.vms)
1418 1422
1419 1423 test_process = psutil.Process()
1420 1424 mem = test_process.memory_info()
1421 1425 client.tag_before('test.rss', mem.rss)
1422 1426 client.tag_before('test.vms', mem.vms)
1423 1427
1424 1428 client.tag_before('time', time.time())
1425 1429
1426 1430 @request.addfinalizer
1427 1431 def send_stats():
1428 1432 client.tag_after('time', time.time())
1429 1433 with server:
1430 1434 gc_stats = server.run_gc()
1431 1435 for tag, value in gc_stats.items():
1432 1436 client.tag_after(tag, value)
1433 1437 mem = vcs_process.memory_info()
1434 1438 client.tag_after('vcsserver.rss', mem.rss)
1435 1439 client.tag_after('vcsserver.vms', mem.vms)
1436 1440
1437 1441 mem = test_process.memory_info()
1438 1442 client.tag_after('test.rss', mem.rss)
1439 1443 client.tag_after('test.vms', mem.vms)
1440 1444
1441 1445 client.collect({
1442 1446 'message': "Finished",
1443 1447 })
1444 1448 client.send_stats()
1445 1449
1446 1450 return client
1447 1451
1448 1452
1449 1453 class AppenlightClient():
1450 1454
1451 1455 url_template = '{url}?protocol_version=0.5'
1452 1456
1453 1457 def __init__(
1454 1458 self, url, api_key, add_server=True, add_timestamp=True,
1455 1459 namespace=None, request=None, testrun=None):
1456 1460 self.url = self.url_template.format(url=url)
1457 1461 self.api_key = api_key
1458 1462 self.add_server = add_server
1459 1463 self.add_timestamp = add_timestamp
1460 1464 self.namespace = namespace
1461 1465 self.request = request
1462 1466 self.server = socket.getfqdn(socket.gethostname())
1463 1467 self.tags_before = {}
1464 1468 self.tags_after = {}
1465 1469 self.stats = []
1466 1470 self.testrun = testrun or {}
1467 1471
1468 1472 def tag_before(self, tag, value):
1469 1473 self.tags_before[tag] = value
1470 1474
1471 1475 def tag_after(self, tag, value):
1472 1476 self.tags_after[tag] = value
1473 1477
1474 1478 def collect(self, data):
1475 1479 if self.add_server:
1476 1480 data.setdefault('server', self.server)
1477 1481 if self.add_timestamp:
1478 1482 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1479 1483 if self.namespace:
1480 1484 data.setdefault('namespace', self.namespace)
1481 1485 if self.request:
1482 1486 data.setdefault('request', self.request)
1483 1487 self.stats.append(data)
1484 1488
1485 1489 def send_stats(self):
1486 1490 tags = [
1487 1491 ('testrun', self.request),
1488 1492 ('testrun.start', self.testrun['start']),
1489 1493 ('testrun.timestamp', self.testrun['timestamp']),
1490 1494 ('test', self.namespace),
1491 1495 ]
1492 1496 for key, value in self.tags_before.items():
1493 1497 tags.append((key + '.before', value))
1494 1498 try:
1495 1499 delta = self.tags_after[key] - value
1496 1500 tags.append((key + '.delta', delta))
1497 1501 except Exception:
1498 1502 pass
1499 1503 for key, value in self.tags_after.items():
1500 1504 tags.append((key + '.after', value))
1501 1505 self.collect({
1502 1506 'message': "Collected tags",
1503 1507 'tags': tags,
1504 1508 })
1505 1509
1506 1510 response = requests.post(
1507 1511 self.url,
1508 1512 headers={
1509 1513 'X-appenlight-api-key': self.api_key},
1510 1514 json=self.stats,
1511 1515 )
1512 1516
1513 1517 if not response.status_code == 200:
1514 1518 pprint.pprint(self.stats)
1515 1519 print(response.headers)
1516 1520 print(response.text)
1517 1521 raise Exception('Sending to appenlight failed')
1518 1522
1519 1523
1520 1524 @pytest.fixture
1521 1525 def gist_util(request, db_connection):
1522 1526 """
1523 1527 Provides a wired instance of `GistUtility` with integrated cleanup.
1524 1528 """
1525 1529 utility = GistUtility()
1526 1530 request.addfinalizer(utility.cleanup)
1527 1531 return utility
1528 1532
1529 1533
1530 1534 class GistUtility(object):
1531 1535 def __init__(self):
1532 1536 self.fixture = Fixture()
1533 1537 self.gist_ids = []
1534 1538
1535 1539 def create_gist(self, **kwargs):
1536 1540 gist = self.fixture.create_gist(**kwargs)
1537 1541 self.gist_ids.append(gist.gist_id)
1538 1542 return gist
1539 1543
1540 1544 def cleanup(self):
1541 1545 for id_ in self.gist_ids:
1542 1546 self.fixture.destroy_gists(str(id_))
1543 1547
1544 1548
1545 1549 @pytest.fixture
1546 1550 def enabled_backends(request):
1547 1551 backends = request.config.option.backends
1548 1552 return backends[:]
1549 1553
1550 1554
1551 1555 @pytest.fixture
1552 1556 def settings_util(request, db_connection):
1553 1557 """
1554 1558 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1555 1559 """
1556 1560 utility = SettingsUtility()
1557 1561 request.addfinalizer(utility.cleanup)
1558 1562 return utility
1559 1563
1560 1564
1561 1565 class SettingsUtility(object):
1562 1566 def __init__(self):
1563 1567 self.rhodecode_ui_ids = []
1564 1568 self.rhodecode_setting_ids = []
1565 1569 self.repo_rhodecode_ui_ids = []
1566 1570 self.repo_rhodecode_setting_ids = []
1567 1571
1568 1572 def create_repo_rhodecode_ui(
1569 1573 self, repo, section, value, key=None, active=True, cleanup=True):
1570 1574 key = key or hashlib.sha1(
1571 1575 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1572 1576
1573 1577 setting = RepoRhodeCodeUi()
1574 1578 setting.repository_id = repo.repo_id
1575 1579 setting.ui_section = section
1576 1580 setting.ui_value = value
1577 1581 setting.ui_key = key
1578 1582 setting.ui_active = active
1579 1583 Session().add(setting)
1580 1584 Session().commit()
1581 1585
1582 1586 if cleanup:
1583 1587 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1584 1588 return setting
1585 1589
1586 1590 def create_rhodecode_ui(
1587 1591 self, section, value, key=None, active=True, cleanup=True):
1588 1592 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1589 1593
1590 1594 setting = RhodeCodeUi()
1591 1595 setting.ui_section = section
1592 1596 setting.ui_value = value
1593 1597 setting.ui_key = key
1594 1598 setting.ui_active = active
1595 1599 Session().add(setting)
1596 1600 Session().commit()
1597 1601
1598 1602 if cleanup:
1599 1603 self.rhodecode_ui_ids.append(setting.ui_id)
1600 1604 return setting
1601 1605
1602 1606 def create_repo_rhodecode_setting(
1603 1607 self, repo, name, value, type_, cleanup=True):
1604 1608 setting = RepoRhodeCodeSetting(
1605 1609 repo.repo_id, key=name, val=value, type=type_)
1606 1610 Session().add(setting)
1607 1611 Session().commit()
1608 1612
1609 1613 if cleanup:
1610 1614 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1611 1615 return setting
1612 1616
1613 1617 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1614 1618 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1615 1619 Session().add(setting)
1616 1620 Session().commit()
1617 1621
1618 1622 if cleanup:
1619 1623 self.rhodecode_setting_ids.append(setting.app_settings_id)
1620 1624
1621 1625 return setting
1622 1626
1623 1627 def cleanup(self):
1624 1628 for id_ in self.rhodecode_ui_ids:
1625 1629 setting = RhodeCodeUi.get(id_)
1626 1630 Session().delete(setting)
1627 1631
1628 1632 for id_ in self.rhodecode_setting_ids:
1629 1633 setting = RhodeCodeSetting.get(id_)
1630 1634 Session().delete(setting)
1631 1635
1632 1636 for id_ in self.repo_rhodecode_ui_ids:
1633 1637 setting = RepoRhodeCodeUi.get(id_)
1634 1638 Session().delete(setting)
1635 1639
1636 1640 for id_ in self.repo_rhodecode_setting_ids:
1637 1641 setting = RepoRhodeCodeSetting.get(id_)
1638 1642 Session().delete(setting)
1639 1643
1640 1644 Session().commit()
1641 1645
1642 1646
1643 1647 @pytest.fixture
1644 1648 def no_notifications(request):
1645 1649 notification_patcher = mock.patch(
1646 1650 'rhodecode.model.notification.NotificationModel.create')
1647 1651 notification_patcher.start()
1648 1652 request.addfinalizer(notification_patcher.stop)
1649 1653
1650 1654
1651 1655 @pytest.fixture(scope='session')
1652 1656 def repeat(request):
1653 1657 """
1654 1658 The number of repetitions is based on this fixture.
1655 1659
1656 1660 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1657 1661 tests are not too slow in our default test suite.
1658 1662 """
1659 1663 return request.config.getoption('--repeat')
1660 1664
1661 1665
1662 1666 @pytest.fixture
1663 1667 def rhodecode_fixtures():
1664 1668 return Fixture()
1665 1669
1666 1670
1667 1671 @pytest.fixture
1668 1672 def context_stub():
1669 1673 """
1670 1674 Stub context object.
1671 1675 """
1672 1676 context = pyramid.testing.DummyResource()
1673 1677 return context
1674 1678
1675 1679
1676 1680 @pytest.fixture
1677 1681 def request_stub():
1678 1682 """
1679 1683 Stub request object.
1680 1684 """
1681 1685 from rhodecode.lib.base import bootstrap_request
1682 1686 request = bootstrap_request(scheme='https')
1683 1687 return request
1684 1688
1685 1689
1686 1690 @pytest.fixture
1687 1691 def config_stub(request, request_stub):
1688 1692 """
1689 1693 Set up pyramid.testing and return the Configurator.
1690 1694 """
1691 1695 from rhodecode.lib.base import bootstrap_config
1692 1696 config = bootstrap_config(request=request_stub)
1693 1697
1694 1698 @request.addfinalizer
1695 1699 def cleanup():
1696 1700 pyramid.testing.tearDown()
1697 1701
1698 1702 return config
1699 1703
1700 1704
1701 1705 @pytest.fixture
1702 1706 def StubIntegrationType():
1703 1707 class _StubIntegrationType(IntegrationTypeBase):
1704 1708 """ Test integration type class """
1705 1709
1706 1710 key = 'test'
1707 1711 display_name = 'Test integration type'
1708 1712 description = 'A test integration type for testing'
1709 1713
1710 1714 @classmethod
1711 1715 def icon(cls):
1712 1716 return 'test_icon_html_image'
1713 1717
1714 1718 def __init__(self, settings):
1715 1719 super(_StubIntegrationType, self).__init__(settings)
1716 1720 self.sent_events = [] # for testing
1717 1721
1718 1722 def send_event(self, event):
1719 1723 self.sent_events.append(event)
1720 1724
1721 1725 def settings_schema(self):
1722 1726 class SettingsSchema(colander.Schema):
1723 1727 test_string_field = colander.SchemaNode(
1724 1728 colander.String(),
1725 1729 missing=colander.required,
1726 1730 title='test string field',
1727 1731 )
1728 1732 test_int_field = colander.SchemaNode(
1729 1733 colander.Int(),
1730 1734 title='some integer setting',
1731 1735 )
1732 1736 return SettingsSchema()
1733 1737
1734 1738
1735 1739 integration_type_registry.register_integration_type(_StubIntegrationType)
1736 1740 return _StubIntegrationType
1737 1741
1738 1742 @pytest.fixture
1739 1743 def stub_integration_settings():
1740 1744 return {
1741 1745 'test_string_field': 'some data',
1742 1746 'test_int_field': 100,
1743 1747 }
1744 1748
1745 1749
1746 1750 @pytest.fixture
1747 1751 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1748 1752 stub_integration_settings):
1749 1753 integration = IntegrationModel().create(
1750 1754 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1751 1755 name='test repo integration',
1752 1756 repo=repo_stub, repo_group=None, child_repos_only=None)
1753 1757
1754 1758 @request.addfinalizer
1755 1759 def cleanup():
1756 1760 IntegrationModel().delete(integration)
1757 1761
1758 1762 return integration
1759 1763
1760 1764
1761 1765 @pytest.fixture
1762 1766 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1763 1767 stub_integration_settings):
1764 1768 integration = IntegrationModel().create(
1765 1769 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1766 1770 name='test repogroup integration',
1767 1771 repo=None, repo_group=test_repo_group, child_repos_only=True)
1768 1772
1769 1773 @request.addfinalizer
1770 1774 def cleanup():
1771 1775 IntegrationModel().delete(integration)
1772 1776
1773 1777 return integration
1774 1778
1775 1779
1776 1780 @pytest.fixture
1777 1781 def repogroup_recursive_integration_stub(request, test_repo_group,
1778 1782 StubIntegrationType, stub_integration_settings):
1779 1783 integration = IntegrationModel().create(
1780 1784 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1781 1785 name='test recursive repogroup integration',
1782 1786 repo=None, repo_group=test_repo_group, child_repos_only=False)
1783 1787
1784 1788 @request.addfinalizer
1785 1789 def cleanup():
1786 1790 IntegrationModel().delete(integration)
1787 1791
1788 1792 return integration
1789 1793
1790 1794
1791 1795 @pytest.fixture
1792 1796 def global_integration_stub(request, StubIntegrationType,
1793 1797 stub_integration_settings):
1794 1798 integration = IntegrationModel().create(
1795 1799 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1796 1800 name='test global integration',
1797 1801 repo=None, repo_group=None, child_repos_only=None)
1798 1802
1799 1803 @request.addfinalizer
1800 1804 def cleanup():
1801 1805 IntegrationModel().delete(integration)
1802 1806
1803 1807 return integration
1804 1808
1805 1809
1806 1810 @pytest.fixture
1807 1811 def root_repos_integration_stub(request, StubIntegrationType,
1808 1812 stub_integration_settings):
1809 1813 integration = IntegrationModel().create(
1810 1814 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1811 1815 name='test global integration',
1812 1816 repo=None, repo_group=None, child_repos_only=True)
1813 1817
1814 1818 @request.addfinalizer
1815 1819 def cleanup():
1816 1820 IntegrationModel().delete(integration)
1817 1821
1818 1822 return integration
1819 1823
1820 1824
1821 1825 @pytest.fixture
1822 1826 def local_dt_to_utc():
1823 1827 def _factory(dt):
1824 1828 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1825 1829 dateutil.tz.tzutc()).replace(tzinfo=None)
1826 1830 return _factory
1827 1831
1828 1832
1829 1833 @pytest.fixture
1830 1834 def disable_anonymous_user(request, baseapp):
1831 1835 set_anonymous_access(False)
1832 1836
1833 1837 @request.addfinalizer
1834 1838 def cleanup():
1835 1839 set_anonymous_access(True)
1836 1840
1837 1841
1838 1842 @pytest.fixture(scope='module')
1839 1843 def rc_fixture(request):
1840 1844 return Fixture()
1841 1845
1842 1846
1843 1847 @pytest.fixture
1844 1848 def repo_groups(request):
1845 1849 fixture = Fixture()
1846 1850
1847 1851 session = Session()
1848 1852 zombie_group = fixture.create_repo_group('zombie')
1849 1853 parent_group = fixture.create_repo_group('parent')
1850 1854 child_group = fixture.create_repo_group('parent/child')
1851 1855 groups_in_db = session.query(RepoGroup).all()
1852 1856 assert len(groups_in_db) == 3
1853 1857 assert child_group.group_parent_id == parent_group.group_id
1854 1858
1855 1859 @request.addfinalizer
1856 1860 def cleanup():
1857 1861 fixture.destroy_repo_group(zombie_group)
1858 1862 fixture.destroy_repo_group(child_group)
1859 1863 fixture.destroy_repo_group(parent_group)
1860 1864
1861 1865 return zombie_group, parent_group, child_group
General Comments 0
You need to be logged in to leave comments. Login now