##// END OF EJS Templates
ruff: fix multiple statements on one line
Mads Kiilerich -
r8762:0e7dab99 default
parent child Browse files
Show More
@@ -1,545 +1,546 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.lib.utils2
16 16 ~~~~~~~~~~~~~~~~~~~~
17 17
18 18 Some simple helper functions.
19 19 Note: all these functions should be independent of Kallithea classes, i.e.
20 20 models, controllers, etc. to prevent import cycles.
21 21
22 22 This file was forked by the Kallithea project in July 2014.
23 23 Original author and date, and relevant copyright and licensing information is below:
24 24 :created_on: Jan 5, 2011
25 25 :author: marcink
26 26 :copyright: (c) 2013 RhodeCode GmbH, and others.
27 27 :license: GPLv3, see LICENSE.md for more details.
28 28 """
29 29
30 30 import binascii
31 31 import datetime
32 32 import hashlib
33 33 import json
34 34 import logging
35 35 import os
36 36 import re
37 37 import string
38 38 import sys
39 39 import time
40 40 import urllib.parse
41 41 from distutils.version import StrictVersion
42 42
43 43 import bcrypt
44 44 import urlobject
45 45 from sqlalchemy.engine import url as sa_url
46 46 from sqlalchemy.exc import ArgumentError
47 47 from tg import tmpl_context
48 48 from tg.support.converters import asbool, aslist
49 49 from webhelpers2.text import collapse, remove_formatting, strip_tags
50 50
51 51 import kallithea
52 52 from kallithea.lib import webutils
53 53 from kallithea.lib.vcs.backends.base import BaseRepository, EmptyChangeset
54 54 from kallithea.lib.vcs.backends.git.repository import GitRepository
55 55 from kallithea.lib.vcs.conf import settings
56 56 from kallithea.lib.vcs.exceptions import RepositoryError
57 57 from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str # re-export
58 58 from kallithea.lib.vcs.utils.lazy import LazyProperty
59 59
60 60
61 61 try:
62 62 import pwd
63 63 except ImportError:
64 64 pass
65 65
66 66
67 67 log = logging.getLogger(__name__)
68 68
69 69
70 70 # mute pyflakes "imported but unused"
71 71 assert asbool
72 72 assert aslist
73 73 assert ascii_bytes
74 74 assert ascii_str
75 75 assert safe_bytes
76 76 assert safe_str
77 77 assert LazyProperty
78 78
79 79
80 80 # get current umask value without changing it
81 81 umask = os.umask(0)
82 82 os.umask(umask)
83 83
84 84
85 85 def convert_line_endings(line, mode):
86 86 """
87 87 Converts a given line "line end" according to given mode
88 88
89 89 Available modes are::
90 90 0 - Unix
91 91 1 - Mac
92 92 2 - DOS
93 93
94 94 :param line: given line to convert
95 95 :param mode: mode to convert to
96 96 :rtype: str
97 97 :return: converted line according to mode
98 98 """
99 99 if mode == 0:
100 100 line = line.replace('\r\n', '\n')
101 101 line = line.replace('\r', '\n')
102 102 elif mode == 1:
103 103 line = line.replace('\r\n', '\r')
104 104 line = line.replace('\n', '\r')
105 105 elif mode == 2:
106 106 line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
107 107 return line
108 108
109 109
110 110 def detect_mode(line, default):
111 111 """
112 112 Detects line break for given line, if line break couldn't be found
113 113 given default value is returned
114 114
115 115 :param line: str line
116 116 :param default: default
117 117 :rtype: int
118 118 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
119 119 """
120 120 if line.endswith('\r\n'):
121 121 return 2
122 122 elif line.endswith('\n'):
123 123 return 0
124 124 elif line.endswith('\r'):
125 125 return 1
126 126 else:
127 127 return default
128 128
129 129
130 130 def generate_api_key():
131 131 """
132 132 Generates a random (presumably unique) API key.
133 133
134 134 This value is used in URLs and "Bearer" HTTP Authorization headers,
135 135 which in practice means it should only contain URL-safe characters
136 136 (RFC 3986):
137 137
138 138 unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
139 139 """
140 140 # Hexadecimal certainly qualifies as URL-safe.
141 141 return ascii_str(binascii.hexlify(os.urandom(20)))
142 142
143 143
144 144 def safe_int(val, default=None):
145 145 """
146 146 Returns int() of val if val is not convertable to int use default
147 147 instead
148 148
149 149 :param val:
150 150 :param default:
151 151 """
152 152 try:
153 153 val = int(val)
154 154 except (ValueError, TypeError):
155 155 val = default
156 156 return val
157 157
158 158
159 159 def remove_suffix(s, suffix):
160 160 if s.endswith(suffix):
161 161 s = s[:-1 * len(suffix)]
162 162 return s
163 163
164 164
165 165 def remove_prefix(s, prefix):
166 166 if s.startswith(prefix):
167 167 s = s[len(prefix):]
168 168 return s
169 169
170 170
171 171 def uri_filter(uri):
172 172 """
173 173 Removes user:password from given url string
174 174
175 175 :param uri:
176 176 :rtype: str
177 177 :returns: filtered list of strings
178 178 """
179 179 if not uri:
180 180 return []
181 181
182 182 proto = ''
183 183
184 184 for pat in ('https://', 'http://', 'git://'):
185 185 if uri.startswith(pat):
186 186 uri = uri[len(pat):]
187 187 proto = pat
188 188 break
189 189
190 190 # remove passwords and username
191 191 uri = uri[uri.find('@') + 1:]
192 192
193 193 # get the port
194 194 cred_pos = uri.find(':')
195 195 if cred_pos == -1:
196 196 host, port = uri, None
197 197 else:
198 198 host, port = uri[:cred_pos], uri[cred_pos + 1:]
199 199
200 200 return [_f for _f in [proto, host, port] if _f]
201 201
202 202
203 203 def credentials_filter(uri):
204 204 """
205 205 Returns a url with removed credentials
206 206
207 207 :param uri:
208 208 """
209 209
210 210 uri = uri_filter(uri)
211 211 # check if we have port
212 212 if len(uri) > 2 and uri[2]:
213 213 uri[2] = ':' + uri[2]
214 214
215 215 return ''.join(uri)
216 216
217 217
218 218 def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
219 219 parsed_url = urlobject.URLObject(prefix_url)
220 220 prefix = urllib.parse.unquote(parsed_url.path.rstrip('/'))
221 221 try:
222 222 system_user = pwd.getpwuid(os.getuid()).pw_name
223 223 except NameError: # TODO: support all systems - especially Windows
224 224 system_user = 'kallithea' # hardcoded default value ...
225 225 args = {
226 226 'scheme': parsed_url.scheme,
227 227 'user': urllib.parse.quote(username or ''),
228 228 'netloc': parsed_url.netloc + prefix, # like "hostname:port/prefix" (with optional ":port" and "/prefix")
229 229 'prefix': prefix, # undocumented, empty or starting with /
230 230 'repo': repo_name,
231 231 'repoid': str(repo_id),
232 232 'system_user': system_user,
233 233 'hostname': parsed_url.hostname,
234 234 }
235 235 url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
236 236
237 237 # remove leading @ sign if it's present. Case of empty user
238 238 url_obj = urlobject.URLObject(url)
239 239 if not url_obj.username:
240 240 url_obj = url_obj.with_username(None)
241 241
242 242 return str(url_obj)
243 243
244 244
245 245 def short_ref_name(ref_type, ref_name):
246 246 """Return short description of PR ref - revs will be truncated"""
247 247 if ref_type == 'rev':
248 248 return ref_name[:12]
249 249 return ref_name
250 250
251 251
252 252 def link_to_ref(repo_name, ref_type, ref_name, rev=None):
253 253 """
254 254 Return full markup for a PR ref to changeset_home for a changeset.
255 255 If ref_type is 'branch', it will link to changelog.
256 256 ref_name is shortened if ref_type is 'rev'.
257 257 if rev is specified, show it too, explicitly linking to that revision.
258 258 """
259 259 txt = short_ref_name(ref_type, ref_name)
260 260 if ref_type == 'branch':
261 261 u = webutils.url('changelog_home', repo_name=repo_name, branch=ref_name)
262 262 else:
263 263 u = webutils.url('changeset_home', repo_name=repo_name, revision=ref_name)
264 264 l = webutils.link_to(repo_name + '#' + txt, u)
265 265 if rev and ref_type != 'rev':
266 266 l = webutils.literal('%s (%s)' % (l, webutils.link_to(rev[:12], webutils.url('changeset_home', repo_name=repo_name, revision=rev))))
267 267 return l
268 268
269 269
270 270 def get_changeset_safe(repo, rev):
271 271 """
272 272 Safe version of get_changeset if this changeset doesn't exists for a
273 273 repo it returns a Dummy one instead
274 274
275 275 :param repo:
276 276 :param rev:
277 277 """
278 278 if not isinstance(repo, BaseRepository):
279 279 raise Exception('You must pass an Repository '
280 280 'object as first argument got %s' % type(repo))
281 281
282 282 try:
283 283 cs = repo.get_changeset(rev)
284 284 except (RepositoryError, LookupError):
285 285 cs = EmptyChangeset(requested_revision=rev)
286 286 return cs
287 287
288 288
289 289 def datetime_to_time(dt):
290 290 if dt:
291 291 return time.mktime(dt.timetuple())
292 292
293 293
294 294 def time_to_datetime(tm):
295 295 if tm:
296 296 if isinstance(tm, str):
297 297 try:
298 298 tm = float(tm)
299 299 except ValueError:
300 300 return
301 301 return datetime.datetime.fromtimestamp(tm)
302 302
303 303
304 304 class AttributeDict(dict):
305 305 def __getattr__(self, attr):
306 306 return self.get(attr, None)
307 307 __setattr__ = dict.__setitem__
308 308 __delattr__ = dict.__delitem__
309 309
310 310
311 311 def obfuscate_url_pw(engine):
312 312 try:
313 313 _url = sa_url.make_url(engine or '')
314 314 except ArgumentError:
315 315 return engine
316 316 if _url.password:
317 317 _url.password = 'XXXXX'
318 318 return str(_url)
319 319
320 320
321 class HookEnvironmentError(Exception): pass
321 class HookEnvironmentError(Exception):
322 pass
322 323
323 324
324 325 def get_hook_environment():
325 326 """
326 327 Get hook context by deserializing the global KALLITHEA_EXTRAS environment
327 328 variable.
328 329
329 330 Called early in Git out-of-process hooks to get .ini config path so the
330 331 basic environment can be configured properly. Also used in all hooks to get
331 332 information about the action that triggered it.
332 333 """
333 334
334 335 try:
335 336 kallithea_extras = os.environ['KALLITHEA_EXTRAS']
336 337 except KeyError:
337 338 raise HookEnvironmentError("Environment variable KALLITHEA_EXTRAS not found")
338 339
339 340 extras = json.loads(kallithea_extras)
340 341 for k in ['username', 'repository', 'scm', 'action', 'ip', 'config']:
341 342 try:
342 343 extras[k]
343 344 except KeyError:
344 345 raise HookEnvironmentError('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
345 346
346 347 return AttributeDict(extras)
347 348
348 349
349 350 def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
350 351 """Prepare global context for running hooks by serializing data in the
351 352 global KALLITHEA_EXTRAS environment variable.
352 353
353 354 Most importantly, this allow Git hooks to do proper logging and updating of
354 355 caches after pushes.
355 356
356 357 Must always be called before anything with hooks are invoked.
357 358 """
358 359 extras = {
359 360 'ip': ip_addr, # used in action_logger
360 361 'username': username,
361 362 'action': action or 'push_local', # used in process_pushed_raw_ids action_logger
362 363 'repository': repo_name,
363 364 'scm': repo_alias,
364 365 'config': kallithea.CONFIG['__file__'], # used by git hook to read config
365 366 }
366 367 os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
367 368
368 369
369 370 def get_current_authuser():
370 371 """
371 372 Gets kallithea user from threadlocal tmpl_context variable if it's
372 373 defined, else returns None.
373 374 """
374 375 try:
375 376 return getattr(tmpl_context, 'authuser', None)
376 377 except TypeError: # No object (name: context) has been registered for this thread
377 378 return None
378 379
379 380
380 381 def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
381 382 return _cleanstringsub('_', s).rstrip('_')
382 383
383 384
384 385 def recursive_replace(str_, replace=' '):
385 386 """
386 387 Recursive replace of given sign to just one instance
387 388
388 389 :param str_: given string
389 390 :param replace: char to find and replace multiple instances
390 391
391 392 Examples::
392 393 >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
393 394 'Mighty-Mighty-Bo-sstones'
394 395 """
395 396
396 397 if str_.find(replace * 2) == -1:
397 398 return str_
398 399 else:
399 400 str_ = str_.replace(replace * 2, replace)
400 401 return recursive_replace(str_, replace)
401 402
402 403
403 404 def repo_name_slug(value):
404 405 """
405 406 Return slug of name of repository
406 407 This function is called on each creation/modification
407 408 of repository to prevent bad names in repo
408 409 """
409 410
410 411 slug = remove_formatting(value)
411 412 slug = strip_tags(slug)
412 413
413 414 for c in r"""`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
414 415 slug = slug.replace(c, '-')
415 416 slug = recursive_replace(slug, '-')
416 417 slug = collapse(slug, '-')
417 418 return slug
418 419
419 420
420 421 def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
421 422 while True:
422 423 ok = input(prompt)
423 424 if ok in ('y', 'ye', 'yes'):
424 425 return True
425 426 if ok in ('n', 'no', 'nop', 'nope'):
426 427 return False
427 428 retries = retries - 1
428 429 if retries < 0:
429 430 raise IOError
430 431 print(complaint)
431 432
432 433
433 434 class PasswordGenerator(object):
434 435 """
435 436 This is a simple class for generating password from different sets of
436 437 characters
437 438 usage::
438 439
439 440 passwd_gen = PasswordGenerator()
440 441 #print 8-letter password containing only big and small letters
441 442 of alphabet
442 443 passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
443 444 """
444 445 ALPHABETS_NUM = r'''1234567890'''
445 446 ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
446 447 ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
447 448 ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
448 449 ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
449 450 + ALPHABETS_NUM + ALPHABETS_SPECIAL
450 451 ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
451 452 ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
452 453 ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
453 454 ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
454 455
455 456 def gen_password(self, length, alphabet=ALPHABETS_FULL):
456 457 assert len(alphabet) <= 256, alphabet
457 458 l = []
458 459 while len(l) < length:
459 460 i = ord(os.urandom(1))
460 461 if i < len(alphabet):
461 462 l.append(alphabet[i])
462 463 return ''.join(l)
463 464
464 465
465 466 def get_crypt_password(password):
466 467 """
467 468 Cryptographic function used for bcrypt password hashing.
468 469
469 470 :param password: password to hash
470 471 """
471 472 return ascii_str(bcrypt.hashpw(safe_bytes(password), bcrypt.gensalt(10)))
472 473
473 474
474 475 def check_password(password, hashed):
475 476 """
476 477 Checks password match the hashed value using bcrypt.
477 478 Remains backwards compatible and accept plain sha256 hashes which used to
478 479 be used on Windows.
479 480
480 481 :param password: password
481 482 :param hashed: password in hashed form
482 483 """
483 484 # sha256 hashes will always be 64 hex chars
484 485 # bcrypt hashes will always contain $ (and be shorter)
485 486 if len(hashed) == 64 and all(x in string.hexdigits for x in hashed):
486 487 return hashlib.sha256(password).hexdigest() == hashed
487 488 try:
488 489 return bcrypt.checkpw(safe_bytes(password), ascii_bytes(hashed))
489 490 except ValueError as e:
490 491 # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
491 492 log.error('error from bcrypt checking password: %s', e)
492 493 return False
493 494 log.error('check_password failed - no method found for hash length %s', len(hashed))
494 495 return False
495 496
496 497
497 498 git_req_ver = StrictVersion('1.7.4')
498 499
499 500 def check_git_version():
500 501 """
501 502 Checks what version of git is installed on the system, and raise a system exit
502 503 if it's too old for Kallithea to work properly.
503 504 """
504 505 if 'git' not in kallithea.BACKENDS:
505 506 return None
506 507
507 508 if not settings.GIT_EXECUTABLE_PATH:
508 509 log.warning('No git executable configured - check "git_path" in the ini file.')
509 510 return None
510 511
511 512 try:
512 513 stdout, stderr = GitRepository._run_git_command(['--version'])
513 514 except RepositoryError as e:
514 515 # message will already have been logged as error
515 516 log.warning('No working git executable found - check "git_path" in the ini file.')
516 517 return None
517 518
518 519 if stderr:
519 520 log.warning('Error/stderr from "%s --version":\n%s', settings.GIT_EXECUTABLE_PATH, safe_str(stderr))
520 521
521 522 if not stdout:
522 523 log.warning('No working git executable found - check "git_path" in the ini file.')
523 524 return None
524 525
525 526 output = safe_str(stdout).strip()
526 527 m = re.search(r"\d+.\d+.\d+", output)
527 528 if m:
528 529 ver = StrictVersion(m.group(0))
529 530 log.debug('Git executable: "%s", version %s (parsed from: "%s")',
530 531 settings.GIT_EXECUTABLE_PATH, ver, output)
531 532 if ver < git_req_ver:
532 533 log.error('Kallithea detected %s version %s, which is too old '
533 534 'for the system to function properly. '
534 535 'Please upgrade to version %s or later. '
535 536 'If you strictly need Mercurial repositories, you can '
536 537 'clear the "git_path" setting in the ini file.',
537 538 settings.GIT_EXECUTABLE_PATH, ver, git_req_ver)
538 539 log.error("Terminating ...")
539 540 sys.exit(1)
540 541 else:
541 542 ver = StrictVersion('0.0.0')
542 543 log.warning('Error finding version number in "%s --version" stdout:\n%s',
543 544 settings.GIT_EXECUTABLE_PATH, output)
544 545
545 546 return ver
@@ -1,641 +1,642 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 Test suite for vcs push/pull operations.
16 16
17 17 The tests need Git > 1.8.1.
18 18
19 19 This file was forked by the Kallithea project in July 2014.
20 20 Original author and date, and relevant copyright and licensing information is below:
21 21 :created_on: Dec 30, 2010
22 22 :author: marcink
23 23 :copyright: (c) 2013 RhodeCode GmbH, and others.
24 24 :license: GPLv3, see LICENSE.md for more details.
25 25
26 26 """
27 27
28 28 import json
29 29 import os
30 30 import re
31 31 import tempfile
32 32 import time
33 33 import urllib.request
34 34 from subprocess import PIPE, Popen
35 35 from tempfile import _RandomNameSequence
36 36
37 37 import pytest
38 38
39 39 import kallithea
40 40 from kallithea.lib.utils2 import ascii_bytes, safe_str
41 41 from kallithea.model import db, meta
42 42 from kallithea.model.ssh_key import SshKeyModel
43 43 from kallithea.model.user import UserModel
44 44 from kallithea.tests import base
45 45 from kallithea.tests.fixture import Fixture
46 46
47 47
48 48 DEBUG = True
49 49 HOST = '127.0.0.1:4999' # test host
50 50
51 51 fixture = Fixture()
52 52
53 53
54 54 # Parameterize different kinds of VCS testing - both the kind of VCS and the
55 55 # access method (HTTP/SSH)
56 56
57 57 # Mixin for using HTTP and SSH URLs
58 58 class HttpVcsTest(object):
59 59 @staticmethod
60 60 def repo_url_param(webserver, repo_name, **kwargs):
61 61 return webserver.repo_url(repo_name, **kwargs)
62 62
63 63 class SshVcsTest(object):
64 64 public_keys = {
65 65 base.TEST_USER_REGULAR_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost',
66 66 base.TEST_USER_ADMIN_LOGIN: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost',
67 67 }
68 68
69 69 @classmethod
70 70 def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR):
71 71 user = db.User.get_by_username(username)
72 72 if user.ssh_keys:
73 73 ssh_key = user.ssh_keys[0]
74 74 else:
75 75 sshkeymodel = SshKeyModel()
76 76 ssh_key = sshkeymodel.create(user, 'test key', cls.public_keys[user.username])
77 77 meta.Session().commit()
78 78
79 79 return cls._ssh_param(repo_name, user, ssh_key, client_ip)
80 80
81 81 # Mixins for using Mercurial and Git
82 82 class HgVcsTest(object):
83 83 repo_type = 'hg'
84 84 repo_name = base.HG_REPO
85 85
86 86 class GitVcsTest(object):
87 87 repo_type = 'git'
88 88 repo_name = base.GIT_REPO
89 89
90 90 # Combine mixins to give the combinations we want to parameterize tests with
91 91 class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
92 92 pass
93 93
94 94 class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
95 95 pass
96 96
97 97 class HgSshVcsTest(HgVcsTest, SshVcsTest):
98 98 @staticmethod
99 99 def _ssh_param(repo_name, user, ssh_key, client_ip):
100 100 # Specify a custom ssh command on the command line
101 101 return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % (
102 102 client_ip,
103 103 kallithea.CONFIG['__file__'],
104 104 user.user_id,
105 105 ssh_key.user_ssh_key_id,
106 106 repo_name)
107 107
108 108 class GitSshVcsTest(GitVcsTest, SshVcsTest):
109 109 @staticmethod
110 110 def _ssh_param(repo_name, user, ssh_key, client_ip):
111 111 # Set a custom ssh command in the global environment
112 112 os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % (
113 113 client_ip,
114 114 kallithea.CONFIG['__file__'],
115 115 user.user_id,
116 116 ssh_key.user_ssh_key_id)
117 117 return "ssh://someuser@somehost/%s""" % repo_name
118 118
119 119 parametrize_vcs_test = base.parametrize('vt', [
120 120 HgHttpVcsTest,
121 121 GitHttpVcsTest,
122 122 HgSshVcsTest,
123 123 GitSshVcsTest,
124 124 ])
125 125 parametrize_vcs_test_hg = base.parametrize('vt', [
126 126 HgHttpVcsTest,
127 127 HgSshVcsTest,
128 128 ])
129 129 parametrize_vcs_test_http = base.parametrize('vt', [
130 130 HgHttpVcsTest,
131 131 GitHttpVcsTest,
132 132 ])
133 133
134 134 class Command(object):
135 135
136 136 def __init__(self, cwd):
137 137 self.cwd = cwd
138 138
139 139 def execute(self, *args, **environ):
140 140 """
141 141 Runs command on the system with given ``args`` using simple space
142 142 join without safe quoting.
143 143 """
144 144 command = ' '.join(args)
145 145 ignoreReturnCode = environ.pop('ignoreReturnCode', False)
146 146 if DEBUG:
147 147 print('*** CMD %s ***' % command)
148 148 testenv = dict(os.environ)
149 149 testenv['LANG'] = 'en_US.UTF-8'
150 150 testenv['LANGUAGE'] = 'en_US:en'
151 151 testenv['HGPLAIN'] = ''
152 152 testenv['HGRCPATH'] = ''
153 153 testenv.update(environ)
154 154 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
155 155 stdout, stderr = p.communicate()
156 156 if DEBUG:
157 157 if stdout:
158 158 print('stdout:', stdout)
159 159 if stderr:
160 160 print('stderr:', stderr)
161 161 if not ignoreReturnCode:
162 162 assert p.returncode == 0
163 163 return safe_str(stdout), safe_str(stderr)
164 164
165 165
166 166 def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
167 167 return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
168 168
169 169
170 170 def _add_files(vcs, dest_dir, files_no=3):
171 171 """
172 172 Generate some files, add it to dest_dir repo and push back
173 173 vcs is git or hg and defines what VCS we want to make those files for
174 174
175 175 :param vcs:
176 176 :param dest_dir:
177 177 """
178 178 added_file = '%ssetup.py' % next(_RandomNameSequence())
179 179 open(os.path.join(dest_dir, added_file), 'a').close()
180 180 Command(dest_dir).execute(vcs, 'add', added_file)
181 181
182 182 email = 'me@example.com'
183 183 if os.name == 'nt':
184 184 author_str = 'User <%s>' % email
185 185 else:
186 186 author_str = 'User ǝɯɐᴎ <%s>' % email
187 187 for i in range(files_no):
188 188 cmd = """echo "added_line%s" >> %s""" % (i, added_file)
189 189 Command(dest_dir).execute(cmd)
190 190 if vcs == 'hg':
191 191 cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
192 192 i, author_str, added_file
193 193 )
194 194 elif vcs == 'git':
195 195 cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
196 196 i, author_str, added_file
197 197 )
198 198 # git commit needs EMAIL on some machines
199 199 Command(dest_dir).execute(cmd, EMAIL=email)
200 200
201 201 def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3):
202 202 _add_files(vt.repo_type, dest_dir, files_no=files_no)
203 203 # PUSH it back
204 204 stdout = stderr = None
205 205 if vt.repo_type == 'hg':
206 206 stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
207 207 elif vt.repo_type == 'git':
208 208 stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
209 209
210 210 return stdout, stderr
211 211
212 212
213 213 def _check_outgoing(vcs, cwd, clone_url):
214 214 if vcs == 'hg':
215 215 # hg removes the password from default URLs, so we have to provide it here via the clone_url
216 216 return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
217 217 elif vcs == 'git':
218 218 Command(cwd).execute('git remote update')
219 219 return Command(cwd).execute('git log origin/master..master')
220 220
221 221
222 222 def set_anonymous_access(enable=True):
223 223 user = db.User.get_default_user()
224 224 user.active = enable
225 225 meta.Session().commit()
226 226 if enable != db.User.get_default_user().active:
227 227 raise Exception('Cannot set anonymous access')
228 228
229 229
230 230 #==============================================================================
231 231 # TESTS
232 232 #==============================================================================
233 233
234 234
235 235 def _check_proper_git_push(stdout, stderr):
236 236 assert 'fatal' not in stderr
237 237 assert 'rejected' not in stderr
238 238 assert 'Pushing to' in stderr
239 239 assert 'master -> master' in stderr
240 240
241 241
242 242 @pytest.mark.usefixtures("test_context_fixture")
243 243 class TestVCSOperations(base.TestController):
244 244
245 245 @classmethod
246 246 def setup_class(cls):
247 247 # DISABLE ANONYMOUS ACCESS
248 248 set_anonymous_access(False)
249 249
250 250 @pytest.fixture()
251 251 def testhook_cleanup(self):
252 252 yield
253 253 # remove hook
254 254 for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
255 255 entry = db.Ui.get_by_key('hooks', '%s.testhook' % hook)
256 256 if entry:
257 257 meta.Session().delete(entry)
258 258 meta.Session().commit()
259 259
260 260 @pytest.fixture(scope="module")
261 261 def testfork(self):
262 262 # create fork so the repo stays untouched
263 263 git_fork_name = '%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence()))
264 264 fixture.create_fork(base.GIT_REPO, git_fork_name)
265 265 hg_fork_name = '%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence()))
266 266 fixture.create_fork(base.HG_REPO, hg_fork_name)
267 267 return {'git': git_fork_name, 'hg': hg_fork_name}
268 268
269 269 @parametrize_vcs_test
270 270 def test_clone_repo_by_admin(self, webserver, vt):
271 271 clone_url = vt.repo_url_param(webserver, vt.repo_name)
272 272 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
273 273
274 274 if vt.repo_type == 'git':
275 275 assert 'Cloning into' in stdout + stderr
276 276 assert stderr == '' or stdout == ''
277 277 elif vt.repo_type == 'hg':
278 278 assert 'requesting all changes' in stdout
279 279 assert 'adding changesets' in stdout
280 280 assert 'adding manifests' in stdout
281 281 assert 'adding file changes' in stdout
282 282 assert stderr == ''
283 283
284 284 @parametrize_vcs_test_http
285 285 def test_clone_wrong_credentials(self, webserver, vt):
286 286 clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!')
287 287 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
288 288 if vt.repo_type == 'git':
289 289 assert 'fatal: Authentication failed' in stderr
290 290 elif vt.repo_type == 'hg':
291 291 assert 'abort: authorization failed' in stderr
292 292
293 293 def test_clone_git_dir_as_hg(self, webserver):
294 294 clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
295 295 stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
296 296 assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr
297 297
298 298 def test_clone_hg_repo_as_git(self, webserver):
299 299 clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO)
300 300 stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
301 301 assert 'not found' in stderr
302 302
303 303 @parametrize_vcs_test
304 304 def test_clone_non_existing_path(self, webserver, vt):
305 305 clone_url = vt.repo_url_param(webserver, 'trololo')
306 306 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
307 307 if vt.repo_type == 'git':
308 308 assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr
309 309 elif vt.repo_type == 'hg':
310 310 assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout + stderr
311 311
312 312 @parametrize_vcs_test
313 313 def test_push_new_repo(self, webserver, vt):
314 314 # Clear the log so we know what is added
315 315 db.UserLog.query().delete()
316 316 meta.Session().commit()
317 317
318 318 # Create an empty server repo using the API
319 319 repo_name = 'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence()))
320 320 usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
321 321 params = {
322 322 "id": 7,
323 323 "api_key": usr.api_key,
324 324 "method": 'create_repo',
325 325 "args": dict(repo_name=repo_name,
326 326 owner=base.TEST_USER_ADMIN_LOGIN,
327 327 repo_type=vt.repo_type),
328 328 }
329 329 req = urllib.request.Request(
330 330 'http://%s:%s/_admin/api' % webserver.server_address,
331 331 data=ascii_bytes(json.dumps(params)),
332 332 headers={'content-type': 'application/json'})
333 333 response = urllib.request.urlopen(req)
334 334 result = json.loads(response.read())
335 335 # Expect something like:
336 336 # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
337 337 assert result['result']['success']
338 338
339 339 # Create local clone of the empty server repo
340 340 local_clone_dir = _get_tmp_dir()
341 341 clone_url = vt.repo_url_param(webserver, repo_name)
342 342 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir)
343 343
344 344 # Make 3 commits and push to the empty server repo.
345 345 # The server repo doesn't have any other heads than the
346 346 # refs/heads/master we are pushing, but the `git log` in the push hook
347 347 # should still list the 3 commits.
348 348 stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url)
349 349 if vt.repo_type == 'git':
350 350 _check_proper_git_push(stdout, stderr)
351 351 elif vt.repo_type == 'hg':
352 352 assert 'pushing to ' in stdout
353 353 assert 'remote: added ' in stdout
354 354
355 355 # Verify that we got the right events in UserLog. Expect something like:
356 356 # <UserLog('id:new_git_XXX:started_following_repo')>
357 357 # <UserLog('id:new_git_XXX:user_created_repo')>
358 358 # <UserLog('id:new_git_XXX:pull')>
359 359 # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
360 360 meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
361 361 action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
362 362 assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([
363 363 ('started_following_repo', 0),
364 364 ('user_created_repo', 0),
365 365 ('pull', 0),
366 366 ('push', 3)]
367 367 if vt.repo_type == 'git' else [
368 368 ('started_following_repo', 0),
369 369 ('user_created_repo', 0),
370 370 # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones
371 371 ('push', 3)])
372 372
373 373 @parametrize_vcs_test
374 374 def test_push_new_file(self, webserver, testfork, vt):
375 375 db.UserLog.query().delete()
376 376 meta.Session().commit()
377 377
378 378 dest_dir = _get_tmp_dir()
379 379 clone_url = vt.repo_url_param(webserver, vt.repo_name)
380 380 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
381 381
382 382 clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
383 383 stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url=clone_url)
384 384
385 385 if vt.repo_type == 'git':
386 386 _check_proper_git_push(stdout, stderr)
387 387 elif vt.repo_type == 'hg':
388 388 assert 'pushing to' in stdout
389 389 assert 'Repository size' in stdout
390 390 assert 'Last revision is now' in stdout
391 391
392 392 meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
393 393 action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
394 394 assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
395 395 [('pull', 0), ('push', 3)]
396 396
397 397 @parametrize_vcs_test
398 398 def test_pull(self, webserver, testfork, vt):
399 399 db.UserLog.query().delete()
400 400 meta.Session().commit()
401 401
402 402 dest_dir = _get_tmp_dir()
403 403 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'init', dest_dir)
404 404
405 405 clone_url = vt.repo_url_param(webserver, vt.repo_name)
406 406 stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url)
407 407 meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
408 408
409 409 if vt.repo_type == 'git':
410 410 assert 'FETCH_HEAD' in stderr
411 411 elif vt.repo_type == 'hg':
412 412 assert 'new changesets' in stdout
413 413
414 414 action_parts = [ul.action for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
415 415 assert action_parts == ['pull']
416 416
417 417 # Test handling of URLs with extra '/' around repo_name
418 418 stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/./%s/' % vt.repo_name), ignoreReturnCode=True)
419 419 if issubclass(vt, HttpVcsTest):
420 420 if vt.repo_type == 'git':
421 421 # NOTE: when pulling from http://hostname/./vcs_test_git/ , the git client will normalize that and issue an HTTP request to /vcs_test_git/info/refs
422 422 assert 'Already up to date.' in stdout
423 423 else:
424 424 assert vt.repo_type == 'hg'
425 425 assert "abort: HTTP Error 404: Not Found" in stderr
426 426 else:
427 427 assert issubclass(vt, SshVcsTest)
428 428 if vt.repo_type == 'git':
429 429 assert "abort: Access to './%s' denied" % vt.repo_name in stderr
430 430 else:
431 431 assert "abort: Access to './%s' denied" % vt.repo_name in stdout + stderr
432 432
433 433 stdout, stderr = Command(dest_dir).execute(vt.repo_type, 'pull', clone_url.replace('/' + vt.repo_name, '/%s/' % vt.repo_name), ignoreReturnCode=True)
434 434 if vt.repo_type == 'git':
435 435 assert 'Already up to date.' in stdout
436 436 else:
437 437 assert vt.repo_type == 'hg'
438 438 assert "no changes found" in stdout
439 439 assert "denied" not in stderr
440 440 assert "denied" not in stdout
441 441 assert "404" not in stdout
442 442
443 443 @parametrize_vcs_test
444 444 def test_push_invalidates_cache(self, webserver, testfork, vt):
445 445 pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])]
446 446
447 447 dest_dir = _get_tmp_dir()
448 448 clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
449 449 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
450 450
451 451 stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url)
452 452
453 453 if vt.repo_type == 'git':
454 454 _check_proper_git_push(stdout, stderr)
455 455
456 456 meta.Session.close() # expire session to make sure SA fetches new Repository instances after last_changeset has been updated by server side hook in another process
457 457 post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in db.Repository.query().filter(db.Repository.repo_name == testfork[vt.repo_type])]
458 458 assert pre_cached_tip != post_cached_tip
459 459
460 460 @parametrize_vcs_test_http
461 461 def test_push_wrong_credentials(self, webserver, vt):
462 462 dest_dir = _get_tmp_dir()
463 463 clone_url = vt.repo_url_param(webserver, vt.repo_name)
464 464 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
465 465
466 466 clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name')
467 467 stdout, stderr = _add_files_and_push(webserver, vt, dest_dir,
468 468 clone_url=clone_url, ignoreReturnCode=True)
469 469
470 470 if vt.repo_type == 'git':
471 471 assert 'fatal: Authentication failed' in stderr
472 472 elif vt.repo_type == 'hg':
473 473 assert 'abort: authorization failed' in stderr
474 474
475 475 @parametrize_vcs_test
476 476 def test_push_with_readonly_credentials(self, webserver, vt):
477 477 db.UserLog.query().delete()
478 478 meta.Session().commit()
479 479
480 480 dest_dir = _get_tmp_dir()
481 481 clone_url = vt.repo_url_param(webserver, vt.repo_name, username=base.TEST_USER_REGULAR_LOGIN, password=base.TEST_USER_REGULAR_PASS)
482 482 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
483 483
484 484 stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, ignoreReturnCode=True, clone_url=clone_url)
485 485
486 486 if vt.repo_type == 'git':
487 487 assert 'The requested URL returned error: 403' in stderr or 'abort: Push access to %r denied' % str(vt.repo_name) in stderr
488 488 elif vt.repo_type == 'hg':
489 489 assert 'abort: HTTP Error 403: Forbidden' in stderr or 'abort: push failed on remote' in stderr and 'remote: Push access to %r denied' % str(vt.repo_name) in stdout
490 490
491 491 meta.Session.close() # make sure SA fetches all new log entries (apparently only needed for MariaDB/MySQL ...)
492 492 action_parts = [ul.action.split(':', 1) for ul in db.UserLog.query().order_by(db.UserLog.user_log_id)]
493 493 assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == \
494 494 [('pull', 0)]
495 495
496 496 @parametrize_vcs_test
497 497 def test_push_back_to_wrong_url(self, webserver, vt):
498 498 dest_dir = _get_tmp_dir()
499 499 clone_url = vt.repo_url_param(webserver, vt.repo_name)
500 500 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
501 501
502 502 stdout, stderr = _add_files_and_push(
503 503 webserver, vt, dest_dir, clone_url='http://%s:%s/tmp' % (
504 504 webserver.server_address[0], webserver.server_address[1]),
505 505 ignoreReturnCode=True)
506 506
507 507 if vt.repo_type == 'git':
508 508 assert 'not found' in stderr
509 509 elif vt.repo_type == 'hg':
510 510 assert 'HTTP Error 404: Not Found' in stderr
511 511
512 512 @parametrize_vcs_test
513 513 def test_ip_restriction(self, webserver, vt):
514 514 user_model = UserModel()
515 515 try:
516 516 # Add IP constraint that excludes the test context:
517 517 user_model.add_extra_ip(base.TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
518 518 meta.Session().commit()
519 519 # IP permissions are cached, need to wait for the cache in the server process to expire
520 520 time.sleep(1.5)
521 521 clone_url = vt.repo_url_param(webserver, vt.repo_name)
522 522 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
523 523 if vt.repo_type == 'git':
524 524 # The message apparently changed in Git 1.8.3, so match it loosely.
525 525 assert re.search(r'\b403\b', stderr) or 'abort: User test_admin from 127.0.0.127 cannot be authorized' in stderr
526 526 elif vt.repo_type == 'hg':
527 527 assert 'abort: HTTP Error 403: Forbidden' in stderr or 'remote: abort: User test_admin from 127.0.0.127 cannot be authorized' in stdout + stderr
528 528 finally:
529 529 # release IP restrictions
530 530 for ip in db.UserIpMap.query():
531 531 db.UserIpMap.delete(ip.ip_id)
532 532 meta.Session().commit()
533 533 # IP permissions are cached, need to wait for the cache in the server process to expire
534 534 time.sleep(1.5)
535 535
536 536 clone_url = vt.repo_url_param(webserver, vt.repo_name)
537 537 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
538 538
539 539 if vt.repo_type == 'git':
540 540 assert 'Cloning into' in stdout + stderr
541 541 assert stderr == '' or stdout == ''
542 542 elif vt.repo_type == 'hg':
543 543 assert 'requesting all changes' in stdout
544 544 assert 'adding changesets' in stdout
545 545 assert 'adding manifests' in stdout
546 546 assert 'adding file changes' in stdout
547 547
548 548 assert stderr == ''
549 549
550 550 @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
551 551 def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, vt):
552 552 # set prechangegroup to failing hook (returns True)
553 553 db.Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
554 554 meta.Session().commit()
555 555 # clone repo
556 556 clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
557 557 dest_dir = _get_tmp_dir()
558 558 stdout, stderr = Command(base.TESTS_TMP_PATH) \
559 559 .execute(vt.repo_type, 'clone', clone_url, dest_dir, ignoreReturnCode=True)
560 560 if vt.repo_type == 'hg':
561 561 assert 'preoutgoing.testhook hook failed' in stdout + stderr
562 562 elif vt.repo_type == 'git':
563 563 assert 'error: 406' in stderr
564 564
565 565 @parametrize_vcs_test_hg # git hooks doesn't work like hg hooks
566 566 def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, vt):
567 567 # set prechangegroup to failing hook (returns exit code 1)
568 568 db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
569 569 meta.Session().commit()
570 570 # clone repo
571 571 clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type], username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS)
572 572 dest_dir = _get_tmp_dir()
573 573 stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
574 574
575 575 stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, clone_url,
576 576 ignoreReturnCode=True)
577 577 assert 'failing_test_hook failed' in stdout + stderr
578 578 assert 'Traceback' not in stdout + stderr
579 579 assert 'prechangegroup.testhook hook failed' in stdout + stderr
580 580 # there are still outgoing changesets
581 581 stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
582 582 assert stdout != ''
583 583
584 584 # set prechangegroup hook to exception throwing method
585 585 db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook')
586 586 meta.Session().commit()
587 587 # re-try to push
588 588 stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
589 589 if vt is HgHttpVcsTest:
590 590 # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned
591 591 assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr
592 592 elif vt is HgSshVcsTest:
593 593 assert 'remote: Exception: exception_test_hook threw an exception' in stdout
594 else: assert False
594 else:
595 assert False
595 596 # there are still outgoing changesets
596 597 stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
597 598 assert stdout != ''
598 599
599 600 # set prechangegroup hook to method that returns False
600 601 db.Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook')
601 602 meta.Session().commit()
602 603 # re-try to push
603 604 stdout, stderr = Command(dest_dir).execute('%s push' % vt.repo_type, clone_url, ignoreReturnCode=True)
604 605 assert 'passing_test_hook succeeded' in stdout + stderr
605 606 assert 'Traceback' not in stdout + stderr
606 607 assert 'prechangegroup.testhook hook failed' not in stdout + stderr
607 608 # no more outgoing changesets
608 609 stdout, stderr = _check_outgoing(vt.repo_type, dest_dir, clone_url)
609 610 assert stdout == ''
610 611 assert stderr == ''
611 612
612 613 def test_add_submodule_git(self, webserver, testfork):
613 614 dest_dir = _get_tmp_dir()
614 615 clone_url = GitHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
615 616
616 617 fork_url = GitHttpVcsTest.repo_url_param(webserver, testfork['git'])
617 618
618 619 # add submodule
619 620 stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', fork_url, dest_dir)
620 621 stdout, stderr = Command(dest_dir).execute('git submodule add', clone_url, 'testsubmodule')
621 622 stdout, stderr = Command(dest_dir).execute('git commit -am "added testsubmodule pointing to', clone_url, '"', EMAIL=base.TEST_USER_ADMIN_EMAIL)
622 623 stdout, stderr = Command(dest_dir).execute('git push', fork_url, 'master')
623 624
624 625 # check for testsubmodule link in files page
625 626 self.log_user()
626 627 response = self.app.get(base.url(controller='files', action='index',
627 628 repo_name=testfork['git'],
628 629 revision='tip',
629 630 f_path='/'))
630 631 # check _repo_files_url that will be used to reload as AJAX
631 632 response.mustcontain('var _repo_files_url = ("/%s/files/");' % testfork['git'])
632 633
633 634 response.mustcontain('<a class="submodule-dir" href="%s" target="_blank"><i class="icon-file-submodule"></i><span>testsubmodule @ ' % clone_url)
634 635
635 636 # check that following a submodule link actually works - and redirects
636 637 response = self.app.get(base.url(controller='files', action='index',
637 638 repo_name=testfork['git'],
638 639 revision='tip',
639 640 f_path='/testsubmodule'),
640 641 status=302)
641 642 assert response.location == clone_url
General Comments 0
You need to be logged in to leave comments. Login now