##// END OF EJS Templates
Added new random directory for each test to be better sandboxed
marcink -
r1397:dc960653 beta
parent child Browse files
Show More
@@ -1,619 +1,624
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.utils
4 4 ~~~~~~~~~~~~~~~~~~~
5 5
6 6 Utilities library for RhodeCode
7 7
8 8 :created_on: Apr 18, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import logging
28 28 import datetime
29 29 import traceback
30 30 import paste
31 31 import beaker
32 32 from os.path import dirname as dn, join as jn
33 33
34 34 from paste.script.command import Command, BadCommand
35 35
36 36 from UserDict import DictMixin
37 37
38 38 from mercurial import ui, config, hg
39 39 from mercurial.error import RepoError
40 40
41 41 from webhelpers.text import collapse, remove_formatting, strip_tags
42 42
43 43 from vcs.backends.base import BaseChangeset
44 44 from vcs.utils.lazy import LazyProperty
45 45
46 46 from rhodecode.model import meta
47 47 from rhodecode.model.caching_query import FromCache
48 48 from rhodecode.model.db import Repository, User, RhodeCodeUi, UserLog, Group, \
49 49 RhodeCodeSettings
50 50 from rhodecode.model.repo import RepoModel
51 51 from rhodecode.model.user import UserModel
52 52
53 53 log = logging.getLogger(__name__)
54 54
55 55
56 56 def recursive_replace(str, replace=' '):
57 57 """Recursive replace of given sign to just one instance
58 58
59 59 :param str: given string
60 60 :param replace: char to find and replace multiple instances
61 61
62 62 Examples::
63 63 >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
64 64 'Mighty-Mighty-Bo-sstones'
65 65 """
66 66
67 67 if str.find(replace * 2) == -1:
68 68 return str
69 69 else:
70 70 str = str.replace(replace * 2, replace)
71 71 return recursive_replace(str, replace)
72 72
73 73
74 74 def repo_name_slug(value):
75 75 """Return slug of name of repository
76 76 This function is called on each creation/modification
77 77 of repository to prevent bad names in repo
78 78 """
79 79
80 80 slug = remove_formatting(value)
81 81 slug = strip_tags(slug)
82 82
83 83 for c in """=[]\;'"<>,/~!@#$%^&*()+{}|: """:
84 84 slug = slug.replace(c, '-')
85 85 slug = recursive_replace(slug, '-')
86 86 slug = collapse(slug, '-')
87 87 return slug
88 88
89 89
90 90 def get_repo_slug(request):
91 91 return request.environ['pylons.routes_dict'].get('repo_name')
92 92
93 93
94 94 def action_logger(user, action, repo, ipaddr='', sa=None):
95 95 """
96 96 Action logger for various actions made by users
97 97
98 98 :param user: user that made this action, can be a unique username string or
99 99 object containing user_id attribute
100 100 :param action: action to log, should be on of predefined unique actions for
101 101 easy translations
102 102 :param repo: string name of repository or object containing repo_id,
103 103 that action was made on
104 104 :param ipaddr: optional ip address from what the action was made
105 105 :param sa: optional sqlalchemy session
106 106
107 107 """
108 108
109 109 if not sa:
110 110 sa = meta.Session()
111 111
112 112 try:
113 113 um = UserModel()
114 114 if hasattr(user, 'user_id'):
115 115 user_obj = user
116 116 elif isinstance(user, basestring):
117 117 user_obj = um.get_by_username(user, cache=False)
118 118 else:
119 119 raise Exception('You have to provide user object or username')
120 120
121 121 rm = RepoModel()
122 122 if hasattr(repo, 'repo_id'):
123 123 repo_obj = rm.get(repo.repo_id, cache=False)
124 124 repo_name = repo_obj.repo_name
125 125 elif isinstance(repo, basestring):
126 126 repo_name = repo.lstrip('/')
127 127 repo_obj = rm.get_by_repo_name(repo_name, cache=False)
128 128 else:
129 129 raise Exception('You have to provide repository to action logger')
130 130
131 131 user_log = UserLog()
132 132 user_log.user_id = user_obj.user_id
133 133 user_log.action = action
134 134
135 135 user_log.repository_id = repo_obj.repo_id
136 136 user_log.repository_name = repo_name
137 137
138 138 user_log.action_date = datetime.datetime.now()
139 139 user_log.user_ip = ipaddr
140 140 sa.add(user_log)
141 141 sa.commit()
142 142
143 143 log.info('Adding user %s, action %s on %s', user_obj, action, repo)
144 144 except:
145 145 log.error(traceback.format_exc())
146 146 sa.rollback()
147 147
148 148
149 149 def get_repos(path, recursive=False):
150 150 """
151 151 Scans given path for repos and return (name,(type,path)) tuple
152 152
153 153 :param path: path to scann for repositories
154 154 :param recursive: recursive search and return names with subdirs in front
155 155 """
156 156 from vcs.utils.helpers import get_scm
157 157 from vcs.exceptions import VCSError
158 158
159 159 if path.endswith(os.sep):
160 160 #remove ending slash for better results
161 161 path = path[:-1]
162 162
163 163 def _get_repos(p):
164 164 if not os.access(p, os.W_OK):
165 165 return
166 166 for dirpath in os.listdir(p):
167 167 if os.path.isfile(os.path.join(p, dirpath)):
168 168 continue
169 169 cur_path = os.path.join(p, dirpath)
170 170 try:
171 171 scm_info = get_scm(cur_path)
172 172 yield scm_info[1].split(path)[-1].lstrip(os.sep), scm_info
173 173 except VCSError:
174 174 if not recursive:
175 175 continue
176 176 #check if this dir containts other repos for recursive scan
177 177 rec_path = os.path.join(p, dirpath)
178 178 if os.path.isdir(rec_path):
179 179 for inner_scm in _get_repos(rec_path):
180 180 yield inner_scm
181 181
182 182 return _get_repos(path)
183 183
184 184
185 185 def check_repo_fast(repo_name, base_path):
186 186 """
187 187 Check given path for existence of directory
188 188 :param repo_name:
189 189 :param base_path:
190 190
191 191 :return False: if this directory is present
192 192 """
193 193 if os.path.isdir(os.path.join(base_path, repo_name)):
194 194 return False
195 195 return True
196 196
197 197
198 198 def check_repo(repo_name, base_path, verify=True):
199 199
200 200 repo_path = os.path.join(base_path, repo_name)
201 201
202 202 try:
203 203 if not check_repo_fast(repo_name, base_path):
204 204 return False
205 205 r = hg.repository(ui.ui(), repo_path)
206 206 if verify:
207 207 hg.verify(r)
208 208 #here we hnow that repo exists it was verified
209 209 log.info('%s repo is already created', repo_name)
210 210 return False
211 211 except RepoError:
212 212 #it means that there is no valid repo there...
213 213 log.info('%s repo is free for creation', repo_name)
214 214 return True
215 215
216 216
217 217 def ask_ok(prompt, retries=4, complaint='Yes or no, please!'):
218 218 while True:
219 219 ok = raw_input(prompt)
220 220 if ok in ('y', 'ye', 'yes'):
221 221 return True
222 222 if ok in ('n', 'no', 'nop', 'nope'):
223 223 return False
224 224 retries = retries - 1
225 225 if retries < 0:
226 226 raise IOError
227 227 print complaint
228 228
229 229 #propagated from mercurial documentation
230 230 ui_sections = ['alias', 'auth',
231 231 'decode/encode', 'defaults',
232 232 'diff', 'email',
233 233 'extensions', 'format',
234 234 'merge-patterns', 'merge-tools',
235 235 'hooks', 'http_proxy',
236 236 'smtp', 'patch',
237 237 'paths', 'profiling',
238 238 'server', 'trusted',
239 239 'ui', 'web', ]
240 240
241 241
242 242 def make_ui(read_from='file', path=None, checkpaths=True):
243 243 """A function that will read python rc files or database
244 244 and make an mercurial ui object from read options
245 245
246 246 :param path: path to mercurial config file
247 247 :param checkpaths: check the path
248 248 :param read_from: read from 'file' or 'db'
249 249 """
250 250
251 251 baseui = ui.ui()
252 252
253 253 #clean the baseui object
254 254 baseui._ocfg = config.config()
255 255 baseui._ucfg = config.config()
256 256 baseui._tcfg = config.config()
257 257
258 258 if read_from == 'file':
259 259 if not os.path.isfile(path):
260 260 log.warning('Unable to read config file %s' % path)
261 261 return False
262 262 log.debug('reading hgrc from %s', path)
263 263 cfg = config.config()
264 264 cfg.read(path)
265 265 for section in ui_sections:
266 266 for k, v in cfg.items(section):
267 267 log.debug('settings ui from file[%s]%s:%s', section, k, v)
268 268 baseui.setconfig(section, k, v)
269 269
270 270 elif read_from == 'db':
271 271 sa = meta.Session()
272 272 ret = sa.query(RhodeCodeUi)\
273 273 .options(FromCache("sql_cache_short",
274 274 "get_hg_ui_settings")).all()
275 275
276 276 hg_ui = ret
277 277 for ui_ in hg_ui:
278 278 if ui_.ui_active:
279 279 log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
280 280 ui_.ui_key, ui_.ui_value)
281 281 baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
282 282
283 283 meta.Session.remove()
284 284 return baseui
285 285
286 286
287 287 def set_rhodecode_config(config):
288 288 """Updates pylons config with new settings from database
289 289
290 290 :param config:
291 291 """
292 292 hgsettings = RhodeCodeSettings.get_app_settings()
293 293
294 294 for k, v in hgsettings.items():
295 295 config[k] = v
296 296
297 297
298 298 def invalidate_cache(cache_key, *args):
299 299 """Puts cache invalidation task into db for
300 300 further global cache invalidation
301 301 """
302 302
303 303 from rhodecode.model.scm import ScmModel
304 304
305 305 if cache_key.startswith('get_repo_cached_'):
306 306 name = cache_key.split('get_repo_cached_')[-1]
307 307 ScmModel().mark_for_invalidation(name)
308 308
309 309
310 310 class EmptyChangeset(BaseChangeset):
311 311 """
312 312 An dummy empty changeset. It's possible to pass hash when creating
313 313 an EmptyChangeset
314 314 """
315 315
316 316 def __init__(self, cs='0' * 40, repo=None):
317 317 self._empty_cs = cs
318 318 self.revision = -1
319 319 self.message = ''
320 320 self.author = ''
321 321 self.date = ''
322 322 self.repository = repo
323 323
324 324 @LazyProperty
325 325 def raw_id(self):
326 326 """Returns raw string identifying this changeset, useful for web
327 327 representation.
328 328 """
329 329
330 330 return self._empty_cs
331 331
332 332 @LazyProperty
333 333 def short_id(self):
334 334 return self.raw_id[:12]
335 335
336 336 def get_file_changeset(self, path):
337 337 return self
338 338
339 339 def get_file_content(self, path):
340 340 return u''
341 341
342 342 def get_file_size(self, path):
343 343 return 0
344 344
345 345
346 346 def map_groups(groups):
347 347 """Checks for groups existence, and creates groups structures.
348 348 It returns last group in structure
349 349
350 350 :param groups: list of groups structure
351 351 """
352 352 sa = meta.Session()
353 353
354 354 parent = None
355 355 group = None
356 356 for lvl, group_name in enumerate(groups[:-1]):
357 357 group = sa.query(Group).filter(Group.group_name == group_name).scalar()
358 358
359 359 if group is None:
360 360 group = Group(group_name, parent)
361 361 sa.add(group)
362 362 sa.commit()
363 363
364 364 parent = group
365 365
366 366 return group
367 367
368 368
369 369 def repo2db_mapper(initial_repo_list, remove_obsolete=False):
370 370 """maps all repos given in initial_repo_list, non existing repositories
371 371 are created, if remove_obsolete is True it also check for db entries
372 372 that are not in initial_repo_list and removes them.
373 373
374 374 :param initial_repo_list: list of repositories found by scanning methods
375 375 :param remove_obsolete: check for obsolete entries in database
376 376 """
377 377
378 378 sa = meta.Session()
379 379 rm = RepoModel()
380 380 user = sa.query(User).filter(User.admin == True).first()
381 381 added = []
382 382 for name, repo in initial_repo_list.items():
383 383 group = map_groups(name.split('/'))
384 384 if not rm.get_by_repo_name(name, cache=False):
385 385 log.info('repository %s not found creating default', name)
386 386 added.append(name)
387 387 form_data = {
388 388 'repo_name': name,
389 389 'repo_name_full': name,
390 390 'repo_type': repo.alias,
391 391 'description': repo.description \
392 392 if repo.description != 'unknown' else \
393 393 '%s repository' % name,
394 394 'private': False,
395 395 'group_id': getattr(group, 'group_id', None)
396 396 }
397 397 rm.create(form_data, user, just_db=True)
398 398
399 399 removed = []
400 400 if remove_obsolete:
401 401 #remove from database those repositories that are not in the filesystem
402 402 for repo in sa.query(Repository).all():
403 403 if repo.repo_name not in initial_repo_list.keys():
404 404 removed.append(repo.repo_name)
405 405 sa.delete(repo)
406 406 sa.commit()
407 407
408 408 return added, removed
409 409
410 410 #set cache regions for beaker so celery can utilise it
411 411 def add_cache(settings):
412 412 cache_settings = {'regions': None}
413 413 for key in settings.keys():
414 414 for prefix in ['beaker.cache.', 'cache.']:
415 415 if key.startswith(prefix):
416 416 name = key.split(prefix)[1].strip()
417 417 cache_settings[name] = settings[key].strip()
418 418 if cache_settings['regions']:
419 419 for region in cache_settings['regions'].split(','):
420 420 region = region.strip()
421 421 region_settings = {}
422 422 for key, value in cache_settings.items():
423 423 if key.startswith(region):
424 424 region_settings[key.split('.')[1]] = value
425 425 region_settings['expire'] = int(region_settings.get('expire',
426 426 60))
427 427 region_settings.setdefault('lock_dir',
428 428 cache_settings.get('lock_dir'))
429 429 region_settings.setdefault('data_dir',
430 430 cache_settings.get('data_dir'))
431 431
432 432 if 'type' not in region_settings:
433 433 region_settings['type'] = cache_settings.get('type',
434 434 'memory')
435 435 beaker.cache.cache_regions[region] = region_settings
436 436
437 437
438 438 def get_current_revision():
439 439 """Returns tuple of (number, id) from repository containing this package
440 440 or None if repository could not be found.
441 441 """
442 442
443 443 try:
444 444 from vcs import get_repo
445 445 from vcs.utils.helpers import get_scm
446 446 from vcs.exceptions import RepositoryError, VCSError
447 447 repopath = os.path.join(os.path.dirname(__file__), '..', '..')
448 448 scm = get_scm(repopath)[0]
449 449 repo = get_repo(path=repopath, alias=scm)
450 450 tip = repo.get_changeset()
451 451 return (tip.revision, tip.short_id)
452 452 except (ImportError, RepositoryError, VCSError), err:
453 453 logging.debug("Cannot retrieve rhodecode's revision. Original error "
454 454 "was: %s" % err)
455 455 return None
456 456
457 457
458 458 #==============================================================================
459 459 # TEST FUNCTIONS AND CREATORS
460 460 #==============================================================================
461 461 def create_test_index(repo_location, full_index):
462 462 """Makes default test index
463 463 :param repo_location:
464 464 :param full_index:
465 465 """
466 466 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
467 467 from rhodecode.lib.pidlock import DaemonLock, LockHeld
468 468 import shutil
469 469
470 470 index_location = os.path.join(repo_location, 'index')
471 471 if os.path.exists(index_location):
472 472 shutil.rmtree(index_location)
473 473
474 474 try:
475 475 l = DaemonLock(file=jn(dn(index_location), 'make_index.lock'))
476 476 WhooshIndexingDaemon(index_location=index_location,
477 477 repo_location=repo_location)\
478 478 .run(full_index=full_index)
479 479 l.release()
480 480 except LockHeld:
481 481 pass
482 482
483 483
484 484 def create_test_env(repos_test_path, config):
485 485 """Makes a fresh database and
486 486 install test repository into tmp dir
487 487 """
488 488 from rhodecode.lib.db_manage import DbManage
489 489 from rhodecode.tests import HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, \
490 490 HG_FORK, GIT_FORK, TESTS_TMP_PATH
491 491 import tarfile
492 492 import shutil
493 493 from os.path import dirname as dn, join as jn, abspath
494 494
495 495 log = logging.getLogger('TestEnvCreator')
496 496 # create logger
497 497 log.setLevel(logging.DEBUG)
498 498 log.propagate = True
499 499 # create console handler and set level to debug
500 500 ch = logging.StreamHandler()
501 501 ch.setLevel(logging.DEBUG)
502 502
503 503 # create formatter
504 504 formatter = logging.Formatter("%(asctime)s - %(name)s -"
505 505 " %(levelname)s - %(message)s")
506 506
507 507 # add formatter to ch
508 508 ch.setFormatter(formatter)
509 509
510 510 # add ch to logger
511 511 log.addHandler(ch)
512 512
513 513 #PART ONE create db
514 514 dbconf = config['sqlalchemy.db1.url']
515 515 log.debug('making test db %s', dbconf)
516 516
517 # create test dir if it doesn't exist
518 if not os.path.isdir(repos_test_path):
519 log.debug('Creating testdir %s' % repos_test_path)
520 os.makedirs(repos_test_path)
521
517 522 dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
518 523 tests=True)
519 524 dbmanage.create_tables(override=True)
520 525 dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
521 526 dbmanage.create_default_user()
522 527 dbmanage.admin_prompt()
523 528 dbmanage.create_permissions()
524 529 dbmanage.populate_default_permissions()
525 530
526 531 #PART TWO make test repo
527 532 log.debug('making test vcs repositories')
528 533
529 534 #remove old one from previos tests
530 535 for r in [HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, HG_FORK, GIT_FORK]:
531 536
532 537 if os.path.isdir(jn(TESTS_TMP_PATH, r)):
533 538 log.debug('removing %s', r)
534 539 shutil.rmtree(jn(TESTS_TMP_PATH, r))
535 540
536 541 idx_path = config['app_conf']['index_dir']
537 542 data_path = config['app_conf']['cache_dir']
538 543
539 544 #clean index and data
540 545 if idx_path and os.path.exists(idx_path):
541 546 log.debug('remove %s' % idx_path)
542 547 shutil.rmtree(idx_path)
543 548
544 549 if data_path and os.path.exists(data_path):
545 550 log.debug('remove %s' % data_path)
546 551 shutil.rmtree(data_path)
547 552
548 553 #CREATE DEFAULT HG REPOSITORY
549 554 cur_dir = dn(dn(abspath(__file__)))
550 555 tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
551 556 tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
552 557 tar.close()
553 558
554 559
555 560 #==============================================================================
556 561 # PASTER COMMANDS
557 562 #==============================================================================
558 563 class BasePasterCommand(Command):
559 564 """
560 565 Abstract Base Class for paster commands.
561 566
562 567 The celery commands are somewhat aggressive about loading
563 568 celery.conf, and since our module sets the `CELERY_LOADER`
564 569 environment variable to our loader, we have to bootstrap a bit and
565 570 make sure we've had a chance to load the pylons config off of the
566 571 command line, otherwise everything fails.
567 572 """
568 573 min_args = 1
569 574 min_args_error = "Please provide a paster config file as an argument."
570 575 takes_config_file = 1
571 576 requires_config_file = True
572 577
573 578 def notify_msg(self, msg, log=False):
574 579 """Make a notification to user, additionally if logger is passed
575 580 it logs this action using given logger
576 581
577 582 :param msg: message that will be printed to user
578 583 :param log: logging instance, to use to additionally log this message
579 584
580 585 """
581 586 if log and isinstance(log, logging):
582 587 log(msg)
583 588
584 589 def run(self, args):
585 590 """
586 591 Overrides Command.run
587 592
588 593 Checks for a config file argument and loads it.
589 594 """
590 595 if len(args) < self.min_args:
591 596 raise BadCommand(
592 597 self.min_args_error % {'min_args': self.min_args,
593 598 'actual_args': len(args)})
594 599
595 600 # Decrement because we're going to lob off the first argument.
596 601 # @@ This is hacky
597 602 self.min_args -= 1
598 603 self.bootstrap_config(args[0])
599 604 self.update_parser()
600 605 return super(BasePasterCommand, self).run(args[1:])
601 606
602 607 def update_parser(self):
603 608 """
604 609 Abstract method. Allows for the class's parser to be updated
605 610 before the superclass's `run` method is called. Necessary to
606 611 allow options/arguments to be passed through to the underlying
607 612 celery command.
608 613 """
609 614 raise NotImplementedError("Abstract Method.")
610 615
611 616 def bootstrap_config(self, conf):
612 617 """
613 618 Loads the pylons configuration.
614 619 """
615 620 from pylons import config as pylonsconfig
616 621
617 622 path_to_ini_file = os.path.realpath(conf)
618 623 conf = paste.deploy.appconfig('config:' + path_to_ini_file)
619 624 pylonsconfig.init_app(conf.global_conf, conf.local_conf)
@@ -1,81 +1,81
1 1 """Pylons application test package
2 2
3 3 This package assumes the Pylons environment is already loaded, such as
4 4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 5 command.
6 6
7 7 This module initializes the application via ``websetup`` (`paster
8 8 setup-app`) and provides the base testing objects.
9 9 """
10 10 import os
11 11 from os.path import join as jn
12 12
13 13 from unittest import TestCase
14 14
15 15 from paste.deploy import loadapp
16 16 from paste.script.appinstall import SetupCommand
17 17 from pylons import config, url
18 18 from routes.util import URLGenerator
19 19 from webtest import TestApp
20 20
21 21 from rhodecode.model import meta
22 22 import logging
23 23
24 24
25 25 log = logging.getLogger(__name__)
26 26
27 27 import pylons.test
28 28
29 29 __all__ = ['environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
30 30 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK', ]
31 31
32 32 # Invoke websetup with the current config file
33 33 #SetupCommand('setup-app').run([config_file])
34 34
35 35 ##RUNNING DESIRED TESTS
36 36 #nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
37 37
38 38 environ = {}
39 39
40 40 #SOME GLOBALS FOR TESTS
41 TESTS_TMP_PATH = jn('/', 'tmp')
42
41 from tempfile import _RandomNameSequence
42 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
43 43 HG_REPO = 'vcs_test_hg'
44 44 GIT_REPO = 'vcs_test_git'
45 45
46 46 NEW_HG_REPO = 'vcs_test_hg_new'
47 47 NEW_GIT_REPO = 'vcs_test_git_new'
48 48
49 49 HG_FORK = 'vcs_test_hg_fork'
50 50 GIT_FORK = 'vcs_test_git_fork'
51 51
52 52 class TestController(TestCase):
53 53
54 54 def __init__(self, *args, **kwargs):
55 55 wsgiapp = pylons.test.pylonsapp
56 56 config = wsgiapp.config
57 57
58 58 self.app = TestApp(wsgiapp)
59 59 url._push_object(URLGenerator(config['routes.map'], environ))
60 60 self.sa = meta.Session
61 61 self.index_location = config['app_conf']['index_dir']
62 62 TestCase.__init__(self, *args, **kwargs)
63 63
64 64 def log_user(self, username='test_admin', password='test12'):
65 65 response = self.app.post(url(controller='login', action='index'),
66 66 {'username':username,
67 67 'password':password})
68 68
69 69 if 'invalid user name' in response.body:
70 70 self.fail('could not login using %s %s' % (username, password))
71 71
72 72 self.assertEqual(response.status, '302 Found')
73 73 self.assertEqual(response.session['rhodecode_user'].username, username)
74 74 return response.follow()
75 75
76 76
77 77
78 78 def checkSessionFlash(self, response, msg):
79 79 self.assertTrue('flash' in response.session)
80 80 self.assertTrue(msg in response.session['flash'][0][1])
81 81
General Comments 0
You need to be logged in to leave comments. Login now