##// END OF EJS Templates
Added rewritten validators module + tests
marcink -
r2466:7010dc12 codereview
parent child Browse files
Show More
This diff has been collapsed as it changes many lines, (592 lines changed) Show them Hide them
@@ -0,0 +1,592 b''
1 """
2 Set of generic validators
3 """
4 import os
5 import re
6 import formencode
7 import logging
8 from pylons.i18n.translation import _
9 from webhelpers.pylonslib.secure_form import authentication_token
10
11 from formencode.validators import (
12 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set
13 )
14
15 from rhodecode.lib.utils import repo_name_slug
16 from rhodecode.model.db import RepoGroup, Repository, UsersGroup, User
17 from rhodecode.lib.auth import authenticate
18 from rhodecode.lib.exceptions import LdapImportError
19 from rhodecode.config.routing import ADMIN_PREFIX
20 # silence warnings and pylint
21 UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set
22
23 log = logging.getLogger(__name__)
24
25
26 class StateObj(object):
27 """
28 this is needed to translate the messages using _() in validators
29 """
30 _ = staticmethod(_)
31
32
33 def M(self, key, state=None, **kwargs):
34 """
35 returns string from self.message based on given key,
36 passed kw params are used to substitute %(named)s params inside
37 translated strings
38
39 :param msg:
40 :param state:
41 """
42 if state is None:
43 state = StateObj()
44 else:
45 state._ = staticmethod(_)
46 #inject validator into state object
47 return self.message(key, state, **kwargs)
48
49
50 def ValidUsername(edit=False, old_data={}):
51 class _validator(formencode.validators.FancyValidator):
52 messages = {
53 'username_exists': _(u'Username "%(username)s" already exists'),
54 'system_invalid_username':
55 _(u'Username "%(username)s" is forbidden'),
56 'invalid_username':
57 _(u'Username may only contain alphanumeric characters '
58 'underscores, periods or dashes and must begin with '
59 'alphanumeric character')
60 }
61
62 def validate_python(self, value, state):
63 if value in ['default', 'new_user']:
64 msg = M(self, 'system_invalid_username', state, username=value)
65 raise formencode.Invalid(msg, value, state)
66 #check if user is unique
67 old_un = None
68 if edit:
69 old_un = User.get(old_data.get('user_id')).username
70
71 if old_un != value or not edit:
72 if User.get_by_username(value, case_insensitive=True):
73 msg = M(self, 'username_exists', state, username=value)
74 raise formencode.Invalid(msg, value, state)
75
76 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
77 msg = M(self, 'invalid_username', state)
78 raise formencode.Invalid(msg, value, state)
79 return _validator
80
81
82 def ValidRepoUser():
83 class _validator(formencode.validators.FancyValidator):
84 messages = {
85 'invalid_username': _(u'Username %(username)s is not valid')
86 }
87
88 def validate_python(self, value, state):
89 try:
90 User.query().filter(User.active == True)\
91 .filter(User.username == value).one()
92 except Exception:
93 msg = M(self, 'invalid_username', state, username=value)
94 raise formencode.Invalid(msg, value, state,
95 error_dict=dict(username=msg)
96 )
97
98 return _validator
99
100
101 def ValidUsersGroup(edit=False, old_data={}):
102 class _validator(formencode.validators.FancyValidator):
103 messages = {
104 'invalid_group': _(u'Invalid users group name'),
105 'group_exist': _(u'Users group "%(usersgroup)s" already exists'),
106 'invalid_usersgroup_name':
107 _(u'users group name may only contain alphanumeric '
108 'characters underscores, periods or dashes and must begin '
109 'with alphanumeric character')
110 }
111
112 def validate_python(self, value, state):
113 if value in ['default']:
114 msg = M(self, 'invalid_group', state)
115 raise formencode.Invalid(msg, value, state,
116 error_dict=dict(users_group_name=msg)
117 )
118 #check if group is unique
119 old_ugname = None
120 if edit:
121 old_id = old_data.get('users_group_id')
122 old_ugname = UsersGroup.get(old_id).users_group_name
123
124 if old_ugname != value or not edit:
125 is_existing_group = UsersGroup.get_by_group_name(value,
126 case_insensitive=True)
127 if is_existing_group:
128 msg = M(self, 'group_exist', state, usersgroup=value)
129 raise formencode.Invalid(msg, value, state,
130 error_dict=dict(users_group_name=msg)
131 )
132
133 if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
134 msg = M(self, 'invalid_usersgroup_name', state)
135 raise formencode.Invalid(msg, value, state,
136 error_dict=dict(users_group_name=msg)
137 )
138
139 return _validator
140
141
142 def ValidReposGroup(edit=False, old_data={}):
143 class _validator(formencode.validators.FancyValidator):
144 messages = {
145 'group_parent_id': _(u'Cannot assign this group as parent'),
146 'group_exists': _(u'Group "%(group_name)s" already exists'),
147 'repo_exists':
148 _(u'Repository with name "%(group_name)s" already exists')
149 }
150
151 def validate_python(self, value, state):
152 # TODO WRITE VALIDATIONS
153 group_name = value.get('group_name')
154 group_parent_id = value.get('group_parent_id')
155
156 # slugify repo group just in case :)
157 slug = repo_name_slug(group_name)
158
159 # check for parent of self
160 parent_of_self = lambda: (
161 old_data['group_id'] == int(group_parent_id)
162 if group_parent_id else False
163 )
164 if edit and parent_of_self():
165 msg = M(self, 'group_parent_id', state)
166 raise formencode.Invalid(msg, value, state,
167 error_dict=dict(group_parent_id=msg)
168 )
169
170 old_gname = None
171 if edit:
172 old_gname = RepoGroup.get(old_data.get('group_id')).group_name
173
174 if old_gname != group_name or not edit:
175
176 # check group
177 gr = RepoGroup.query()\
178 .filter(RepoGroup.group_name == slug)\
179 .filter(RepoGroup.group_parent_id == group_parent_id)\
180 .scalar()
181
182 if gr:
183 msg = M(self, 'group_exists', state, group_name=slug)
184 raise formencode.Invalid(msg, value, state,
185 error_dict=dict(group_name=msg)
186 )
187
188 # check for same repo
189 repo = Repository.query()\
190 .filter(Repository.repo_name == slug)\
191 .scalar()
192
193 if repo:
194 msg = M(self, 'repo_exists', state, group_name=slug)
195 raise formencode.Invalid(msg, value, state,
196 error_dict=dict(group_name=msg)
197 )
198
199 return _validator
200
201
202 def ValidPassword():
203 class _validator(formencode.validators.FancyValidator):
204 messages = {
205 'invalid_password':
206 _(u'Invalid characters (non-ascii) in password')
207 }
208
209 def validate_python(self, value, state):
210 try:
211 (value or '').decode('ascii')
212 except UnicodeError:
213 msg = M(self, 'invalid_password', state)
214 raise formencode.Invalid(msg, value, state,)
215 return _validator
216
217
218 def ValidPasswordsMatch():
219 class _validator(formencode.validators.FancyValidator):
220 messages = {
221 'password_mismatch': _(u'Passwords do not match'),
222 }
223
224 def validate_python(self, value, state):
225
226 pass_val = value.get('password') or value.get('new_password')
227 if pass_val != value['password_confirmation']:
228 msg = M(self, 'password_mismatch', state)
229 raise formencode.Invalid(msg, value, state,
230 error_dict=dict(password_confirmation=msg)
231 )
232 return _validator
233
234
235 def ValidAuth():
236 class _validator(formencode.validators.FancyValidator):
237 messages = {
238 'invalid_password': _(u'invalid password'),
239 'invalid_username': _(u'invalid user name'),
240 'disabled_account': _(u'Your account is disabled')
241 }
242
243 def validate_python(self, value, state):
244 password = value['password']
245 username = value['username']
246
247 if not authenticate(username, password):
248 user = User.get_by_username(username)
249 if user and user.active is False:
250 log.warning('user %s is disabled' % username)
251 msg = M(self, 'disabled_account', state)
252 raise formencode.Invalid(msg, value, state,
253 error_dict=dict(username=msg)
254 )
255 else:
256 log.warning('user %s failed to authenticate' % username)
257 msg = M(self, 'invalid_username', state)
258 msg2 = M(self, 'invalid_password', state)
259 raise formencode.Invalid(msg, value, state,
260 error_dict=dict(username=msg, password=msg2)
261 )
262 return _validator
263
264
265 def ValidAuthToken():
266 class _validator(formencode.validators.FancyValidator):
267 messages = {
268 'invalid_token': _(u'Token mismatch')
269 }
270
271 def validate_python(self, value, state):
272 if value != authentication_token():
273 msg = M(self, 'invalid_token', state)
274 raise formencode.Invalid(msg, value, state)
275 return _validator
276
277
278 def ValidRepoName(edit=False, old_data={}):
279 class _validator(formencode.validators.FancyValidator):
280 messages = {
281 'invalid_repo_name':
282 _(u'Repository name %(repo)s is disallowed'),
283 'repository_exists':
284 _(u'Repository named %(repo)s already exists'),
285 'repository_in_group_exists': _(u'Repository "%(repo)s" already '
286 'exists in group "%(group)s"'),
287 'same_group_exists': _(u'Repositories group with name "%(repo)s" '
288 'already exists')
289 }
290
291 def _to_python(self, value, state):
292 repo_name = repo_name_slug(value.get('repo_name', ''))
293 repo_group = value.get('repo_group')
294 if repo_group:
295 gr = RepoGroup.get(repo_group)
296 group_path = gr.full_path
297 group_name = gr.group_name
298 # value needs to be aware of group name in order to check
299 # db key This is an actual just the name to store in the
300 # database
301 repo_name_full = group_path + RepoGroup.url_sep() + repo_name
302 else:
303 group_name = group_path = ''
304 repo_name_full = repo_name
305
306 value['repo_name'] = repo_name
307 value['repo_name_full'] = repo_name_full
308 value['group_path'] = group_path
309 value['group_name'] = group_name
310 return value
311
312 def validate_python(self, value, state):
313
314 repo_name = value.get('repo_name')
315 repo_name_full = value.get('repo_name_full')
316 group_path = value.get('group_path')
317 group_name = value.get('group_name')
318
319 if repo_name in [ADMIN_PREFIX, '']:
320 msg = M(self, 'invalid_repo_name', state, repo=repo_name)
321 raise formencode.Invalid(msg, value, state,
322 error_dict=dict(repo_name=msg)
323 )
324
325 rename = old_data.get('repo_name') != repo_name_full
326 create = not edit
327 if rename or create:
328
329 if group_path != '':
330 if Repository.get_by_repo_name(repo_name_full):
331 msg = M(self, 'repository_in_group_exists', state,
332 repo=repo_name, group=group_name)
333 raise formencode.Invalid(msg, value, state,
334 error_dict=dict(repo_name=msg)
335 )
336 elif RepoGroup.get_by_group_name(repo_name_full):
337 msg = M(self, 'same_group_exists', state,
338 repo=repo_name)
339 raise formencode.Invalid(msg, value, state,
340 error_dict=dict(repo_name=msg)
341 )
342
343 elif Repository.get_by_repo_name(repo_name_full):
344 msg = M(self, 'repository_exists', state,
345 repo=repo_name)
346 raise formencode.Invalid(msg, value, state,
347 error_dict=dict(repo_name=msg)
348 )
349 return value
350 return _validator
351
352
353 def ValidForkName(*args, **kwargs):
354 return ValidRepoName(*args, **kwargs)
355
356
357 def SlugifyName():
358 class _validator(formencode.validators.FancyValidator):
359
360 def _to_python(self, value, state):
361 return repo_name_slug(value)
362
363 def validate_python(self, value, state):
364 pass
365
366 return _validator
367
368
369 def ValidCloneUri():
370 from rhodecode.lib.utils import make_ui
371
372 def url_handler(repo_type, url, proto, ui=None):
373 if repo_type == 'hg':
374 from mercurial.httprepo import httprepository, httpsrepository
375 if proto == 'https':
376 httpsrepository(make_ui('db'), url).capabilities
377 elif proto == 'http':
378 httprepository(make_ui('db'), url).capabilities
379 elif repo_type == 'git':
380 #TODO: write a git url validator
381 pass
382
383 class _validator(formencode.validators.FancyValidator):
384 messages = {
385 'clone_uri': _(u'invalid clone url'),
386 'invalid_clone_uri': _(u'Invalid clone url, provide a '
387 'valid clone http\s url')
388 }
389
390 def validate_python(self, value, state):
391 repo_type = value.get('repo_type')
392 url = value.get('clone_uri')
393
394 if not url:
395 pass
396 elif url.startswith('https') or url.startswith('http'):
397 _type = 'https' if url.startswith('https') else 'http'
398 try:
399 url_handler(repo_type, url, _type, make_ui('db'))
400 except Exception:
401 log.exception('Url validation failed')
402 msg = M(self, 'clone_uri')
403 raise formencode.Invalid(msg, value, state,
404 error_dict=dict(clone_uri=msg)
405 )
406 else:
407 msg = M(self, 'invalid_clone_uri', state)
408 raise formencode.Invalid(msg, value, state,
409 error_dict=dict(clone_uri=msg)
410 )
411 return _validator
412
413
414 def ValidForkType(old_data={}):
415 class _validator(formencode.validators.FancyValidator):
416 messages = {
417 'invalid_fork_type': _(u'Fork have to be the same type as parent')
418 }
419
420 def validate_python(self, value, state):
421 if old_data['repo_type'] != value:
422 msg = M(self, 'invalid_fork_type', state)
423 raise formencode.Invalid(msg, value, state,
424 error_dict=dict(repo_type=msg)
425 )
426 return _validator
427
428
429 def ValidPerms(type_='repo'):
430 if type_ == 'group':
431 EMPTY_PERM = 'group.none'
432 elif type_ == 'repo':
433 EMPTY_PERM = 'repository.none'
434
435 class _validator(formencode.validators.FancyValidator):
436 messages = {
437 'perm_new_member_name':
438 _(u'This username or users group name is not valid')
439 }
440
441 def to_python(self, value, state):
442 perms_update = []
443 perms_new = []
444 # build a list of permission to update and new permission to create
445 for k, v in value.items():
446 # means new added member to permissions
447 if k.startswith('perm_new_member'):
448 new_perm = value.get('perm_new_member', False)
449 new_member = value.get('perm_new_member_name', False)
450 new_type = value.get('perm_new_member_type')
451
452 if new_member and new_perm:
453 if (new_member, new_perm, new_type) not in perms_new:
454 perms_new.append((new_member, new_perm, new_type))
455 elif k.startswith('u_perm_') or k.startswith('g_perm_'):
456 member = k[7:]
457 t = {'u': 'user',
458 'g': 'users_group'
459 }[k[0]]
460 if member == 'default':
461 if value.get('private'):
462 # set none for default when updating to
463 # private repo
464 v = EMPTY_PERM
465 perms_update.append((member, v, t))
466
467 value['perms_updates'] = perms_update
468 value['perms_new'] = perms_new
469
470 # update permissions
471 for k, v, t in perms_new:
472 try:
473 if t is 'user':
474 self.user_db = User.query()\
475 .filter(User.active == True)\
476 .filter(User.username == k).one()
477 if t is 'users_group':
478 self.user_db = UsersGroup.query()\
479 .filter(UsersGroup.users_group_active == True)\
480 .filter(UsersGroup.users_group_name == k).one()
481
482 except Exception:
483 log.exception('Updated permission failed')
484 msg = M(self, 'perm_new_member_type', state)
485 raise formencode.Invalid(msg, value, state,
486 error_dict=dict(perm_new_member_name=msg)
487 )
488 return value
489 return _validator
490
491
492 def ValidSettings():
493 class _validator(formencode.validators.FancyValidator):
494 def _to_python(self, value, state):
495 # settings form can't edit user
496 if 'user' in value:
497 del value['user']
498 return value
499
500 def validate_python(self, value, state):
501 pass
502 return _validator
503
504
505 def ValidPath():
506 class _validator(formencode.validators.FancyValidator):
507 messages = {
508 'invalid_path': _(u'This is not a valid path')
509 }
510
511 def validate_python(self, value, state):
512 if not os.path.isdir(value):
513 msg = M(self, 'invalid_path', state)
514 raise formencode.Invalid(msg, value, state,
515 error_dict=dict(paths_root_path=msg)
516 )
517 return _validator
518
519
520 def UniqSystemEmail(old_data={}):
521 class _validator(formencode.validators.FancyValidator):
522 messages = {
523 'email_taken': _(u'This e-mail address is already taken')
524 }
525
526 def _to_python(self, value, state):
527 return value.lower()
528
529 def validate_python(self, value, state):
530 if (old_data.get('email') or '').lower() != value:
531 user = User.get_by_email(value, case_insensitive=True)
532 if user:
533 msg = M(self, 'email_taken', state)
534 raise formencode.Invalid(msg, value, state,
535 error_dict=dict(email=msg)
536 )
537 return _validator
538
539
540 def ValidSystemEmail():
541 class _validator(formencode.validators.FancyValidator):
542 messages = {
543 'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
544 }
545
546 def _to_python(self, value, state):
547 return value.lower()
548
549 def validate_python(self, value, state):
550 user = User.get_by_email(value, case_insensitive=True)
551 if user is None:
552 msg = M(self, 'non_existing_email', state, email=value)
553 raise formencode.Invalid(msg, value, state,
554 error_dict=dict(email=msg)
555 )
556
557 return _validator
558
559
560 def LdapLibValidator():
561 class _validator(formencode.validators.FancyValidator):
562 messages = {
563
564 }
565
566 def validate_python(self, value, state):
567 try:
568 import ldap
569 ldap # pyflakes silence !
570 except ImportError:
571 raise LdapImportError()
572
573 return _validator
574
575
576 def AttrLoginValidator():
577 class _validator(formencode.validators.FancyValidator):
578 messages = {
579 'invalid_cn':
580 _(u'The LDAP Login attribute of the CN must be specified - '
581 'this is the name of the attribute that is equivalent '
582 'to "username"')
583 }
584
585 def validate_python(self, value, state):
586 if not value or not isinstance(value, (str, unicode)):
587 msg = M(self, 'invalid_cn', state)
588 raise formencode.Invalid(msg, value, state,
589 error_dict=dict(ldap_attr_login=msg)
590 )
591
592 return _validator
@@ -0,0 +1,222 b''
1 # -*- coding: utf-8 -*-
2 import unittest
3 import formencode
4
5 from rhodecode.tests import *
6
7 from rhodecode.model import validators as v
8 from rhodecode.model.users_group import UsersGroupModel
9
10 from rhodecode.model.meta import Session
11 from rhodecode.model.repos_group import ReposGroupModel
12 from rhodecode.config.routing import ADMIN_PREFIX
13
14
15 class TestReposGroups(unittest.TestCase):
16
17 def setUp(self):
18 pass
19
20 def tearDown(self):
21 pass
22
23 def test_Message_extractor(self):
24 validator = v.ValidUsername()
25 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
26
27 class StateObj(object):
28 pass
29
30 self.assertRaises(formencode.Invalid,
31 validator.to_python, 'default', StateObj)
32
33 def test_ValidUsername(self):
34 validator = v.ValidUsername()
35
36 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
37 self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
38 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
39 self.assertRaises(formencode.Invalid, validator.to_python,
40 TEST_USER_ADMIN_LOGIN)
41 self.assertEqual('test', validator.to_python('test'))
42
43 validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
44
45 def test_ValidRepoUser(self):
46 validator = v.ValidRepoUser()
47 self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
48 self.assertEqual(TEST_USER_ADMIN_LOGIN,
49 validator.to_python(TEST_USER_ADMIN_LOGIN))
50
51 def test_ValidUsersGroup(self):
52 validator = v.ValidUsersGroup()
53 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
54 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
55
56 gr = UsersGroupModel().create('test')
57 gr2 = UsersGroupModel().create('tes2')
58 Session.commit()
59 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
60 assert gr.users_group_id != None
61 validator = v.ValidUsersGroup(edit=True,
62 old_data={'users_group_id':
63 gr2.users_group_id})
64
65 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
66 self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
67 self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
68 UsersGroupModel().delete(gr)
69 UsersGroupModel().delete(gr2)
70 Session.commit()
71
72 def test_ValidReposGroup(self):
73 validator = v.ValidReposGroup()
74 model = ReposGroupModel()
75 self.assertRaises(formencode.Invalid, validator.to_python,
76 {'group_name': HG_REPO, })
77 gr = model.create(group_name='test_gr', group_description='desc',
78 parent=None,
79 just_db=True)
80 self.assertRaises(formencode.Invalid,
81 validator.to_python, {'group_name': gr.group_name, })
82
83 validator = v.ValidReposGroup(edit=True,
84 old_data={'group_id': gr.group_id})
85 self.assertRaises(formencode.Invalid,
86 validator.to_python, {
87 'group_name': gr.group_name + 'n',
88 'group_parent_id': gr.group_id
89 })
90 model.delete(gr)
91
92 def test_ValidPassword(self):
93 validator = v.ValidPassword()
94 self.assertEqual('lol', validator.to_python('lol'))
95 self.assertEqual(None, validator.to_python(None))
96 self.assertRaises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
97
98 def test_ValidPasswordsMatch(self):
99 validator = v.ValidPasswordsMatch()
100 self.assertRaises(formencode.Invalid,
101 validator.to_python, {'password': 'pass',
102 'password_confirmation': 'pass2'})
103
104 self.assertRaises(formencode.Invalid,
105 validator.to_python, {'new_password': 'pass',
106 'password_confirmation': 'pass2'})
107
108 self.assertEqual({'new_password': 'pass',
109 'password_confirmation': 'pass'},
110 validator.to_python({'new_password': 'pass',
111 'password_confirmation': 'pass'}))
112
113 self.assertEqual({'password': 'pass',
114 'password_confirmation': 'pass'},
115 validator.to_python({'password': 'pass',
116 'password_confirmation': 'pass'}))
117
118 def test_ValidAuth(self):
119 validator = v.ValidAuth()
120 valid_creds = {
121 'username': TEST_USER_REGULAR2_LOGIN,
122 'password': TEST_USER_REGULAR2_PASS,
123 }
124 invalid_creds = {
125 'username': 'err',
126 'password': 'err',
127 }
128 self.assertEqual(valid_creds, validator.to_python(valid_creds))
129 self.assertRaises(formencode.Invalid,
130 validator.to_python, invalid_creds)
131
132 def test_ValidAuthToken(self):
133 validator = v.ValidAuthToken()
134 # this is untestable without a threadlocal
135 # self.assertRaises(formencode.Invalid,
136 # validator.to_python, 'BadToken')
137 validator
138
139 def test_ValidRepoName(self):
140 validator = v.ValidRepoName()
141
142 self.assertRaises(formencode.Invalid,
143 validator.to_python, {'repo_name': ''})
144
145 self.assertRaises(formencode.Invalid,
146 validator.to_python, {'repo_name': HG_REPO})
147
148 gr = ReposGroupModel().create(group_name='group_test',
149 group_description='desc',
150 parent=None,)
151 self.assertRaises(formencode.Invalid,
152 validator.to_python, {'repo_name': gr.group_name})
153
154 #TODO: write an error case for that ie. create a repo withinh a group
155 # self.assertRaises(formencode.Invalid,
156 # validator.to_python, {'repo_name': 'some',
157 # 'repo_group': gr.group_id})
158
159 def test_ValidForkName(self):
160 # this uses ValidRepoName validator
161 assert True
162
163 @parameterized.expand([
164 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
165 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
166 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
167 ('/]re po', 're-po')])
168 def test_SlugifyName(self, name, expected):
169 validator = v.SlugifyName()
170 self.assertEqual(expected, validator.to_python(name))
171
172 def test_ValidCloneUri(self):
173 assert False
174
175 def test_ValidForkType(self):
176 validator = v.ValidForkType(old_data={'repo_type': 'hg'})
177 self.assertEqual('hg', validator.to_python('hg'))
178 self.assertRaises(formencode.Invalid, validator.to_python, 'git')
179
180 def test_ValidPerms(self):
181 assert False
182
183 def test_ValidSettings(self):
184 validator = v.ValidSettings()
185 self.assertEqual({'pass': 'pass'},
186 validator.to_python(value={'user': 'test',
187 'pass': 'pass'}))
188
189 self.assertEqual({'user2': 'test', 'pass': 'pass'},
190 validator.to_python(value={'user2': 'test',
191 'pass': 'pass'}))
192
193 def test_ValidPath(self):
194 validator = v.ValidPath()
195 self.assertEqual(TESTS_TMP_PATH,
196 validator.to_python(TESTS_TMP_PATH))
197 self.assertRaises(formencode.Invalid, validator.to_python,
198 '/no_such_dir')
199
200 def test_UniqSystemEmail(self):
201 validator = v.UniqSystemEmail(old_data={})
202
203 self.assertEqual('mail@python.org',
204 validator.to_python('MaiL@Python.org'))
205
206 email = TEST_USER_REGULAR2_EMAIL
207 self.assertRaises(formencode.Invalid, validator.to_python, email)
208
209 def test_ValidSystemEmail(self):
210 validator = v.ValidSystemEmail()
211 email = TEST_USER_REGULAR2_EMAIL
212
213 self.assertEqual(email, validator.to_python(email))
214 self.assertRaises(formencode.Invalid, validator.to_python, 'err')
215
216 def test_LdapLibValidator(self):
217 validator = v.LdapLibValidator()
218 self.assertRaises(v.LdapImportError, validator.to_python, 'err')
219
220 def test_AttrLoginValidator(self):
221 validator = v.AttrLoginValidator()
222 self.assertRaises(formencode.Invalid, validator.to_python, 123)
@@ -1,993 +1,1007 b''
1 1 """Helper functions
2 2
3 3 Consists of functions to typically be used within templates, but also
4 4 available to Controllers. This module is available to both as 'h'.
5 5 """
6 6 import random
7 7 import hashlib
8 8 import StringIO
9 9 import urllib
10 10 import math
11 11 import logging
12 12
13 13 from datetime import datetime
14 14 from pygments.formatters.html import HtmlFormatter
15 15 from pygments import highlight as code_highlight
16 16 from pylons import url, request, config
17 17 from pylons.i18n.translation import _, ungettext
18 18 from hashlib import md5
19 19
20 20 from webhelpers.html import literal, HTML, escape
21 21 from webhelpers.html.tools import *
22 22 from webhelpers.html.builder import make_tag
23 23 from webhelpers.html.tags import auto_discovery_link, checkbox, css_classes, \
24 24 end_form, file, form, hidden, image, javascript_link, link_to, \
25 25 link_to_if, link_to_unless, ol, required_legend, select, stylesheet_link, \
26 26 submit, text, password, textarea, title, ul, xml_declaration, radio
27 27 from webhelpers.html.tools import auto_link, button_to, highlight, \
28 28 js_obfuscate, mail_to, strip_links, strip_tags, tag_re
29 29 from webhelpers.number import format_byte_size, format_bit_size
30 30 from webhelpers.pylonslib import Flash as _Flash
31 31 from webhelpers.pylonslib.secure_form import secure_form
32 32 from webhelpers.text import chop_at, collapse, convert_accented_entities, \
33 33 convert_misc_entities, lchop, plural, rchop, remove_formatting, \
34 34 replace_whitespace, urlify, truncate, wrap_paragraphs
35 35 from webhelpers.date import time_ago_in_words
36 36 from webhelpers.paginate import Page
37 37 from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
38 38 convert_boolean_attrs, NotGiven, _make_safe_id_component
39 39
40 40 from rhodecode.lib.annotate import annotate_highlight
41 41 from rhodecode.lib.utils import repo_name_slug
42 42 from rhodecode.lib.utils2 import str2bool, safe_unicode, safe_str, \
43 43 get_changeset_safe
44 44 from rhodecode.lib.markup_renderer import MarkupRenderer
45 45 from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
46 46 from rhodecode.lib.vcs.backends.base import BaseChangeset
47 47 from rhodecode.config.conf import DATE_FORMAT, DATETIME_FORMAT
48 48 from rhodecode.model.changeset_status import ChangesetStatusModel
49 49 from rhodecode.model.db import URL_SEP
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53
54 html_escape_table = {
55 "&": "&",
56 '"': """,
57 "'": "'",
58 ">": ">",
59 "<": "&lt;",
60 }
61
62
63 def html_escape(text):
64 """Produce entities within text."""
65 return "".join(html_escape_table.get(c,c) for c in text)
66
67
54 68 def shorter(text, size=20):
55 69 postfix = '...'
56 70 if len(text) > size:
57 71 return text[:size - len(postfix)] + postfix
58 72 return text
59 73
60 74
61 75 def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
62 76 """
63 77 Reset button
64 78 """
65 79 _set_input_attrs(attrs, type, name, value)
66 80 _set_id_attr(attrs, id, name)
67 81 convert_boolean_attrs(attrs, ["disabled"])
68 82 return HTML.input(**attrs)
69 83
70 84 reset = _reset
71 85 safeid = _make_safe_id_component
72 86
73 87
74 88 def FID(raw_id, path):
75 89 """
76 90 Creates a uniqe ID for filenode based on it's hash of path and revision
77 91 it's safe to use in urls
78 92
79 93 :param raw_id:
80 94 :param path:
81 95 """
82 96
83 97 return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
84 98
85 99
86 100 def get_token():
87 101 """Return the current authentication token, creating one if one doesn't
88 102 already exist.
89 103 """
90 104 token_key = "_authentication_token"
91 105 from pylons import session
92 106 if not token_key in session:
93 107 try:
94 108 token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
95 109 except AttributeError: # Python < 2.4
96 110 token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
97 111 session[token_key] = token
98 112 if hasattr(session, 'save'):
99 113 session.save()
100 114 return session[token_key]
101 115
102 116
103 117 class _GetError(object):
104 118 """Get error from form_errors, and represent it as span wrapped error
105 119 message
106 120
107 121 :param field_name: field to fetch errors for
108 122 :param form_errors: form errors dict
109 123 """
110 124
111 125 def __call__(self, field_name, form_errors):
112 126 tmpl = """<span class="error_msg">%s</span>"""
113 127 if form_errors and field_name in form_errors:
114 128 return literal(tmpl % form_errors.get(field_name))
115 129
116 130 get_error = _GetError()
117 131
118 132
119 133 class _ToolTip(object):
120 134
121 135 def __call__(self, tooltip_title, trim_at=50):
122 136 """
123 137 Special function just to wrap our text into nice formatted
124 138 autowrapped text
125 139
126 140 :param tooltip_title:
127 141 """
128 142 tooltip_title = escape(tooltip_title)
129 143 tooltip_title = tooltip_title.replace('<', '&lt;').replace('>', '&gt;')
130 144 return tooltip_title
131 145 tooltip = _ToolTip()
132 146
133 147
134 148 class _FilesBreadCrumbs(object):
135 149
136 150 def __call__(self, repo_name, rev, paths):
137 151 if isinstance(paths, str):
138 152 paths = safe_unicode(paths)
139 153 url_l = [link_to(repo_name, url('files_home',
140 154 repo_name=repo_name,
141 155 revision=rev, f_path=''))]
142 156 paths_l = paths.split('/')
143 157 for cnt, p in enumerate(paths_l):
144 158 if p != '':
145 159 url_l.append(link_to(p,
146 160 url('files_home',
147 161 repo_name=repo_name,
148 162 revision=rev,
149 163 f_path='/'.join(paths_l[:cnt + 1])
150 164 )
151 165 )
152 166 )
153 167
154 168 return literal('/'.join(url_l))
155 169
156 170 files_breadcrumbs = _FilesBreadCrumbs()
157 171
158 172
159 173 class CodeHtmlFormatter(HtmlFormatter):
160 174 """
161 175 My code Html Formatter for source codes
162 176 """
163 177
164 178 def wrap(self, source, outfile):
165 179 return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
166 180
167 181 def _wrap_code(self, source):
168 182 for cnt, it in enumerate(source):
169 183 i, t = it
170 184 t = '<div id="L%s">%s</div>' % (cnt + 1, t)
171 185 yield i, t
172 186
173 187 def _wrap_tablelinenos(self, inner):
174 188 dummyoutfile = StringIO.StringIO()
175 189 lncount = 0
176 190 for t, line in inner:
177 191 if t:
178 192 lncount += 1
179 193 dummyoutfile.write(line)
180 194
181 195 fl = self.linenostart
182 196 mw = len(str(lncount + fl - 1))
183 197 sp = self.linenospecial
184 198 st = self.linenostep
185 199 la = self.lineanchors
186 200 aln = self.anchorlinenos
187 201 nocls = self.noclasses
188 202 if sp:
189 203 lines = []
190 204
191 205 for i in range(fl, fl + lncount):
192 206 if i % st == 0:
193 207 if i % sp == 0:
194 208 if aln:
195 209 lines.append('<a href="#%s%d" class="special">%*d</a>' %
196 210 (la, i, mw, i))
197 211 else:
198 212 lines.append('<span class="special">%*d</span>' % (mw, i))
199 213 else:
200 214 if aln:
201 215 lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
202 216 else:
203 217 lines.append('%*d' % (mw, i))
204 218 else:
205 219 lines.append('')
206 220 ls = '\n'.join(lines)
207 221 else:
208 222 lines = []
209 223 for i in range(fl, fl + lncount):
210 224 if i % st == 0:
211 225 if aln:
212 226 lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
213 227 else:
214 228 lines.append('%*d' % (mw, i))
215 229 else:
216 230 lines.append('')
217 231 ls = '\n'.join(lines)
218 232
219 233 # in case you wonder about the seemingly redundant <div> here: since the
220 234 # content in the other cell also is wrapped in a div, some browsers in
221 235 # some configurations seem to mess up the formatting...
222 236 if nocls:
223 237 yield 0, ('<table class="%stable">' % self.cssclass +
224 238 '<tr><td><div class="linenodiv" '
225 239 'style="background-color: #f0f0f0; padding-right: 10px">'
226 240 '<pre style="line-height: 125%">' +
227 241 ls + '</pre></div></td><td id="hlcode" class="code">')
228 242 else:
229 243 yield 0, ('<table class="%stable">' % self.cssclass +
230 244 '<tr><td class="linenos"><div class="linenodiv"><pre>' +
231 245 ls + '</pre></div></td><td id="hlcode" class="code">')
232 246 yield 0, dummyoutfile.getvalue()
233 247 yield 0, '</td></tr></table>'
234 248
235 249
236 250 def pygmentize(filenode, **kwargs):
237 251 """pygmentize function using pygments
238 252
239 253 :param filenode:
240 254 """
241 255
242 256 return literal(code_highlight(filenode.content,
243 257 filenode.lexer, CodeHtmlFormatter(**kwargs)))
244 258
245 259
246 260 def pygmentize_annotation(repo_name, filenode, **kwargs):
247 261 """
248 262 pygmentize function for annotation
249 263
250 264 :param filenode:
251 265 """
252 266
253 267 color_dict = {}
254 268
255 269 def gen_color(n=10000):
256 270 """generator for getting n of evenly distributed colors using
257 271 hsv color and golden ratio. It always return same order of colors
258 272
259 273 :returns: RGB tuple
260 274 """
261 275
262 276 def hsv_to_rgb(h, s, v):
263 277 if s == 0.0:
264 278 return v, v, v
265 279 i = int(h * 6.0) # XXX assume int() truncates!
266 280 f = (h * 6.0) - i
267 281 p = v * (1.0 - s)
268 282 q = v * (1.0 - s * f)
269 283 t = v * (1.0 - s * (1.0 - f))
270 284 i = i % 6
271 285 if i == 0:
272 286 return v, t, p
273 287 if i == 1:
274 288 return q, v, p
275 289 if i == 2:
276 290 return p, v, t
277 291 if i == 3:
278 292 return p, q, v
279 293 if i == 4:
280 294 return t, p, v
281 295 if i == 5:
282 296 return v, p, q
283 297
284 298 golden_ratio = 0.618033988749895
285 299 h = 0.22717784590367374
286 300
287 301 for _ in xrange(n):
288 302 h += golden_ratio
289 303 h %= 1
290 304 HSV_tuple = [h, 0.95, 0.95]
291 305 RGB_tuple = hsv_to_rgb(*HSV_tuple)
292 306 yield map(lambda x: str(int(x * 256)), RGB_tuple)
293 307
294 308 cgenerator = gen_color()
295 309
296 310 def get_color_string(cs):
297 311 if cs in color_dict:
298 312 col = color_dict[cs]
299 313 else:
300 314 col = color_dict[cs] = cgenerator.next()
301 315 return "color: rgb(%s)! important;" % (', '.join(col))
302 316
303 317 def url_func(repo_name):
304 318
305 319 def _url_func(changeset):
306 320 author = changeset.author
307 321 date = changeset.date
308 322 message = tooltip(changeset.message)
309 323
310 324 tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
311 325 " %s<br/><b>Date:</b> %s</b><br/><b>Message:"
312 326 "</b> %s<br/></div>")
313 327
314 328 tooltip_html = tooltip_html % (author, date, message)
315 329 lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
316 330 short_id(changeset.raw_id))
317 331 uri = link_to(
318 332 lnk_format,
319 333 url('changeset_home', repo_name=repo_name,
320 334 revision=changeset.raw_id),
321 335 style=get_color_string(changeset.raw_id),
322 336 class_='tooltip',
323 337 title=tooltip_html
324 338 )
325 339
326 340 uri += '\n'
327 341 return uri
328 342 return _url_func
329 343
330 344 return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
331 345
332 346
333 347 def is_following_repo(repo_name, user_id):
334 348 from rhodecode.model.scm import ScmModel
335 349 return ScmModel().is_following_repo(repo_name, user_id)
336 350
337 351 flash = _Flash()
338 352
339 353 #==============================================================================
340 354 # SCM FILTERS available via h.
341 355 #==============================================================================
342 356 from rhodecode.lib.vcs.utils import author_name, author_email
343 357 from rhodecode.lib.utils2 import credentials_filter, age as _age
344 358 from rhodecode.model.db import User, ChangesetStatus
345 359
346 360 age = lambda x: _age(x)
347 361 capitalize = lambda x: x.capitalize()
348 362 email = author_email
349 363 short_id = lambda x: x[:12]
350 364 hide_credentials = lambda x: ''.join(credentials_filter(x))
351 365
352 366
353 367 def fmt_date(date):
354 368 if date:
355 369 _fmt = _(u"%a, %d %b %Y %H:%M:%S").encode('utf8')
356 370 return date.strftime(_fmt).decode('utf8')
357 371
358 372 return ""
359 373
360 374
361 375 def is_git(repository):
362 376 if hasattr(repository, 'alias'):
363 377 _type = repository.alias
364 378 elif hasattr(repository, 'repo_type'):
365 379 _type = repository.repo_type
366 380 else:
367 381 _type = repository
368 382 return _type == 'git'
369 383
370 384
371 385 def is_hg(repository):
372 386 if hasattr(repository, 'alias'):
373 387 _type = repository.alias
374 388 elif hasattr(repository, 'repo_type'):
375 389 _type = repository.repo_type
376 390 else:
377 391 _type = repository
378 392 return _type == 'hg'
379 393
380 394
381 395 def email_or_none(author):
382 396 _email = email(author)
383 397 if _email != '':
384 398 return _email
385 399
386 400 # See if it contains a username we can get an email from
387 401 user = User.get_by_username(author_name(author), case_insensitive=True,
388 402 cache=True)
389 403 if user is not None:
390 404 return user.email
391 405
392 406 # No valid email, not a valid user in the system, none!
393 407 return None
394 408
395 409
396 410 def person(author):
397 411 # attr to return from fetched user
398 412 person_getter = lambda usr: usr.username
399 413
400 414 # Valid email in the attribute passed, see if they're in the system
401 415 _email = email(author)
402 416 if _email != '':
403 417 user = User.get_by_email(_email, case_insensitive=True, cache=True)
404 418 if user is not None:
405 419 return person_getter(user)
406 420 return _email
407 421
408 422 # Maybe it's a username?
409 423 _author = author_name(author)
410 424 user = User.get_by_username(_author, case_insensitive=True,
411 425 cache=True)
412 426 if user is not None:
413 427 return person_getter(user)
414 428
415 429 # Still nothing? Just pass back the author name then
416 430 return _author
417 431
418 432
419 433 def bool2icon(value):
420 434 """Returns True/False values represented as small html image of true/false
421 435 icons
422 436
423 437 :param value: bool value
424 438 """
425 439
426 440 if value is True:
427 441 return HTML.tag('img', src=url("/images/icons/accept.png"),
428 442 alt=_('True'))
429 443
430 444 if value is False:
431 445 return HTML.tag('img', src=url("/images/icons/cancel.png"),
432 446 alt=_('False'))
433 447
434 448 return value
435 449
436 450
437 451 def action_parser(user_log, feed=False):
438 452 """
439 453 This helper will action_map the specified string action into translated
440 454 fancy names with icons and links
441 455
442 456 :param user_log: user log instance
443 457 :param feed: use output for feeds (no html and fancy icons)
444 458 """
445 459
446 460 action = user_log.action
447 461 action_params = ' '
448 462
449 463 x = action.split(':')
450 464
451 465 if len(x) > 1:
452 466 action, action_params = x
453 467
454 468 def get_cs_links():
455 469 revs_limit = 3 # display this amount always
456 470 revs_top_limit = 50 # show upto this amount of changesets hidden
457 471 revs_ids = action_params.split(',')
458 472 deleted = user_log.repository is None
459 473 if deleted:
460 474 return ','.join(revs_ids)
461 475
462 476 repo_name = user_log.repository.repo_name
463 477
464 478 repo = user_log.repository.scm_instance
465 479
466 480 def lnk(rev, repo_name):
467 481
468 482 if isinstance(rev, BaseChangeset):
469 483 lbl = 'r%s:%s' % (rev.revision, rev.short_id)
470 484 _url = url('changeset_home', repo_name=repo_name,
471 485 revision=rev.raw_id)
472 486 title = tooltip(rev.message)
473 487 else:
474 488 lbl = '%s' % rev
475 489 _url = '#'
476 490 title = _('Changeset not found')
477 491
478 492 return link_to(lbl, _url, title=title, class_='tooltip',)
479 493
480 494 revs = []
481 495 if len(filter(lambda v: v != '', revs_ids)) > 0:
482 496 for rev in revs_ids[:revs_top_limit]:
483 497 try:
484 498 rev = repo.get_changeset(rev)
485 499 revs.append(rev)
486 500 except ChangesetDoesNotExistError:
487 501 log.error('cannot find revision %s in this repo' % rev)
488 502 revs.append(rev)
489 503 continue
490 504 cs_links = []
491 505 cs_links.append(" " + ', '.join(
492 506 [lnk(rev, repo_name) for rev in revs[:revs_limit]]
493 507 )
494 508 )
495 509
496 510 compare_view = (
497 511 ' <div class="compare_view tooltip" title="%s">'
498 512 '<a href="%s">%s</a> </div>' % (
499 513 _('Show all combined changesets %s->%s') % (
500 514 revs_ids[0], revs_ids[-1]
501 515 ),
502 516 url('changeset_home', repo_name=repo_name,
503 517 revision='%s...%s' % (revs_ids[0], revs_ids[-1])
504 518 ),
505 519 _('compare view')
506 520 )
507 521 )
508 522
509 523 # if we have exactly one more than normally displayed
510 524 # just display it, takes less space than displaying
511 525 # "and 1 more revisions"
512 526 if len(revs_ids) == revs_limit + 1:
513 527 rev = revs[revs_limit]
514 528 cs_links.append(", " + lnk(rev, repo_name))
515 529
516 530 # hidden-by-default ones
517 531 if len(revs_ids) > revs_limit + 1:
518 532 uniq_id = revs_ids[0]
519 533 html_tmpl = (
520 534 '<span> %s <a class="show_more" id="_%s" '
521 535 'href="#more">%s</a> %s</span>'
522 536 )
523 537 if not feed:
524 538 cs_links.append(html_tmpl % (
525 539 _('and'),
526 540 uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
527 541 _('revisions')
528 542 )
529 543 )
530 544
531 545 if not feed:
532 546 html_tmpl = '<span id="%s" style="display:none">, %s </span>'
533 547 else:
534 548 html_tmpl = '<span id="%s"> %s </span>'
535 549
536 550 morelinks = ', '.join(
537 551 [lnk(rev, repo_name) for rev in revs[revs_limit:]]
538 552 )
539 553
540 554 if len(revs_ids) > revs_top_limit:
541 555 morelinks += ', ...'
542 556
543 557 cs_links.append(html_tmpl % (uniq_id, morelinks))
544 558 if len(revs) > 1:
545 559 cs_links.append(compare_view)
546 560 return ''.join(cs_links)
547 561
548 562 def get_fork_name():
549 563 repo_name = action_params
550 564 return _('fork name ') + str(link_to(action_params, url('summary_home',
551 565 repo_name=repo_name,)))
552 566
553 567 def get_user_name():
554 568 user_name = action_params
555 569 return user_name
556 570
557 571 def get_users_group():
558 572 group_name = action_params
559 573 return group_name
560 574
561 575 # action : translated str, callback(extractor), icon
562 576 action_map = {
563 577 'user_deleted_repo': (_('[deleted] repository'),
564 578 None, 'database_delete.png'),
565 579 'user_created_repo': (_('[created] repository'),
566 580 None, 'database_add.png'),
567 581 'user_created_fork': (_('[created] repository as fork'),
568 582 None, 'arrow_divide.png'),
569 583 'user_forked_repo': (_('[forked] repository'),
570 584 get_fork_name, 'arrow_divide.png'),
571 585 'user_updated_repo': (_('[updated] repository'),
572 586 None, 'database_edit.png'),
573 587 'admin_deleted_repo': (_('[delete] repository'),
574 588 None, 'database_delete.png'),
575 589 'admin_created_repo': (_('[created] repository'),
576 590 None, 'database_add.png'),
577 591 'admin_forked_repo': (_('[forked] repository'),
578 592 None, 'arrow_divide.png'),
579 593 'admin_updated_repo': (_('[updated] repository'),
580 594 None, 'database_edit.png'),
581 595 'admin_created_user': (_('[created] user'),
582 596 get_user_name, 'user_add.png'),
583 597 'admin_updated_user': (_('[updated] user'),
584 598 get_user_name, 'user_edit.png'),
585 599 'admin_created_users_group': (_('[created] users group'),
586 600 get_users_group, 'group_add.png'),
587 601 'admin_updated_users_group': (_('[updated] users group'),
588 602 get_users_group, 'group_edit.png'),
589 603 'user_commented_revision': (_('[commented] on revision in repository'),
590 604 get_cs_links, 'comment_add.png'),
591 605 'push': (_('[pushed] into'),
592 606 get_cs_links, 'script_add.png'),
593 607 'push_local': (_('[committed via RhodeCode] into repository'),
594 608 get_cs_links, 'script_edit.png'),
595 609 'push_remote': (_('[pulled from remote] into repository'),
596 610 get_cs_links, 'connect.png'),
597 611 'pull': (_('[pulled] from'),
598 612 None, 'down_16.png'),
599 613 'started_following_repo': (_('[started following] repository'),
600 614 None, 'heart_add.png'),
601 615 'stopped_following_repo': (_('[stopped following] repository'),
602 616 None, 'heart_delete.png'),
603 617 }
604 618
605 619 action_str = action_map.get(action, action)
606 620 if feed:
607 621 action = action_str[0].replace('[', '').replace(']', '')
608 622 else:
609 623 action = action_str[0]\
610 624 .replace('[', '<span class="journal_highlight">')\
611 625 .replace(']', '</span>')
612 626
613 627 action_params_func = lambda: ""
614 628
615 629 if callable(action_str[1]):
616 630 action_params_func = action_str[1]
617 631
618 632 def action_parser_icon():
619 633 action = user_log.action
620 634 action_params = None
621 635 x = action.split(':')
622 636
623 637 if len(x) > 1:
624 638 action, action_params = x
625 639
626 640 tmpl = """<img src="%s%s" alt="%s"/>"""
627 641 ico = action_map.get(action, ['', '', ''])[2]
628 642 return literal(tmpl % ((url('/images/icons/')), ico, action))
629 643
630 644 # returned callbacks we need to call to get
631 645 return [lambda: literal(action), action_params_func, action_parser_icon]
632 646
633 647
634 648
635 649 #==============================================================================
636 650 # PERMS
637 651 #==============================================================================
638 652 from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
639 653 HasRepoPermissionAny, HasRepoPermissionAll
640 654
641 655
642 656 #==============================================================================
643 657 # GRAVATAR URL
644 658 #==============================================================================
645 659
646 660 def gravatar_url(email_address, size=30):
647 661 if (not str2bool(config['app_conf'].get('use_gravatar')) or
648 662 not email_address or email_address == 'anonymous@rhodecode.org'):
649 663 f = lambda a, l: min(l, key=lambda x: abs(x - a))
650 664 return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
651 665
652 666 ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
653 667 default = 'identicon'
654 668 baseurl_nossl = "http://www.gravatar.com/avatar/"
655 669 baseurl_ssl = "https://secure.gravatar.com/avatar/"
656 670 baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
657 671
658 672 if isinstance(email_address, unicode):
659 673 #hashlib crashes on unicode items
660 674 email_address = safe_str(email_address)
661 675 # construct the url
662 676 gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
663 677 gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
664 678
665 679 return gravatar_url
666 680
667 681
668 682 #==============================================================================
669 683 # REPO PAGER, PAGER FOR REPOSITORY
670 684 #==============================================================================
671 685 class RepoPage(Page):
672 686
673 687 def __init__(self, collection, page=1, items_per_page=20,
674 688 item_count=None, url=None, **kwargs):
675 689
676 690 """Create a "RepoPage" instance. special pager for paging
677 691 repository
678 692 """
679 693 self._url_generator = url
680 694
681 695 # Safe the kwargs class-wide so they can be used in the pager() method
682 696 self.kwargs = kwargs
683 697
684 698 # Save a reference to the collection
685 699 self.original_collection = collection
686 700
687 701 self.collection = collection
688 702
689 703 # The self.page is the number of the current page.
690 704 # The first page has the number 1!
691 705 try:
692 706 self.page = int(page) # make it int() if we get it as a string
693 707 except (ValueError, TypeError):
694 708 self.page = 1
695 709
696 710 self.items_per_page = items_per_page
697 711
698 712 # Unless the user tells us how many items the collections has
699 713 # we calculate that ourselves.
700 714 if item_count is not None:
701 715 self.item_count = item_count
702 716 else:
703 717 self.item_count = len(self.collection)
704 718
705 719 # Compute the number of the first and last available page
706 720 if self.item_count > 0:
707 721 self.first_page = 1
708 722 self.page_count = int(math.ceil(float(self.item_count) /
709 723 self.items_per_page))
710 724 self.last_page = self.first_page + self.page_count - 1
711 725
712 726 # Make sure that the requested page number is the range of
713 727 # valid pages
714 728 if self.page > self.last_page:
715 729 self.page = self.last_page
716 730 elif self.page < self.first_page:
717 731 self.page = self.first_page
718 732
719 733 # Note: the number of items on this page can be less than
720 734 # items_per_page if the last page is not full
721 735 self.first_item = max(0, (self.item_count) - (self.page *
722 736 items_per_page))
723 737 self.last_item = ((self.item_count - 1) - items_per_page *
724 738 (self.page - 1))
725 739
726 740 self.items = list(self.collection[self.first_item:self.last_item + 1])
727 741
728 742 # Links to previous and next page
729 743 if self.page > self.first_page:
730 744 self.previous_page = self.page - 1
731 745 else:
732 746 self.previous_page = None
733 747
734 748 if self.page < self.last_page:
735 749 self.next_page = self.page + 1
736 750 else:
737 751 self.next_page = None
738 752
739 753 # No items available
740 754 else:
741 755 self.first_page = None
742 756 self.page_count = 0
743 757 self.last_page = None
744 758 self.first_item = None
745 759 self.last_item = None
746 760 self.previous_page = None
747 761 self.next_page = None
748 762 self.items = []
749 763
750 764 # This is a subclass of the 'list' type. Initialise the list now.
751 765 list.__init__(self, reversed(self.items))
752 766
753 767
754 768 def changed_tooltip(nodes):
755 769 """
756 770 Generates a html string for changed nodes in changeset page.
757 771 It limits the output to 30 entries
758 772
759 773 :param nodes: LazyNodesGenerator
760 774 """
761 775 if nodes:
762 776 pref = ': <br/> '
763 777 suf = ''
764 778 if len(nodes) > 30:
765 779 suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
766 780 return literal(pref + '<br/> '.join([safe_unicode(x.path)
767 781 for x in nodes[:30]]) + suf)
768 782 else:
769 783 return ': ' + _('No Files')
770 784
771 785
772 786 def repo_link(groups_and_repos):
773 787 """
774 788 Makes a breadcrumbs link to repo within a group
775 789 joins &raquo; on each group to create a fancy link
776 790
777 791 ex::
778 792 group >> subgroup >> repo
779 793
780 794 :param groups_and_repos:
781 795 """
782 796 groups, repo_name = groups_and_repos
783 797
784 798 if not groups:
785 799 return repo_name
786 800 else:
787 801 def make_link(group):
788 802 return link_to(group.name, url('repos_group_home',
789 803 group_name=group.group_name))
790 804 return literal(' &raquo; '.join(map(make_link, groups)) + \
791 805 " &raquo; " + repo_name)
792 806
793 807
794 808 def fancy_file_stats(stats):
795 809 """
796 810 Displays a fancy two colored bar for number of added/deleted
797 811 lines of code on file
798 812
799 813 :param stats: two element list of added/deleted lines of code
800 814 """
801 815
802 816 a, d, t = stats[0], stats[1], stats[0] + stats[1]
803 817 width = 100
804 818 unit = float(width) / (t or 1)
805 819
806 820 # needs > 9% of width to be visible or 0 to be hidden
807 821 a_p = max(9, unit * a) if a > 0 else 0
808 822 d_p = max(9, unit * d) if d > 0 else 0
809 823 p_sum = a_p + d_p
810 824
811 825 if p_sum > width:
812 826 #adjust the percentage to be == 100% since we adjusted to 9
813 827 if a_p > d_p:
814 828 a_p = a_p - (p_sum - width)
815 829 else:
816 830 d_p = d_p - (p_sum - width)
817 831
818 832 a_v = a if a > 0 else ''
819 833 d_v = d if d > 0 else ''
820 834
821 835 def cgen(l_type):
822 836 mapping = {'tr': 'top-right-rounded-corner-mid',
823 837 'tl': 'top-left-rounded-corner-mid',
824 838 'br': 'bottom-right-rounded-corner-mid',
825 839 'bl': 'bottom-left-rounded-corner-mid'}
826 840 map_getter = lambda x: mapping[x]
827 841
828 842 if l_type == 'a' and d_v:
829 843 #case when added and deleted are present
830 844 return ' '.join(map(map_getter, ['tl', 'bl']))
831 845
832 846 if l_type == 'a' and not d_v:
833 847 return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
834 848
835 849 if l_type == 'd' and a_v:
836 850 return ' '.join(map(map_getter, ['tr', 'br']))
837 851
838 852 if l_type == 'd' and not a_v:
839 853 return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
840 854
841 855 d_a = '<div class="added %s" style="width:%s%%">%s</div>' % (
842 856 cgen('a'), a_p, a_v
843 857 )
844 858 d_d = '<div class="deleted %s" style="width:%s%%">%s</div>' % (
845 859 cgen('d'), d_p, d_v
846 860 )
847 861 return literal('<div style="width:%spx">%s%s</div>' % (width, d_a, d_d))
848 862
849 863
850 864 def urlify_text(text_):
851 865 import re
852 866
853 867 url_pat = re.compile(r'''(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]'''
854 868 '''|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)''')
855 869
856 870 def url_func(match_obj):
857 871 url_full = match_obj.groups()[0]
858 872 return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
859 873
860 874 return literal(url_pat.sub(url_func, text_))
861 875
862 876
863 877 def urlify_changesets(text_, repository):
864 878 """
865 879 Extract revision ids from changeset and make link from them
866 880
867 881 :param text_:
868 882 :param repository:
869 883 """
870 884 import re
871 885 URL_PAT = re.compile(r'([0-9a-fA-F]{12,})')
872 886
873 887 def url_func(match_obj):
874 888 rev = match_obj.groups()[0]
875 889 pref = ''
876 890 if match_obj.group().startswith(' '):
877 891 pref = ' '
878 892 tmpl = (
879 893 '%(pref)s<a class="%(cls)s" href="%(url)s">'
880 894 '%(rev)s'
881 895 '</a>'
882 896 )
883 897 return tmpl % {
884 898 'pref': pref,
885 899 'cls': 'revision-link',
886 900 'url': url('changeset_home', repo_name=repository, revision=rev),
887 901 'rev': rev,
888 902 }
889 903
890 904 newtext = URL_PAT.sub(url_func, text_)
891 905
892 906 return newtext
893 907
894 908
895 909 def urlify_commit(text_, repository=None, link_=None):
896 910 """
897 911 Parses given text message and makes proper links.
898 912 issues are linked to given issue-server, and rest is a changeset link
899 913 if link_ is given, in other case it's a plain text
900 914
901 915 :param text_:
902 916 :param repository:
903 917 :param link_: changeset link
904 918 """
905 919 import re
906 920 import traceback
907 921
908 922 def escaper(string):
909 923 return string.replace('<', '&lt;').replace('>', '&gt;')
910 924
911 925 def linkify_others(t, l):
912 926 urls = re.compile(r'(\<a.*?\<\/a\>)',)
913 927 links = []
914 928 for e in urls.split(t):
915 929 if not urls.match(e):
916 930 links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
917 931 else:
918 932 links.append(e)
919 933
920 934 return ''.join(links)
921 935
922 936 # urlify changesets - extrac revisions and make link out of them
923 937 text_ = urlify_changesets(escaper(text_), repository)
924 938
925 939 try:
926 940 conf = config['app_conf']
927 941
928 942 URL_PAT = re.compile(r'%s' % conf.get('issue_pat'))
929 943
930 944 if URL_PAT:
931 945 ISSUE_SERVER_LNK = conf.get('issue_server_link')
932 946 ISSUE_PREFIX = conf.get('issue_prefix')
933 947
934 948 def url_func(match_obj):
935 949 pref = ''
936 950 if match_obj.group().startswith(' '):
937 951 pref = ' '
938 952
939 953 issue_id = ''.join(match_obj.groups())
940 954 tmpl = (
941 955 '%(pref)s<a class="%(cls)s" href="%(url)s">'
942 956 '%(issue-prefix)s%(id-repr)s'
943 957 '</a>'
944 958 )
945 959 url = ISSUE_SERVER_LNK.replace('{id}', issue_id)
946 960 if repository:
947 961 url = url.replace('{repo}', repository)
948 962 repo_name = repository.split(URL_SEP)[-1]
949 963 url = url.replace('{repo_name}', repo_name)
950 964 return tmpl % {
951 965 'pref': pref,
952 966 'cls': 'issue-tracker-link',
953 967 'url': url,
954 968 'id-repr': issue_id,
955 969 'issue-prefix': ISSUE_PREFIX,
956 970 'serv': ISSUE_SERVER_LNK,
957 971 }
958 972
959 973 newtext = URL_PAT.sub(url_func, text_)
960 974
961 975 if link_:
962 976 # wrap not links into final link => link_
963 977 newtext = linkify_others(newtext, link_)
964 978
965 979 return literal(newtext)
966 980 except:
967 981 log.error(traceback.format_exc())
968 982 pass
969 983
970 984 return text_
971 985
972 986
973 987 def rst(source):
974 988 return literal('<div class="rst-block">%s</div>' %
975 989 MarkupRenderer.rst(source))
976 990
977 991
978 992 def rst_w_mentions(source):
979 993 """
980 994 Wrapped rst renderer with @mention highlighting
981 995
982 996 :param source:
983 997 """
984 998 return literal('<div class="rst-block">%s</div>' %
985 999 MarkupRenderer.rst_with_mentions(source))
986 1000
987 1001
988 1002 def changeset_status(repo, revision):
989 1003 return ChangesetStatusModel().get_status(repo, revision)
990 1004
991 1005
992 1006 def changeset_status_lbl(changeset_status):
993 1007 return dict(ChangesetStatus.STATUSES).get(changeset_status)
@@ -1,158 +1,160 b''
1 1 """Pylons application test package
2 2
3 3 This package assumes the Pylons environment is already loaded, such as
4 4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 5 command.
6 6
7 7 This module initializes the application via ``websetup`` (`paster
8 8 setup-app`) and provides the base testing objects.
9 9 """
10 10 import os
11 11 import time
12 12 import logging
13 13 import datetime
14 14 import hashlib
15 15 import tempfile
16 16 from os.path import join as jn
17 17
18 18 from unittest import TestCase
19 19 from tempfile import _RandomNameSequence
20 20
21 21 from paste.deploy import loadapp
22 22 from paste.script.appinstall import SetupCommand
23 23 from pylons import config, url
24 24 from routes.util import URLGenerator
25 25 from webtest import TestApp
26 26
27 27 from rhodecode import is_windows
28 28 from rhodecode.model.meta import Session
29 29 from rhodecode.model.db import User
30 from rhodecode.tests.nose_parametrized import parameterized
30 31
31 32 import pylons.test
32 33
33 34
34 35 os.environ['TZ'] = 'UTC'
35 36 if not is_windows:
36 37 time.tzset()
37 38
38 39 log = logging.getLogger(__name__)
39 40
40 41 __all__ = [
41 'environ', 'url', 'get_new_dir', 'TestController', 'TESTS_TMP_PATH',
42 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK',
43 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
42 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
43 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
44 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
44 45 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
45 46 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
46 47 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
47 48 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
48 49 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
49 50 ]
50 51
51 52 # Invoke websetup with the current config file
52 53 # SetupCommand('setup-app').run([config_file])
53 54
54 55 ##RUNNING DESIRED TESTS
55 56 # nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
56 57 # nosetests --pdb --pdb-failures
58 # nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
57 59 environ = {}
58 60
59 61 #SOME GLOBALS FOR TESTS
60 62
61 63 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
62 64 TEST_USER_ADMIN_LOGIN = 'test_admin'
63 65 TEST_USER_ADMIN_PASS = 'test12'
64 66 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
65 67
66 68 TEST_USER_REGULAR_LOGIN = 'test_regular'
67 69 TEST_USER_REGULAR_PASS = 'test12'
68 70 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
69 71
70 72 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
71 73 TEST_USER_REGULAR2_PASS = 'test12'
72 74 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
73 75
74 76 HG_REPO = 'vcs_test_hg'
75 77 GIT_REPO = 'vcs_test_git'
76 78
77 79 NEW_HG_REPO = 'vcs_test_hg_new'
78 80 NEW_GIT_REPO = 'vcs_test_git_new'
79 81
80 82 HG_FORK = 'vcs_test_hg_fork'
81 83 GIT_FORK = 'vcs_test_git_fork'
82 84
83 85 ## VCS
84 86 SCM_TESTS = ['hg', 'git']
85 87 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
86 88
87 89 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
88 90
89 91 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
90 92 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
91 93 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
92 94
93 95
94 96 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
95 97
96 98 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
97 99 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
98 100 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
99 101
100 102 TEST_DIR = tempfile.gettempdir()
101 103 TEST_REPO_PREFIX = 'vcs-test'
102 104
103 105 # cached repos if any !
104 106 # comment out to get some other repos from bb or github
105 107 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
106 108 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
107 109
108 110
109 111 def get_new_dir(title):
110 112 """
111 113 Returns always new directory path.
112 114 """
113 115 from rhodecode.tests.vcs.utils import get_normalized_path
114 116 name = TEST_REPO_PREFIX
115 117 if title:
116 118 name = '-'.join((name, title))
117 119 hex = hashlib.sha1(str(time.time())).hexdigest()
118 120 name = '-'.join((name, hex))
119 121 path = os.path.join(TEST_DIR, name)
120 122 return get_normalized_path(path)
121 123
122 124
123 125 class TestController(TestCase):
124 126
125 127 def __init__(self, *args, **kwargs):
126 128 wsgiapp = pylons.test.pylonsapp
127 129 config = wsgiapp.config
128 130
129 131 self.app = TestApp(wsgiapp)
130 132 url._push_object(URLGenerator(config['routes.map'], environ))
131 133 self.Session = Session
132 134 self.index_location = config['app_conf']['index_dir']
133 135 TestCase.__init__(self, *args, **kwargs)
134 136
135 137 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
136 138 password=TEST_USER_ADMIN_PASS):
137 139 self._logged_username = username
138 140 response = self.app.post(url(controller='login', action='index'),
139 141 {'username': username,
140 142 'password': password})
141 143
142 144 if 'invalid user name' in response.body:
143 145 self.fail('could not login using %s %s' % (username, password))
144 146
145 147 self.assertEqual(response.status, '302 Found')
146 148 ses = response.session['rhodecode_user']
147 149 self.assertEqual(ses.get('username'), username)
148 150 response = response.follow()
149 151 self.assertEqual(ses.get('is_authenticated'), True)
150 152
151 153 return response.session['rhodecode_user']
152 154
153 155 def _get_logged_user(self):
154 156 return User.get_by_username(self._logged_username)
155 157
156 158 def checkSessionFlash(self, response, msg):
157 159 self.assertTrue('flash' in response.session)
158 160 self.assertTrue(msg in response.session['flash'][0][1])
General Comments 0
You need to be logged in to leave comments. Login now