##// END OF EJS Templates
Fixed issue #501 error on setting set_as_fork to same repo...
marcink -
r2629:d2901d90 beta
parent child Browse files
Show More
@@ -1,446 +1,448 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.controllers.admin.repos
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Repositories controller for RhodeCode
7 7
8 8 :created_on: Apr 7, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import logging
27 27 import traceback
28 28 import formencode
29 29 from formencode import htmlfill
30 30
31 31 from webob.exc import HTTPInternalServerError
32 32 from pylons import request, session, tmpl_context as c, url
33 33 from pylons.controllers.util import redirect
34 34 from pylons.i18n.translation import _
35 35 from sqlalchemy.exc import IntegrityError
36 36
37 37 from rhodecode.lib import helpers as h
38 38 from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
39 39 HasPermissionAnyDecorator, HasRepoPermissionAllDecorator
40 40 from rhodecode.lib.base import BaseController, render
41 41 from rhodecode.lib.utils import invalidate_cache, action_logger, repo_name_slug
42 42 from rhodecode.lib.helpers import get_token
43 43 from rhodecode.model.meta import Session
44 44 from rhodecode.model.db import User, Repository, UserFollowing, RepoGroup
45 45 from rhodecode.model.forms import RepoForm
46 46 from rhodecode.model.scm import ScmModel
47 47 from rhodecode.model.repo import RepoModel
48 48
49 49 log = logging.getLogger(__name__)
50 50
51 51
52 52 class ReposController(BaseController):
53 53 """
54 54 REST Controller styled on the Atom Publishing Protocol"""
55 55 # To properly map this controller, ensure your config/routing.py
56 56 # file has a resource setup:
57 57 # map.resource('repo', 'repos')
58 58
59 59 @LoginRequired()
60 60 @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
61 61 def __before__(self):
62 62 c.admin_user = session.get('admin_user')
63 63 c.admin_username = session.get('admin_username')
64 64 super(ReposController, self).__before__()
65 65
66 66 def __load_defaults(self):
67 67 c.repo_groups = RepoGroup.groups_choices()
68 68 c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
69 69
70 70 repo_model = RepoModel()
71 71 c.users_array = repo_model.get_users_js()
72 72 c.users_groups_array = repo_model.get_users_groups_js()
73 73 choices, c.landing_revs = ScmModel().get_repo_landing_revs()
74 74 c.landing_revs_choices = choices
75 75
76 76 def __load_data(self, repo_name=None):
77 77 """
78 78 Load defaults settings for edit, and update
79 79
80 80 :param repo_name:
81 81 """
82 82 self.__load_defaults()
83 83
84 84 c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
85 85 repo = db_repo.scm_instance
86 86
87 87 if c.repo_info is None:
88 88 h.flash(_('%s repository is not mapped to db perhaps'
89 89 ' it was created or renamed from the filesystem'
90 90 ' please run the application again'
91 91 ' in order to rescan repositories') % repo_name,
92 92 category='error')
93 93
94 94 return redirect(url('repos'))
95 95
96 96 choices, c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
97 97 c.landing_revs_choices = choices
98 98
99 99 c.default_user_id = User.get_by_username('default').user_id
100 100 c.in_public_journal = UserFollowing.query()\
101 101 .filter(UserFollowing.user_id == c.default_user_id)\
102 102 .filter(UserFollowing.follows_repository == c.repo_info).scalar()
103 103
104 104 if c.repo_info.stats:
105 105 # this is on what revision we ended up so we add +1 for count
106 106 last_rev = c.repo_info.stats.stat_on_revision + 1
107 107 else:
108 108 last_rev = 0
109 109 c.stats_revision = last_rev
110 110
111 111 c.repo_last_rev = repo.count() if repo.revisions else 0
112 112
113 113 if last_rev == 0 or c.repo_last_rev == 0:
114 114 c.stats_percentage = 0
115 115 else:
116 116 c.stats_percentage = '%.2f' % ((float((last_rev)) /
117 117 c.repo_last_rev) * 100)
118 118
119 119 defaults = RepoModel()._get_defaults(repo_name)
120 120
121 121 c.repos_list = [('', _('--REMOVE FORK--'))]
122 122 c.repos_list += [(x.repo_id, x.repo_name) for x in
123 Repository.query().order_by(Repository.repo_name).all()]
123 Repository.query().order_by(Repository.repo_name).all()
124 if x.repo_id != c.repo_info.repo_id]
124 125
126 defaults['id_fork_of'] = db_repo.fork.repo_id if db_repo.fork else ''
125 127 return defaults
126 128
127 129 @HasPermissionAllDecorator('hg.admin')
128 130 def index(self, format='html'):
129 131 """GET /repos: All items in the collection"""
130 132 # url('repos')
131 133
132 134 c.repos_list = ScmModel().get_repos(Repository.query()
133 135 .order_by(Repository.repo_name)
134 136 .all(), sort_key='name_sort')
135 137 return render('admin/repos/repos.html')
136 138
137 139 @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
138 140 def create(self):
139 141 """
140 142 POST /repos: Create a new item"""
141 143 # url('repos')
142 144
143 145 self.__load_defaults()
144 146 form_result = {}
145 147 try:
146 148 form_result = RepoForm(repo_groups=c.repo_groups_choices,
147 149 landing_revs=c.landing_revs_choices)()\
148 150 .to_python(dict(request.POST))
149 151 RepoModel().create(form_result, self.rhodecode_user.user_id)
150 152 if form_result['clone_uri']:
151 153 h.flash(_('created repository %s from %s') \
152 154 % (form_result['repo_name'], form_result['clone_uri']),
153 155 category='success')
154 156 else:
155 157 h.flash(_('created repository %s') % form_result['repo_name'],
156 158 category='success')
157 159
158 160 if request.POST.get('user_created'):
159 161 # created by regular non admin user
160 162 action_logger(self.rhodecode_user, 'user_created_repo',
161 163 form_result['repo_name_full'], self.ip_addr,
162 164 self.sa)
163 165 else:
164 166 action_logger(self.rhodecode_user, 'admin_created_repo',
165 167 form_result['repo_name_full'], self.ip_addr,
166 168 self.sa)
167 169 Session.commit()
168 170 except formencode.Invalid, errors:
169 171
170 172 c.new_repo = errors.value['repo_name']
171 173
172 174 if request.POST.get('user_created'):
173 175 r = render('admin/repos/repo_add_create_repository.html')
174 176 else:
175 177 r = render('admin/repos/repo_add.html')
176 178
177 179 return htmlfill.render(
178 180 r,
179 181 defaults=errors.value,
180 182 errors=errors.error_dict or {},
181 183 prefix_error=False,
182 184 encoding="UTF-8")
183 185
184 186 except Exception:
185 187 log.error(traceback.format_exc())
186 188 msg = _('error occurred during creation of repository %s') \
187 189 % form_result.get('repo_name')
188 190 h.flash(msg, category='error')
189 191 if request.POST.get('user_created'):
190 192 return redirect(url('home'))
191 193 return redirect(url('repos'))
192 194
193 195 @HasPermissionAllDecorator('hg.admin')
194 196 def new(self, format='html'):
195 197 """GET /repos/new: Form to create a new item"""
196 198 new_repo = request.GET.get('repo', '')
197 199 c.new_repo = repo_name_slug(new_repo)
198 200 self.__load_defaults()
199 201 return render('admin/repos/repo_add.html')
200 202
201 203 @HasPermissionAllDecorator('hg.admin')
202 204 def update(self, repo_name):
203 205 """
204 206 PUT /repos/repo_name: Update an existing item"""
205 207 # Forms posted to this method should contain a hidden field:
206 208 # <input type="hidden" name="_method" value="PUT" />
207 209 # Or using helpers:
208 210 # h.form(url('repo', repo_name=ID),
209 211 # method='put')
210 212 # url('repo', repo_name=ID)
211 213 self.__load_defaults()
212 214 repo_model = RepoModel()
213 215 changed_name = repo_name
214 216 #override the choices with extracted revisions !
215 217 choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo_name)
216 218 c.landing_revs_choices = choices
217 219
218 220 _form = RepoForm(edit=True, old_data={'repo_name': repo_name},
219 221 repo_groups=c.repo_groups_choices,
220 222 landing_revs=c.landing_revs_choices)()
221 223 try:
222 224 form_result = _form.to_python(dict(request.POST))
223 225 repo = repo_model.update(repo_name, form_result)
224 226 invalidate_cache('get_repo_cached_%s' % repo_name)
225 227 h.flash(_('Repository %s updated successfully') % repo_name,
226 228 category='success')
227 229 changed_name = repo.repo_name
228 230 action_logger(self.rhodecode_user, 'admin_updated_repo',
229 231 changed_name, self.ip_addr, self.sa)
230 232 Session.commit()
231 233 except formencode.Invalid, errors:
232 234 defaults = self.__load_data(repo_name)
233 235 defaults.update(errors.value)
234 236 return htmlfill.render(
235 237 render('admin/repos/repo_edit.html'),
236 238 defaults=defaults,
237 239 errors=errors.error_dict or {},
238 240 prefix_error=False,
239 241 encoding="UTF-8")
240 242
241 243 except Exception:
242 244 log.error(traceback.format_exc())
243 245 h.flash(_('error occurred during update of repository %s') \
244 246 % repo_name, category='error')
245 247 return redirect(url('edit_repo', repo_name=changed_name))
246 248
247 249 @HasPermissionAllDecorator('hg.admin')
248 250 def delete(self, repo_name):
249 251 """
250 252 DELETE /repos/repo_name: Delete an existing item"""
251 253 # Forms posted to this method should contain a hidden field:
252 254 # <input type="hidden" name="_method" value="DELETE" />
253 255 # Or using helpers:
254 256 # h.form(url('repo', repo_name=ID),
255 257 # method='delete')
256 258 # url('repo', repo_name=ID)
257 259
258 260 repo_model = RepoModel()
259 261 repo = repo_model.get_by_repo_name(repo_name)
260 262 if not repo:
261 263 h.flash(_('%s repository is not mapped to db perhaps'
262 264 ' it was moved or renamed from the filesystem'
263 265 ' please run the application again'
264 266 ' in order to rescan repositories') % repo_name,
265 267 category='error')
266 268
267 269 return redirect(url('repos'))
268 270 try:
269 271 action_logger(self.rhodecode_user, 'admin_deleted_repo',
270 272 repo_name, self.ip_addr, self.sa)
271 273 repo_model.delete(repo)
272 274 invalidate_cache('get_repo_cached_%s' % repo_name)
273 275 h.flash(_('deleted repository %s') % repo_name, category='success')
274 276 Session.commit()
275 277 except IntegrityError, e:
276 278 if e.message.find('repositories_fork_id_fkey') != -1:
277 279 log.error(traceback.format_exc())
278 280 h.flash(_('Cannot delete %s it still contains attached '
279 281 'forks') % repo_name,
280 282 category='warning')
281 283 else:
282 284 log.error(traceback.format_exc())
283 285 h.flash(_('An error occurred during '
284 286 'deletion of %s') % repo_name,
285 287 category='error')
286 288
287 289 except Exception, e:
288 290 log.error(traceback.format_exc())
289 291 h.flash(_('An error occurred during deletion of %s') % repo_name,
290 292 category='error')
291 293
292 294 return redirect(url('repos'))
293 295
294 296 @HasRepoPermissionAllDecorator('repository.admin')
295 297 def delete_perm_user(self, repo_name):
296 298 """
297 299 DELETE an existing repository permission user
298 300
299 301 :param repo_name:
300 302 """
301 303 try:
302 304 RepoModel().revoke_user_permission(repo=repo_name,
303 305 user=request.POST['user_id'])
304 306 Session.commit()
305 307 except Exception:
306 308 log.error(traceback.format_exc())
307 309 h.flash(_('An error occurred during deletion of repository user'),
308 310 category='error')
309 311 raise HTTPInternalServerError()
310 312
311 313 @HasRepoPermissionAllDecorator('repository.admin')
312 314 def delete_perm_users_group(self, repo_name):
313 315 """
314 316 DELETE an existing repository permission users group
315 317
316 318 :param repo_name:
317 319 """
318 320
319 321 try:
320 322 RepoModel().revoke_users_group_permission(
321 323 repo=repo_name, group_name=request.POST['users_group_id']
322 324 )
323 325 Session.commit()
324 326 except Exception:
325 327 log.error(traceback.format_exc())
326 328 h.flash(_('An error occurred during deletion of repository'
327 329 ' users groups'),
328 330 category='error')
329 331 raise HTTPInternalServerError()
330 332
331 333 @HasPermissionAllDecorator('hg.admin')
332 334 def repo_stats(self, repo_name):
333 335 """
334 336 DELETE an existing repository statistics
335 337
336 338 :param repo_name:
337 339 """
338 340
339 341 try:
340 342 RepoModel().delete_stats(repo_name)
341 343 Session.commit()
342 344 except Exception, e:
343 345 h.flash(_('An error occurred during deletion of repository stats'),
344 346 category='error')
345 347 return redirect(url('edit_repo', repo_name=repo_name))
346 348
347 349 @HasPermissionAllDecorator('hg.admin')
348 350 def repo_cache(self, repo_name):
349 351 """
350 352 INVALIDATE existing repository cache
351 353
352 354 :param repo_name:
353 355 """
354 356
355 357 try:
356 358 ScmModel().mark_for_invalidation(repo_name)
357 359 Session.commit()
358 360 except Exception, e:
359 361 h.flash(_('An error occurred during cache invalidation'),
360 362 category='error')
361 363 return redirect(url('edit_repo', repo_name=repo_name))
362 364
363 365 @HasPermissionAllDecorator('hg.admin')
364 366 def repo_public_journal(self, repo_name):
365 367 """
366 368 Set's this repository to be visible in public journal,
367 369 in other words assing default user to follow this repo
368 370
369 371 :param repo_name:
370 372 """
371 373
372 374 cur_token = request.POST.get('auth_token')
373 375 token = get_token()
374 376 if cur_token == token:
375 377 try:
376 378 repo_id = Repository.get_by_repo_name(repo_name).repo_id
377 379 user_id = User.get_by_username('default').user_id
378 380 self.scm_model.toggle_following_repo(repo_id, user_id)
379 381 h.flash(_('Updated repository visibility in public journal'),
380 382 category='success')
381 383 Session.commit()
382 384 except:
383 385 h.flash(_('An error occurred during setting this'
384 386 ' repository in public journal'),
385 387 category='error')
386 388
387 389 else:
388 390 h.flash(_('Token mismatch'), category='error')
389 391 return redirect(url('edit_repo', repo_name=repo_name))
390 392
391 393 @HasPermissionAllDecorator('hg.admin')
392 394 def repo_pull(self, repo_name):
393 395 """
394 396 Runs task to update given repository with remote changes,
395 397 ie. make pull on remote location
396 398
397 399 :param repo_name:
398 400 """
399 401 try:
400 402 ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
401 403 h.flash(_('Pulled from remote location'), category='success')
402 404 except Exception, e:
403 405 h.flash(_('An error occurred during pull from remote location'),
404 406 category='error')
405 407
406 408 return redirect(url('edit_repo', repo_name=repo_name))
407 409
408 410 @HasPermissionAllDecorator('hg.admin')
409 411 def repo_as_fork(self, repo_name):
410 412 """
411 413 Mark given repository as a fork of another
412 414
413 415 :param repo_name:
414 416 """
415 417 try:
416 418 fork_id = request.POST.get('id_fork_of')
417 419 repo = ScmModel().mark_as_fork(repo_name, fork_id,
418 420 self.rhodecode_user.username)
419 421 fork = repo.fork.repo_name if repo.fork else _('Nothing')
420 Session.commit()
421 h.flash(_('Marked repo %s as fork of %s') % (repo_name,fork),
422 Session().commit()
423 h.flash(_('Marked repo %s as fork of %s') % (repo_name, fork),
422 424 category='success')
423 425 except Exception, e:
424 raise
426 log.error(traceback.format_exc())
425 427 h.flash(_('An error occurred during this operation'),
426 428 category='error')
427 429
428 430 return redirect(url('edit_repo', repo_name=repo_name))
429 431
430 432 @HasPermissionAllDecorator('hg.admin')
431 433 def show(self, repo_name, format='html'):
432 434 """GET /repos/repo_name: Show a specific item"""
433 435 # url('repo', repo_name=ID)
434 436
435 437 @HasPermissionAllDecorator('hg.admin')
436 438 def edit(self, repo_name, format='html'):
437 439 """GET /repos/repo_name/edit: Form to edit an existing item"""
438 440 # url('edit_repo', repo_name=ID)
439 441 defaults = self.__load_data(repo_name)
440 442
441 443 return htmlfill.render(
442 444 render('admin/repos/repo_edit.html'),
443 445 defaults=defaults,
444 446 encoding="UTF-8",
445 447 force_defaults=False
446 448 )
@@ -1,598 +1,600 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.model.scm
4 4 ~~~~~~~~~~~~~~~~~~~
5 5
6 6 Scm model for RhodeCode
7 7
8 8 :created_on: Apr 9, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26 import os
27 27 import re
28 28 import time
29 29 import traceback
30 30 import logging
31 31 import cStringIO
32 32 import pkg_resources
33 33 from os.path import dirname as dn, join as jn
34 34
35 35 from sqlalchemy import func
36 36 from pylons.i18n.translation import _
37 37
38 38 import rhodecode
39 39 from rhodecode.lib.vcs import get_backend
40 40 from rhodecode.lib.vcs.exceptions import RepositoryError
41 41 from rhodecode.lib.vcs.utils.lazy import LazyProperty
42 42 from rhodecode.lib.vcs.nodes import FileNode
43 43
44 44 from rhodecode import BACKENDS
45 45 from rhodecode.lib import helpers as h
46 46 from rhodecode.lib.utils2 import safe_str, safe_unicode
47 47 from rhodecode.lib.auth import HasRepoPermissionAny, HasReposGroupPermissionAny
48 48 from rhodecode.lib.utils import get_repos as get_filesystem_repos, make_ui, \
49 49 action_logger, EmptyChangeset, REMOVED_REPO_PAT
50 50 from rhodecode.model import BaseModel
51 51 from rhodecode.model.db import Repository, RhodeCodeUi, CacheInvalidation, \
52 52 UserFollowing, UserLog, User, RepoGroup, PullRequest
53 53
54 54 log = logging.getLogger(__name__)
55 55
56 56
57 57 class UserTemp(object):
58 58 def __init__(self, user_id):
59 59 self.user_id = user_id
60 60
61 61 def __repr__(self):
62 62 return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
63 63
64 64
65 65 class RepoTemp(object):
66 66 def __init__(self, repo_id):
67 67 self.repo_id = repo_id
68 68
69 69 def __repr__(self):
70 70 return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
71 71
72 72
73 73 class CachedRepoList(object):
74 74 """
75 75 Cached repo list, uses in-memory cache after initialization, that is
76 76 super fast
77 77 """
78 78
79 79 def __init__(self, db_repo_list, repos_path, order_by=None):
80 80 self.db_repo_list = db_repo_list
81 81 self.repos_path = repos_path
82 82 self.order_by = order_by
83 83 self.reversed = (order_by or '').startswith('-')
84 84
85 85 def __len__(self):
86 86 return len(self.db_repo_list)
87 87
88 88 def __repr__(self):
89 89 return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
90 90
91 91 def __iter__(self):
92 92 # pre-propagated cache_map to save executing select statements
93 93 # for each repo
94 94 cache_map = CacheInvalidation.get_cache_map()
95 95
96 96 for dbr in self.db_repo_list:
97 97 scmr = dbr.scm_instance_cached(cache_map)
98 98 # check permission at this level
99 99 if not HasRepoPermissionAny(
100 100 'repository.read', 'repository.write', 'repository.admin'
101 101 )(dbr.repo_name, 'get repo check'):
102 102 continue
103 103
104 104 if scmr is None:
105 105 log.error(
106 106 '%s this repository is present in database but it '
107 107 'cannot be created as an scm instance' % dbr.repo_name
108 108 )
109 109 continue
110 110
111 111 last_change = scmr.last_change
112 112 tip = h.get_changeset_safe(scmr, 'tip')
113 113
114 114 tmp_d = {}
115 115 tmp_d['name'] = dbr.repo_name
116 116 tmp_d['name_sort'] = tmp_d['name'].lower()
117 117 tmp_d['description'] = dbr.description
118 118 tmp_d['description_sort'] = tmp_d['description'].lower()
119 119 tmp_d['last_change'] = last_change
120 120 tmp_d['last_change_sort'] = time.mktime(last_change.timetuple())
121 121 tmp_d['tip'] = tip.raw_id
122 122 tmp_d['tip_sort'] = tip.revision
123 123 tmp_d['rev'] = tip.revision
124 124 tmp_d['contact'] = dbr.user.full_contact
125 125 tmp_d['contact_sort'] = tmp_d['contact']
126 126 tmp_d['owner_sort'] = tmp_d['contact']
127 127 tmp_d['repo_archives'] = list(scmr._get_archives())
128 128 tmp_d['last_msg'] = tip.message
129 129 tmp_d['author'] = tip.author
130 130 tmp_d['dbrepo'] = dbr.get_dict()
131 131 tmp_d['dbrepo_fork'] = dbr.fork.get_dict() if dbr.fork else {}
132 132 yield tmp_d
133 133
134 134
135 135 class SimpleCachedRepoList(CachedRepoList):
136 136 """
137 137 Lighter version of CachedRepoList without the scm initialisation
138 138 """
139 139
140 140 def __iter__(self):
141 141 for dbr in self.db_repo_list:
142 142 # check permission at this level
143 143 if not HasRepoPermissionAny(
144 144 'repository.read', 'repository.write', 'repository.admin'
145 145 )(dbr.repo_name, 'get repo check'):
146 146 continue
147 147
148 148 tmp_d = {}
149 149 tmp_d['name'] = dbr.repo_name
150 150 tmp_d['name_sort'] = tmp_d['name'].lower()
151 151 tmp_d['description'] = dbr.description
152 152 tmp_d['description_sort'] = tmp_d['description'].lower()
153 153 tmp_d['dbrepo'] = dbr.get_dict()
154 154 tmp_d['dbrepo_fork'] = dbr.fork.get_dict() if dbr.fork else {}
155 155 yield tmp_d
156 156
157 157
158 158 class GroupList(object):
159 159
160 160 def __init__(self, db_repo_group_list):
161 161 self.db_repo_group_list = db_repo_group_list
162 162
163 163 def __len__(self):
164 164 return len(self.db_repo_group_list)
165 165
166 166 def __repr__(self):
167 167 return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
168 168
169 169 def __iter__(self):
170 170 for dbgr in self.db_repo_group_list:
171 171 # check permission at this level
172 172 if not HasReposGroupPermissionAny(
173 173 'group.read', 'group.write', 'group.admin'
174 174 )(dbgr.group_name, 'get group repo check'):
175 175 continue
176 176
177 177 yield dbgr
178 178
179 179
180 180 class ScmModel(BaseModel):
181 181 """
182 182 Generic Scm Model
183 183 """
184 184
185 185 def __get_repo(self, instance):
186 186 cls = Repository
187 187 if isinstance(instance, cls):
188 188 return instance
189 189 elif isinstance(instance, int) or str(instance).isdigit():
190 190 return cls.get(instance)
191 191 elif isinstance(instance, basestring):
192 192 return cls.get_by_repo_name(instance)
193 193 elif instance:
194 194 raise Exception('given object must be int, basestr or Instance'
195 195 ' of %s got %s' % (type(cls), type(instance)))
196 196
197 197 @LazyProperty
198 198 def repos_path(self):
199 199 """
200 200 Get's the repositories root path from database
201 201 """
202 202
203 203 q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
204 204
205 205 return q.ui_value
206 206
207 207 def repo_scan(self, repos_path=None):
208 208 """
209 209 Listing of repositories in given path. This path should not be a
210 210 repository itself. Return a dictionary of repository objects
211 211
212 212 :param repos_path: path to directory containing repositories
213 213 """
214 214
215 215 if repos_path is None:
216 216 repos_path = self.repos_path
217 217
218 218 log.info('scanning for repositories in %s' % repos_path)
219 219
220 220 baseui = make_ui('db')
221 221 repos = {}
222 222
223 223 for name, path in get_filesystem_repos(repos_path, recursive=True):
224 224 # skip removed repos
225 225 if REMOVED_REPO_PAT.match(name):
226 226 continue
227 227
228 228 # name need to be decomposed and put back together using the /
229 229 # since this is internal storage separator for rhodecode
230 230 name = Repository.url_sep().join(name.split(os.sep))
231 231
232 232 try:
233 233 if name in repos:
234 234 raise RepositoryError('Duplicate repository name %s '
235 235 'found in %s' % (name, path))
236 236 else:
237 237
238 238 klass = get_backend(path[0])
239 239
240 240 if path[0] == 'hg' and path[0] in BACKENDS.keys():
241 241 repos[name] = klass(safe_str(path[1]), baseui=baseui)
242 242
243 243 if path[0] == 'git' and path[0] in BACKENDS.keys():
244 244 repos[name] = klass(path[1])
245 245 except OSError:
246 246 continue
247 247
248 248 return repos
249 249
250 250 def get_repos(self, all_repos=None, sort_key=None, simple=False):
251 251 """
252 252 Get all repos from db and for each repo create it's
253 253 backend instance and fill that backed with information from database
254 254
255 255 :param all_repos: list of repository names as strings
256 256 give specific repositories list, good for filtering
257 257
258 258 :param sort_key: initial sorting of repos
259 259 :param simple: use SimpleCachedList - one without the SCM info
260 260 """
261 261 if all_repos is None:
262 262 all_repos = self.sa.query(Repository)\
263 263 .filter(Repository.group_id == None)\
264 264 .order_by(func.lower(Repository.repo_name)).all()
265 265 if simple:
266 266 repo_iter = SimpleCachedRepoList(all_repos,
267 267 repos_path=self.repos_path,
268 268 order_by=sort_key)
269 269 else:
270 270 repo_iter = CachedRepoList(all_repos,
271 271 repos_path=self.repos_path,
272 272 order_by=sort_key)
273 273
274 274 return repo_iter
275 275
276 276 def get_repos_groups(self, all_groups=None):
277 277 if all_groups is None:
278 278 all_groups = RepoGroup.query()\
279 279 .filter(RepoGroup.group_parent_id == None).all()
280 280 group_iter = GroupList(all_groups)
281 281
282 282 return group_iter
283 283
284 284 def mark_for_invalidation(self, repo_name):
285 285 """
286 286 Puts cache invalidation task into db for
287 287 further global cache invalidation
288 288
289 289 :param repo_name: this repo that should invalidation take place
290 290 """
291 291 CacheInvalidation.set_invalidate(repo_name)
292 292
293 293 def toggle_following_repo(self, follow_repo_id, user_id):
294 294
295 295 f = self.sa.query(UserFollowing)\
296 296 .filter(UserFollowing.follows_repo_id == follow_repo_id)\
297 297 .filter(UserFollowing.user_id == user_id).scalar()
298 298
299 299 if f is not None:
300 300 try:
301 301 self.sa.delete(f)
302 302 action_logger(UserTemp(user_id),
303 303 'stopped_following_repo',
304 304 RepoTemp(follow_repo_id))
305 305 return
306 306 except:
307 307 log.error(traceback.format_exc())
308 308 raise
309 309
310 310 try:
311 311 f = UserFollowing()
312 312 f.user_id = user_id
313 313 f.follows_repo_id = follow_repo_id
314 314 self.sa.add(f)
315 315
316 316 action_logger(UserTemp(user_id),
317 317 'started_following_repo',
318 318 RepoTemp(follow_repo_id))
319 319 except:
320 320 log.error(traceback.format_exc())
321 321 raise
322 322
323 323 def toggle_following_user(self, follow_user_id, user_id):
324 324 f = self.sa.query(UserFollowing)\
325 325 .filter(UserFollowing.follows_user_id == follow_user_id)\
326 326 .filter(UserFollowing.user_id == user_id).scalar()
327 327
328 328 if f is not None:
329 329 try:
330 330 self.sa.delete(f)
331 331 return
332 332 except:
333 333 log.error(traceback.format_exc())
334 334 raise
335 335
336 336 try:
337 337 f = UserFollowing()
338 338 f.user_id = user_id
339 339 f.follows_user_id = follow_user_id
340 340 self.sa.add(f)
341 341 except:
342 342 log.error(traceback.format_exc())
343 343 raise
344 344
345 345 def is_following_repo(self, repo_name, user_id, cache=False):
346 346 r = self.sa.query(Repository)\
347 347 .filter(Repository.repo_name == repo_name).scalar()
348 348
349 349 f = self.sa.query(UserFollowing)\
350 350 .filter(UserFollowing.follows_repository == r)\
351 351 .filter(UserFollowing.user_id == user_id).scalar()
352 352
353 353 return f is not None
354 354
355 355 def is_following_user(self, username, user_id, cache=False):
356 356 u = User.get_by_username(username)
357 357
358 358 f = self.sa.query(UserFollowing)\
359 359 .filter(UserFollowing.follows_user == u)\
360 360 .filter(UserFollowing.user_id == user_id).scalar()
361 361
362 362 return f is not None
363 363
364 364 def get_followers(self, repo):
365 365 repo = self._get_repo(repo)
366 366
367 367 return self.sa.query(UserFollowing)\
368 368 .filter(UserFollowing.follows_repository == repo).count()
369 369
370 370 def get_forks(self, repo):
371 371 repo = self._get_repo(repo)
372 372 return self.sa.query(Repository)\
373 373 .filter(Repository.fork == repo).count()
374 374
375 375 def get_pull_requests(self, repo):
376 376 repo = self._get_repo(repo)
377 377 return self.sa.query(PullRequest)\
378 378 .filter(PullRequest.other_repo == repo).count()
379 379
380 380 def mark_as_fork(self, repo, fork, user):
381 381 repo = self.__get_repo(repo)
382 382 fork = self.__get_repo(fork)
383 if fork and repo.repo_id == fork.repo_id:
384 raise Exception("Cannot set repository as fork of itself")
383 385 repo.fork = fork
384 386 self.sa.add(repo)
385 387 return repo
386 388
387 389 def pull_changes(self, repo, username):
388 390 dbrepo = self.__get_repo(repo)
389 391 clone_uri = dbrepo.clone_uri
390 392 if not clone_uri:
391 393 raise Exception("This repository doesn't have a clone uri")
392 394
393 395 repo = dbrepo.scm_instance
394 396 try:
395 397 extras = {
396 398 'ip': '',
397 399 'username': username,
398 400 'action': 'push_remote',
399 401 'repository': dbrepo.repo_name,
400 402 'scm': repo.alias,
401 403 }
402 404 Repository.inject_ui(repo, extras=extras)
403 405
404 406 if repo.alias == 'git':
405 407 repo.fetch(clone_uri)
406 408 else:
407 409 repo.pull(clone_uri)
408 410 self.mark_for_invalidation(dbrepo.repo_name)
409 411 except:
410 412 log.error(traceback.format_exc())
411 413 raise
412 414
413 415 def commit_change(self, repo, repo_name, cs, user, author, message,
414 416 content, f_path):
415 417
416 418 if repo.alias == 'hg':
417 419 from rhodecode.lib.vcs.backends.hg import \
418 420 MercurialInMemoryChangeset as IMC
419 421 elif repo.alias == 'git':
420 422 from rhodecode.lib.vcs.backends.git import \
421 423 GitInMemoryChangeset as IMC
422 424
423 425 # decoding here will force that we have proper encoded values
424 426 # in any other case this will throw exceptions and deny commit
425 427 content = safe_str(content)
426 428 path = safe_str(f_path)
427 429 # message and author needs to be unicode
428 430 # proper backend should then translate that into required type
429 431 message = safe_unicode(message)
430 432 author = safe_unicode(author)
431 433 m = IMC(repo)
432 434 m.change(FileNode(path, content))
433 435 tip = m.commit(message=message,
434 436 author=author,
435 437 parents=[cs], branch=cs.branch)
436 438
437 439 new_cs = tip.short_id
438 440 action = 'push_local:%s' % new_cs
439 441
440 442 action_logger(user, action, repo_name)
441 443
442 444 self.mark_for_invalidation(repo_name)
443 445
444 446 def create_node(self, repo, repo_name, cs, user, author, message, content,
445 447 f_path):
446 448 if repo.alias == 'hg':
447 449 from rhodecode.lib.vcs.backends.hg import MercurialInMemoryChangeset as IMC
448 450 elif repo.alias == 'git':
449 451 from rhodecode.lib.vcs.backends.git import GitInMemoryChangeset as IMC
450 452 # decoding here will force that we have proper encoded values
451 453 # in any other case this will throw exceptions and deny commit
452 454
453 455 if isinstance(content, (basestring,)):
454 456 content = safe_str(content)
455 457 elif isinstance(content, (file, cStringIO.OutputType,)):
456 458 content = content.read()
457 459 else:
458 460 raise Exception('Content is of unrecognized type %s' % (
459 461 type(content)
460 462 ))
461 463
462 464 message = safe_unicode(message)
463 465 author = safe_unicode(author)
464 466 path = safe_str(f_path)
465 467 m = IMC(repo)
466 468
467 469 if isinstance(cs, EmptyChangeset):
468 470 # EmptyChangeset means we we're editing empty repository
469 471 parents = None
470 472 else:
471 473 parents = [cs]
472 474
473 475 m.add(FileNode(path, content=content))
474 476 tip = m.commit(message=message,
475 477 author=author,
476 478 parents=parents, branch=cs.branch)
477 479 new_cs = tip.short_id
478 480 action = 'push_local:%s' % new_cs
479 481
480 482 action_logger(user, action, repo_name)
481 483
482 484 self.mark_for_invalidation(repo_name)
483 485
484 486 def get_nodes(self, repo_name, revision, root_path='/', flat=True):
485 487 """
486 488 recursive walk in root dir and return a set of all path in that dir
487 489 based on repository walk function
488 490
489 491 :param repo_name: name of repository
490 492 :param revision: revision for which to list nodes
491 493 :param root_path: root path to list
492 494 :param flat: return as a list, if False returns a dict with decription
493 495
494 496 """
495 497 _files = list()
496 498 _dirs = list()
497 499 try:
498 500 _repo = self.__get_repo(repo_name)
499 501 changeset = _repo.scm_instance.get_changeset(revision)
500 502 root_path = root_path.lstrip('/')
501 503 for topnode, dirs, files in changeset.walk(root_path):
502 504 for f in files:
503 505 _files.append(f.path if flat else {"name": f.path,
504 506 "type": "file"})
505 507 for d in dirs:
506 508 _dirs.append(d.path if flat else {"name": d.path,
507 509 "type": "dir"})
508 510 except RepositoryError:
509 511 log.debug(traceback.format_exc())
510 512 raise
511 513
512 514 return _dirs, _files
513 515
514 516 def get_unread_journal(self):
515 517 return self.sa.query(UserLog).count()
516 518
517 519 def get_repo_landing_revs(self, repo=None):
518 520 """
519 521 Generates select option with tags branches and bookmarks (for hg only)
520 522 grouped by type
521 523
522 524 :param repo:
523 525 :type repo:
524 526 """
525 527
526 528 hist_l = []
527 529 choices = []
528 530 repo = self.__get_repo(repo)
529 531 hist_l.append(['tip', _('latest tip')])
530 532 choices.append('tip')
531 533 if not repo:
532 534 return choices, hist_l
533 535
534 536 repo = repo.scm_instance
535 537
536 538 branches_group = ([(k, k) for k, v in
537 539 repo.branches.iteritems()], _("Branches"))
538 540 hist_l.append(branches_group)
539 541 choices.extend([x[0] for x in branches_group[0]])
540 542
541 543 if repo.alias == 'hg':
542 544 bookmarks_group = ([(k, k) for k, v in
543 545 repo.bookmarks.iteritems()], _("Bookmarks"))
544 546 hist_l.append(bookmarks_group)
545 547 choices.extend([x[0] for x in bookmarks_group[0]])
546 548
547 549 tags_group = ([(k, k) for k, v in
548 550 repo.tags.iteritems()], _("Tags"))
549 551 hist_l.append(tags_group)
550 552 choices.extend([x[0] for x in tags_group[0]])
551 553
552 554 return choices, hist_l
553 555
554 556 def install_git_hook(self, repo, force_create=False):
555 557 """
556 558 Creates a rhodecode hook inside a git repository
557 559
558 560 :param repo: Instance of VCS repo
559 561 :param force_create: Create even if same name hook exists
560 562 """
561 563
562 564 loc = jn(repo.path, 'hooks')
563 565 if not repo.bare:
564 566 loc = jn(repo.path, '.git', 'hooks')
565 567 if not os.path.isdir(loc):
566 568 os.makedirs(loc)
567 569
568 570 tmpl = pkg_resources.resource_string(
569 571 'rhodecode', jn('config', 'post_receive_tmpl.py')
570 572 )
571 573
572 574 _hook_file = jn(loc, 'post-receive')
573 575 _rhodecode_hook = False
574 576 log.debug('Installing git hook in repo %s' % repo)
575 577 if os.path.exists(_hook_file):
576 578 # let's take a look at this hook, maybe it's rhodecode ?
577 579 log.debug('hook exists, checking if it is from rhodecode')
578 580 _HOOK_VER_PAT = re.compile(r'^RC_HOOK_VER')
579 581 with open(_hook_file, 'rb') as f:
580 582 data = f.read()
581 583 matches = re.compile(r'(?:%s)\s*=\s*(.*)'
582 584 % 'RC_HOOK_VER').search(data)
583 585 if matches:
584 586 try:
585 587 ver = matches.groups()[0]
586 588 log.debug('got %s it is rhodecode' % (ver))
587 589 _rhodecode_hook = True
588 590 except:
589 591 log.error(traceback.format_exc())
590 592
591 593 if _rhodecode_hook or force_create:
592 594 log.debug('writing hook file !')
593 595 with open(_hook_file, 'wb') as f:
594 596 tmpl = tmpl.replace('_TMPL_', rhodecode.__version__)
595 597 f.write(tmpl)
596 598 os.chmod(_hook_file, 0755)
597 599 else:
598 600 log.debug('skipping writing hook file') No newline at end of file
@@ -1,164 +1,164 b''
1 1 """Pylons application test package
2 2
3 3 This package assumes the Pylons environment is already loaded, such as
4 4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 5 command.
6 6
7 7 This module initializes the application via ``websetup`` (`paster
8 8 setup-app`) and provides the base testing objects.
9 9 """
10 10 import os
11 11 import time
12 12 import logging
13 13 import datetime
14 14 import hashlib
15 15 import tempfile
16 16 from os.path import join as jn
17 17
18 18 from unittest import TestCase
19 19 from tempfile import _RandomNameSequence
20 20
21 21 from paste.deploy import loadapp
22 22 from paste.script.appinstall import SetupCommand
23 23 from pylons import config, url
24 24 from routes.util import URLGenerator
25 25 from webtest import TestApp
26 26
27 27 from rhodecode import is_windows
28 28 from rhodecode.model.meta import Session
29 29 from rhodecode.model.db import User
30 30 from rhodecode.tests.nose_parametrized import parameterized
31 31
32 32 import pylons.test
33 33
34 34
35 35 os.environ['TZ'] = 'UTC'
36 36 if not is_windows:
37 37 time.tzset()
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41 __all__ = [
42 42 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
43 43 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
44 44 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
45 45 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
46 46 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
47 47 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
48 48 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
49 49 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
50 50 ]
51 51
52 52 # Invoke websetup with the current config file
53 53 # SetupCommand('setup-app').run([config_file])
54 54
55 55 ##RUNNING DESIRED TESTS
56 56 # nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
57 57 # nosetests --pdb --pdb-failures
58 58 # nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
59 59 environ = {}
60 60
61 61 #SOME GLOBALS FOR TESTS
62 62
63 63 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
64 64 TEST_USER_ADMIN_LOGIN = 'test_admin'
65 65 TEST_USER_ADMIN_PASS = 'test12'
66 66 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
67 67
68 68 TEST_USER_REGULAR_LOGIN = 'test_regular'
69 69 TEST_USER_REGULAR_PASS = 'test12'
70 70 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
71 71
72 72 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
73 73 TEST_USER_REGULAR2_PASS = 'test12'
74 74 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
75 75
76 76 HG_REPO = 'vcs_test_hg'
77 77 GIT_REPO = 'vcs_test_git'
78 78
79 79 NEW_HG_REPO = 'vcs_test_hg_new'
80 80 NEW_GIT_REPO = 'vcs_test_git_new'
81 81
82 82 HG_FORK = 'vcs_test_hg_fork'
83 83 GIT_FORK = 'vcs_test_git_fork'
84 84
85 85 ## VCS
86 86 SCM_TESTS = ['hg', 'git']
87 87 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
88 88
89 89 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
90 90
91 91 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
92 92 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
93 93 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
94 94
95 95
96 96 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
97 97
98 98 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
99 99 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
100 100 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
101 101
102 102 TEST_DIR = tempfile.gettempdir()
103 103 TEST_REPO_PREFIX = 'vcs-test'
104 104
105 105 # cached repos if any !
106 106 # comment out to get some other repos from bb or github
107 107 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
108 108 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
109 109
110 110
111 111 def get_new_dir(title):
112 112 """
113 113 Returns always new directory path.
114 114 """
115 115 from rhodecode.tests.vcs.utils import get_normalized_path
116 116 name = TEST_REPO_PREFIX
117 117 if title:
118 118 name = '-'.join((name, title))
119 119 hex = hashlib.sha1(str(time.time())).hexdigest()
120 120 name = '-'.join((name, hex))
121 121 path = os.path.join(TEST_DIR, name)
122 122 return get_normalized_path(path)
123 123
124 124
125 125 class TestController(TestCase):
126 126
127 127 def __init__(self, *args, **kwargs):
128 128 wsgiapp = pylons.test.pylonsapp
129 129 config = wsgiapp.config
130 130
131 131 self.app = TestApp(wsgiapp)
132 132 url._push_object(URLGenerator(config['routes.map'], environ))
133 133 self.Session = Session
134 134 self.index_location = config['app_conf']['index_dir']
135 135 TestCase.__init__(self, *args, **kwargs)
136 136
137 137 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
138 138 password=TEST_USER_ADMIN_PASS):
139 139 self._logged_username = username
140 140 response = self.app.post(url(controller='login', action='index'),
141 141 {'username': username,
142 142 'password': password})
143 143
144 144 if 'invalid user name' in response.body:
145 145 self.fail('could not login using %s %s' % (username, password))
146 146
147 147 self.assertEqual(response.status, '302 Found')
148 148 ses = response.session['rhodecode_user']
149 149 self.assertEqual(ses.get('username'), username)
150 150 response = response.follow()
151 151 self.assertEqual(ses.get('is_authenticated'), True)
152 152
153 153 return response.session['rhodecode_user']
154 154
155 155 def _get_logged_user(self):
156 156 return User.get_by_username(self._logged_username)
157 157
158 158 def checkSessionFlash(self, response, msg):
159 159 self.assertTrue('flash' in response.session)
160 160 if not msg in response.session['flash'][0][1]:
161 161 self.fail(
162 162 'msg `%s` not found in session flash: got `%s` instead' % (
163 msg, response.session['flash'][0][1])
163 msg, response.session['flash'])
164 164 )
@@ -1,213 +1,266 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 from rhodecode.lib.auth import get_crypt_password, check_password
4 from rhodecode.model.db import User, RhodeCodeSetting
4 from rhodecode.model.db import User, RhodeCodeSetting, Repository
5 5 from rhodecode.tests import *
6 6 from rhodecode.lib import helpers as h
7 7 from rhodecode.model.user import UserModel
8 from rhodecode.model.scm import ScmModel
8 9
9 10
10 11 class TestAdminSettingsController(TestController):
11 12
12 13 def test_index(self):
13 14 response = self.app.get(url('admin_settings'))
14 15 # Test response...
15 16
16 17 def test_index_as_xml(self):
17 18 response = self.app.get(url('formatted_admin_settings', format='xml'))
18 19
19 20 def test_create(self):
20 21 response = self.app.post(url('admin_settings'))
21 22
22 23 def test_new(self):
23 24 response = self.app.get(url('admin_new_setting'))
24 25
25 26 def test_new_as_xml(self):
26 27 response = self.app.get(url('formatted_admin_new_setting', format='xml'))
27 28
28 29 def test_update(self):
29 30 response = self.app.put(url('admin_setting', setting_id=1))
30 31
31 32 def test_update_browser_fakeout(self):
32 33 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
33 34
34 35 def test_delete(self):
35 36 response = self.app.delete(url('admin_setting', setting_id=1))
36 37
37 38 def test_delete_browser_fakeout(self):
38 39 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
39 40
40 41 def test_show(self):
41 42 response = self.app.get(url('admin_setting', setting_id=1))
42 43
43 44 def test_show_as_xml(self):
44 45 response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
45 46
46 47 def test_edit(self):
47 48 response = self.app.get(url('admin_edit_setting', setting_id=1))
48 49
49 50 def test_edit_as_xml(self):
50 51 response = self.app.get(url('formatted_admin_edit_setting',
51 52 setting_id=1, format='xml'))
52 53
53 54 def test_ga_code_active(self):
54 55 self.log_user()
55 56 old_title = 'RhodeCode'
56 57 old_realm = 'RhodeCode authentication'
57 58 new_ga_code = 'ga-test-123456789'
58 59 response = self.app.post(url('admin_setting', setting_id='global'),
59 60 params=dict(
60 61 _method='put',
61 62 rhodecode_title=old_title,
62 63 rhodecode_realm=old_realm,
63 64 rhodecode_ga_code=new_ga_code
64 65 ))
65 66
66 67 self.checkSessionFlash(response, 'Updated application settings')
67 68
68 69 self.assertEqual(RhodeCodeSetting
69 70 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
70 71
71 72 response = response.follow()
72 73 response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
73 74
74 75 def test_ga_code_inactive(self):
75 76 self.log_user()
76 77 old_title = 'RhodeCode'
77 78 old_realm = 'RhodeCode authentication'
78 79 new_ga_code = ''
79 80 response = self.app.post(url('admin_setting', setting_id='global'),
80 81 params=dict(
81 82 _method='put',
82 83 rhodecode_title=old_title,
83 84 rhodecode_realm=old_realm,
84 85 rhodecode_ga_code=new_ga_code
85 86 ))
86 87
87 88 self.assertTrue('Updated application settings' in
88 89 response.session['flash'][0][1])
89 90 self.assertEqual(RhodeCodeSetting
90 91 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
91 92
92 93 response = response.follow()
93 94 self.assertFalse("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code
94 95 in response.body)
95 96
96 97 def test_title_change(self):
97 98 self.log_user()
98 99 old_title = 'RhodeCode'
99 100 new_title = old_title + '_changed'
100 101 old_realm = 'RhodeCode authentication'
101 102
102 103 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
103 104 response = self.app.post(url('admin_setting', setting_id='global'),
104 105 params=dict(
105 106 _method='put',
106 107 rhodecode_title=new_title,
107 108 rhodecode_realm=old_realm,
108 109 rhodecode_ga_code=''
109 110 ))
110 111
111 112 self.checkSessionFlash(response, 'Updated application settings')
112 113 self.assertEqual(RhodeCodeSetting
113 114 .get_app_settings()['rhodecode_title'],
114 115 new_title.decode('utf-8'))
115 116
116 117 response = response.follow()
117 118 response.mustcontain("""<h1><a href="/">%s</a></h1>""" % new_title)
118 119
119 120 def test_my_account(self):
120 121 self.log_user()
121 122 response = self.app.get(url('admin_settings_my_account'))
122 123
123 124 self.assertTrue('value="test_admin' in response.body)
124 125
125 126 @parameterized.expand([('firstname', 'new_username'),
126 127 ('lastname', 'new_username'),
127 128 ('admin', True),
128 129 ('admin', False),
129 130 ('ldap_dn', 'test'),
130 131 ('ldap_dn', None),
131 132 ('active', False),
132 133 ('active', True),
133 134 ('email', 'some@email.com'),
134 135 ])
135 136 def test_my_account_update(self, name, expected):
136 137 uname = 'testme'
137 138 usr = UserModel().create_or_update(username=uname, password='qweqwe',
138 139 email='testme@rhodecod.org')
139 140 self.Session().commit()
140 141 params = usr.get_api_data()
141 142 user_id = usr.user_id
142 143 self.log_user(username=uname, password='qweqwe')
143 144 params.update({name: expected})
144 145 params.update({'password_confirmation': ''})
145 146 params.update({'new_password': ''})
146 147
147 148 try:
148 149 response = self.app.put(url('admin_settings_my_account_update',
149 150 id=user_id), params)
150 151
151 152 self.checkSessionFlash(response,
152 153 'Your account was updated successfully')
153 154
154 155 updated_user = User.get_by_username(uname)
155 156 updated_params = updated_user.get_api_data()
156 157 updated_params.update({'password_confirmation': ''})
157 158 updated_params.update({'new_password': ''})
158 159
159 160 params['last_login'] = updated_params['last_login']
160 161 if name == 'email':
161 162 params['emails'] = [expected]
162 163 if name == 'ldap_dn':
163 164 #cannot update this via form
164 165 params['ldap_dn'] = None
165 166 if name == 'active':
166 167 #my account cannot deactivate account
167 168 params['active'] = True
168 169 if name == 'admin':
169 170 #my account cannot make you an admin !
170 171 params['admin'] = False
171 172
172 173 self.assertEqual(params, updated_params)
173 174
174 175 finally:
175 176 UserModel().delete('testme')
176 177
177 178 def test_my_account_update_err_email_exists(self):
178 179 self.log_user()
179 180
180 181 new_email = 'test_regular@mail.com' # already exisitn email
181 182 response = self.app.put(url('admin_settings_my_account_update'),
182 183 params=dict(
183 184 username='test_admin',
184 185 new_password='test12',
185 186 password_confirmation='test122',
186 187 firstname='NewName',
187 188 lastname='NewLastname',
188 189 email=new_email,)
189 190 )
190 191
191 192 response.mustcontain('This e-mail address is already taken')
192 193
193 194 def test_my_account_update_err(self):
194 195 self.log_user('test_regular2', 'test12')
195 196
196 197 new_email = 'newmail.pl'
197 198 response = self.app.post(url('admin_settings_my_account_update'),
198 199 params=dict(
199 200 _method='put',
200 201 username='test_admin',
201 202 new_password='test12',
202 203 password_confirmation='test122',
203 204 firstname='NewName',
204 205 lastname='NewLastname',
205 206 email=new_email,)
206 207 )
207 208
208 209 response.mustcontain('An email address must contain a single @')
209 210 from rhodecode.model import validators
210 211 msg = validators.ValidUsername(edit=False,
211 212 old_data={})._messages['username_exists']
212 213 msg = h.html_escape(msg % {'username': 'test_admin'})
213 214 response.mustcontain(u"%s" % msg)
215
216 def test_set_repo_fork_has_no_self_id(self):
217 self.log_user()
218 repo = Repository.get_by_repo_name(HG_REPO)
219 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
220 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
221 assert opt not in response.body
222
223 def test_set_fork_of_repo(self):
224 self.log_user()
225 repo = Repository.get_by_repo_name(HG_REPO)
226 repo2 = Repository.get_by_repo_name(GIT_REPO)
227 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
228 params=dict(
229 id_fork_of=repo2.repo_id
230 ))
231 repo = Repository.get_by_repo_name(HG_REPO)
232 repo2 = Repository.get_by_repo_name(GIT_REPO)
233 self.checkSessionFlash(response,
234 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
235
236 assert repo.fork == repo2
237 response = response.follow()
238 # check if given repo is selected
239
240 opt = """<option value="%s" selected="selected">%s</option>""" % (
241 repo2.repo_id, repo2.repo_name)
242 response.mustcontain(opt)
243
244 # clean session flash
245 #response = self.app.get(url('edit_repo', repo_name=HG_REPO))
246
247 ## mark it as None
248 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
249 params=dict(
250 id_fork_of=None
251 ))
252 repo = Repository.get_by_repo_name(HG_REPO)
253 repo2 = Repository.get_by_repo_name(GIT_REPO)
254 self.checkSessionFlash(response,
255 'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
256 assert repo.fork == None
257
258 def test_set_fork_of_same_repo(self):
259 self.log_user()
260 repo = Repository.get_by_repo_name(HG_REPO)
261 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
262 params=dict(
263 id_fork_of=repo.repo_id
264 ))
265 self.checkSessionFlash(response,
266 'An error occurred during this operation') No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now