|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
# You may obtain a copy of the License at
|
|
|
#
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
#
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
import argparse
|
|
|
import getpass
|
|
|
import logging
|
|
|
|
|
|
from pyramid.paster import setup_logging, bootstrap
|
|
|
from pyramid.threadlocal import get_current_request
|
|
|
from ziggurat_foundations.models.services.user import UserService
|
|
|
|
|
|
|
|
|
from appenlight.forms import UserRegisterForm
|
|
|
from appenlight.lib.ext_json import json
|
|
|
from appenlight.models import (
|
|
|
DBSession,
|
|
|
Group,
|
|
|
GroupPermission,
|
|
|
User,
|
|
|
AuthToken
|
|
|
)
|
|
|
from appenlight.models.services.group import GroupService
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
_ = str
|
|
|
|
|
|
|
|
|
def is_yes(input_data):
|
|
|
return input_data in ['y', 'yes']
|
|
|
|
|
|
|
|
|
def is_no(input_data):
|
|
|
return input_data in ['n', 'no']
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description='Populate AppEnlight database',
|
|
|
add_help=False)
|
|
|
parser.add_argument('-c', '--config', required=True,
|
|
|
help='Configuration ini file of application')
|
|
|
parser.add_argument('--username', default=None,
|
|
|
help='User to create')
|
|
|
parser.add_argument('--password', default=None,
|
|
|
help='Password for created user')
|
|
|
parser.add_argument('--email', default=None,
|
|
|
help='Email for created user')
|
|
|
parser.add_argument('--auth-token', default=None,
|
|
|
help='Auth token for created user')
|
|
|
args = parser.parse_args()
|
|
|
config_uri = args.config
|
|
|
|
|
|
setup_logging(config_uri)
|
|
|
env = bootstrap(config_uri)
|
|
|
request = env['request']
|
|
|
with get_current_request().tm:
|
|
|
group = GroupService.by_id(1)
|
|
|
if not group:
|
|
|
group = Group(id=1, group_name='Administrators',
|
|
|
description="Top level permission owners")
|
|
|
DBSession.add(group)
|
|
|
permission = GroupPermission(perm_name='root_administration')
|
|
|
group.permissions.append(permission)
|
|
|
|
|
|
create_user = True if args.username else None
|
|
|
while create_user is None:
|
|
|
response = input(
|
|
|
'Do you want to create a new admin? (n)\n').lower()
|
|
|
|
|
|
if is_yes(response or 'n'):
|
|
|
create_user = True
|
|
|
elif is_no(response or 'n'):
|
|
|
create_user = False
|
|
|
|
|
|
if create_user:
|
|
|
csrf_token = request.session.get_csrf_token()
|
|
|
user_name = args.username
|
|
|
print('*********************************************************')
|
|
|
while user_name is None:
|
|
|
response = input('What is the username of new admin?\n')
|
|
|
form = UserRegisterForm(
|
|
|
user_name=response, csrf_token=csrf_token,
|
|
|
csrf_context=request)
|
|
|
form.validate()
|
|
|
if form.user_name.errors:
|
|
|
print(form.user_name.errors[0])
|
|
|
else:
|
|
|
user_name = response
|
|
|
print('The admin username is "{}"\n'.format(user_name))
|
|
|
print('*********************************************************')
|
|
|
email = args.email
|
|
|
while email is None:
|
|
|
response = input('What is the email of admin account?\n')
|
|
|
form = UserRegisterForm(
|
|
|
email=response, csrf_token=csrf_token,
|
|
|
csrf_context=request)
|
|
|
form.validate()
|
|
|
if form.email.errors:
|
|
|
print(form.email.errors[0])
|
|
|
else:
|
|
|
email = response
|
|
|
print('The admin email is "{}"\n'.format(email))
|
|
|
print('*********************************************************')
|
|
|
user_password = args.password
|
|
|
confirmed_password = args.password
|
|
|
while user_password is None or confirmed_password is None:
|
|
|
response = getpass.getpass(
|
|
|
'What is the password for admin account?\n')
|
|
|
form = UserRegisterForm(
|
|
|
user_password=response, csrf_token=csrf_token,
|
|
|
csrf_context=request)
|
|
|
form.validate()
|
|
|
if form.user_password.errors:
|
|
|
print(form.user_password.errors[0])
|
|
|
else:
|
|
|
user_password = response
|
|
|
|
|
|
response = getpass.getpass('Please confirm the password.\n')
|
|
|
if user_password == response:
|
|
|
confirmed_password = response
|
|
|
else:
|
|
|
print('Passwords do not match. Please try again')
|
|
|
print('*********************************************************')
|
|
|
|
|
|
with get_current_request().tm:
|
|
|
if create_user:
|
|
|
group = GroupService.by_id(1)
|
|
|
user = User(user_name=user_name, email=email, status=1)
|
|
|
UserService.regenerate_security_code(user)
|
|
|
UserService.set_password(user, user_password)
|
|
|
DBSession.add(user)
|
|
|
token = AuthToken(description="Uptime monitoring token")
|
|
|
if args.auth_token:
|
|
|
token.token = args.auth_token
|
|
|
user.auth_tokens.append(token)
|
|
|
group.users.append(user)
|
|
|
print('USER CREATED')
|
|
|
print(json.dumps(user.get_dict()))
|
|
|
print('*********************************************************')
|
|
|
print('AUTH TOKEN')
|
|
|
print(json.dumps(user.auth_tokens[0].get_dict()))
|
|
|
|