##// END OF EJS Templates
testing: cleanup request/config stubs...
marcink -
r2311:321e8b97 default
parent child Browse files
Show More
@@ -1,634 +1,649 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 The base Controller API
23 23 Provides the BaseController class for subclassing. And usage in different
24 24 controllers
25 25 """
26 26
27 27 import logging
28 28 import socket
29 29
30 30 import markupsafe
31 31 import ipaddress
32 32 import pyramid.threadlocal
33 33
34 34 from paste.auth.basic import AuthBasicAuthenticator
35 35 from paste.httpexceptions import HTTPUnauthorized, HTTPForbidden, get_exception
36 36 from paste.httpheaders import WWW_AUTHENTICATE, AUTHORIZATION
37 37
38 38 import rhodecode
39 39 from rhodecode.authentication.base import VCS_TYPE
40 40 from rhodecode.lib import auth, utils2
41 41 from rhodecode.lib import helpers as h
42 42 from rhodecode.lib.auth import AuthUser, CookieStoreWrapper
43 43 from rhodecode.lib.exceptions import UserCreationError
44 44 from rhodecode.lib.utils import (
45 45 get_repo_slug, set_rhodecode_config, password_changed,
46 46 get_enabled_hook_classes)
47 47 from rhodecode.lib.utils2 import (
48 48 str2bool, safe_unicode, AttributeDict, safe_int, md5, aslist, safe_str)
49 49 from rhodecode.model import meta
50 50 from rhodecode.model.db import Repository, User, ChangesetComment
51 51 from rhodecode.model.notification import NotificationModel
52 52 from rhodecode.model.scm import ScmModel
53 53 from rhodecode.model.settings import VcsSettingsModel, SettingsModel
54 54
55 55 # NOTE(marcink): remove after base controller is no longer required
56 56 from pylons.controllers import WSGIController
57 57 from pylons.i18n import translation
58 58
59 59 log = logging.getLogger(__name__)
60 60
61 61
62 62 # hack to make the migration to pyramid easier
63 63 def render(template_name, extra_vars=None, cache_key=None,
64 64 cache_type=None, cache_expire=None):
65 65 """Render a template with Mako
66 66
67 67 Accepts the cache options ``cache_key``, ``cache_type``, and
68 68 ``cache_expire``.
69 69
70 70 """
71 71 from pylons.templating import literal
72 72 from pylons.templating import cached_template, pylons_globals
73 73
74 74 # Create a render callable for the cache function
75 75 def render_template():
76 76 # Pull in extra vars if needed
77 77 globs = extra_vars or {}
78 78
79 79 # Second, get the globals
80 80 globs.update(pylons_globals())
81 81
82 82 globs['_ungettext'] = globs['ungettext']
83 83 # Grab a template reference
84 84 template = globs['app_globals'].mako_lookup.get_template(template_name)
85 85
86 86 return literal(template.render_unicode(**globs))
87 87
88 88 return cached_template(template_name, render_template, cache_key=cache_key,
89 89 cache_type=cache_type, cache_expire=cache_expire)
90 90
91 91 def _filter_proxy(ip):
92 92 """
93 93 Passed in IP addresses in HEADERS can be in a special format of multiple
94 94 ips. Those comma separated IPs are passed from various proxies in the
95 95 chain of request processing. The left-most being the original client.
96 96 We only care about the first IP which came from the org. client.
97 97
98 98 :param ip: ip string from headers
99 99 """
100 100 if ',' in ip:
101 101 _ips = ip.split(',')
102 102 _first_ip = _ips[0].strip()
103 103 log.debug('Got multiple IPs %s, using %s', ','.join(_ips), _first_ip)
104 104 return _first_ip
105 105 return ip
106 106
107 107
108 108 def _filter_port(ip):
109 109 """
110 110 Removes a port from ip, there are 4 main cases to handle here.
111 111 - ipv4 eg. 127.0.0.1
112 112 - ipv6 eg. ::1
113 113 - ipv4+port eg. 127.0.0.1:8080
114 114 - ipv6+port eg. [::1]:8080
115 115
116 116 :param ip:
117 117 """
118 118 def is_ipv6(ip_addr):
119 119 if hasattr(socket, 'inet_pton'):
120 120 try:
121 121 socket.inet_pton(socket.AF_INET6, ip_addr)
122 122 except socket.error:
123 123 return False
124 124 else:
125 125 # fallback to ipaddress
126 126 try:
127 127 ipaddress.IPv6Address(safe_unicode(ip_addr))
128 128 except Exception:
129 129 return False
130 130 return True
131 131
132 132 if ':' not in ip: # must be ipv4 pure ip
133 133 return ip
134 134
135 135 if '[' in ip and ']' in ip: # ipv6 with port
136 136 return ip.split(']')[0][1:].lower()
137 137
138 138 # must be ipv6 or ipv4 with port
139 139 if is_ipv6(ip):
140 140 return ip
141 141 else:
142 142 ip, _port = ip.split(':')[:2] # means ipv4+port
143 143 return ip
144 144
145 145
146 146 def get_ip_addr(environ):
147 147 proxy_key = 'HTTP_X_REAL_IP'
148 148 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
149 149 def_key = 'REMOTE_ADDR'
150 150 _filters = lambda x: _filter_port(_filter_proxy(x))
151 151
152 152 ip = environ.get(proxy_key)
153 153 if ip:
154 154 return _filters(ip)
155 155
156 156 ip = environ.get(proxy_key2)
157 157 if ip:
158 158 return _filters(ip)
159 159
160 160 ip = environ.get(def_key, '0.0.0.0')
161 161 return _filters(ip)
162 162
163 163
164 164 def get_server_ip_addr(environ, log_errors=True):
165 165 hostname = environ.get('SERVER_NAME')
166 166 try:
167 167 return socket.gethostbyname(hostname)
168 168 except Exception as e:
169 169 if log_errors:
170 170 # in some cases this lookup is not possible, and we don't want to
171 171 # make it an exception in logs
172 172 log.exception('Could not retrieve server ip address: %s', e)
173 173 return hostname
174 174
175 175
176 176 def get_server_port(environ):
177 177 return environ.get('SERVER_PORT')
178 178
179 179
180 180 def get_access_path(environ):
181 181 path = environ.get('PATH_INFO')
182 182 org_req = environ.get('pylons.original_request')
183 183 if org_req:
184 184 path = org_req.environ.get('PATH_INFO')
185 185 return path
186 186
187 187
188 188 def get_user_agent(environ):
189 189 return environ.get('HTTP_USER_AGENT')
190 190
191 191
192 192 def vcs_operation_context(
193 193 environ, repo_name, username, action, scm, check_locking=True,
194 194 is_shadow_repo=False):
195 195 """
196 196 Generate the context for a vcs operation, e.g. push or pull.
197 197
198 198 This context is passed over the layers so that hooks triggered by the
199 199 vcs operation know details like the user, the user's IP address etc.
200 200
201 201 :param check_locking: Allows to switch of the computation of the locking
202 202 data. This serves mainly the need of the simplevcs middleware to be
203 203 able to disable this for certain operations.
204 204
205 205 """
206 206 # Tri-state value: False: unlock, None: nothing, True: lock
207 207 make_lock = None
208 208 locked_by = [None, None, None]
209 209 is_anonymous = username == User.DEFAULT_USER
210 210 if not is_anonymous and check_locking:
211 211 log.debug('Checking locking on repository "%s"', repo_name)
212 212 user = User.get_by_username(username)
213 213 repo = Repository.get_by_repo_name(repo_name)
214 214 make_lock, __, locked_by = repo.get_locking_state(
215 215 action, user.user_id)
216 216
217 217 settings_model = VcsSettingsModel(repo=repo_name)
218 218 ui_settings = settings_model.get_ui_settings()
219 219
220 220 extras = {
221 221 'ip': get_ip_addr(environ),
222 222 'username': username,
223 223 'action': action,
224 224 'repository': repo_name,
225 225 'scm': scm,
226 226 'config': rhodecode.CONFIG['__file__'],
227 227 'make_lock': make_lock,
228 228 'locked_by': locked_by,
229 229 'server_url': utils2.get_server_url(environ),
230 230 'user_agent': get_user_agent(environ),
231 231 'hooks': get_enabled_hook_classes(ui_settings),
232 232 'is_shadow_repo': is_shadow_repo,
233 233 }
234 234 return extras
235 235
236 236
237 237 class BasicAuth(AuthBasicAuthenticator):
238 238
239 239 def __init__(self, realm, authfunc, registry, auth_http_code=None,
240 240 initial_call_detection=False, acl_repo_name=None):
241 241 self.realm = realm
242 242 self.initial_call = initial_call_detection
243 243 self.authfunc = authfunc
244 244 self.registry = registry
245 245 self.acl_repo_name = acl_repo_name
246 246 self._rc_auth_http_code = auth_http_code
247 247
248 248 def _get_response_from_code(self, http_code):
249 249 try:
250 250 return get_exception(safe_int(http_code))
251 251 except Exception:
252 252 log.exception('Failed to fetch response for code %s' % http_code)
253 253 return HTTPForbidden
254 254
255 255 def get_rc_realm(self):
256 256 return safe_str(self.registry.rhodecode_settings.get('rhodecode_realm'))
257 257
258 258 def build_authentication(self):
259 259 head = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
260 260 if self._rc_auth_http_code and not self.initial_call:
261 261 # return alternative HTTP code if alternative http return code
262 262 # is specified in RhodeCode config, but ONLY if it's not the
263 263 # FIRST call
264 264 custom_response_klass = self._get_response_from_code(
265 265 self._rc_auth_http_code)
266 266 return custom_response_klass(headers=head)
267 267 return HTTPUnauthorized(headers=head)
268 268
269 269 def authenticate(self, environ):
270 270 authorization = AUTHORIZATION(environ)
271 271 if not authorization:
272 272 return self.build_authentication()
273 273 (authmeth, auth) = authorization.split(' ', 1)
274 274 if 'basic' != authmeth.lower():
275 275 return self.build_authentication()
276 276 auth = auth.strip().decode('base64')
277 277 _parts = auth.split(':', 1)
278 278 if len(_parts) == 2:
279 279 username, password = _parts
280 280 auth_data = self.authfunc(
281 281 username, password, environ, VCS_TYPE,
282 282 registry=self.registry, acl_repo_name=self.acl_repo_name)
283 283 if auth_data:
284 284 return {'username': username, 'auth_data': auth_data}
285 285 if username and password:
286 286 # we mark that we actually executed authentication once, at
287 287 # that point we can use the alternative auth code
288 288 self.initial_call = False
289 289
290 290 return self.build_authentication()
291 291
292 292 __call__ = authenticate
293 293
294 294
295 295 def calculate_version_hash(config):
296 296 return md5(
297 297 config.get('beaker.session.secret', '') +
298 298 rhodecode.__version__)[:8]
299 299
300 300
301 301 def get_current_lang(request):
302 302 # NOTE(marcink): remove after pyramid move
303 303 try:
304 304 return translation.get_lang()[0]
305 305 except:
306 306 pass
307 307
308 308 return getattr(request, '_LOCALE_', request.locale_name)
309 309
310 310
311 311 def attach_context_attributes(context, request, user_id):
312 312 """
313 313 Attach variables into template context called `c`, please note that
314 314 request could be pylons or pyramid request in here.
315 315 """
316 316 # NOTE(marcink): remove check after pyramid migration
317 317 if hasattr(request, 'registry'):
318 318 config = request.registry.settings
319 319 else:
320 320 from pylons import config
321 321
322 322 rc_config = SettingsModel().get_all_settings(cache=True)
323 323
324 324 context.rhodecode_version = rhodecode.__version__
325 325 context.rhodecode_edition = config.get('rhodecode.edition')
326 326 # unique secret + version does not leak the version but keep consistency
327 327 context.rhodecode_version_hash = calculate_version_hash(config)
328 328
329 329 # Default language set for the incoming request
330 330 context.language = get_current_lang(request)
331 331
332 332 # Visual options
333 333 context.visual = AttributeDict({})
334 334
335 335 # DB stored Visual Items
336 336 context.visual.show_public_icon = str2bool(
337 337 rc_config.get('rhodecode_show_public_icon'))
338 338 context.visual.show_private_icon = str2bool(
339 339 rc_config.get('rhodecode_show_private_icon'))
340 340 context.visual.stylify_metatags = str2bool(
341 341 rc_config.get('rhodecode_stylify_metatags'))
342 342 context.visual.dashboard_items = safe_int(
343 343 rc_config.get('rhodecode_dashboard_items', 100))
344 344 context.visual.admin_grid_items = safe_int(
345 345 rc_config.get('rhodecode_admin_grid_items', 100))
346 346 context.visual.repository_fields = str2bool(
347 347 rc_config.get('rhodecode_repository_fields'))
348 348 context.visual.show_version = str2bool(
349 349 rc_config.get('rhodecode_show_version'))
350 350 context.visual.use_gravatar = str2bool(
351 351 rc_config.get('rhodecode_use_gravatar'))
352 352 context.visual.gravatar_url = rc_config.get('rhodecode_gravatar_url')
353 353 context.visual.default_renderer = rc_config.get(
354 354 'rhodecode_markup_renderer', 'rst')
355 355 context.visual.comment_types = ChangesetComment.COMMENT_TYPES
356 356 context.visual.rhodecode_support_url = \
357 357 rc_config.get('rhodecode_support_url') or h.route_url('rhodecode_support')
358 358
359 359 context.visual.affected_files_cut_off = 60
360 360
361 361 context.pre_code = rc_config.get('rhodecode_pre_code')
362 362 context.post_code = rc_config.get('rhodecode_post_code')
363 363 context.rhodecode_name = rc_config.get('rhodecode_title')
364 364 context.default_encodings = aslist(config.get('default_encoding'), sep=',')
365 365 # if we have specified default_encoding in the request, it has more
366 366 # priority
367 367 if request.GET.get('default_encoding'):
368 368 context.default_encodings.insert(0, request.GET.get('default_encoding'))
369 369 context.clone_uri_tmpl = rc_config.get('rhodecode_clone_uri_tmpl')
370 370
371 371 # INI stored
372 372 context.labs_active = str2bool(
373 373 config.get('labs_settings_active', 'false'))
374 374 context.visual.allow_repo_location_change = str2bool(
375 375 config.get('allow_repo_location_change', True))
376 376 context.visual.allow_custom_hooks_settings = str2bool(
377 377 config.get('allow_custom_hooks_settings', True))
378 378 context.debug_style = str2bool(config.get('debug_style', False))
379 379
380 380 context.rhodecode_instanceid = config.get('instance_id')
381 381
382 382 context.visual.cut_off_limit_diff = safe_int(
383 383 config.get('cut_off_limit_diff'))
384 384 context.visual.cut_off_limit_file = safe_int(
385 385 config.get('cut_off_limit_file'))
386 386
387 387 # AppEnlight
388 388 context.appenlight_enabled = str2bool(config.get('appenlight', 'false'))
389 389 context.appenlight_api_public_key = config.get(
390 390 'appenlight.api_public_key', '')
391 391 context.appenlight_server_url = config.get('appenlight.server_url', '')
392 392
393 393 # JS template context
394 394 context.template_context = {
395 395 'repo_name': None,
396 396 'repo_type': None,
397 397 'repo_landing_commit': None,
398 398 'rhodecode_user': {
399 399 'username': None,
400 400 'email': None,
401 401 'notification_status': False
402 402 },
403 403 'visual': {
404 404 'default_renderer': None
405 405 },
406 406 'commit_data': {
407 407 'commit_id': None
408 408 },
409 409 'pull_request_data': {'pull_request_id': None},
410 410 'timeago': {
411 411 'refresh_time': 120 * 1000,
412 412 'cutoff_limit': 1000 * 60 * 60 * 24 * 7
413 413 },
414 414 'pyramid_dispatch': {
415 415
416 416 },
417 417 'extra': {'plugins': {}}
418 418 }
419 419 # END CONFIG VARS
420 420
421 421 # TODO: This dosn't work when called from pylons compatibility tween.
422 422 # Fix this and remove it from base controller.
423 423 # context.repo_name = get_repo_slug(request) # can be empty
424 424
425 425 diffmode = 'sideside'
426 426 if request.GET.get('diffmode'):
427 427 if request.GET['diffmode'] == 'unified':
428 428 diffmode = 'unified'
429 429 elif request.session.get('diffmode'):
430 430 diffmode = request.session['diffmode']
431 431
432 432 context.diffmode = diffmode
433 433
434 434 if request.session.get('diffmode') != diffmode:
435 435 request.session['diffmode'] = diffmode
436 436
437 437 context.csrf_token = auth.get_csrf_token(session=request.session)
438 438 context.backends = rhodecode.BACKENDS.keys()
439 439 context.backends.sort()
440 440 context.unread_notifications = NotificationModel().get_unread_cnt_for_user(user_id)
441 441
442 442 # NOTE(marcink): when migrated to pyramid we don't need to set this anymore,
443 443 # given request will ALWAYS be pyramid one
444 444 pyramid_request = pyramid.threadlocal.get_current_request()
445 445 context.pyramid_request = pyramid_request
446 446
447 447 # web case
448 448 if hasattr(pyramid_request, 'user'):
449 449 context.auth_user = pyramid_request.user
450 450 context.rhodecode_user = pyramid_request.user
451 451
452 452 # api case
453 453 if hasattr(pyramid_request, 'rpc_user'):
454 454 context.auth_user = pyramid_request.rpc_user
455 455 context.rhodecode_user = pyramid_request.rpc_user
456 456
457 457 # attach the whole call context to the request
458 458 request.call_context = context
459 459
460 460
461 461 def get_auth_user(request):
462 462 environ = request.environ
463 463 session = request.session
464 464
465 465 ip_addr = get_ip_addr(environ)
466 466 # make sure that we update permissions each time we call controller
467 467 _auth_token = (request.GET.get('auth_token', '') or
468 468 request.GET.get('api_key', ''))
469 469
470 470 if _auth_token:
471 471 # when using API_KEY we assume user exists, and
472 472 # doesn't need auth based on cookies.
473 473 auth_user = AuthUser(api_key=_auth_token, ip_addr=ip_addr)
474 474 authenticated = False
475 475 else:
476 476 cookie_store = CookieStoreWrapper(session.get('rhodecode_user'))
477 477 try:
478 478 auth_user = AuthUser(user_id=cookie_store.get('user_id', None),
479 479 ip_addr=ip_addr)
480 480 except UserCreationError as e:
481 481 h.flash(e, 'error')
482 482 # container auth or other auth functions that create users
483 483 # on the fly can throw this exception signaling that there's
484 484 # issue with user creation, explanation should be provided
485 485 # in Exception itself. We then create a simple blank
486 486 # AuthUser
487 487 auth_user = AuthUser(ip_addr=ip_addr)
488 488
489 489 if password_changed(auth_user, session):
490 490 session.invalidate()
491 491 cookie_store = CookieStoreWrapper(session.get('rhodecode_user'))
492 492 auth_user = AuthUser(ip_addr=ip_addr)
493 493
494 494 authenticated = cookie_store.get('is_authenticated')
495 495
496 496 if not auth_user.is_authenticated and auth_user.is_user_object:
497 497 # user is not authenticated and not empty
498 498 auth_user.set_authenticated(authenticated)
499 499
500 500 return auth_user
501 501
502 502
503 503 class BaseController(WSGIController):
504 504
505 505 def __before__(self):
506 506 """
507 507 __before__ is called before controller methods and after __call__
508 508 """
509 509 # on each call propagate settings calls into global settings.
510 510 from pylons import config
511 511 from pylons import tmpl_context as c, request, url
512 512 set_rhodecode_config(config)
513 513 attach_context_attributes(c, request, self._rhodecode_user.user_id)
514 514
515 515 # TODO: Remove this when fixed in attach_context_attributes()
516 516 c.repo_name = get_repo_slug(request) # can be empty
517 517
518 518 self.cut_off_limit_diff = safe_int(config.get('cut_off_limit_diff'))
519 519 self.cut_off_limit_file = safe_int(config.get('cut_off_limit_file'))
520 520 self.sa = meta.Session
521 521 self.scm_model = ScmModel(self.sa)
522 522
523 523 # set user language
524 524 user_lang = getattr(c.pyramid_request, '_LOCALE_', None)
525 525 if user_lang:
526 526 translation.set_lang(user_lang)
527 527 log.debug('set language to %s for user %s',
528 528 user_lang, self._rhodecode_user)
529 529
530 530 def _dispatch_redirect(self, with_url, environ, start_response):
531 531 from webob.exc import HTTPFound
532 532 resp = HTTPFound(with_url)
533 533 environ['SCRIPT_NAME'] = '' # handle prefix middleware
534 534 environ['PATH_INFO'] = with_url
535 535 return resp(environ, start_response)
536 536
537 537 def __call__(self, environ, start_response):
538 538 """Invoke the Controller"""
539 539 # WSGIController.__call__ dispatches to the Controller method
540 540 # the request is routed to. This routing information is
541 541 # available in environ['pylons.routes_dict']
542 542 from rhodecode.lib import helpers as h
543 543 from pylons import tmpl_context as c, request, url
544 544
545 545 # Provide the Pylons context to Pyramid's debugtoolbar if it asks
546 546 if environ.get('debugtoolbar.wants_pylons_context', False):
547 547 environ['debugtoolbar.pylons_context'] = c._current_obj()
548 548
549 549 _route_name = '.'.join([environ['pylons.routes_dict']['controller'],
550 550 environ['pylons.routes_dict']['action']])
551 551
552 552 self.rc_config = SettingsModel().get_all_settings(cache=True)
553 553 self.ip_addr = get_ip_addr(environ)
554 554
555 555 # The rhodecode auth user is looked up and passed through the
556 556 # environ by the pylons compatibility tween in pyramid.
557 557 # So we can just grab it from there.
558 558 auth_user = environ['rc_auth_user']
559 559
560 560 # set globals for auth user
561 561 request.user = auth_user
562 562 self._rhodecode_user = auth_user
563 563
564 564 log.info('IP: %s User: %s accessed %s [%s]' % (
565 565 self.ip_addr, auth_user, safe_unicode(get_access_path(environ)),
566 566 _route_name)
567 567 )
568 568
569 569 user_obj = auth_user.get_instance()
570 570 if user_obj and user_obj.user_data.get('force_password_change'):
571 571 h.flash('You are required to change your password', 'warning',
572 572 ignore_duplicate=True)
573 573 return self._dispatch_redirect(
574 574 url('my_account_password'), environ, start_response)
575 575
576 576 return WSGIController.__call__(self, environ, start_response)
577 577
578 578
579 579 def h_filter(s):
580 580 """
581 581 Custom filter for Mako templates. Mako by standard uses `markupsafe.escape`
582 582 we wrap this with additional functionality that converts None to empty
583 583 strings
584 584 """
585 585 if s is None:
586 586 return markupsafe.Markup()
587 587 return markupsafe.escape(s)
588 588
589 589
590 590 def add_events_routes(config):
591 591 """
592 592 Adds routing that can be used in events. Because some events are triggered
593 593 outside of pyramid context, we need to bootstrap request with some
594 594 routing registered
595 595 """
596
597 from rhodecode.apps._base import ADMIN_PREFIX
598
596 599 config.add_route(name='home', pattern='/')
597 600
601 config.add_route(name='login', pattern=ADMIN_PREFIX + '/login')
602 config.add_route(name='logout', pattern=ADMIN_PREFIX + '/logout')
598 603 config.add_route(name='repo_summary', pattern='/{repo_name}')
599 604 config.add_route(name='repo_summary_explicit', pattern='/{repo_name}/summary')
600 605 config.add_route(name='repo_group_home', pattern='/{repo_group_name}')
601 606
602 607 config.add_route(name='pullrequest_show',
603 608 pattern='/{repo_name}/pull-request/{pull_request_id}')
604 609 config.add_route(name='pull_requests_global',
605 610 pattern='/pull-request/{pull_request_id}')
606
607 611 config.add_route(name='repo_commit',
608 612 pattern='/{repo_name}/changeset/{commit_id}')
613
609 614 config.add_route(name='repo_files',
610 615 pattern='/{repo_name}/files/{commit_id}/{f_path}')
611 616
612 617
618 def bootstrap_config(request):
619 import pyramid.testing
620 registry = pyramid.testing.Registry('RcTestRegistry')
621 config = pyramid.testing.setUp(registry=registry, request=request)
622
623 # allow pyramid lookup in testing
624 config.include('pyramid_mako')
625
626 add_events_routes(config)
627 return config
628
629
613 630 def bootstrap_request(**kwargs):
614 631 import pyramid.testing
615 632
616 633 class TestRequest(pyramid.testing.DummyRequest):
617 634 application_url = kwargs.pop('application_url', 'http://example.com')
618 635 host = kwargs.pop('host', 'example.com:80')
619 636 domain = kwargs.pop('domain', 'example.com')
620 637
621 638 def translate(self, msg):
622 639 return msg
623 640
624 641 class TestDummySession(pyramid.testing.DummySession):
625 642 def save(*arg, **kw):
626 643 pass
627 644
628 645 request = TestRequest(**kwargs)
629 646 request.session = TestDummySession()
630 647
631 config = pyramid.testing.setUp(request=request)
632 add_events_routes(config)
633 648 return request
634 649
@@ -1,1858 +1,1858 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import CommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access, add_test_routes
65 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
66 66 from rhodecode.tests.fixture import Fixture
67 67
68 68
69 69 def _split_comma(value):
70 70 return value.split(',')
71 71
72 72
73 73 def pytest_addoption(parser):
74 74 parser.addoption(
75 75 '--keep-tmp-path', action='store_true',
76 76 help="Keep the test temporary directories")
77 77 parser.addoption(
78 78 '--backends', action='store', type=_split_comma,
79 79 default=['git', 'hg', 'svn'],
80 80 help="Select which backends to test for backend specific tests.")
81 81 parser.addoption(
82 82 '--dbs', action='store', type=_split_comma,
83 83 default=['sqlite'],
84 84 help="Select which database to test for database specific tests. "
85 85 "Possible options are sqlite,postgres,mysql")
86 86 parser.addoption(
87 87 '--appenlight', '--ae', action='store_true',
88 88 help="Track statistics in appenlight.")
89 89 parser.addoption(
90 90 '--appenlight-api-key', '--ae-key',
91 91 help="API key for Appenlight.")
92 92 parser.addoption(
93 93 '--appenlight-url', '--ae-url',
94 94 default="https://ae.rhodecode.com",
95 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
96 96 parser.addoption(
97 97 '--sqlite-connection-string', action='store',
98 98 default='', help="Connection string for the dbs tests with SQLite")
99 99 parser.addoption(
100 100 '--postgres-connection-string', action='store',
101 101 default='', help="Connection string for the dbs tests with Postgres")
102 102 parser.addoption(
103 103 '--mysql-connection-string', action='store',
104 104 default='', help="Connection string for the dbs tests with MySQL")
105 105 parser.addoption(
106 106 '--repeat', type=int, default=100,
107 107 help="Number of repetitions in performance tests.")
108 108
109 109
110 110 def pytest_configure(config):
111 111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 112 from rhodecode.config import patches
113 113 patches.kombu_1_5_1_python_2_7_11()
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.function.backends.args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 185 return http_environ(
186 186 http_host_stub=http_host_stub())
187 187
188 188
189 189 @pytest.fixture
190 190 def http_host_stub():
191 191 """
192 192 Value of HTTP_HOST in the test run.
193 193 """
194 194 return 'example.com:80'
195 195
196 196
197 197 @pytest.fixture
198 198 def http_host_only_stub():
199 199 """
200 200 Value of HTTP_HOST in the test run.
201 201 """
202 202 return http_host_stub().split(':')[0]
203 203
204 204
205 205 @pytest.fixture
206 206 def http_environ(http_host_stub):
207 207 """
208 208 HTTP extra environ keys.
209 209
210 210 User by the test application and as well for setting up the pylons
211 211 environment. In the case of the fixture "app" it should be possible
212 212 to override this for a specific test case.
213 213 """
214 214 return {
215 215 'SERVER_NAME': http_host_only_stub(),
216 216 'SERVER_PORT': http_host_stub.split(':')[1],
217 217 'HTTP_HOST': http_host_stub,
218 218 'HTTP_USER_AGENT': 'rc-test-agent',
219 219 'REQUEST_METHOD': 'GET'
220 220 }
221 221
222 222
223 223 @pytest.fixture(scope='function')
224 224 def app(request, config_stub, pylonsapp, http_environ):
225 225 app = CustomTestApp(
226 226 pylonsapp,
227 227 extra_environ=http_environ)
228 228 if request.cls:
229 229 request.cls.app = app
230 230 return app
231 231
232 232
233 233 @pytest.fixture(scope='session')
234 234 def app_settings(pylonsapp, pylons_config):
235 235 """
236 236 Settings dictionary used to create the app.
237 237
238 238 Parses the ini file and passes the result through the sanitize and apply
239 239 defaults mechanism in `rhodecode.config.middleware`.
240 240 """
241 241 from paste.deploy.loadwsgi import loadcontext, APP
242 242 from rhodecode.config.middleware import (
243 243 sanitize_settings_and_apply_defaults)
244 244 context = loadcontext(APP, 'config:' + pylons_config)
245 245 settings = sanitize_settings_and_apply_defaults(context.config())
246 246 return settings
247 247
248 248
249 249 @pytest.fixture(scope='session')
250 250 def db(app_settings):
251 251 """
252 252 Initializes the database connection.
253 253
254 254 It uses the same settings which are used to create the ``pylonsapp`` or
255 255 ``app`` fixtures.
256 256 """
257 257 from rhodecode.config.utils import initialize_database
258 258 initialize_database(app_settings)
259 259
260 260
261 261 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
262 262
263 263
264 264 def _autologin_user(app, *args):
265 265 session = login_user_session(app, *args)
266 266 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
267 267 return LoginData(csrf_token, session['rhodecode_user'])
268 268
269 269
270 270 @pytest.fixture
271 271 def autologin_user(app):
272 272 """
273 273 Utility fixture which makes sure that the admin user is logged in
274 274 """
275 275 return _autologin_user(app)
276 276
277 277
278 278 @pytest.fixture
279 279 def autologin_regular_user(app):
280 280 """
281 281 Utility fixture which makes sure that the regular user is logged in
282 282 """
283 283 return _autologin_user(
284 284 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
285 285
286 286
287 287 @pytest.fixture(scope='function')
288 288 def csrf_token(request, autologin_user):
289 289 return autologin_user.csrf_token
290 290
291 291
292 292 @pytest.fixture(scope='function')
293 293 def xhr_header(request):
294 294 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
295 295
296 296
297 297 @pytest.fixture
298 298 def real_crypto_backend(monkeypatch):
299 299 """
300 300 Switch the production crypto backend on for this test.
301 301
302 302 During the test run the crypto backend is replaced with a faster
303 303 implementation based on the MD5 algorithm.
304 304 """
305 305 monkeypatch.setattr(rhodecode, 'is_test', False)
306 306
307 307
308 308 @pytest.fixture(scope='class')
309 309 def index_location(request, pylonsapp):
310 310 index_location = pylonsapp.config['app_conf']['search.location']
311 311 if request.cls:
312 312 request.cls.index_location = index_location
313 313 return index_location
314 314
315 315
316 316 @pytest.fixture(scope='session', autouse=True)
317 317 def tests_tmp_path(request):
318 318 """
319 319 Create temporary directory to be used during the test session.
320 320 """
321 321 if not os.path.exists(TESTS_TMP_PATH):
322 322 os.makedirs(TESTS_TMP_PATH)
323 323
324 324 if not request.config.getoption('--keep-tmp-path'):
325 325 @request.addfinalizer
326 326 def remove_tmp_path():
327 327 shutil.rmtree(TESTS_TMP_PATH)
328 328
329 329 return TESTS_TMP_PATH
330 330
331 331
332 332 @pytest.fixture
333 333 def test_repo_group(request):
334 334 """
335 335 Create a temporary repository group, and destroy it after
336 336 usage automatically
337 337 """
338 338 fixture = Fixture()
339 339 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
340 340 repo_group = fixture.create_repo_group(repogroupid)
341 341
342 342 def _cleanup():
343 343 fixture.destroy_repo_group(repogroupid)
344 344
345 345 request.addfinalizer(_cleanup)
346 346 return repo_group
347 347
348 348
349 349 @pytest.fixture
350 350 def test_user_group(request):
351 351 """
352 352 Create a temporary user group, and destroy it after
353 353 usage automatically
354 354 """
355 355 fixture = Fixture()
356 356 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
357 357 user_group = fixture.create_user_group(usergroupid)
358 358
359 359 def _cleanup():
360 360 fixture.destroy_user_group(user_group)
361 361
362 362 request.addfinalizer(_cleanup)
363 363 return user_group
364 364
365 365
366 366 @pytest.fixture(scope='session')
367 367 def test_repo(request):
368 368 container = TestRepoContainer()
369 369 request.addfinalizer(container._cleanup)
370 370 return container
371 371
372 372
373 373 class TestRepoContainer(object):
374 374 """
375 375 Container for test repositories which are used read only.
376 376
377 377 Repositories will be created on demand and re-used during the lifetime
378 378 of this object.
379 379
380 380 Usage to get the svn test repository "minimal"::
381 381
382 382 test_repo = TestContainer()
383 383 repo = test_repo('minimal', 'svn')
384 384
385 385 """
386 386
387 387 dump_extractors = {
388 388 'git': utils.extract_git_repo_from_dump,
389 389 'hg': utils.extract_hg_repo_from_dump,
390 390 'svn': utils.extract_svn_repo_from_dump,
391 391 }
392 392
393 393 def __init__(self):
394 394 self._cleanup_repos = []
395 395 self._fixture = Fixture()
396 396 self._repos = {}
397 397
398 398 def __call__(self, dump_name, backend_alias, config=None):
399 399 key = (dump_name, backend_alias)
400 400 if key not in self._repos:
401 401 repo = self._create_repo(dump_name, backend_alias, config)
402 402 self._repos[key] = repo.repo_id
403 403 return Repository.get(self._repos[key])
404 404
405 405 def _create_repo(self, dump_name, backend_alias, config):
406 406 repo_name = '%s-%s' % (backend_alias, dump_name)
407 407 backend_class = get_backend(backend_alias)
408 408 dump_extractor = self.dump_extractors[backend_alias]
409 409 repo_path = dump_extractor(dump_name, repo_name)
410 410
411 411 vcs_repo = backend_class(repo_path, config=config)
412 412 repo2db_mapper({repo_name: vcs_repo})
413 413
414 414 repo = RepoModel().get_by_repo_name(repo_name)
415 415 self._cleanup_repos.append(repo_name)
416 416 return repo
417 417
418 418 def _cleanup(self):
419 419 for repo_name in reversed(self._cleanup_repos):
420 420 self._fixture.destroy_repo(repo_name)
421 421
422 422
423 423 @pytest.fixture
424 424 def backend(request, backend_alias, pylonsapp, test_repo):
425 425 """
426 426 Parametrized fixture which represents a single backend implementation.
427 427
428 428 It respects the option `--backends` to focus the test run on specific
429 429 backend implementations.
430 430
431 431 It also supports `pytest.mark.xfail_backends` to mark tests as failing
432 432 for specific backends. This is intended as a utility for incremental
433 433 development of a new backend implementation.
434 434 """
435 435 if backend_alias not in request.config.getoption('--backends'):
436 436 pytest.skip("Backend %s not selected." % (backend_alias, ))
437 437
438 438 utils.check_xfail_backends(request.node, backend_alias)
439 439 utils.check_skip_backends(request.node, backend_alias)
440 440
441 441 repo_name = 'vcs_test_%s' % (backend_alias, )
442 442 backend = Backend(
443 443 alias=backend_alias,
444 444 repo_name=repo_name,
445 445 test_name=request.node.name,
446 446 test_repo_container=test_repo)
447 447 request.addfinalizer(backend.cleanup)
448 448 return backend
449 449
450 450
451 451 @pytest.fixture
452 452 def backend_git(request, pylonsapp, test_repo):
453 453 return backend(request, 'git', pylonsapp, test_repo)
454 454
455 455
456 456 @pytest.fixture
457 457 def backend_hg(request, pylonsapp, test_repo):
458 458 return backend(request, 'hg', pylonsapp, test_repo)
459 459
460 460
461 461 @pytest.fixture
462 462 def backend_svn(request, pylonsapp, test_repo):
463 463 return backend(request, 'svn', pylonsapp, test_repo)
464 464
465 465
466 466 @pytest.fixture
467 467 def backend_random(backend_git):
468 468 """
469 469 Use this to express that your tests need "a backend.
470 470
471 471 A few of our tests need a backend, so that we can run the code. This
472 472 fixture is intended to be used for such cases. It will pick one of the
473 473 backends and run the tests.
474 474
475 475 The fixture `backend` would run the test multiple times for each
476 476 available backend which is a pure waste of time if the test is
477 477 independent of the backend type.
478 478 """
479 479 # TODO: johbo: Change this to pick a random backend
480 480 return backend_git
481 481
482 482
483 483 @pytest.fixture
484 484 def backend_stub(backend_git):
485 485 """
486 486 Use this to express that your tests need a backend stub
487 487
488 488 TODO: mikhail: Implement a real stub logic instead of returning
489 489 a git backend
490 490 """
491 491 return backend_git
492 492
493 493
494 494 @pytest.fixture
495 495 def repo_stub(backend_stub):
496 496 """
497 497 Use this to express that your tests need a repository stub
498 498 """
499 499 return backend_stub.create_repo()
500 500
501 501
502 502 class Backend(object):
503 503 """
504 504 Represents the test configuration for one supported backend
505 505
506 506 Provides easy access to different test repositories based on
507 507 `__getitem__`. Such repositories will only be created once per test
508 508 session.
509 509 """
510 510
511 511 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
512 512 _master_repo = None
513 513 _commit_ids = {}
514 514
515 515 def __init__(self, alias, repo_name, test_name, test_repo_container):
516 516 self.alias = alias
517 517 self.repo_name = repo_name
518 518 self._cleanup_repos = []
519 519 self._test_name = test_name
520 520 self._test_repo_container = test_repo_container
521 521 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
522 522 # Fixture will survive in the end.
523 523 self._fixture = Fixture()
524 524
525 525 def __getitem__(self, key):
526 526 return self._test_repo_container(key, self.alias)
527 527
528 528 def create_test_repo(self, key, config=None):
529 529 return self._test_repo_container(key, self.alias, config)
530 530
531 531 @property
532 532 def repo(self):
533 533 """
534 534 Returns the "current" repository. This is the vcs_test repo or the
535 535 last repo which has been created with `create_repo`.
536 536 """
537 537 from rhodecode.model.db import Repository
538 538 return Repository.get_by_repo_name(self.repo_name)
539 539
540 540 @property
541 541 def default_branch_name(self):
542 542 VcsRepository = get_backend(self.alias)
543 543 return VcsRepository.DEFAULT_BRANCH_NAME
544 544
545 545 @property
546 546 def default_head_id(self):
547 547 """
548 548 Returns the default head id of the underlying backend.
549 549
550 550 This will be the default branch name in case the backend does have a
551 551 default branch. In the other cases it will point to a valid head
552 552 which can serve as the base to create a new commit on top of it.
553 553 """
554 554 vcsrepo = self.repo.scm_instance()
555 555 head_id = (
556 556 vcsrepo.DEFAULT_BRANCH_NAME or
557 557 vcsrepo.commit_ids[-1])
558 558 return head_id
559 559
560 560 @property
561 561 def commit_ids(self):
562 562 """
563 563 Returns the list of commits for the last created repository
564 564 """
565 565 return self._commit_ids
566 566
567 567 def create_master_repo(self, commits):
568 568 """
569 569 Create a repository and remember it as a template.
570 570
571 571 This allows to easily create derived repositories to construct
572 572 more complex scenarios for diff, compare and pull requests.
573 573
574 574 Returns a commit map which maps from commit message to raw_id.
575 575 """
576 576 self._master_repo = self.create_repo(commits=commits)
577 577 return self._commit_ids
578 578
579 579 def create_repo(
580 580 self, commits=None, number_of_commits=0, heads=None,
581 581 name_suffix=u'', **kwargs):
582 582 """
583 583 Create a repository and record it for later cleanup.
584 584
585 585 :param commits: Optional. A sequence of dict instances.
586 586 Will add a commit per entry to the new repository.
587 587 :param number_of_commits: Optional. If set to a number, this number of
588 588 commits will be added to the new repository.
589 589 :param heads: Optional. Can be set to a sequence of of commit
590 590 names which shall be pulled in from the master repository.
591 591
592 592 """
593 593 self.repo_name = self._next_repo_name() + name_suffix
594 594 repo = self._fixture.create_repo(
595 595 self.repo_name, repo_type=self.alias, **kwargs)
596 596 self._cleanup_repos.append(repo.repo_name)
597 597
598 598 commits = commits or [
599 599 {'message': 'Commit %s of %s' % (x, self.repo_name)}
600 600 for x in xrange(number_of_commits)]
601 601 self._add_commits_to_repo(repo.scm_instance(), commits)
602 602 if heads:
603 603 self.pull_heads(repo, heads)
604 604
605 605 return repo
606 606
607 607 def pull_heads(self, repo, heads):
608 608 """
609 609 Make sure that repo contains all commits mentioned in `heads`
610 610 """
611 611 vcsmaster = self._master_repo.scm_instance()
612 612 vcsrepo = repo.scm_instance()
613 613 vcsrepo.config.clear_section('hooks')
614 614 commit_ids = [self._commit_ids[h] for h in heads]
615 615 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
616 616
617 617 def create_fork(self):
618 618 repo_to_fork = self.repo_name
619 619 self.repo_name = self._next_repo_name()
620 620 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
621 621 self._cleanup_repos.append(self.repo_name)
622 622 return repo
623 623
624 624 def new_repo_name(self, suffix=u''):
625 625 self.repo_name = self._next_repo_name() + suffix
626 626 self._cleanup_repos.append(self.repo_name)
627 627 return self.repo_name
628 628
629 629 def _next_repo_name(self):
630 630 return u"%s_%s" % (
631 631 self.invalid_repo_name.sub(u'_', self._test_name),
632 632 len(self._cleanup_repos))
633 633
634 634 def ensure_file(self, filename, content='Test content\n'):
635 635 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
636 636 commits = [
637 637 {'added': [
638 638 FileNode(filename, content=content),
639 639 ]},
640 640 ]
641 641 self._add_commits_to_repo(self.repo.scm_instance(), commits)
642 642
643 643 def enable_downloads(self):
644 644 repo = self.repo
645 645 repo.enable_downloads = True
646 646 Session().add(repo)
647 647 Session().commit()
648 648
649 649 def cleanup(self):
650 650 for repo_name in reversed(self._cleanup_repos):
651 651 self._fixture.destroy_repo(repo_name)
652 652
653 653 def _add_commits_to_repo(self, repo, commits):
654 654 commit_ids = _add_commits_to_repo(repo, commits)
655 655 if not commit_ids:
656 656 return
657 657 self._commit_ids = commit_ids
658 658
659 659 # Creating refs for Git to allow fetching them from remote repository
660 660 if self.alias == 'git':
661 661 refs = {}
662 662 for message in self._commit_ids:
663 663 # TODO: mikhail: do more special chars replacements
664 664 ref_name = 'refs/test-refs/{}'.format(
665 665 message.replace(' ', ''))
666 666 refs[ref_name] = self._commit_ids[message]
667 667 self._create_refs(repo, refs)
668 668
669 669 def _create_refs(self, repo, refs):
670 670 for ref_name in refs:
671 671 repo.set_refs(ref_name, refs[ref_name])
672 672
673 673
674 674 @pytest.fixture
675 675 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
676 676 """
677 677 Parametrized fixture which represents a single vcs backend implementation.
678 678
679 679 See the fixture `backend` for more details. This one implements the same
680 680 concept, but on vcs level. So it does not provide model instances etc.
681 681
682 682 Parameters are generated dynamically, see :func:`pytest_generate_tests`
683 683 for how this works.
684 684 """
685 685 if backend_alias not in request.config.getoption('--backends'):
686 686 pytest.skip("Backend %s not selected." % (backend_alias, ))
687 687
688 688 utils.check_xfail_backends(request.node, backend_alias)
689 689 utils.check_skip_backends(request.node, backend_alias)
690 690
691 691 repo_name = 'vcs_test_%s' % (backend_alias, )
692 692 repo_path = os.path.join(tests_tmp_path, repo_name)
693 693 backend = VcsBackend(
694 694 alias=backend_alias,
695 695 repo_path=repo_path,
696 696 test_name=request.node.name,
697 697 test_repo_container=test_repo)
698 698 request.addfinalizer(backend.cleanup)
699 699 return backend
700 700
701 701
702 702 @pytest.fixture
703 703 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
704 704 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
705 705
706 706
707 707 @pytest.fixture
708 708 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
709 709 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
710 710
711 711
712 712 @pytest.fixture
713 713 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
714 714 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
715 715
716 716
717 717 @pytest.fixture
718 718 def vcsbackend_random(vcsbackend_git):
719 719 """
720 720 Use this to express that your tests need "a vcsbackend".
721 721
722 722 The fixture `vcsbackend` would run the test multiple times for each
723 723 available vcs backend which is a pure waste of time if the test is
724 724 independent of the vcs backend type.
725 725 """
726 726 # TODO: johbo: Change this to pick a random backend
727 727 return vcsbackend_git
728 728
729 729
730 730 @pytest.fixture
731 731 def vcsbackend_stub(vcsbackend_git):
732 732 """
733 733 Use this to express that your test just needs a stub of a vcsbackend.
734 734
735 735 Plan is to eventually implement an in-memory stub to speed tests up.
736 736 """
737 737 return vcsbackend_git
738 738
739 739
740 740 class VcsBackend(object):
741 741 """
742 742 Represents the test configuration for one supported vcs backend.
743 743 """
744 744
745 745 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
746 746
747 747 def __init__(self, alias, repo_path, test_name, test_repo_container):
748 748 self.alias = alias
749 749 self._repo_path = repo_path
750 750 self._cleanup_repos = []
751 751 self._test_name = test_name
752 752 self._test_repo_container = test_repo_container
753 753
754 754 def __getitem__(self, key):
755 755 return self._test_repo_container(key, self.alias).scm_instance()
756 756
757 757 @property
758 758 def repo(self):
759 759 """
760 760 Returns the "current" repository. This is the vcs_test repo of the last
761 761 repo which has been created.
762 762 """
763 763 Repository = get_backend(self.alias)
764 764 return Repository(self._repo_path)
765 765
766 766 @property
767 767 def backend(self):
768 768 """
769 769 Returns the backend implementation class.
770 770 """
771 771 return get_backend(self.alias)
772 772
773 773 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
774 774 repo_name = self._next_repo_name()
775 775 self._repo_path = get_new_dir(repo_name)
776 776 repo_class = get_backend(self.alias)
777 777 src_url = None
778 778 if _clone_repo:
779 779 src_url = _clone_repo.path
780 780 repo = repo_class(self._repo_path, create=True, src_url=src_url)
781 781 self._cleanup_repos.append(repo)
782 782
783 783 commits = commits or [
784 784 {'message': 'Commit %s of %s' % (x, repo_name)}
785 785 for x in xrange(number_of_commits)]
786 786 _add_commits_to_repo(repo, commits)
787 787 return repo
788 788
789 789 def clone_repo(self, repo):
790 790 return self.create_repo(_clone_repo=repo)
791 791
792 792 def cleanup(self):
793 793 for repo in self._cleanup_repos:
794 794 shutil.rmtree(repo.path)
795 795
796 796 def new_repo_path(self):
797 797 repo_name = self._next_repo_name()
798 798 self._repo_path = get_new_dir(repo_name)
799 799 return self._repo_path
800 800
801 801 def _next_repo_name(self):
802 802 return "%s_%s" % (
803 803 self.invalid_repo_name.sub('_', self._test_name),
804 804 len(self._cleanup_repos))
805 805
806 806 def add_file(self, repo, filename, content='Test content\n'):
807 807 imc = repo.in_memory_commit
808 808 imc.add(FileNode(filename, content=content))
809 809 imc.commit(
810 810 message=u'Automatic commit from vcsbackend fixture',
811 811 author=u'Automatic')
812 812
813 813 def ensure_file(self, filename, content='Test content\n'):
814 814 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
815 815 self.add_file(self.repo, filename, content)
816 816
817 817
818 818 def _add_commits_to_repo(vcs_repo, commits):
819 819 commit_ids = {}
820 820 if not commits:
821 821 return commit_ids
822 822
823 823 imc = vcs_repo.in_memory_commit
824 824 commit = None
825 825
826 826 for idx, commit in enumerate(commits):
827 827 message = unicode(commit.get('message', 'Commit %s' % idx))
828 828
829 829 for node in commit.get('added', []):
830 830 imc.add(FileNode(node.path, content=node.content))
831 831 for node in commit.get('changed', []):
832 832 imc.change(FileNode(node.path, content=node.content))
833 833 for node in commit.get('removed', []):
834 834 imc.remove(FileNode(node.path))
835 835
836 836 parents = [
837 837 vcs_repo.get_commit(commit_id=commit_ids[p])
838 838 for p in commit.get('parents', [])]
839 839
840 840 operations = ('added', 'changed', 'removed')
841 841 if not any((commit.get(o) for o in operations)):
842 842 imc.add(FileNode('file_%s' % idx, content=message))
843 843
844 844 commit = imc.commit(
845 845 message=message,
846 846 author=unicode(commit.get('author', 'Automatic')),
847 847 date=commit.get('date'),
848 848 branch=commit.get('branch'),
849 849 parents=parents)
850 850
851 851 commit_ids[commit.message] = commit.raw_id
852 852
853 853 return commit_ids
854 854
855 855
856 856 @pytest.fixture
857 857 def reposerver(request):
858 858 """
859 859 Allows to serve a backend repository
860 860 """
861 861
862 862 repo_server = RepoServer()
863 863 request.addfinalizer(repo_server.cleanup)
864 864 return repo_server
865 865
866 866
867 867 class RepoServer(object):
868 868 """
869 869 Utility to serve a local repository for the duration of a test case.
870 870
871 871 Supports only Subversion so far.
872 872 """
873 873
874 874 url = None
875 875
876 876 def __init__(self):
877 877 self._cleanup_servers = []
878 878
879 879 def serve(self, vcsrepo):
880 880 if vcsrepo.alias != 'svn':
881 881 raise TypeError("Backend %s not supported" % vcsrepo.alias)
882 882
883 883 proc = subprocess32.Popen(
884 884 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
885 885 '--root', vcsrepo.path])
886 886 self._cleanup_servers.append(proc)
887 887 self.url = 'svn://localhost'
888 888
889 889 def cleanup(self):
890 890 for proc in self._cleanup_servers:
891 891 proc.terminate()
892 892
893 893
894 894 @pytest.fixture
895 895 def pr_util(backend, request, config_stub):
896 896 """
897 897 Utility for tests of models and for functional tests around pull requests.
898 898
899 899 It gives an instance of :class:`PRTestUtility` which provides various
900 900 utility methods around one pull request.
901 901
902 902 This fixture uses `backend` and inherits its parameterization.
903 903 """
904 904
905 905 util = PRTestUtility(backend)
906 906
907 907 @request.addfinalizer
908 908 def cleanup():
909 909 util.cleanup()
910 910
911 911 return util
912 912
913 913
914 914 class PRTestUtility(object):
915 915
916 916 pull_request = None
917 917 pull_request_id = None
918 918 mergeable_patcher = None
919 919 mergeable_mock = None
920 920 notification_patcher = None
921 921
922 922 def __init__(self, backend):
923 923 self.backend = backend
924 924
925 925 def create_pull_request(
926 926 self, commits=None, target_head=None, source_head=None,
927 927 revisions=None, approved=False, author=None, mergeable=False,
928 928 enable_notifications=True, name_suffix=u'', reviewers=None,
929 929 title=u"Test", description=u"Description"):
930 930 self.set_mergeable(mergeable)
931 931 if not enable_notifications:
932 932 # mock notification side effect
933 933 self.notification_patcher = mock.patch(
934 934 'rhodecode.model.notification.NotificationModel.create')
935 935 self.notification_patcher.start()
936 936
937 937 if not self.pull_request:
938 938 if not commits:
939 939 commits = [
940 940 {'message': 'c1'},
941 941 {'message': 'c2'},
942 942 {'message': 'c3'},
943 943 ]
944 944 target_head = 'c1'
945 945 source_head = 'c2'
946 946 revisions = ['c2']
947 947
948 948 self.commit_ids = self.backend.create_master_repo(commits)
949 949 self.target_repository = self.backend.create_repo(
950 950 heads=[target_head], name_suffix=name_suffix)
951 951 self.source_repository = self.backend.create_repo(
952 952 heads=[source_head], name_suffix=name_suffix)
953 953 self.author = author or UserModel().get_by_username(
954 954 TEST_USER_ADMIN_LOGIN)
955 955
956 956 model = PullRequestModel()
957 957 self.create_parameters = {
958 958 'created_by': self.author,
959 959 'source_repo': self.source_repository.repo_name,
960 960 'source_ref': self._default_branch_reference(source_head),
961 961 'target_repo': self.target_repository.repo_name,
962 962 'target_ref': self._default_branch_reference(target_head),
963 963 'revisions': [self.commit_ids[r] for r in revisions],
964 964 'reviewers': reviewers or self._get_reviewers(),
965 965 'title': title,
966 966 'description': description,
967 967 }
968 968 self.pull_request = model.create(**self.create_parameters)
969 969 assert model.get_versions(self.pull_request) == []
970 970
971 971 self.pull_request_id = self.pull_request.pull_request_id
972 972
973 973 if approved:
974 974 self.approve()
975 975
976 976 Session().add(self.pull_request)
977 977 Session().commit()
978 978
979 979 return self.pull_request
980 980
981 981 def approve(self):
982 982 self.create_status_votes(
983 983 ChangesetStatus.STATUS_APPROVED,
984 984 *self.pull_request.reviewers)
985 985
986 986 def close(self):
987 987 PullRequestModel().close_pull_request(self.pull_request, self.author)
988 988
989 989 def _default_branch_reference(self, commit_message):
990 990 reference = '%s:%s:%s' % (
991 991 'branch',
992 992 self.backend.default_branch_name,
993 993 self.commit_ids[commit_message])
994 994 return reference
995 995
996 996 def _get_reviewers(self):
997 997 return [
998 998 (TEST_USER_REGULAR_LOGIN, ['default1'], False),
999 999 (TEST_USER_REGULAR2_LOGIN, ['default2'], False),
1000 1000 ]
1001 1001
1002 1002 def update_source_repository(self, head=None):
1003 1003 heads = [head or 'c3']
1004 1004 self.backend.pull_heads(self.source_repository, heads=heads)
1005 1005
1006 1006 def add_one_commit(self, head=None):
1007 1007 self.update_source_repository(head=head)
1008 1008 old_commit_ids = set(self.pull_request.revisions)
1009 1009 PullRequestModel().update_commits(self.pull_request)
1010 1010 commit_ids = set(self.pull_request.revisions)
1011 1011 new_commit_ids = commit_ids - old_commit_ids
1012 1012 assert len(new_commit_ids) == 1
1013 1013 return new_commit_ids.pop()
1014 1014
1015 1015 def remove_one_commit(self):
1016 1016 assert len(self.pull_request.revisions) == 2
1017 1017 source_vcs = self.source_repository.scm_instance()
1018 1018 removed_commit_id = source_vcs.commit_ids[-1]
1019 1019
1020 1020 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1021 1021 # remove the if once that's sorted out.
1022 1022 if self.backend.alias == "git":
1023 1023 kwargs = {'branch_name': self.backend.default_branch_name}
1024 1024 else:
1025 1025 kwargs = {}
1026 1026 source_vcs.strip(removed_commit_id, **kwargs)
1027 1027
1028 1028 PullRequestModel().update_commits(self.pull_request)
1029 1029 assert len(self.pull_request.revisions) == 1
1030 1030 return removed_commit_id
1031 1031
1032 1032 def create_comment(self, linked_to=None):
1033 1033 comment = CommentsModel().create(
1034 1034 text=u"Test comment",
1035 1035 repo=self.target_repository.repo_name,
1036 1036 user=self.author,
1037 1037 pull_request=self.pull_request)
1038 1038 assert comment.pull_request_version_id is None
1039 1039
1040 1040 if linked_to:
1041 1041 PullRequestModel()._link_comments_to_version(linked_to)
1042 1042
1043 1043 return comment
1044 1044
1045 1045 def create_inline_comment(
1046 1046 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1047 1047 comment = CommentsModel().create(
1048 1048 text=u"Test comment",
1049 1049 repo=self.target_repository.repo_name,
1050 1050 user=self.author,
1051 1051 line_no=line_no,
1052 1052 f_path=file_path,
1053 1053 pull_request=self.pull_request)
1054 1054 assert comment.pull_request_version_id is None
1055 1055
1056 1056 if linked_to:
1057 1057 PullRequestModel()._link_comments_to_version(linked_to)
1058 1058
1059 1059 return comment
1060 1060
1061 1061 def create_version_of_pull_request(self):
1062 1062 pull_request = self.create_pull_request()
1063 1063 version = PullRequestModel()._create_version_from_snapshot(
1064 1064 pull_request)
1065 1065 return version
1066 1066
1067 1067 def create_status_votes(self, status, *reviewers):
1068 1068 for reviewer in reviewers:
1069 1069 ChangesetStatusModel().set_status(
1070 1070 repo=self.pull_request.target_repo,
1071 1071 status=status,
1072 1072 user=reviewer.user_id,
1073 1073 pull_request=self.pull_request)
1074 1074
1075 1075 def set_mergeable(self, value):
1076 1076 if not self.mergeable_patcher:
1077 1077 self.mergeable_patcher = mock.patch.object(
1078 1078 VcsSettingsModel, 'get_general_settings')
1079 1079 self.mergeable_mock = self.mergeable_patcher.start()
1080 1080 self.mergeable_mock.return_value = {
1081 1081 'rhodecode_pr_merge_enabled': value}
1082 1082
1083 1083 def cleanup(self):
1084 1084 # In case the source repository is already cleaned up, the pull
1085 1085 # request will already be deleted.
1086 1086 pull_request = PullRequest().get(self.pull_request_id)
1087 1087 if pull_request:
1088 1088 PullRequestModel().delete(pull_request, pull_request.author)
1089 1089 Session().commit()
1090 1090
1091 1091 if self.notification_patcher:
1092 1092 self.notification_patcher.stop()
1093 1093
1094 1094 if self.mergeable_patcher:
1095 1095 self.mergeable_patcher.stop()
1096 1096
1097 1097
1098 1098 @pytest.fixture
1099 1099 def user_admin(pylonsapp):
1100 1100 """
1101 1101 Provides the default admin test user as an instance of `db.User`.
1102 1102 """
1103 1103 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1104 1104 return user
1105 1105
1106 1106
1107 1107 @pytest.fixture
1108 1108 def user_regular(pylonsapp):
1109 1109 """
1110 1110 Provides the default regular test user as an instance of `db.User`.
1111 1111 """
1112 1112 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1113 1113 return user
1114 1114
1115 1115
1116 1116 @pytest.fixture
1117 1117 def user_util(request, pylonsapp):
1118 1118 """
1119 1119 Provides a wired instance of `UserUtility` with integrated cleanup.
1120 1120 """
1121 1121 utility = UserUtility(test_name=request.node.name)
1122 1122 request.addfinalizer(utility.cleanup)
1123 1123 return utility
1124 1124
1125 1125
1126 1126 # TODO: johbo: Split this up into utilities per domain or something similar
1127 1127 class UserUtility(object):
1128 1128
1129 1129 def __init__(self, test_name="test"):
1130 1130 self._test_name = self._sanitize_name(test_name)
1131 1131 self.fixture = Fixture()
1132 1132 self.repo_group_ids = []
1133 1133 self.repos_ids = []
1134 1134 self.user_ids = []
1135 1135 self.user_group_ids = []
1136 1136 self.user_repo_permission_ids = []
1137 1137 self.user_group_repo_permission_ids = []
1138 1138 self.user_repo_group_permission_ids = []
1139 1139 self.user_group_repo_group_permission_ids = []
1140 1140 self.user_user_group_permission_ids = []
1141 1141 self.user_group_user_group_permission_ids = []
1142 1142 self.user_permissions = []
1143 1143
1144 1144 def _sanitize_name(self, name):
1145 1145 for char in ['[', ']']:
1146 1146 name = name.replace(char, '_')
1147 1147 return name
1148 1148
1149 1149 def create_repo_group(
1150 1150 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1151 1151 group_name = "{prefix}_repogroup_{count}".format(
1152 1152 prefix=self._test_name,
1153 1153 count=len(self.repo_group_ids))
1154 1154 repo_group = self.fixture.create_repo_group(
1155 1155 group_name, cur_user=owner)
1156 1156 if auto_cleanup:
1157 1157 self.repo_group_ids.append(repo_group.group_id)
1158 1158 return repo_group
1159 1159
1160 1160 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1161 1161 auto_cleanup=True, repo_type='hg'):
1162 1162 repo_name = "{prefix}_repository_{count}".format(
1163 1163 prefix=self._test_name,
1164 1164 count=len(self.repos_ids))
1165 1165
1166 1166 repository = self.fixture.create_repo(
1167 1167 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type)
1168 1168 if auto_cleanup:
1169 1169 self.repos_ids.append(repository.repo_id)
1170 1170 return repository
1171 1171
1172 1172 def create_user(self, auto_cleanup=True, **kwargs):
1173 1173 user_name = "{prefix}_user_{count}".format(
1174 1174 prefix=self._test_name,
1175 1175 count=len(self.user_ids))
1176 1176 user = self.fixture.create_user(user_name, **kwargs)
1177 1177 if auto_cleanup:
1178 1178 self.user_ids.append(user.user_id)
1179 1179 return user
1180 1180
1181 1181 def create_user_with_group(self):
1182 1182 user = self.create_user()
1183 1183 user_group = self.create_user_group(members=[user])
1184 1184 return user, user_group
1185 1185
1186 1186 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1187 1187 auto_cleanup=True, **kwargs):
1188 1188 group_name = "{prefix}_usergroup_{count}".format(
1189 1189 prefix=self._test_name,
1190 1190 count=len(self.user_group_ids))
1191 1191 user_group = self.fixture.create_user_group(
1192 1192 group_name, cur_user=owner, **kwargs)
1193 1193
1194 1194 if auto_cleanup:
1195 1195 self.user_group_ids.append(user_group.users_group_id)
1196 1196 if members:
1197 1197 for user in members:
1198 1198 UserGroupModel().add_user_to_group(user_group, user)
1199 1199 return user_group
1200 1200
1201 1201 def grant_user_permission(self, user_name, permission_name):
1202 1202 self._inherit_default_user_permissions(user_name, False)
1203 1203 self.user_permissions.append((user_name, permission_name))
1204 1204
1205 1205 def grant_user_permission_to_repo_group(
1206 1206 self, repo_group, user, permission_name):
1207 1207 permission = RepoGroupModel().grant_user_permission(
1208 1208 repo_group, user, permission_name)
1209 1209 self.user_repo_group_permission_ids.append(
1210 1210 (repo_group.group_id, user.user_id))
1211 1211 return permission
1212 1212
1213 1213 def grant_user_group_permission_to_repo_group(
1214 1214 self, repo_group, user_group, permission_name):
1215 1215 permission = RepoGroupModel().grant_user_group_permission(
1216 1216 repo_group, user_group, permission_name)
1217 1217 self.user_group_repo_group_permission_ids.append(
1218 1218 (repo_group.group_id, user_group.users_group_id))
1219 1219 return permission
1220 1220
1221 1221 def grant_user_permission_to_repo(
1222 1222 self, repo, user, permission_name):
1223 1223 permission = RepoModel().grant_user_permission(
1224 1224 repo, user, permission_name)
1225 1225 self.user_repo_permission_ids.append(
1226 1226 (repo.repo_id, user.user_id))
1227 1227 return permission
1228 1228
1229 1229 def grant_user_group_permission_to_repo(
1230 1230 self, repo, user_group, permission_name):
1231 1231 permission = RepoModel().grant_user_group_permission(
1232 1232 repo, user_group, permission_name)
1233 1233 self.user_group_repo_permission_ids.append(
1234 1234 (repo.repo_id, user_group.users_group_id))
1235 1235 return permission
1236 1236
1237 1237 def grant_user_permission_to_user_group(
1238 1238 self, target_user_group, user, permission_name):
1239 1239 permission = UserGroupModel().grant_user_permission(
1240 1240 target_user_group, user, permission_name)
1241 1241 self.user_user_group_permission_ids.append(
1242 1242 (target_user_group.users_group_id, user.user_id))
1243 1243 return permission
1244 1244
1245 1245 def grant_user_group_permission_to_user_group(
1246 1246 self, target_user_group, user_group, permission_name):
1247 1247 permission = UserGroupModel().grant_user_group_permission(
1248 1248 target_user_group, user_group, permission_name)
1249 1249 self.user_group_user_group_permission_ids.append(
1250 1250 (target_user_group.users_group_id, user_group.users_group_id))
1251 1251 return permission
1252 1252
1253 1253 def revoke_user_permission(self, user_name, permission_name):
1254 1254 self._inherit_default_user_permissions(user_name, True)
1255 1255 UserModel().revoke_perm(user_name, permission_name)
1256 1256
1257 1257 def _inherit_default_user_permissions(self, user_name, value):
1258 1258 user = UserModel().get_by_username(user_name)
1259 1259 user.inherit_default_permissions = value
1260 1260 Session().add(user)
1261 1261 Session().commit()
1262 1262
1263 1263 def cleanup(self):
1264 1264 self._cleanup_permissions()
1265 1265 self._cleanup_repos()
1266 1266 self._cleanup_repo_groups()
1267 1267 self._cleanup_user_groups()
1268 1268 self._cleanup_users()
1269 1269
1270 1270 def _cleanup_permissions(self):
1271 1271 if self.user_permissions:
1272 1272 for user_name, permission_name in self.user_permissions:
1273 1273 self.revoke_user_permission(user_name, permission_name)
1274 1274
1275 1275 for permission in self.user_repo_permission_ids:
1276 1276 RepoModel().revoke_user_permission(*permission)
1277 1277
1278 1278 for permission in self.user_group_repo_permission_ids:
1279 1279 RepoModel().revoke_user_group_permission(*permission)
1280 1280
1281 1281 for permission in self.user_repo_group_permission_ids:
1282 1282 RepoGroupModel().revoke_user_permission(*permission)
1283 1283
1284 1284 for permission in self.user_group_repo_group_permission_ids:
1285 1285 RepoGroupModel().revoke_user_group_permission(*permission)
1286 1286
1287 1287 for permission in self.user_user_group_permission_ids:
1288 1288 UserGroupModel().revoke_user_permission(*permission)
1289 1289
1290 1290 for permission in self.user_group_user_group_permission_ids:
1291 1291 UserGroupModel().revoke_user_group_permission(*permission)
1292 1292
1293 1293 def _cleanup_repo_groups(self):
1294 1294 def _repo_group_compare(first_group_id, second_group_id):
1295 1295 """
1296 1296 Gives higher priority to the groups with the most complex paths
1297 1297 """
1298 1298 first_group = RepoGroup.get(first_group_id)
1299 1299 second_group = RepoGroup.get(second_group_id)
1300 1300 first_group_parts = (
1301 1301 len(first_group.group_name.split('/')) if first_group else 0)
1302 1302 second_group_parts = (
1303 1303 len(second_group.group_name.split('/')) if second_group else 0)
1304 1304 return cmp(second_group_parts, first_group_parts)
1305 1305
1306 1306 sorted_repo_group_ids = sorted(
1307 1307 self.repo_group_ids, cmp=_repo_group_compare)
1308 1308 for repo_group_id in sorted_repo_group_ids:
1309 1309 self.fixture.destroy_repo_group(repo_group_id)
1310 1310
1311 1311 def _cleanup_repos(self):
1312 1312 sorted_repos_ids = sorted(self.repos_ids)
1313 1313 for repo_id in sorted_repos_ids:
1314 1314 self.fixture.destroy_repo(repo_id)
1315 1315
1316 1316 def _cleanup_user_groups(self):
1317 1317 def _user_group_compare(first_group_id, second_group_id):
1318 1318 """
1319 1319 Gives higher priority to the groups with the most complex paths
1320 1320 """
1321 1321 first_group = UserGroup.get(first_group_id)
1322 1322 second_group = UserGroup.get(second_group_id)
1323 1323 first_group_parts = (
1324 1324 len(first_group.users_group_name.split('/'))
1325 1325 if first_group else 0)
1326 1326 second_group_parts = (
1327 1327 len(second_group.users_group_name.split('/'))
1328 1328 if second_group else 0)
1329 1329 return cmp(second_group_parts, first_group_parts)
1330 1330
1331 1331 sorted_user_group_ids = sorted(
1332 1332 self.user_group_ids, cmp=_user_group_compare)
1333 1333 for user_group_id in sorted_user_group_ids:
1334 1334 self.fixture.destroy_user_group(user_group_id)
1335 1335
1336 1336 def _cleanup_users(self):
1337 1337 for user_id in self.user_ids:
1338 1338 self.fixture.destroy_user(user_id)
1339 1339
1340 1340
1341 1341 # TODO: Think about moving this into a pytest-pyro package and make it a
1342 1342 # pytest plugin
1343 1343 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1344 1344 def pytest_runtest_makereport(item, call):
1345 1345 """
1346 1346 Adding the remote traceback if the exception has this information.
1347 1347
1348 1348 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1349 1349 to the exception instance.
1350 1350 """
1351 1351 outcome = yield
1352 1352 report = outcome.get_result()
1353 1353 if call.excinfo:
1354 1354 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1355 1355
1356 1356
1357 1357 def _add_vcsserver_remote_traceback(report, exc):
1358 1358 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1359 1359
1360 1360 if vcsserver_traceback:
1361 1361 section = 'VCSServer remote traceback ' + report.when
1362 1362 report.sections.append((section, vcsserver_traceback))
1363 1363
1364 1364
1365 1365 @pytest.fixture(scope='session')
1366 1366 def testrun():
1367 1367 return {
1368 1368 'uuid': uuid.uuid4(),
1369 1369 'start': datetime.datetime.utcnow().isoformat(),
1370 1370 'timestamp': int(time.time()),
1371 1371 }
1372 1372
1373 1373
1374 1374 @pytest.fixture(autouse=True)
1375 1375 def collect_appenlight_stats(request, testrun):
1376 1376 """
1377 1377 This fixture reports memory consumtion of single tests.
1378 1378
1379 1379 It gathers data based on `psutil` and sends them to Appenlight. The option
1380 1380 ``--ae`` has te be used to enable this fixture and the API key for your
1381 1381 application has to be provided in ``--ae-key``.
1382 1382 """
1383 1383 try:
1384 1384 # cygwin cannot have yet psutil support.
1385 1385 import psutil
1386 1386 except ImportError:
1387 1387 return
1388 1388
1389 1389 if not request.config.getoption('--appenlight'):
1390 1390 return
1391 1391 else:
1392 1392 # Only request the pylonsapp fixture if appenlight tracking is
1393 1393 # enabled. This will speed up a test run of unit tests by 2 to 3
1394 1394 # seconds if appenlight is not enabled.
1395 1395 pylonsapp = request.getfuncargvalue("pylonsapp")
1396 1396 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1397 1397 client = AppenlightClient(
1398 1398 url=url,
1399 1399 api_key=request.config.getoption('--appenlight-api-key'),
1400 1400 namespace=request.node.nodeid,
1401 1401 request=str(testrun['uuid']),
1402 1402 testrun=testrun)
1403 1403
1404 1404 client.collect({
1405 1405 'message': "Starting",
1406 1406 })
1407 1407
1408 1408 server_and_port = pylonsapp.config['vcs.server']
1409 1409 protocol = pylonsapp.config['vcs.server.protocol']
1410 1410 server = create_vcsserver_proxy(server_and_port, protocol)
1411 1411 with server:
1412 1412 vcs_pid = server.get_pid()
1413 1413 server.run_gc()
1414 1414 vcs_process = psutil.Process(vcs_pid)
1415 1415 mem = vcs_process.memory_info()
1416 1416 client.tag_before('vcsserver.rss', mem.rss)
1417 1417 client.tag_before('vcsserver.vms', mem.vms)
1418 1418
1419 1419 test_process = psutil.Process()
1420 1420 mem = test_process.memory_info()
1421 1421 client.tag_before('test.rss', mem.rss)
1422 1422 client.tag_before('test.vms', mem.vms)
1423 1423
1424 1424 client.tag_before('time', time.time())
1425 1425
1426 1426 @request.addfinalizer
1427 1427 def send_stats():
1428 1428 client.tag_after('time', time.time())
1429 1429 with server:
1430 1430 gc_stats = server.run_gc()
1431 1431 for tag, value in gc_stats.items():
1432 1432 client.tag_after(tag, value)
1433 1433 mem = vcs_process.memory_info()
1434 1434 client.tag_after('vcsserver.rss', mem.rss)
1435 1435 client.tag_after('vcsserver.vms', mem.vms)
1436 1436
1437 1437 mem = test_process.memory_info()
1438 1438 client.tag_after('test.rss', mem.rss)
1439 1439 client.tag_after('test.vms', mem.vms)
1440 1440
1441 1441 client.collect({
1442 1442 'message': "Finished",
1443 1443 })
1444 1444 client.send_stats()
1445 1445
1446 1446 return client
1447 1447
1448 1448
1449 1449 class AppenlightClient():
1450 1450
1451 1451 url_template = '{url}?protocol_version=0.5'
1452 1452
1453 1453 def __init__(
1454 1454 self, url, api_key, add_server=True, add_timestamp=True,
1455 1455 namespace=None, request=None, testrun=None):
1456 1456 self.url = self.url_template.format(url=url)
1457 1457 self.api_key = api_key
1458 1458 self.add_server = add_server
1459 1459 self.add_timestamp = add_timestamp
1460 1460 self.namespace = namespace
1461 1461 self.request = request
1462 1462 self.server = socket.getfqdn(socket.gethostname())
1463 1463 self.tags_before = {}
1464 1464 self.tags_after = {}
1465 1465 self.stats = []
1466 1466 self.testrun = testrun or {}
1467 1467
1468 1468 def tag_before(self, tag, value):
1469 1469 self.tags_before[tag] = value
1470 1470
1471 1471 def tag_after(self, tag, value):
1472 1472 self.tags_after[tag] = value
1473 1473
1474 1474 def collect(self, data):
1475 1475 if self.add_server:
1476 1476 data.setdefault('server', self.server)
1477 1477 if self.add_timestamp:
1478 1478 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1479 1479 if self.namespace:
1480 1480 data.setdefault('namespace', self.namespace)
1481 1481 if self.request:
1482 1482 data.setdefault('request', self.request)
1483 1483 self.stats.append(data)
1484 1484
1485 1485 def send_stats(self):
1486 1486 tags = [
1487 1487 ('testrun', self.request),
1488 1488 ('testrun.start', self.testrun['start']),
1489 1489 ('testrun.timestamp', self.testrun['timestamp']),
1490 1490 ('test', self.namespace),
1491 1491 ]
1492 1492 for key, value in self.tags_before.items():
1493 1493 tags.append((key + '.before', value))
1494 1494 try:
1495 1495 delta = self.tags_after[key] - value
1496 1496 tags.append((key + '.delta', delta))
1497 1497 except Exception:
1498 1498 pass
1499 1499 for key, value in self.tags_after.items():
1500 1500 tags.append((key + '.after', value))
1501 1501 self.collect({
1502 1502 'message': "Collected tags",
1503 1503 'tags': tags,
1504 1504 })
1505 1505
1506 1506 response = requests.post(
1507 1507 self.url,
1508 1508 headers={
1509 1509 'X-appenlight-api-key': self.api_key},
1510 1510 json=self.stats,
1511 1511 )
1512 1512
1513 1513 if not response.status_code == 200:
1514 1514 pprint.pprint(self.stats)
1515 1515 print response.headers
1516 1516 print response.text
1517 1517 raise Exception('Sending to appenlight failed')
1518 1518
1519 1519
1520 1520 @pytest.fixture
1521 1521 def gist_util(request, pylonsapp):
1522 1522 """
1523 1523 Provides a wired instance of `GistUtility` with integrated cleanup.
1524 1524 """
1525 1525 utility = GistUtility()
1526 1526 request.addfinalizer(utility.cleanup)
1527 1527 return utility
1528 1528
1529 1529
1530 1530 class GistUtility(object):
1531 1531 def __init__(self):
1532 1532 self.fixture = Fixture()
1533 1533 self.gist_ids = []
1534 1534
1535 1535 def create_gist(self, **kwargs):
1536 1536 gist = self.fixture.create_gist(**kwargs)
1537 1537 self.gist_ids.append(gist.gist_id)
1538 1538 return gist
1539 1539
1540 1540 def cleanup(self):
1541 1541 for id_ in self.gist_ids:
1542 1542 self.fixture.destroy_gists(str(id_))
1543 1543
1544 1544
1545 1545 @pytest.fixture
1546 1546 def enabled_backends(request):
1547 1547 backends = request.config.option.backends
1548 1548 return backends[:]
1549 1549
1550 1550
1551 1551 @pytest.fixture
1552 1552 def settings_util(request):
1553 1553 """
1554 1554 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1555 1555 """
1556 1556 utility = SettingsUtility()
1557 1557 request.addfinalizer(utility.cleanup)
1558 1558 return utility
1559 1559
1560 1560
1561 1561 class SettingsUtility(object):
1562 1562 def __init__(self):
1563 1563 self.rhodecode_ui_ids = []
1564 1564 self.rhodecode_setting_ids = []
1565 1565 self.repo_rhodecode_ui_ids = []
1566 1566 self.repo_rhodecode_setting_ids = []
1567 1567
1568 1568 def create_repo_rhodecode_ui(
1569 1569 self, repo, section, value, key=None, active=True, cleanup=True):
1570 1570 key = key or hashlib.sha1(
1571 1571 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1572 1572
1573 1573 setting = RepoRhodeCodeUi()
1574 1574 setting.repository_id = repo.repo_id
1575 1575 setting.ui_section = section
1576 1576 setting.ui_value = value
1577 1577 setting.ui_key = key
1578 1578 setting.ui_active = active
1579 1579 Session().add(setting)
1580 1580 Session().commit()
1581 1581
1582 1582 if cleanup:
1583 1583 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1584 1584 return setting
1585 1585
1586 1586 def create_rhodecode_ui(
1587 1587 self, section, value, key=None, active=True, cleanup=True):
1588 1588 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1589 1589
1590 1590 setting = RhodeCodeUi()
1591 1591 setting.ui_section = section
1592 1592 setting.ui_value = value
1593 1593 setting.ui_key = key
1594 1594 setting.ui_active = active
1595 1595 Session().add(setting)
1596 1596 Session().commit()
1597 1597
1598 1598 if cleanup:
1599 1599 self.rhodecode_ui_ids.append(setting.ui_id)
1600 1600 return setting
1601 1601
1602 1602 def create_repo_rhodecode_setting(
1603 1603 self, repo, name, value, type_, cleanup=True):
1604 1604 setting = RepoRhodeCodeSetting(
1605 1605 repo.repo_id, key=name, val=value, type=type_)
1606 1606 Session().add(setting)
1607 1607 Session().commit()
1608 1608
1609 1609 if cleanup:
1610 1610 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1611 1611 return setting
1612 1612
1613 1613 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1614 1614 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1615 1615 Session().add(setting)
1616 1616 Session().commit()
1617 1617
1618 1618 if cleanup:
1619 1619 self.rhodecode_setting_ids.append(setting.app_settings_id)
1620 1620
1621 1621 return setting
1622 1622
1623 1623 def cleanup(self):
1624 1624 for id_ in self.rhodecode_ui_ids:
1625 1625 setting = RhodeCodeUi.get(id_)
1626 1626 Session().delete(setting)
1627 1627
1628 1628 for id_ in self.rhodecode_setting_ids:
1629 1629 setting = RhodeCodeSetting.get(id_)
1630 1630 Session().delete(setting)
1631 1631
1632 1632 for id_ in self.repo_rhodecode_ui_ids:
1633 1633 setting = RepoRhodeCodeUi.get(id_)
1634 1634 Session().delete(setting)
1635 1635
1636 1636 for id_ in self.repo_rhodecode_setting_ids:
1637 1637 setting = RepoRhodeCodeSetting.get(id_)
1638 1638 Session().delete(setting)
1639 1639
1640 1640 Session().commit()
1641 1641
1642 1642
1643 1643 @pytest.fixture
1644 1644 def no_notifications(request):
1645 1645 notification_patcher = mock.patch(
1646 1646 'rhodecode.model.notification.NotificationModel.create')
1647 1647 notification_patcher.start()
1648 1648 request.addfinalizer(notification_patcher.stop)
1649 1649
1650 1650
1651 1651 @pytest.fixture(scope='session')
1652 1652 def repeat(request):
1653 1653 """
1654 1654 The number of repetitions is based on this fixture.
1655 1655
1656 1656 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1657 1657 tests are not too slow in our default test suite.
1658 1658 """
1659 1659 return request.config.getoption('--repeat')
1660 1660
1661 1661
1662 1662 @pytest.fixture
1663 1663 def rhodecode_fixtures():
1664 1664 return Fixture()
1665 1665
1666 1666
1667 1667 @pytest.fixture
1668 def context_stub():
1669 """
1670 Stub context object.
1671 """
1672 context = pyramid.testing.DummyResource()
1673 return context
1674
1675
1676 @pytest.fixture
1668 1677 def request_stub():
1669 1678 """
1670 1679 Stub request object.
1671 1680 """
1672 1681 from rhodecode.lib.base import bootstrap_request
1673 1682 request = bootstrap_request(scheme='https')
1674 1683 return request
1675 1684
1676 1685
1677 1686 @pytest.fixture
1678 def context_stub():
1679 """
1680 Stub context object.
1681 """
1682 context = pyramid.testing.DummyResource()
1683 return context
1684
1685
1686 @pytest.fixture
1687 1687 def config_stub(request, request_stub):
1688 1688 """
1689 1689 Set up pyramid.testing and return the Configurator.
1690 1690 """
1691 config = pyramid.testing.setUp(request=request_stub)
1692 add_test_routes(config)
1691 from rhodecode.lib.base import bootstrap_config
1692 config = bootstrap_config(request=request_stub)
1693 1693
1694 1694 @request.addfinalizer
1695 1695 def cleanup():
1696 1696 pyramid.testing.tearDown()
1697 1697
1698 1698 return config
1699 1699
1700 1700
1701 1701 @pytest.fixture
1702 1702 def StubIntegrationType():
1703 1703 class _StubIntegrationType(IntegrationTypeBase):
1704 1704 """ Test integration type class """
1705 1705
1706 1706 key = 'test'
1707 1707 display_name = 'Test integration type'
1708 1708 description = 'A test integration type for testing'
1709 1709 icon = 'test_icon_html_image'
1710 1710
1711 1711 def __init__(self, settings):
1712 1712 super(_StubIntegrationType, self).__init__(settings)
1713 1713 self.sent_events = [] # for testing
1714 1714
1715 1715 def send_event(self, event):
1716 1716 self.sent_events.append(event)
1717 1717
1718 1718 def settings_schema(self):
1719 1719 class SettingsSchema(colander.Schema):
1720 1720 test_string_field = colander.SchemaNode(
1721 1721 colander.String(),
1722 1722 missing=colander.required,
1723 1723 title='test string field',
1724 1724 )
1725 1725 test_int_field = colander.SchemaNode(
1726 1726 colander.Int(),
1727 1727 title='some integer setting',
1728 1728 )
1729 1729 return SettingsSchema()
1730 1730
1731 1731
1732 1732 integration_type_registry.register_integration_type(_StubIntegrationType)
1733 1733 return _StubIntegrationType
1734 1734
1735 1735 @pytest.fixture
1736 1736 def stub_integration_settings():
1737 1737 return {
1738 1738 'test_string_field': 'some data',
1739 1739 'test_int_field': 100,
1740 1740 }
1741 1741
1742 1742
1743 1743 @pytest.fixture
1744 1744 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1745 1745 stub_integration_settings):
1746 1746 integration = IntegrationModel().create(
1747 1747 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1748 1748 name='test repo integration',
1749 1749 repo=repo_stub, repo_group=None, child_repos_only=None)
1750 1750
1751 1751 @request.addfinalizer
1752 1752 def cleanup():
1753 1753 IntegrationModel().delete(integration)
1754 1754
1755 1755 return integration
1756 1756
1757 1757
1758 1758 @pytest.fixture
1759 1759 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1760 1760 stub_integration_settings):
1761 1761 integration = IntegrationModel().create(
1762 1762 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1763 1763 name='test repogroup integration',
1764 1764 repo=None, repo_group=test_repo_group, child_repos_only=True)
1765 1765
1766 1766 @request.addfinalizer
1767 1767 def cleanup():
1768 1768 IntegrationModel().delete(integration)
1769 1769
1770 1770 return integration
1771 1771
1772 1772
1773 1773 @pytest.fixture
1774 1774 def repogroup_recursive_integration_stub(request, test_repo_group,
1775 1775 StubIntegrationType, stub_integration_settings):
1776 1776 integration = IntegrationModel().create(
1777 1777 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1778 1778 name='test recursive repogroup integration',
1779 1779 repo=None, repo_group=test_repo_group, child_repos_only=False)
1780 1780
1781 1781 @request.addfinalizer
1782 1782 def cleanup():
1783 1783 IntegrationModel().delete(integration)
1784 1784
1785 1785 return integration
1786 1786
1787 1787
1788 1788 @pytest.fixture
1789 1789 def global_integration_stub(request, StubIntegrationType,
1790 1790 stub_integration_settings):
1791 1791 integration = IntegrationModel().create(
1792 1792 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1793 1793 name='test global integration',
1794 1794 repo=None, repo_group=None, child_repos_only=None)
1795 1795
1796 1796 @request.addfinalizer
1797 1797 def cleanup():
1798 1798 IntegrationModel().delete(integration)
1799 1799
1800 1800 return integration
1801 1801
1802 1802
1803 1803 @pytest.fixture
1804 1804 def root_repos_integration_stub(request, StubIntegrationType,
1805 1805 stub_integration_settings):
1806 1806 integration = IntegrationModel().create(
1807 1807 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1808 1808 name='test global integration',
1809 1809 repo=None, repo_group=None, child_repos_only=True)
1810 1810
1811 1811 @request.addfinalizer
1812 1812 def cleanup():
1813 1813 IntegrationModel().delete(integration)
1814 1814
1815 1815 return integration
1816 1816
1817 1817
1818 1818 @pytest.fixture
1819 1819 def local_dt_to_utc():
1820 1820 def _factory(dt):
1821 1821 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1822 1822 dateutil.tz.tzutc()).replace(tzinfo=None)
1823 1823 return _factory
1824 1824
1825 1825
1826 1826 @pytest.fixture
1827 1827 def disable_anonymous_user(request, pylonsapp):
1828 1828 set_anonymous_access(False)
1829 1829
1830 1830 @request.addfinalizer
1831 1831 def cleanup():
1832 1832 set_anonymous_access(True)
1833 1833
1834 1834
1835 1835 @pytest.fixture
1836 1836 def rc_fixture(request):
1837 1837 return Fixture()
1838 1838
1839 1839
1840 1840 @pytest.fixture
1841 1841 def repo_groups(request):
1842 1842 fixture = Fixture()
1843 1843
1844 1844 session = Session()
1845 1845 zombie_group = fixture.create_repo_group('zombie')
1846 1846 parent_group = fixture.create_repo_group('parent')
1847 1847 child_group = fixture.create_repo_group('parent/child')
1848 1848 groups_in_db = session.query(RepoGroup).all()
1849 1849 assert len(groups_in_db) == 3
1850 1850 assert child_group.group_parent_id == parent_group.group_id
1851 1851
1852 1852 @request.addfinalizer
1853 1853 def cleanup():
1854 1854 fixture.destroy_repo_group(zombie_group)
1855 1855 fixture.destroy_repo_group(child_group)
1856 1856 fixture.destroy_repo_group(parent_group)
1857 1857
1858 1858 return zombie_group, parent_group, child_group
@@ -1,431 +1,406 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import tempfile
27 27 import urllib2
28 28 from lxml.html import fromstring, tostring
29 29 from lxml.cssselect import CSSSelector
30 30 from urlparse import urlparse, parse_qsl
31 31 from urllib import unquote_plus
32 32 import webob
33 33
34 34 from webtest.app import TestResponse, TestApp, string_types
35 35 from webtest.compat import print_stderr
36 36
37 37 import pytest
38 38 import rc_testdata
39 39
40 40 from rhodecode.model.db import User, Repository
41 41 from rhodecode.model.meta import Session
42 42 from rhodecode.model.scm import ScmModel
43 43 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
44 44 from rhodecode.lib.vcs.backends.base import EmptyCommit
45 45
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49
50 50 class CustomTestResponse(TestResponse):
51 51 def _save_output(self, out):
52 52 f = tempfile.NamedTemporaryFile(
53 53 delete=False, prefix='rc-test-', suffix='.html')
54 54 f.write(out)
55 55 return f.name
56 56
57 57 def mustcontain(self, *strings, **kw):
58 58 """
59 59 Assert that the response contains all of the strings passed
60 60 in as arguments.
61 61
62 62 Equivalent to::
63 63
64 64 assert string in res
65 65 """
66 66 if 'no' in kw:
67 67 no = kw['no']
68 68 del kw['no']
69 69 if isinstance(no, string_types):
70 70 no = [no]
71 71 else:
72 72 no = []
73 73 if kw:
74 74 raise TypeError(
75 75 "The only keyword argument allowed is 'no'")
76 76
77 77 f = self._save_output(str(self))
78 78
79 79 for s in strings:
80 80 if not s in self:
81 81 print_stderr("Actual response (no %r):" % s)
82 82 print_stderr(str(self))
83 83 raise IndexError(
84 84 "Body does not contain string %r, output saved as %s" % (
85 85 s, f))
86 86
87 87 for no_s in no:
88 88 if no_s in self:
89 89 print_stderr("Actual response (has %r)" % no_s)
90 90 print_stderr(str(self))
91 91 raise IndexError(
92 92 "Body contains bad string %r, output saved as %s" % (
93 93 no_s, f))
94 94
95 95 def assert_response(self):
96 96 return AssertResponse(self)
97 97
98 98 def get_session_from_response(self):
99 99 """
100 100 This returns the session from a response object. Pylons has some magic
101 101 to make the session available as `response.session`. But pyramid
102 102 doesn't expose it.
103 103 """
104 104 return self.request.environ['beaker.session']
105 105
106 106
107 107 class TestRequest(webob.BaseRequest):
108 108
109 109 # for py.test
110 110 disabled = True
111 111 ResponseClass = CustomTestResponse
112 112
113 113
114 114 class CustomTestApp(TestApp):
115 115 """
116 116 Custom app to make mustcontain more usefull
117 117 """
118 118 RequestClass = TestRequest
119 119
120 120
121 121 def set_anonymous_access(enabled):
122 122 """(Dis)allows anonymous access depending on parameter `enabled`"""
123 123 user = User.get_default_user()
124 124 user.active = enabled
125 125 Session().add(user)
126 126 Session().commit()
127 127 time.sleep(1.5) # must sleep for cache (1s to expire)
128 128 log.info('anonymous access is now: %s', enabled)
129 129 assert enabled == User.get_default_user().active, (
130 130 'Cannot set anonymous access')
131 131
132 132
133 133 def check_xfail_backends(node, backend_alias):
134 134 # Using "xfail_backends" here intentionally, since this marks work
135 135 # which is "to be done" soon.
136 136 skip_marker = node.get_marker('xfail_backends')
137 137 if skip_marker and backend_alias in skip_marker.args:
138 138 msg = "Support for backend %s to be developed." % (backend_alias, )
139 139 msg = skip_marker.kwargs.get('reason', msg)
140 140 pytest.xfail(msg)
141 141
142 142
143 143 def check_skip_backends(node, backend_alias):
144 144 # Using "skip_backends" here intentionally, since this marks work which is
145 145 # not supported.
146 146 skip_marker = node.get_marker('skip_backends')
147 147 if skip_marker and backend_alias in skip_marker.args:
148 148 msg = "Feature not supported for backend %s." % (backend_alias, )
149 149 msg = skip_marker.kwargs.get('reason', msg)
150 150 pytest.skip(msg)
151 151
152 152
153 153 def extract_git_repo_from_dump(dump_name, repo_name):
154 154 """Create git repo `repo_name` from dump `dump_name`."""
155 155 repos_path = ScmModel().repos_path
156 156 target_path = os.path.join(repos_path, repo_name)
157 157 rc_testdata.extract_git_dump(dump_name, target_path)
158 158 return target_path
159 159
160 160
161 161 def extract_hg_repo_from_dump(dump_name, repo_name):
162 162 """Create hg repo `repo_name` from dump `dump_name`."""
163 163 repos_path = ScmModel().repos_path
164 164 target_path = os.path.join(repos_path, repo_name)
165 165 rc_testdata.extract_hg_dump(dump_name, target_path)
166 166 return target_path
167 167
168 168
169 169 def extract_svn_repo_from_dump(dump_name, repo_name):
170 170 """Create a svn repo `repo_name` from dump `dump_name`."""
171 171 repos_path = ScmModel().repos_path
172 172 target_path = os.path.join(repos_path, repo_name)
173 173 SubversionRepository(target_path, create=True)
174 174 _load_svn_dump_into_repo(dump_name, target_path)
175 175 return target_path
176 176
177 177
178 178 def assert_message_in_log(log_records, message, levelno, module):
179 179 messages = [
180 180 r.message for r in log_records
181 181 if r.module == module and r.levelno == levelno
182 182 ]
183 183 assert message in messages
184 184
185 185
186 186 def _load_svn_dump_into_repo(dump_name, repo_path):
187 187 """
188 188 Utility to populate a svn repository with a named dump
189 189
190 190 Currently the dumps are in rc_testdata. They might later on be
191 191 integrated with the main repository once they stabilize more.
192 192 """
193 193 dump = rc_testdata.load_svn_dump(dump_name)
194 194 load_dump = subprocess32.Popen(
195 195 ['svnadmin', 'load', repo_path],
196 196 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
197 197 stderr=subprocess32.PIPE)
198 198 out, err = load_dump.communicate(dump)
199 199 if load_dump.returncode != 0:
200 200 log.error("Output of load_dump command: %s", out)
201 201 log.error("Error output of load_dump command: %s", err)
202 202 raise Exception(
203 203 'Failed to load dump "%s" into repository at path "%s".'
204 204 % (dump_name, repo_path))
205 205
206 206
207 207 class AssertResponse(object):
208 208 """
209 209 Utility that helps to assert things about a given HTML response.
210 210 """
211 211
212 212 def __init__(self, response):
213 213 self.response = response
214 214
215 215 def get_imports(self):
216 216 return fromstring, tostring, CSSSelector
217 217
218 218 def one_element_exists(self, css_selector):
219 219 self.get_element(css_selector)
220 220
221 221 def no_element_exists(self, css_selector):
222 222 assert not self._get_elements(css_selector)
223 223
224 224 def element_equals_to(self, css_selector, expected_content):
225 225 element = self.get_element(css_selector)
226 226 element_text = self._element_to_string(element)
227 227 assert expected_content in element_text
228 228
229 229 def element_contains(self, css_selector, expected_content):
230 230 element = self.get_element(css_selector)
231 231 assert expected_content in element.text_content()
232 232
233 233 def element_value_contains(self, css_selector, expected_content):
234 234 element = self.get_element(css_selector)
235 235 assert expected_content in element.value
236 236
237 237 def contains_one_link(self, link_text, href):
238 238 fromstring, tostring, CSSSelector = self.get_imports()
239 239 doc = fromstring(self.response.body)
240 240 sel = CSSSelector('a[href]')
241 241 elements = [
242 242 e for e in sel(doc) if e.text_content().strip() == link_text]
243 243 assert len(elements) == 1, "Did not find link or found multiple links"
244 244 self._ensure_url_equal(elements[0].attrib.get('href'), href)
245 245
246 246 def contains_one_anchor(self, anchor_id):
247 247 fromstring, tostring, CSSSelector = self.get_imports()
248 248 doc = fromstring(self.response.body)
249 249 sel = CSSSelector('#' + anchor_id)
250 250 elements = sel(doc)
251 251 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
252 252
253 253 def _ensure_url_equal(self, found, expected):
254 254 assert _Url(found) == _Url(expected)
255 255
256 256 def get_element(self, css_selector):
257 257 elements = self._get_elements(css_selector)
258 258 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
259 259 return elements[0]
260 260
261 261 def get_elements(self, css_selector):
262 262 return self._get_elements(css_selector)
263 263
264 264 def _get_elements(self, css_selector):
265 265 fromstring, tostring, CSSSelector = self.get_imports()
266 266 doc = fromstring(self.response.body)
267 267 sel = CSSSelector(css_selector)
268 268 elements = sel(doc)
269 269 return elements
270 270
271 271 def _element_to_string(self, element):
272 272 fromstring, tostring, CSSSelector = self.get_imports()
273 273 return tostring(element)
274 274
275 275
276 276 class _Url(object):
277 277 """
278 278 A url object that can be compared with other url orbjects
279 279 without regard to the vagaries of encoding, escaping, and ordering
280 280 of parameters in query strings.
281 281
282 282 Inspired by
283 283 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
284 284 """
285 285
286 286 def __init__(self, url):
287 287 parts = urlparse(url)
288 288 _query = frozenset(parse_qsl(parts.query))
289 289 _path = unquote_plus(parts.path)
290 290 parts = parts._replace(query=_query, path=_path)
291 291 self.parts = parts
292 292
293 293 def __eq__(self, other):
294 294 return self.parts == other.parts
295 295
296 296 def __hash__(self):
297 297 return hash(self.parts)
298 298
299 299
300 300 def run_test_concurrently(times, raise_catched_exc=True):
301 301 """
302 302 Add this decorator to small pieces of code that you want to test
303 303 concurrently
304 304
305 305 ex:
306 306
307 307 @test_concurrently(25)
308 308 def my_test_function():
309 309 ...
310 310 """
311 311 def test_concurrently_decorator(test_func):
312 312 def wrapper(*args, **kwargs):
313 313 exceptions = []
314 314
315 315 def call_test_func():
316 316 try:
317 317 test_func(*args, **kwargs)
318 318 except Exception as e:
319 319 exceptions.append(e)
320 320 if raise_catched_exc:
321 321 raise
322 322 threads = []
323 323 for i in range(times):
324 324 threads.append(threading.Thread(target=call_test_func))
325 325 for t in threads:
326 326 t.start()
327 327 for t in threads:
328 328 t.join()
329 329 if exceptions:
330 330 raise Exception(
331 331 'test_concurrently intercepted %s exceptions: %s' % (
332 332 len(exceptions), exceptions))
333 333 return wrapper
334 334 return test_concurrently_decorator
335 335
336 336
337 337 def wait_for_url(url, timeout=10):
338 338 """
339 339 Wait until URL becomes reachable.
340 340
341 341 It polls the URL until the timeout is reached or it became reachable.
342 342 If will call to `py.test.fail` in case the URL is not reachable.
343 343 """
344 344 timeout = time.time() + timeout
345 345 last = 0
346 346 wait = 0.1
347 347
348 348 while timeout > last:
349 349 last = time.time()
350 350 if is_url_reachable(url):
351 351 break
352 352 elif (last + wait) > time.time():
353 353 # Go to sleep because not enough time has passed since last check.
354 354 time.sleep(wait)
355 355 else:
356 356 pytest.fail("Timeout while waiting for URL {}".format(url))
357 357
358 358
359 359 def is_url_reachable(url):
360 360 try:
361 361 urllib2.urlopen(url)
362 362 except urllib2.URLError:
363 363 return False
364 364 return True
365 365
366 366
367 367 def repo_on_filesystem(repo_name):
368 368 from rhodecode.lib import vcs
369 369 from rhodecode.tests import TESTS_TMP_PATH
370 370 repo = vcs.get_vcs_instance(
371 371 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
372 372 return repo is not None
373 373
374 374
375 375 def commit_change(
376 376 repo, filename, content, message, vcs_type, parent=None, newfile=False):
377 377 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
378 378
379 379 repo = Repository.get_by_repo_name(repo)
380 380 _commit = parent
381 381 if not parent:
382 382 _commit = EmptyCommit(alias=vcs_type)
383 383
384 384 if newfile:
385 385 nodes = {
386 386 filename: {
387 387 'content': content
388 388 }
389 389 }
390 390 commit = ScmModel().create_nodes(
391 391 user=TEST_USER_ADMIN_LOGIN, repo=repo,
392 392 message=message,
393 393 nodes=nodes,
394 394 parent_commit=_commit,
395 395 author=TEST_USER_ADMIN_LOGIN,
396 396 )
397 397 else:
398 398 commit = ScmModel().commit_change(
399 399 repo=repo.scm_instance(), repo_name=repo.repo_name,
400 400 commit=parent, user=TEST_USER_ADMIN_LOGIN,
401 401 author=TEST_USER_ADMIN_LOGIN,
402 402 message=message,
403 403 content=content,
404 404 f_path=filename
405 405 )
406 406 return commit
407
408
409 def add_test_routes(config):
410 """
411 Adds test routing that can be used in different functional tests
412 """
413 from rhodecode.apps._base import ADMIN_PREFIX
414
415 config.add_route(name='home', pattern='/')
416
417 config.add_route(name='login', pattern=ADMIN_PREFIX + '/login')
418 config.add_route(name='logout', pattern=ADMIN_PREFIX + '/logout')
419 config.add_route(name='repo_summary', pattern='/{repo_name}')
420 config.add_route(name='repo_summary_explicit', pattern='/{repo_name}/summary')
421 config.add_route(name='repo_group_home', pattern='/{repo_group_name}')
422
423 config.add_route(name='pullrequest_show',
424 pattern='/{repo_name}/pull-request/{pull_request_id}')
425 config.add_route(name='pull_requests_global',
426 pattern='/pull-request/{pull_request_id}')
427 config.add_route(name='repo_commit',
428 pattern='/{repo_name}/changeset/{commit_id}')
429
430 config.add_route(name='repo_files',
431 pattern='/{repo_name}/files/{commit_id}/{f_path}')
General Comments 0
You need to be logged in to leave comments. Login now