diff --git a/rhodecode/apps/ssh_support/lib/backends/__init__.py b/rhodecode/apps/ssh_support/lib/backends/__init__.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/apps/ssh_support/lib/backends/__init__.py
@@ -0,0 +1,202 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import re
+import logging
+import datetime
+import ConfigParser
+
+from rhodecode.model.db import Session, User, UserSshKeys
+from rhodecode.model.scm import ScmModel
+
+from .hg import MercurialServer
+from .git import GitServer
+from .svn import SubversionServer
+log = logging.getLogger(__name__)
+
+
+class SshWrapper(object):
+
+ def __init__(self, command, connection_info, mode,
+ user, user_id, key_id, shell, ini_path, env):
+ self.command = command
+ self.connection_info = connection_info
+ self.mode = mode
+ self.user = user
+ self.user_id = user_id
+ self.key_id = key_id
+ self.shell = shell
+ self.ini_path = ini_path
+ self.env = env
+
+ self.config = self.parse_config(ini_path)
+ self.server_impl = None
+
+ def parse_config(self, config_path):
+ parser = ConfigParser.ConfigParser()
+ parser.read(config_path)
+ return parser
+
+ def update_key_access_time(self, key_id):
+ key = UserSshKeys().query().filter(
+ UserSshKeys.ssh_key_id == key_id).scalar()
+ if key:
+ key.accessed_on = datetime.datetime.utcnow()
+ Session().add(key)
+ Session().commit()
+ log.debug('Update key `%s` access time', key_id)
+
+ def get_connection_info(self):
+ """
+ connection_info
+
+ Identifies the client and server ends of the connection.
+ The variable contains four space-separated values: client IP address,
+ client port number, server IP address, and server port number.
+ """
+ conn = dict(
+ client_ip=None,
+ client_port=None,
+ server_ip=None,
+ server_port=None,
+ )
+
+ info = self.connection_info.split(' ')
+ if len(info) == 4:
+ conn['client_ip'] = info[0]
+ conn['client_port'] = info[1]
+ conn['server_ip'] = info[2]
+ conn['server_port'] = info[3]
+
+ return conn
+
+ def get_repo_details(self, mode):
+ vcs_type = mode if mode in ['svn', 'hg', 'git'] else None
+ mode = mode
+ repo_name = None
+
+ hg_pattern = r'^hg\s+\-R\s+(\S+)\s+serve\s+\-\-stdio$'
+ hg_match = re.match(hg_pattern, self.command)
+ if hg_match is not None:
+ vcs_type = 'hg'
+ repo_name = hg_match.group(1).strip('/')
+ return vcs_type, repo_name, mode
+
+ git_pattern = (
+ r'^git-(receive-pack|upload-pack)\s\'[/]?(\S+?)(|\.git)\'$')
+ git_match = re.match(git_pattern, self.command)
+ if git_match is not None:
+ vcs_type = 'git'
+ repo_name = git_match.group(2).strip('/')
+ mode = git_match.group(1)
+ return vcs_type, repo_name, mode
+
+ svn_pattern = r'^svnserve -t'
+ svn_match = re.match(svn_pattern, self.command)
+
+ if svn_match is not None:
+ vcs_type = 'svn'
+ # Repo name should be extracted from the input stream
+ return vcs_type, repo_name, mode
+
+ return vcs_type, repo_name, mode
+
+ def serve(self, vcs, repo, mode, user, permissions):
+ store = ScmModel().repos_path
+
+ log.debug(
+ 'VCS detected:`%s` mode: `%s` repo_name: %s', vcs, mode, repo)
+
+ if vcs == 'hg':
+ server = MercurialServer(
+ store=store, ini_path=self.ini_path,
+ repo_name=repo, user=user,
+ user_permissions=permissions, config=self.config, env=self.env)
+ self.server_impl = server
+ return server.run()
+
+ elif vcs == 'git':
+ server = GitServer(
+ store=store, ini_path=self.ini_path,
+ repo_name=repo, repo_mode=mode, user=user,
+ user_permissions=permissions, config=self.config, env=self.env)
+ self.server_impl = server
+ return server.run()
+
+ elif vcs == 'svn':
+ server = SubversionServer(
+ store=store, ini_path=self.ini_path,
+ repo_name=None, user=user,
+ user_permissions=permissions, config=self.config, env=self.env)
+ self.server_impl = server
+ return server.run()
+
+ else:
+ raise Exception('Unrecognised VCS: {}'.format(vcs))
+
+ def wrap(self):
+ mode = self.mode
+ user = self.user
+ user_id = self.user_id
+ key_id = self.key_id
+ shell = self.shell
+
+ scm_detected, scm_repo, scm_mode = self.get_repo_details(mode)
+
+ log.debug(
+ 'Mode: `%s` User: `%s:%s` Shell: `%s` SSH Command: `\"%s\"` '
+ 'SCM_DETECTED: `%s` SCM Mode: `%s` SCM Repo: `%s`',
+ mode, user, user_id, shell, self.command,
+ scm_detected, scm_mode, scm_repo)
+
+ # update last access time for this key
+ self.update_key_access_time(key_id)
+
+ log.debug('SSH Connection info %s', self.get_connection_info())
+
+ if shell and self.command is None:
+ log.info(
+ 'Dropping to shell, no command given and shell is allowed')
+ os.execl('/bin/bash', '-l')
+ exit_code = 1
+
+ elif scm_detected:
+ user = User.get(user_id)
+ auth_user = user.AuthUser()
+ permissions = auth_user.permissions['repositories']
+
+ try:
+ exit_code, is_updated = self.serve(
+ scm_detected, scm_repo, scm_mode, user, permissions)
+ except Exception:
+ log.exception('Error occurred during execution of SshWrapper')
+ exit_code = -1
+
+ elif self.command is None and shell is False:
+ log.error('No Command given.')
+ exit_code = -1
+
+ else:
+ log.error(
+ 'Unhandled Command: "%s" Aborting.', self.command)
+ exit_code = -1
+
+ return exit_code
diff --git a/rhodecode/apps/ssh_support/lib/backends/base.py b/rhodecode/apps/ssh_support/lib/backends/base.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/apps/ssh_support/lib/backends/base.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import sys
+import json
+import logging
+
+from rhodecode.lib.hooks_daemon import prepare_callback_daemon
+from rhodecode.lib import hooks_utils
+from rhodecode.model.scm import ScmModel
+
+log = logging.getLogger(__name__)
+
+
+class VcsServer(object):
+ _path = None # set executable path for hg/git/svn binary
+ backend = None # set in child classes
+ tunnel = None # subprocess handling tunnel
+ write_perms = ['repository.admin', 'repository.write']
+ read_perms = ['repository.read', 'repository.admin', 'repository.write']
+
+ def __init__(self, user, user_permissions, config, env):
+ self.user = user
+ self.user_permissions = user_permissions
+ self.config = config
+ self.env = env
+ self.stdin = sys.stdin
+
+ self.repo_name = None
+ self.repo_mode = None
+ self.store = ''
+ self.ini_path = ''
+
+ def _invalidate_cache(self, repo_name):
+ """
+ Set's cache for this repository for invalidation on next access
+
+ :param repo_name: full repo name, also a cache key
+ """
+ ScmModel().mark_for_invalidation(repo_name)
+
+ def has_write_perm(self):
+ permission = self.user_permissions.get(self.repo_name)
+ if permission in ['repository.write', 'repository.admin']:
+ return True
+
+ return False
+
+ def _check_permissions(self, action):
+ permission = self.user_permissions.get(self.repo_name)
+ log.debug(
+ 'permission for %s on %s are: %s',
+ self.user, self.repo_name, permission)
+
+ if action == 'pull':
+ if permission in self.read_perms:
+ log.info(
+ 'READ Permissions for User "%s" detected to repo "%s"!',
+ self.user, self.repo_name)
+ return 0
+ else:
+ if permission in self.write_perms:
+ log.info(
+ 'WRITE+ Permissions for User "%s" detected to repo "%s"!',
+ self.user, self.repo_name)
+ return 0
+
+ log.error('Cannot properly fetch or allow user permissions. '
+ 'Return value is: %s, req action: %s', permission, action)
+ return -2
+
+ def update_environment(self, action, extras=None):
+
+ scm_data = {
+ 'ip': os.environ['SSH_CLIENT'].split()[0],
+ 'username': self.user.username,
+ 'action': action,
+ 'repository': self.repo_name,
+ 'scm': self.backend,
+ 'config': self.ini_path,
+ 'make_lock': None,
+ 'locked_by': [None, None],
+ 'server_url': None,
+ 'is_shadow_repo': False,
+ 'hooks_module': 'rhodecode.lib.hooks_daemon',
+ 'hooks': ['push', 'pull'],
+ 'SSH': True,
+ 'SSH_PERMISSIONS': self.user_permissions.get(self.repo_name)
+ }
+ if extras:
+ scm_data.update(extras)
+ os.putenv("RC_SCM_DATA", json.dumps(scm_data))
+
+ def get_root_store(self):
+ root_store = self.store
+ if not root_store.endswith('/'):
+ # always append trailing slash
+ root_store = root_store + '/'
+ return root_store
+
+ def _handle_tunnel(self, extras):
+ # pre-auth
+ action = 'pull'
+ exit_code = self._check_permissions(action)
+ if exit_code:
+ return exit_code, False
+
+ req = self.env['request']
+ server_url = req.host_url + req.script_name
+ extras['server_url'] = server_url
+
+ log.debug('Using %s binaries from path %s', self.backend, self._path)
+ exit_code = self.tunnel.run(extras)
+
+ return exit_code, action == "push"
+
+ def run(self):
+ extras = {}
+ HOOKS_PROTOCOL = self.config.get('app:main', 'vcs.hooks.protocol')
+
+ callback_daemon, extras = prepare_callback_daemon(
+ extras, protocol=HOOKS_PROTOCOL,
+ use_direct_calls=False)
+
+ with callback_daemon:
+ try:
+ return self._handle_tunnel(extras)
+ finally:
+ log.debug('Running cleanup with cache invalidation')
+ if self.repo_name:
+ self._invalidate_cache(self.repo_name)
diff --git a/rhodecode/apps/ssh_support/lib/backends/git.py b/rhodecode/apps/ssh_support/lib/backends/git.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/apps/ssh_support/lib/backends/git.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import sys
+import logging
+
+from .base import VcsServer
+
+log = logging.getLogger(__name__)
+
+
+class GitTunnelWrapper(object):
+ process = None
+
+ def __init__(self, server):
+ self.server = server
+ self.stdin = sys.stdin
+ self.stdout = sys.stdout
+
+ def create_hooks_env(self):
+ pass
+
+ def command(self):
+ root = self.server.get_root_store()
+ command = "cd {root}; {git_path} {mode} '{root}{repo_name}'".format(
+ root=root, git_path=self.server.git_path,
+ mode=self.server.repo_mode, repo_name=self.server.repo_name)
+ log.debug("Final CMD: %s", command)
+ return command
+
+ def run(self, extras):
+ action = "push" if self.server.repo_mode == "receive-pack" else "pull"
+ exit_code = self.server._check_permissions(action)
+ if exit_code:
+ return exit_code
+
+ self.server.update_environment(action=action, extras=extras)
+ self.create_hooks_env()
+ return os.system(self.command())
+
+
+class GitServer(VcsServer):
+ backend = 'git'
+
+ def __init__(self, store, ini_path, repo_name, repo_mode,
+ user, user_permissions, config, env):
+ super(GitServer, self).\
+ __init__(user, user_permissions, config, env)
+
+ self.store = store
+ self.ini_path = ini_path
+ self.repo_name = repo_name
+ self._path = self.git_path = config.get(
+ 'app:main', 'ssh.executable.git')
+
+ self.repo_mode = repo_mode
+ self.tunnel = GitTunnelWrapper(server=self)
diff --git a/rhodecode/apps/ssh_support/lib/backends/hg.py b/rhodecode/apps/ssh_support/lib/backends/hg.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/apps/ssh_support/lib/backends/hg.py
@@ -0,0 +1,123 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import sys
+import shutil
+import logging
+import tempfile
+import textwrap
+
+from .base import VcsServer
+
+log = logging.getLogger(__name__)
+
+
+class MercurialTunnelWrapper(object):
+ process = None
+
+ def __init__(self, server):
+ self.server = server
+ self.stdin = sys.stdin
+ self.stdout = sys.stdout
+ self.svn_conf_fd, self.svn_conf_path = tempfile.mkstemp()
+ self.hooks_env_fd, self.hooks_env_path = tempfile.mkstemp()
+
+ def create_hooks_env(self):
+
+ content = textwrap.dedent(
+ '''
+ # SSH hooks version=1.0.0
+ [hooks]
+ pretxnchangegroup.ssh_auth=python:vcsserver.hooks.pre_push_ssh_auth
+ pretxnchangegroup.ssh=python:vcsserver.hooks.pre_push_ssh
+ changegroup.ssh=python:vcsserver.hooks.post_push_ssh
+
+ preoutgoing.ssh=python:vcsserver.hooks.pre_pull_ssh
+ outgoing.ssh=python:vcsserver.hooks.post_pull_ssh
+
+ '''
+ )
+
+ with os.fdopen(self.hooks_env_fd, 'w') as hooks_env_file:
+ hooks_env_file.write(content)
+ root = self.server.get_root_store()
+
+ hgrc_custom = os.path.join(
+ root, self.server.repo_name, '.hg', 'hgrc_rhodecode')
+ log.debug('Wrote custom hgrc file under %s', hgrc_custom)
+ shutil.move(
+ self.hooks_env_path, hgrc_custom)
+
+ hgrc_main = os.path.join(
+ root, self.server.repo_name, '.hg', 'hgrc')
+ include_marker = '%include hgrc_rhodecode'
+
+ if not os.path.isfile(hgrc_main):
+ os.mknod(hgrc_main)
+
+ with open(hgrc_main, 'rb') as f:
+ data = f.read()
+ has_marker = include_marker in data
+
+ if not has_marker:
+ log.debug('Adding include marker for hooks')
+ with open(hgrc_main, 'wa') as f:
+ f.write(textwrap.dedent('''
+ # added by RhodeCode
+ {}
+ '''.format(include_marker)))
+
+ def command(self):
+ root = self.server.get_root_store()
+
+ command = (
+ "cd {root}; {hg_path} -R {root}{repo_name} "
+ "serve --stdio".format(
+ root=root, hg_path=self.server.hg_path,
+ repo_name=self.server.repo_name))
+ log.debug("Final CMD: %s", command)
+ return command
+
+ def run(self, extras):
+ # at this point we cannot tell, we do further ACL checks
+ # inside the hooks
+ action = '?'
+ # permissions are check via `pre_push_ssh_auth` hook
+ self.server.update_environment(action=action, extras=extras)
+ self.create_hooks_env()
+ return os.system(self.command())
+
+
+class MercurialServer(VcsServer):
+ backend = 'hg'
+
+ def __init__(self, store, ini_path, repo_name,
+ user, user_permissions, config, env):
+ super(MercurialServer, self).\
+ __init__(user, user_permissions, config, env)
+
+ self.store = store
+ self.ini_path = ini_path
+ self.repo_name = repo_name
+ self._path = self.hg_path = config.get(
+ 'app:main', 'ssh.executable.hg')
+
+ self.tunnel = MercurialTunnelWrapper(server=self)
diff --git a/rhodecode/apps/ssh_support/lib/backends/svn.py b/rhodecode/apps/ssh_support/lib/backends/svn.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/apps/ssh_support/lib/backends/svn.py
@@ -0,0 +1,228 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import re
+import sys
+import logging
+import signal
+import tempfile
+from subprocess import Popen, PIPE
+import urlparse
+
+from .base import VcsServer
+
+log = logging.getLogger(__name__)
+
+
+class SubversionTunnelWrapper(object):
+ process = None
+
+ def __init__(self, server):
+ self.server = server
+ self.timeout = 30
+ self.stdin = sys.stdin
+ self.stdout = sys.stdout
+ self.svn_conf_fd, self.svn_conf_path = tempfile.mkstemp()
+ self.hooks_env_fd, self.hooks_env_path = tempfile.mkstemp()
+
+ self.read_only = True # flag that we set to make the hooks readonly
+
+ def create_svn_config(self):
+ content = (
+ '[general]\n'
+ 'hooks-env = {}\n').format(self.hooks_env_path)
+ with os.fdopen(self.svn_conf_fd, 'w') as config_file:
+ config_file.write(content)
+
+ def create_hooks_env(self):
+ content = (
+ '[default]\n'
+ 'LANG = en_US.UTF-8\n')
+ if self.read_only:
+ content += 'SSH_READ_ONLY = 1\n'
+ with os.fdopen(self.hooks_env_fd, 'w') as hooks_env_file:
+ hooks_env_file.write(content)
+
+ def remove_configs(self):
+ os.remove(self.svn_conf_path)
+ os.remove(self.hooks_env_path)
+
+ def command(self):
+ root = self.server.get_root_store()
+ command = [
+ self.server.svn_path, '-t',
+ '--config-file', self.svn_conf_path,
+ '-r', root]
+ log.debug("Final CMD: %s", command)
+ return command
+
+ def start(self):
+ command = self.command()
+ self.process = Popen(command, stdin=PIPE)
+
+ def sync(self):
+ while self.process.poll() is None:
+ next_byte = self.stdin.read(1)
+ if not next_byte:
+ break
+ self.process.stdin.write(next_byte)
+ self.remove_configs()
+
+ @property
+ def return_code(self):
+ return self.process.returncode
+
+ def get_first_client_response(self):
+ signal.signal(signal.SIGALRM, self.interrupt)
+ signal.alarm(self.timeout)
+ first_response = self._read_first_client_response()
+ signal.alarm(0)
+ return (
+ self._parse_first_client_response(first_response)
+ if first_response else None)
+
+ def patch_first_client_response(self, response, **kwargs):
+ self.create_hooks_env()
+ data = response.copy()
+ data.update(kwargs)
+ data['url'] = self._svn_string(data['url'])
+ data['ra_client'] = self._svn_string(data['ra_client'])
+ data['client'] = data['client'] or ''
+ buffer_ = (
+ "( {version} ( {capabilities} ) {url}{ra_client}"
+ "( {client}) ) ".format(**data))
+ self.process.stdin.write(buffer_)
+
+ def fail(self, message):
+ print(
+ "( failure ( ( 210005 {message} 0: 0 ) ) )".format(
+ message=self._svn_string(message)))
+ self.remove_configs()
+ self.process.kill()
+
+ def interrupt(self, signum, frame):
+ self.fail("Exited by timeout")
+
+ def _svn_string(self, str_):
+ if not str_:
+ return ''
+ return '{length}:{string} '.format(length=len(str_), string=str_)
+
+ def _read_first_client_response(self):
+ buffer_ = ""
+ brackets_stack = []
+ while True:
+ next_byte = self.stdin.read(1)
+ buffer_ += next_byte
+ if next_byte == "(":
+ brackets_stack.append(next_byte)
+ elif next_byte == ")":
+ brackets_stack.pop()
+ elif next_byte == " " and not brackets_stack:
+ break
+ return buffer_
+
+ def _parse_first_client_response(self, buffer_):
+ """
+ According to the Subversion RA protocol, the first request
+ should look like:
+
+ ( version:number ( cap:word ... ) url:string ? ra-client:string
+ ( ? client:string ) )
+
+ Please check https://svn.apache.org/repos/asf/subversion/trunk/
+ subversion/libsvn_ra_svn/protocol
+ """
+ version_re = r'(?P\d+)'
+ capabilities_re = r'\(\s(?P[\w\d\-\ ]+)\s\)'
+ url_re = r'\d+\:(?P[\W\w]+)'
+ ra_client_re = r'(\d+\:(?P[\W\w]+)\s)'
+ client_re = r'(\d+\:(?P[\W\w]+)\s)*'
+ regex = re.compile(
+ r'^\(\s{version}\s{capabilities}\s{url}\s{ra_client}'
+ r'\(\s{client}\)\s\)\s*$'.format(
+ version=version_re, capabilities=capabilities_re,
+ url=url_re, ra_client=ra_client_re, client=client_re))
+ matcher = regex.match(buffer_)
+ return matcher.groupdict() if matcher else None
+
+ def run(self, extras):
+ action = 'pull'
+ self.create_svn_config()
+ self.start()
+
+ first_response = self.get_first_client_response()
+ if not first_response:
+ self.fail("Repository name cannot be extracted")
+ return 1
+
+ url_parts = urlparse.urlparse(first_response['url'])
+ self.server.repo_name = url_parts.path.strip('/')
+
+ exit_code = self.server._check_permissions(action)
+ if exit_code:
+ return exit_code
+
+ # set the readonly flag to False if we have proper permissions
+ if self.server.has_write_perm():
+ self.read_only = False
+ self.server.update_environment(action=action, extras=extras)
+
+ self.patch_first_client_response(first_response)
+ self.sync()
+ return self.return_code
+
+
+class SubversionServer(VcsServer):
+ backend = 'svn'
+
+ def __init__(self, store, ini_path, repo_name,
+ user, user_permissions, config, env):
+ super(SubversionServer, self)\
+ .__init__(user, user_permissions, config, env)
+ self.store = store
+ self.ini_path = ini_path
+ # this is set in .run() from input stream
+ self.repo_name = repo_name
+ self._path = self.svn_path = config.get(
+ 'app:main', 'ssh.executable.svn')
+
+ self.tunnel = SubversionTunnelWrapper(server=self)
+
+ def _handle_tunnel(self, extras):
+
+ # pre-auth
+ action = 'pull'
+ # Special case for SVN, we extract repo name at later stage
+ # exit_code = self._check_permissions(action)
+ # if exit_code:
+ # return exit_code, False
+
+ req = self.env['request']
+ server_url = req.host_url + req.script_name
+ extras['server_url'] = server_url
+
+ log.debug('Using %s binaries from path %s', self.backend, self._path)
+ exit_code = self.tunnel.run(extras)
+
+ return exit_code, action == "push"
+
+
diff --git a/rhodecode/apps/ssh_support/lib/ssh_wrapper.py b/rhodecode/apps/ssh_support/lib/ssh_wrapper.py
--- a/rhodecode/apps/ssh_support/lib/ssh_wrapper.py
+++ b/rhodecode/apps/ssh_support/lib/ssh_wrapper.py
@@ -67,12 +67,14 @@ def main(ini_path, mode, user, user_id,
'Please make sure this is set and available during execution '
'of this script.')
connection_info = os.environ.get('SSH_CONNECTION', '')
- request = Request.blank('/', base_url='http://rhodecode-ssh-wrapper/')
+
+ # TODO(marcink): configure the running host...
+ request = Request.blank('/', base_url='http://localhost:8080')
with bootstrap(ini_path, request=request) as env:
try:
ssh_wrapper = SshWrapper(
command, connection_info, mode,
- user, user_id, key_id, shell, ini_path)
+ user, user_id, key_id, shell, ini_path, env)
except Exception:
log.exception('Failed to execute SshWrapper')
sys.exit(-5)
diff --git a/rhodecode/apps/ssh_support/tests/conftest.py b/rhodecode/apps/ssh_support/tests/conftest.py
new file mode 100644
--- /dev/null
+++ b/rhodecode/apps/ssh_support/tests/conftest.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2016-2017 RhodeCode GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3
+# (only), as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+# This program is dual-licensed. If you wish to learn more about the
+# RhodeCode Enterprise Edition, including its added features, Support services,
+# and proprietary license terms, please see https://rhodecode.com/licenses/
+
+import os
+import pytest
+import ConfigParser
+
+from rhodecode.apps.ssh_support.lib.ssh_wrapper import SshWrapper
+from rhodecode.lib.utils2 import AttributeDict
+
+
+@pytest.fixture
+def dummy_conf_file(tmpdir):
+ conf = ConfigParser.ConfigParser()
+ conf.add_section('app:main')
+ conf.set('app:main', 'ssh.executable.hg', '/usr/bin/hg')
+ conf.set('app:main', 'ssh.executable.git', '/usr/bin/git')
+ conf.set('app:main', 'ssh.executable.svn', '/usr/bin/svnserve')
+
+ f_path = os.path.join(str(tmpdir), 'ssh_wrapper_test.ini')
+ with open(f_path, 'wb') as f:
+ conf.write(f)
+
+ return os.path.join(f_path)
+
+
+@pytest.fixture
+def dummy_env():
+ return {
+ 'request':
+ AttributeDict(host_url='http://localhost', script_name='/')
+ }
+
+
+@pytest.fixture
+def dummy_user():
+ return AttributeDict(username='test_user')
+
+
+@pytest.fixture
+def ssh_wrapper(app, dummy_conf_file, dummy_env):
+ conn_info = '127.0.0.1 22 10.0.0.1 443'
+ return SshWrapper(
+ 'random command', conn_info, 'auto', 'admin', '1', key_id='1',
+ shell=False, ini_path=dummy_conf_file, env=dummy_env)
diff --git a/rhodecode/apps/ssh_support/tests/test_server_git.py b/rhodecode/apps/ssh_support/tests/test_server_git.py
--- a/rhodecode/apps/ssh_support/tests/test_server_git.py
+++ b/rhodecode/apps/ssh_support/tests/test_server_git.py
@@ -19,88 +19,100 @@
# and proprietary license terms, please see https://rhodecode.com/licenses/
import json
-
+import mock
import pytest
-from mock import Mock, patch
from rhodecode.apps.ssh_support.lib.backends.git import GitServer
-
-
-@pytest.fixture
-def git_server():
- return GitServerCreator()
+from rhodecode.apps.ssh_support.tests.conftest import dummy_env, dummy_user
class GitServerCreator(object):
root = '/tmp/repo/path/'
- git_path = '/usr/local/bin/'
+ git_path = '/usr/local/bin/git'
config_data = {
'app:main': {
- 'ssh.executable.git': git_path
+ 'ssh.executable.git': git_path,
+ 'vcs.hooks.protocol': 'http',
}
}
repo_name = 'test_git'
repo_mode = 'receive-pack'
- user = 'vcs'
+ user = dummy_user()
def __init__(self):
def config_get(part, key):
return self.config_data.get(part, {}).get(key)
- self.config_mock = Mock()
- self.config_mock.get = Mock(side_effect=config_get)
+ self.config_mock = mock.Mock()
+ self.config_mock.get = mock.Mock(side_effect=config_get)
def create(self, **kwargs):
parameters = {
- 'store': {'path': self.root},
+ 'store': self.root,
'ini_path': '',
'user': self.user,
'repo_name': self.repo_name,
'repo_mode': self.repo_mode,
'user_permissions': {
- self.repo_name: 'repo_admin'
+ self.repo_name: 'repository.admin'
},
'config': self.config_mock,
+ 'env': dummy_env()
}
parameters.update(kwargs)
server = GitServer(**parameters)
return server
+@pytest.fixture
+def git_server(app):
+ return GitServerCreator()
+
+
class TestGitServer(object):
+
def test_command(self, git_server):
server = git_server.create()
- server.read_only = False
expected_command = (
- 'cd {root}; {git_path}-{repo_mode}'
- ' \'{root}{repo_name}\''.format(
+ 'cd {root}; {git_path} {repo_mode} \'{root}{repo_name}\''.format(
root=git_server.root, git_path=git_server.git_path,
repo_mode=git_server.repo_mode, repo_name=git_server.repo_name)
)
- assert expected_command == server.command
+ assert expected_command == server.tunnel.command()
+
+ @pytest.mark.parametrize('permissions, action, code', [
+ ({}, 'pull', -2),
+ ({'test_git': 'repository.read'}, 'pull', 0),
+ ({'test_git': 'repository.read'}, 'push', -2),
+ ({'test_git': 'repository.write'}, 'push', 0),
+ ({'test_git': 'repository.admin'}, 'push', 0),
- def test_run_returns_exit_code_2_when_no_permissions(self, git_server, caplog):
+ ])
+ def test_permission_checks(self, git_server, permissions, action, code):
+ server = git_server.create(user_permissions=permissions)
+ result = server._check_permissions(action)
+ assert result is code
+
+ @pytest.mark.parametrize('permissions, value', [
+ ({}, False),
+ ({'test_git': 'repository.read'}, False),
+ ({'test_git': 'repository.write'}, True),
+ ({'test_git': 'repository.admin'}, True),
+
+ ])
+ def test_has_write_permissions(self, git_server, permissions, value):
+ server = git_server.create(user_permissions=permissions)
+ result = server.has_write_perm()
+ assert result is value
+
+ def test_run_returns_executes_command(self, git_server):
server = git_server.create()
- with patch.object(server, '_check_permissions') as permissions_mock:
- with patch.object(server, '_update_environment'):
- permissions_mock.return_value = 2
+ from rhodecode.apps.ssh_support.lib.backends.git import GitTunnelWrapper
+ with mock.patch.object(GitTunnelWrapper, 'create_hooks_env') as _patch:
+ _patch.return_value = 0
+ with mock.patch.object(GitTunnelWrapper, 'command', return_value='date'):
exit_code = server.run()
- assert exit_code == (2, False)
-
- def test_run_returns_executes_command(self, git_server, caplog):
- server = git_server.create()
- with patch.object(server, '_check_permissions') as permissions_mock:
- with patch('os.system') as system_mock:
- with patch.object(server, '_update_environment') as (
- update_mock):
- permissions_mock.return_value = 0
- system_mock.return_value = 0
- exit_code = server.run()
-
- system_mock.assert_called_once_with(server.command)
- update_mock.assert_called_once_with()
-
- assert exit_code == (0, True)
+ assert exit_code == (0, False)
@pytest.mark.parametrize(
'repo_mode, action', [
@@ -109,88 +121,25 @@ class TestGitServer(object):
])
def test_update_environment(self, git_server, repo_mode, action):
server = git_server.create(repo_mode=repo_mode)
- with patch('os.environ', {'SSH_CLIENT': '10.10.10.10 b'}):
- with patch('os.putenv') as putenv_mock:
- server._update_environment()
+ with mock.patch('os.environ', {'SSH_CLIENT': '10.10.10.10 b'}):
+ with mock.patch('os.putenv') as putenv_mock:
+ server.update_environment(action)
expected_data = {
- "username": git_server.user,
- "scm": "git",
- "repository": git_server.repo_name,
- "make_lock": None,
- "action": [action],
- "ip": "10.10.10.10",
- "locked_by": [None, None],
- "config": ""
+ 'username': git_server.user.username,
+ 'scm': 'git',
+ 'repository': git_server.repo_name,
+ 'make_lock': None,
+ 'action': action,
+ 'ip': '10.10.10.10',
+ 'locked_by': [None, None],
+ 'config': '',
+ 'server_url': None,
+ 'hooks': ['push', 'pull'],
+ 'is_shadow_repo': False,
+ 'hooks_module': 'rhodecode.lib.hooks_daemon',
+ 'SSH': True,
+ 'SSH_PERMISSIONS': 'repository.admin',
}
args, kwargs = putenv_mock.call_args
assert json.loads(args[1]) == expected_data
-
-
-class TestGitServerCheckPermissions(object):
- def test_returns_2_when_no_permissions_found(self, git_server, caplog):
- user_permissions = {}
- server = git_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result == 2
-
- log_msg = 'permission for vcs on test_git are: None'
- assert log_msg in [t[2] for t in caplog.record_tuples]
-
- def test_returns_2_when_no_permissions(self, git_server, caplog):
- user_permissions = {git_server.repo_name: 'repository.none'}
- server = git_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result == 2
-
- log_msg = 'repo not found or no permissions'
- assert log_msg in [t[2] for t in caplog.record_tuples]
-
- @pytest.mark.parametrize(
- 'permission', ['repository.admin', 'repository.write'])
- def test_access_allowed_when_user_has_write_permissions(
- self, git_server, permission, caplog):
- user_permissions = {git_server.repo_name: permission}
- server = git_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result is None
-
- log_msg = 'Write Permissions for User "%s" granted to repo "%s"!' % (
- git_server.user, git_server.repo_name)
- assert log_msg in [t[2] for t in caplog.record_tuples]
-
- def test_write_access_is_not_allowed_when_user_has_read_permission(
- self, git_server, caplog):
- user_permissions = {git_server.repo_name: 'repository.read'}
- server = git_server.create(
- user_permissions=user_permissions, repo_mode='receive-pack')
- result = server._check_permissions()
- assert result == -3
-
- log_msg = 'Only Read Only access for User "%s" granted to repo "%s"! Failing!' % (
- git_server.user, git_server.repo_name)
- assert log_msg in [t[2] for t in caplog.record_tuples]
-
- def test_read_access_allowed_when_user_has_read_permission(
- self, git_server, caplog):
- user_permissions = {git_server.repo_name: 'repository.read'}
- server = git_server.create(
- user_permissions=user_permissions, repo_mode='upload-pack')
- result = server._check_permissions()
- assert result is None
-
- log_msg = 'Only Read Only access for User "%s" granted to repo "%s"!' % (
- git_server.user, git_server.repo_name)
- assert log_msg in [t[2] for t in caplog.record_tuples]
-
- def test_returns_error_when_permission_not_recognised(
- self, git_server, caplog):
- user_permissions = {git_server.repo_name: 'repository.whatever'}
- server = git_server.create(
- user_permissions=user_permissions, repo_mode='upload-pack')
- result = server._check_permissions()
- assert result == -2
-
- log_msg = 'Cannot properly fetch user permission. ' \
- 'Return value is: repository.whatever'
- assert log_msg in [t[2] for t in caplog.record_tuples]
\ No newline at end of file
diff --git a/rhodecode/apps/ssh_support/tests/test_server_hg.py b/rhodecode/apps/ssh_support/tests/test_server_hg.py
--- a/rhodecode/apps/ssh_support/tests/test_server_hg.py
+++ b/rhodecode/apps/ssh_support/tests/test_server_hg.py
@@ -18,15 +18,11 @@
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
+import mock
import pytest
-from mock import Mock, patch
from rhodecode.apps.ssh_support.lib.backends.hg import MercurialServer
-
-
-@pytest.fixture
-def hg_server():
- return MercurialServerCreator()
+from rhodecode.apps.ssh_support.tests.conftest import dummy_env, dummy_user
class MercurialServerCreator(object):
@@ -35,112 +31,86 @@ class MercurialServerCreator(object):
config_data = {
'app:main': {
- 'ssh.executable.hg': hg_path
+ 'ssh.executable.hg': hg_path,
+ 'vcs.hooks.protocol': 'http',
}
}
repo_name = 'test_hg'
- user = 'vcs'
+ user = dummy_user()
def __init__(self):
def config_get(part, key):
return self.config_data.get(part, {}).get(key)
- self.config_mock = Mock()
- self.config_mock.get = Mock(side_effect=config_get)
+ self.config_mock = mock.Mock()
+ self.config_mock.get = mock.Mock(side_effect=config_get)
def create(self, **kwargs):
parameters = {
- 'store': {'path': self.root},
+ 'store': self.root,
'ini_path': '',
'user': self.user,
'repo_name': self.repo_name,
'user_permissions': {
- 'test_hg': 'repo_admin'
+ 'test_hg': 'repository.admin'
},
'config': self.config_mock,
+ 'env': dummy_env()
}
parameters.update(kwargs)
server = MercurialServer(**parameters)
return server
+@pytest.fixture
+def hg_server(app):
+ return MercurialServerCreator()
+
+
class TestMercurialServer(object):
- def test_read_only_command(self, hg_server):
+
+ def test_command(self, hg_server):
server = hg_server.create()
- server.read_only = True
expected_command = (
- 'cd {root}; {hg_path} -R {root}{repo_name} serve --stdio'
- ' --config hooks.pretxnchangegroup="false"'.format(
- root=hg_server.root, hg_path=hg_server.hg_path,
- repo_name=hg_server.repo_name)
- )
- assert expected_command == server.command
-
- def test_normal_command(self, hg_server):
- server = hg_server.create()
- server.read_only = False
- expected_command = (
- 'cd {root}; {hg_path} -R {root}{repo_name} serve --stdio '.format(
+ 'cd {root}; {hg_path} -R {root}{repo_name} serve --stdio'.format(
root=hg_server.root, hg_path=hg_server.hg_path,
repo_name=hg_server.repo_name)
)
- assert expected_command == server.command
-
- def test_access_rejected_when_permissions_are_not_found(self, hg_server, caplog):
- user_permissions = {}
- server = hg_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result is False
-
- log_msg = 'repo not found or no permissions'
- assert log_msg in [t[2] for t in caplog.record_tuples]
+ assert expected_command == server.tunnel.command()
- def test_access_rejected_when_no_permissions(self, hg_server, caplog):
- user_permissions = {hg_server.repo_name: 'repository.none'}
- server = hg_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result is False
-
- log_msg = 'repo not found or no permissions'
- assert log_msg in [t[2] for t in caplog.record_tuples]
+ @pytest.mark.parametrize('permissions, action, code', [
+ ({}, 'pull', -2),
+ ({'test_hg': 'repository.read'}, 'pull', 0),
+ ({'test_hg': 'repository.read'}, 'push', -2),
+ ({'test_hg': 'repository.write'}, 'push', 0),
+ ({'test_hg': 'repository.admin'}, 'push', 0),
- @pytest.mark.parametrize(
- 'permission', ['repository.admin', 'repository.write'])
- def test_access_allowed_when_user_has_write_permissions(
- self, hg_server, permission, caplog):
- user_permissions = {hg_server.repo_name: permission}
- server = hg_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result is True
+ ])
+ def test_permission_checks(self, hg_server, permissions, action, code):
+ server = hg_server.create(user_permissions=permissions)
+ result = server._check_permissions(action)
+ assert result is code
- assert server.read_only is False
- log_msg = 'Write Permissions for User "vcs" granted to repo "test_hg"!'
- assert log_msg in [t[2] for t in caplog.record_tuples]
-
- def test_access_allowed_when_user_has_read_permissions(self, hg_server, caplog):
- user_permissions = {hg_server.repo_name: 'repository.read'}
- server = hg_server.create(user_permissions=user_permissions)
- result = server._check_permissions()
- assert result is True
+ @pytest.mark.parametrize('permissions, value', [
+ ({}, False),
+ ({'test_hg': 'repository.read'}, False),
+ ({'test_hg': 'repository.write'}, True),
+ ({'test_hg': 'repository.admin'}, True),
- assert server.read_only is True
- log_msg = 'Only Read Only access for User "%s" granted to repo "%s"!' % (
- hg_server.user, hg_server.repo_name)
- assert log_msg in [t[2] for t in caplog.record_tuples]
+ ])
+ def test_has_write_permissions(self, hg_server, permissions, value):
+ server = hg_server.create(user_permissions=permissions)
+ result = server.has_write_perm()
+ assert result is value
- def test_run_returns_exit_code_2_when_no_permissions(self, hg_server, caplog):
+ def test_run_returns_executes_command(self, hg_server):
server = hg_server.create()
- with patch.object(server, '_check_permissions') as permissions_mock:
- permissions_mock.return_value = False
- exit_code = server.run()
- assert exit_code == (2, False)
-
- def test_run_returns_executes_command(self, hg_server, caplog):
- server = hg_server.create()
- with patch.object(server, '_check_permissions') as permissions_mock:
- with patch('os.system') as system_mock:
- permissions_mock.return_value = True
- system_mock.return_value = 0
+ from rhodecode.apps.ssh_support.lib.backends.hg import MercurialTunnelWrapper
+ with mock.patch.object(MercurialTunnelWrapper, 'create_hooks_env') as _patch:
+ _patch.return_value = 0
+ with mock.patch.object(MercurialTunnelWrapper, 'command', return_value='date'):
exit_code = server.run()
- system_mock.assert_called_once_with(server.command)
assert exit_code == (0, False)
+
+
+
diff --git a/rhodecode/apps/ssh_support/tests/test_server_svn.py b/rhodecode/apps/ssh_support/tests/test_server_svn.py
--- a/rhodecode/apps/ssh_support/tests/test_server_svn.py
+++ b/rhodecode/apps/ssh_support/tests/test_server_svn.py
@@ -18,15 +18,11 @@
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
+import mock
import pytest
-from mock import Mock, patch
from rhodecode.apps.ssh_support.lib.backends.svn import SubversionServer
-
-
-@pytest.fixture
-def svn_server():
- return SubversionServerCreator()
+from rhodecode.apps.ssh_support.tests.conftest import dummy_env, dummy_user
class SubversionServerCreator(object):
@@ -34,103 +30,95 @@ class SubversionServerCreator(object):
svn_path = '/usr/local/bin/svnserve'
config_data = {
'app:main': {
- 'ssh.executable.svn': svn_path
+ 'ssh.executable.svn': svn_path,
+ 'vcs.hooks.protocol': 'http',
}
}
repo_name = 'test-svn'
- user = 'vcs'
+ user = dummy_user()
def __init__(self):
def config_get(part, key):
return self.config_data.get(part, {}).get(key)
- self.config_mock = Mock()
- self.config_mock.get = Mock(side_effect=config_get)
+ self.config_mock = mock.Mock()
+ self.config_mock.get = mock.Mock(side_effect=config_get)
def create(self, **kwargs):
parameters = {
- 'store': {'path': self.root},
+ 'store': self.root,
+ 'repo_name': self.repo_name,
'ini_path': '',
'user': self.user,
'user_permissions': {
- self.repo_name: 'repo_admin'
+ self.repo_name: 'repository.admin'
},
'config': self.config_mock,
+ 'env': dummy_env()
}
+
parameters.update(kwargs)
server = SubversionServer(**parameters)
return server
+@pytest.fixture
+def svn_server(app):
+ return SubversionServerCreator()
+
+
class TestSubversionServer(object):
- def test_timeout_returns_value_from_config(self, svn_server):
+ def test_command(self, svn_server):
server = svn_server.create()
- assert server.timeout == 30
+ expected_command = [
+ svn_server.svn_path, '-t', '--config-file',
+ server.tunnel.svn_conf_path, '-r', svn_server.root
+ ]
- @pytest.mark.parametrize(
- 'permission', ['repository.admin', 'repository.write'])
- def test_check_permissions_with_write_permissions(
- self, svn_server, permission):
- user_permissions = {svn_server.repo_name: permission}
- server = svn_server.create(user_permissions=user_permissions)
- server.tunnel = Mock()
- server.repo_name = svn_server.repo_name
- result = server._check_permissions()
- assert result is True
- assert server.tunnel.read_only is False
+ assert expected_command == server.tunnel.command()
- def test_check_permissions_with_read_permissions(self, svn_server):
- user_permissions = {svn_server.repo_name: 'repository.read'}
- server = svn_server.create(user_permissions=user_permissions)
- server.tunnel = Mock()
- server.repo_name = svn_server.repo_name
- result = server._check_permissions()
- assert result is True
- assert server.tunnel.read_only is True
+ @pytest.mark.parametrize('permissions, action, code', [
+ ({}, 'pull', -2),
+ ({'test-svn': 'repository.read'}, 'pull', 0),
+ ({'test-svn': 'repository.read'}, 'push', -2),
+ ({'test-svn': 'repository.write'}, 'push', 0),
+ ({'test-svn': 'repository.admin'}, 'push', 0),
+
+ ])
+ def test_permission_checks(self, svn_server, permissions, action, code):
+ server = svn_server.create(user_permissions=permissions)
+ result = server._check_permissions(action)
+ assert result is code
- def test_check_permissions_with_no_permissions(self, svn_server, caplog):
- tunnel_mock = Mock()
- user_permissions = {}
- server = svn_server.create(user_permissions=user_permissions)
- server.tunnel = tunnel_mock
- server.repo_name = svn_server.repo_name
- result = server._check_permissions()
- assert result is False
- tunnel_mock.fail.assert_called_once_with(
- "Not enough permissions for repository {}".format(
- svn_server.repo_name))
-
- def test_run_returns_1_when_repository_name_cannot_be_extracted(
- self, svn_server):
- server = svn_server.create()
- with patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.SubversionTunnelWrapper') as tunnel_mock:
- tunnel_mock().get_first_client_response.return_value = None
- exit_code = server.run()
- assert exit_code == (1, False)
- tunnel_mock().fail.assert_called_once_with(
- 'Repository name cannot be extracted')
-
- def test_run_returns_tunnel_return_code(self, svn_server, caplog):
+ def test_run_returns_executes_command(self, svn_server):
server = svn_server.create()
- fake_response = {
- 'url': 'ssh+svn://test@example.com/test-svn/'
- }
- with patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.SubversionTunnelWrapper') as tunnel_mock:
- with patch.object(server, '_check_permissions') as (
- permissions_mock):
- permissions_mock.return_value = True
- tunnel = tunnel_mock()
- tunnel.get_first_client_response.return_value = fake_response
- tunnel.return_code = 0
- exit_code = server.run()
- permissions_mock.assert_called_once_with()
+ from rhodecode.apps.ssh_support.lib.backends.svn import SubversionTunnelWrapper
+ with mock.patch.object(
+ SubversionTunnelWrapper, 'get_first_client_response',
+ return_value={'url': 'http://server/test-svn'}):
+ with mock.patch.object(
+ SubversionTunnelWrapper, 'patch_first_client_response',
+ return_value=0):
+ with mock.patch.object(
+ SubversionTunnelWrapper, 'sync',
+ return_value=0):
+ with mock.patch.object(
+ SubversionTunnelWrapper, 'command',
+ return_value='date'):
- expected_log_calls = sorted([
- "Using subversion binaries from '%s'" % svn_server.svn_path
- ])
+ exit_code = server.run()
+ # SVN has this differently configured, and we get in our mock env
+ # None as return code
+ assert exit_code == (None, False)
- assert expected_log_calls == [t[2] for t in caplog.record_tuples]
+ def test_run_returns_executes_command_that_cannot_extract_repo_name(self, svn_server):
+ server = svn_server.create()
+ from rhodecode.apps.ssh_support.lib.backends.svn import SubversionTunnelWrapper
+ with mock.patch.object(
+ SubversionTunnelWrapper, 'command',
+ return_value='date'):
+ with mock.patch.object(
+ SubversionTunnelWrapper, 'get_first_client_response',
+ return_value=None):
+ exit_code = server.run()
- assert exit_code == (0, False)
- tunnel.patch_first_client_response.assert_called_once_with(
- fake_response)
- tunnel.sync.assert_called_once_with()
+ assert exit_code == (1, False)
diff --git a/rhodecode/apps/ssh_support/tests/test_ssh_wrapper.py b/rhodecode/apps/ssh_support/tests/test_ssh_wrapper.py
--- a/rhodecode/apps/ssh_support/tests/test_ssh_wrapper.py
+++ b/rhodecode/apps/ssh_support/tests/test_ssh_wrapper.py
@@ -17,185 +17,38 @@
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
-import os
-import mock
+
import pytest
-import ConfigParser
-
-from rhodecode.apps.ssh_support.lib.ssh_wrapper import SshWrapper
-
-
-@pytest.fixture
-def dummy_conf(tmpdir):
- conf = ConfigParser.ConfigParser()
- conf.add_section('app:main')
- conf.set('app:main', 'ssh.executable.hg', '/usr/bin/hg')
- conf.set('app:main', 'ssh.executable.git', '/usr/bin/git')
- conf.set('app:main', 'ssh.executable.svn', '/usr/bin/svnserve')
-
- f_path = os.path.join(str(tmpdir), 'ssh_wrapper_test.ini')
- with open(f_path, 'wb') as f:
- conf.write(f)
-
- return os.path.join(f_path)
-
-
-class TestGetRepoDetails(object):
- @pytest.mark.parametrize(
- 'command', [
- 'hg -R test-repo serve --stdio',
- 'hg -R test-repo serve --stdio'
- ])
- def test_hg_command_matched(self, command, dummy_conf):
- wrapper = SshWrapper(command, 'auto', 'admin', '3', 'False', dummy_conf)
- type_, name, mode = wrapper.get_repo_details('auto')
- assert type_ == 'hg'
- assert name == 'test-repo'
- assert mode is 'auto'
-
- @pytest.mark.parametrize(
- 'command', [
- 'hg test-repo serve --stdio',
- 'hg -R test-repo serve',
- 'hg serve --stdio',
- 'hg serve -R test-repo'
- ])
- def test_hg_command_not_matched(self, command, dummy_conf):
- wrapper = SshWrapper(command, 'auto', 'admin', '3', 'False', dummy_conf)
- type_, name, mode = wrapper.get_repo_details('auto')
- assert type_ is None
- assert name is None
- assert mode is 'auto'
-
-
-class TestServe(object):
- def test_serve_raises_an_exception_when_vcs_is_not_recognized(self, dummy_conf):
- with mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_repo_store'):
- wrapper = SshWrapper('random command', 'auto', 'admin', '3', 'False', dummy_conf)
-
- with pytest.raises(Exception) as exc_info:
- wrapper.serve(
- vcs='microsoft-tfs', repo='test-repo', mode=None, user='test',
- permissions={})
- assert exc_info.value.message == 'Unrecognised VCS: microsoft-tfs'
-class TestServeHg(object):
-
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.invalidate_cache')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_user_permissions')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_repo_store')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.MercurialServer.run')
- def test_serve_creates_hg_instance(
- self, mercurial_run_mock, get_repo_store_mock, get_user_mock,
- invalidate_cache_mock, dummy_conf):
-
- repo_name = None
- mercurial_run_mock.return_value = 0, True
- get_user_mock.return_value = {repo_name: 'repository.admin'}
- get_repo_store_mock.return_value = {'path': '/tmp'}
-
- wrapper = SshWrapper('date', 'hg', 'admin', '3', 'False',
- dummy_conf)
- exit_code = wrapper.wrap()
- assert exit_code == 0
- assert mercurial_run_mock.called
-
- assert get_repo_store_mock.called
- assert get_user_mock.called
- invalidate_cache_mock.assert_called_once_with(repo_name)
+class TestSSHWrapper(object):
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.invalidate_cache')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_user_permissions')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_repo_store')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.MercurialServer.run')
- def test_serve_hg_invalidates_cache(
- self, mercurial_run_mock, get_repo_store_mock, get_user_mock,
- invalidate_cache_mock, dummy_conf):
-
- repo_name = None
- mercurial_run_mock.return_value = 0, True
- get_user_mock.return_value = {repo_name: 'repository.admin'}
- get_repo_store_mock.return_value = {'path': '/tmp'}
+ def test_serve_raises_an_exception_when_vcs_is_not_recognized(self, ssh_wrapper):
+ with pytest.raises(Exception) as exc_info:
+ ssh_wrapper.serve(
+ vcs='microsoft-tfs', repo='test-repo', mode=None, user='test',
+ permissions={})
+ assert exc_info.value.message == 'Unrecognised VCS: microsoft-tfs'
- wrapper = SshWrapper('date', 'hg', 'admin', '3', 'False',
- dummy_conf)
- exit_code = wrapper.wrap()
- assert exit_code == 0
- assert mercurial_run_mock.called
-
- assert get_repo_store_mock.called
- assert get_user_mock.called
- invalidate_cache_mock.assert_called_once_with(repo_name)
-
-
-class TestServeGit(object):
+ def test_parse_config(self, ssh_wrapper):
+ config = ssh_wrapper.parse_config(ssh_wrapper.ini_path)
+ assert config
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.invalidate_cache')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_user_permissions')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_repo_store')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.GitServer.run')
- def test_serve_creates_git_instance(self, git_run_mock, get_repo_store_mock, get_user_mock,
- invalidate_cache_mock, dummy_conf):
- repo_name = None
- git_run_mock.return_value = 0, True
- get_user_mock.return_value = {repo_name: 'repository.admin'}
- get_repo_store_mock.return_value = {'path': '/tmp'}
-
- wrapper = SshWrapper('date', 'git', 'admin', '3', 'False',
- dummy_conf)
-
- exit_code = wrapper.wrap()
- assert exit_code == 0
- assert git_run_mock.called
- assert get_repo_store_mock.called
- assert get_user_mock.called
- invalidate_cache_mock.assert_called_once_with(repo_name)
-
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.invalidate_cache')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_user_permissions')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_repo_store')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.GitServer.run')
- def test_serve_git_invalidates_cache(
- self, git_run_mock, get_repo_store_mock, get_user_mock,
- invalidate_cache_mock, dummy_conf):
- repo_name = None
- git_run_mock.return_value = 0, True
- get_user_mock.return_value = {repo_name: 'repository.admin'}
- get_repo_store_mock.return_value = {'path': '/tmp'}
+ def test_get_connection_info(self, ssh_wrapper):
+ conn_info = ssh_wrapper.get_connection_info()
+ assert {'client_ip': '127.0.0.1',
+ 'client_port': '22',
+ 'server_ip': '10.0.0.1',
+ 'server_port': '443'} == conn_info
- wrapper = SshWrapper('date', 'git', 'admin', '3', 'False', dummy_conf)
-
- exit_code = wrapper.wrap()
- assert exit_code == 0
- assert git_run_mock.called
-
- assert get_repo_store_mock.called
- assert get_user_mock.called
- invalidate_cache_mock.assert_called_once_with(repo_name)
-
-
-class TestServeSvn(object):
+ @pytest.mark.parametrize('command, vcs', [
+ ('xxx', None),
+ ('svnserve -t', 'svn'),
+ ('hg -R repo serve --stdio', 'hg'),
+ ('git-receive-pack \'repo.git\'', 'git'),
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.invalidate_cache')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_user_permissions')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.RhodeCodeApiClient.get_repo_store')
- @mock.patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.SubversionServer.run')
- def test_serve_creates_svn_instance(
- self, svn_run_mock, get_repo_store_mock, get_user_mock,
- invalidate_cache_mock, dummy_conf):
-
- repo_name = None
- svn_run_mock.return_value = 0, True
- get_user_mock.return_value = {repo_name: 'repository.admin'}
- get_repo_store_mock.return_value = {'path': '/tmp'}
-
- wrapper = SshWrapper('date', 'svn', 'admin', '3', 'False', dummy_conf)
-
- exit_code = wrapper.wrap()
- assert exit_code == 0
- assert svn_run_mock.called
-
- assert get_repo_store_mock.called
- assert get_user_mock.called
- invalidate_cache_mock.assert_called_once_with(repo_name)
+ ])
+ def test_get_repo_details(self, ssh_wrapper, command, vcs):
+ ssh_wrapper.command = command
+ vcs_type, repo_name, mode = ssh_wrapper.get_repo_details(mode='auto')
+ assert vcs_type == vcs
diff --git a/rhodecode/apps/ssh_support/tests/test_svn_tunnel_wrapper.py b/rhodecode/apps/ssh_support/tests/test_svn_tunnel_wrapper.py
deleted file mode 100644
--- a/rhodecode/apps/ssh_support/tests/test_svn_tunnel_wrapper.py
+++ /dev/null
@@ -1,285 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2016-2017 RhodeCode GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License, version 3
-# (only), as published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-# This program is dual-licensed. If you wish to learn more about the
-# RhodeCode Enterprise Edition, including its added features, Support services,
-# and proprietary license terms, please see https://rhodecode.com/licenses/
-
-
-import subprocess
-from io import BytesIO
-from time import sleep
-
-import pytest
-from mock import patch, Mock, MagicMock, call
-
-from rhodecode.apps.ssh_support.lib.backends.svn import SubversionTunnelWrapper
-from rhodecode.tests import no_newline_id_generator
-
-
-class TestSubversionTunnelWrapper(object):
- @pytest.mark.parametrize(
- 'input_string, output_string', [
- [None, ''],
- ['abcde', '5:abcde '],
- ['abcdefghijk', '11:abcdefghijk ']
- ])
- def test_svn_string(self, input_string, output_string):
- wrapper = SubversionTunnelWrapper(timeout=5)
- assert wrapper._svn_string(input_string) == output_string
-
- def test_read_first_client_response(self):
- wrapper = SubversionTunnelWrapper(timeout=5)
- buffer_ = '( abcd ( efg hij ) ) '
- wrapper.stdin = BytesIO(buffer_)
- result = wrapper._read_first_client_response()
- assert result == buffer_
-
- def test_parse_first_client_response_returns_dict(self):
- response = (
- '( 2 ( edit-pipeline svndiff1 absent-entries depth mergeinfo'
- ' log-revprops ) 26:svn+ssh://vcs@vm/hello-svn 38:SVN/1.8.11'
- ' (x86_64-apple-darwin14.1.0) ( ) ) ')
- wrapper = SubversionTunnelWrapper(timeout=5)
- result = wrapper._parse_first_client_response(response)
- assert result['version'] == '2'
- assert (
- result['capabilities'] ==
- 'edit-pipeline svndiff1 absent-entries depth mergeinfo'
- ' log-revprops')
- assert result['url'] == 'svn+ssh://vcs@vm/hello-svn'
- assert result['ra_client'] == 'SVN/1.8.11 (x86_64-apple-darwin14.1.0)'
- assert result['client'] is None
-
- def test_parse_first_client_response_returns_none_when_not_matched(self):
- response = (
- '( 2 ( edit-pipeline svndiff1 absent-entries depth mergeinfo'
- ' log-revprops ) ) ')
- wrapper = SubversionTunnelWrapper(timeout=5)
- result = wrapper._parse_first_client_response(response)
- assert result is None
-
- def test_interrupt(self):
- wrapper = SubversionTunnelWrapper(timeout=5)
- with patch.object(wrapper, 'fail') as fail_mock:
- wrapper.interrupt(1, 'frame')
- fail_mock.assert_called_once_with("Exited by timeout")
-
- def test_fail(self):
- process_mock = Mock()
- wrapper = SubversionTunnelWrapper(timeout=5)
- with patch.object(wrapper, 'remove_configs') as remove_configs_mock:
- with patch('sys.stdout', new_callable=BytesIO) as stdout_mock:
- with patch.object(wrapper, 'process') as process_mock:
- wrapper.fail('test message')
- assert (
- stdout_mock.getvalue() ==
- '( failure ( ( 210005 12:test message 0: 0 ) ) )\n')
- process_mock.kill.assert_called_once_with()
- remove_configs_mock.assert_called_once_with()
-
- @pytest.mark.parametrize(
- 'client, expected_client', [
- ['test ', 'test '],
- ['', ''],
- [None, '']
- ])
- def test_client_in_patch_first_client_response(
- self, client, expected_client):
- response = {
- 'version': 2,
- 'capabilities': 'edit-pipeline svndiff1 absent-entries depth',
- 'url': 'svn+ssh://example.com/svn',
- 'ra_client': 'SVN/1.8.11 (x86_64-apple-darwin14.1.0)',
- 'client': client
- }
- wrapper = SubversionTunnelWrapper(timeout=5)
- stdin = BytesIO()
- with patch.object(wrapper, 'process') as process_mock:
- process_mock.stdin = stdin
- wrapper.patch_first_client_response(response)
- assert (
- stdin.getvalue() ==
- '( 2 ( edit-pipeline svndiff1 absent-entries depth )'
- ' 25:svn+ssh://example.com/svn 38:SVN/1.8.11'
- ' (x86_64-apple-darwin14.1.0) ( {expected_client}) ) '.format(
- expected_client=expected_client))
-
- def test_kwargs_override_data_in_patch_first_client_response(self):
- response = {
- 'version': 2,
- 'capabilities': 'edit-pipeline svndiff1 absent-entries depth',
- 'url': 'svn+ssh://example.com/svn',
- 'ra_client': 'SVN/1.8.11 (x86_64-apple-darwin14.1.0)',
- 'client': 'test'
- }
- wrapper = SubversionTunnelWrapper(timeout=5)
- stdin = BytesIO()
- with patch.object(wrapper, 'process') as process_mock:
- process_mock.stdin = stdin
- wrapper.patch_first_client_response(
- response, version=3, client='abcde ',
- capabilities='absent-entries depth',
- url='svn+ssh://example.org/test',
- ra_client='SVN/1.8.12 (ubuntu 14.04)')
- assert (
- stdin.getvalue() ==
- '( 3 ( absent-entries depth ) 26:svn+ssh://example.org/test'
- ' 25:SVN/1.8.12 (ubuntu 14.04) ( abcde ) ) ')
-
- def test_patch_first_client_response_sets_environment(self):
- response = {
- 'version': 2,
- 'capabilities': 'edit-pipeline svndiff1 absent-entries depth',
- 'url': 'svn+ssh://example.com/svn',
- 'ra_client': 'SVN/1.8.11 (x86_64-apple-darwin14.1.0)',
- 'client': 'test'
- }
- wrapper = SubversionTunnelWrapper(timeout=5)
- stdin = BytesIO()
- with patch.object(wrapper, 'create_hooks_env') as create_hooks_mock:
- with patch.object(wrapper, 'process') as process_mock:
- process_mock.stdin = stdin
- wrapper.patch_first_client_response(response)
- create_hooks_mock.assert_called_once_with()
-
- def test_get_first_client_response_exits_by_signal(self):
- wrapper = SubversionTunnelWrapper(timeout=1)
- read_patch = patch.object(wrapper, '_read_first_client_response')
- parse_patch = patch.object(wrapper, '_parse_first_client_response')
- interrupt_patch = patch.object(wrapper, 'interrupt')
-
- with read_patch as read_mock, parse_patch as parse_mock, \
- interrupt_patch as interrupt_mock:
- read_mock.side_effect = lambda: sleep(3)
- wrapper.get_first_client_response()
-
- assert parse_mock.call_count == 0
- assert interrupt_mock.call_count == 1
-
- def test_get_first_client_response_parses_data(self):
- wrapper = SubversionTunnelWrapper(timeout=5)
- response = (
- '( 2 ( edit-pipeline svndiff1 absent-entries depth mergeinfo'
- ' log-revprops ) 26:svn+ssh://vcs@vm/hello-svn 38:SVN/1.8.11'
- ' (x86_64-apple-darwin14.1.0) ( ) ) ')
- read_patch = patch.object(wrapper, '_read_first_client_response')
- parse_patch = patch.object(wrapper, '_parse_first_client_response')
-
- with read_patch as read_mock, parse_patch as parse_mock:
- read_mock.return_value = response
- wrapper.get_first_client_response()
-
- parse_mock.assert_called_once_with(response)
-
- def test_return_code(self):
- wrapper = SubversionTunnelWrapper(timeout=5)
- with patch.object(wrapper, 'process') as process_mock:
- process_mock.returncode = 1
- assert wrapper.return_code == 1
-
- def test_sync_loop_breaks_when_process_cannot_be_polled(self):
- self.counter = 0
- buffer_ = 'abcdefghij'
-
- wrapper = SubversionTunnelWrapper(timeout=5)
- wrapper.stdin = BytesIO(buffer_)
- with patch.object(wrapper, 'remove_configs') as remove_configs_mock:
- with patch.object(wrapper, 'process') as process_mock:
- process_mock.poll.side_effect = self._poll
- process_mock.stdin = BytesIO()
- wrapper.sync()
- assert process_mock.stdin.getvalue() == 'abcde'
- remove_configs_mock.assert_called_once_with()
-
- def test_sync_loop_breaks_when_nothing_to_read(self):
- self.counter = 0
- buffer_ = 'abcdefghij'
-
- wrapper = SubversionTunnelWrapper(timeout=5)
- wrapper.stdin = BytesIO(buffer_)
- with patch.object(wrapper, 'remove_configs') as remove_configs_mock:
- with patch.object(wrapper, 'process') as process_mock:
- process_mock.poll.return_value = None
- process_mock.stdin = BytesIO()
- wrapper.sync()
- assert process_mock.stdin.getvalue() == buffer_
- remove_configs_mock.assert_called_once_with()
-
- def test_start_without_repositories_root(self):
- svn_path = '/usr/local/bin/svnserve'
- wrapper = SubversionTunnelWrapper(timeout=5, svn_path=svn_path)
- with patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.Popen') as popen_mock:
- wrapper.start()
- expected_command = [
- svn_path, '-t', '--config-file', wrapper.svn_conf_path]
- popen_mock.assert_called_once_with(
- expected_command, stdin=subprocess.PIPE)
- assert wrapper.process == popen_mock()
-
- def test_start_with_repositories_root(self):
- svn_path = '/usr/local/bin/svnserve'
- repositories_root = '/home/repos'
- wrapper = SubversionTunnelWrapper(
- timeout=5, svn_path=svn_path, repositories_root=repositories_root)
- with patch('rhodecode.apps.ssh_support.lib.ssh_wrapper.Popen') as popen_mock:
- wrapper.start()
- expected_command = [
- svn_path, '-t', '--config-file', wrapper.svn_conf_path,
- '-r', repositories_root]
- popen_mock.assert_called_once_with(
- expected_command, stdin=subprocess.PIPE)
- assert wrapper.process == popen_mock()
-
- def test_create_svn_config(self):
- wrapper = SubversionTunnelWrapper(timeout=5)
- file_mock = MagicMock(spec=file)
- with patch('os.fdopen', create=True) as open_mock:
- open_mock.return_value = file_mock
- wrapper.create_svn_config()
- open_mock.assert_called_once_with(wrapper.svn_conf_fd, 'w')
- expected_content = '[general]\nhooks-env = {}\n'.format(
- wrapper.hooks_env_path)
- file_handle = file_mock.__enter__.return_value
- file_handle.write.assert_called_once_with(expected_content)
-
- @pytest.mark.parametrize(
- 'read_only, expected_content', [
- [True, '[default]\nLANG = en_US.UTF-8\nSSH_READ_ONLY = 1\n'],
- [False, '[default]\nLANG = en_US.UTF-8\n']
- ], ids=no_newline_id_generator)
- def test_create_hooks_env(self, read_only, expected_content):
- wrapper = SubversionTunnelWrapper(timeout=5)
- wrapper.read_only = read_only
- file_mock = MagicMock(spec=file)
- with patch('os.fdopen', create=True) as open_mock:
- open_mock.return_value = file_mock
- wrapper.create_hooks_env()
- open_mock.assert_called_once_with(wrapper.hooks_env_fd, 'w')
- file_handle = file_mock.__enter__.return_value
- file_handle.write.assert_called_once_with(expected_content)
-
- def test_remove_configs(self):
- wrapper = SubversionTunnelWrapper(timeout=5)
- with patch('os.remove') as remove_mock:
- wrapper.remove_configs()
- expected_calls = [
- call(wrapper.svn_conf_path), call(wrapper.hooks_env_path)]
- assert sorted(remove_mock.call_args_list) == sorted(expected_calls)
-
- def _poll(self):
- self.counter += 1
- return None if self.counter < 6 else 1
diff --git a/rhodecode/lib/base.py b/rhodecode/lib/base.py
--- a/rhodecode/lib/base.py
+++ b/rhodecode/lib/base.py
@@ -628,7 +628,6 @@ def bootstrap_request(**kwargs):
request = TestRequest(**kwargs)
request.session = TestDummySession()
-
config = pyramid.testing.setUp(request=request)
add_events_routes(config)
return request
diff --git a/rhodecode/lib/hooks_daemon.py b/rhodecode/lib/hooks_daemon.py
--- a/rhodecode/lib/hooks_daemon.py
+++ b/rhodecode/lib/hooks_daemon.py
@@ -209,9 +209,9 @@ class Hooks(object):
def _call_hook(self, hook, extras):
extras = AttributeDict(extras)
+ server_url = extras['server_url']
- extras.request = bootstrap_request(
- application_url=extras['server_url'])
+ extras.request = bootstrap_request(application_url=server_url)
try:
result = hook(extras)
@@ -229,6 +229,7 @@ class Hooks(object):
finally:
meta.Session.remove()
+ log.debug('Got hook call response %s', result)
return {
'status': result.status,
'output': result.output,