initialize_db.py
149 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import argparse | ||||
import getpass | ||||
import logging | ||||
from pyramid.paster import setup_logging, bootstrap | ||||
from pyramid.threadlocal import get_current_request | ||||
r135 | from ziggurat_foundations.models.services.user import UserService | |||
r0 | ||||
from appenlight.forms import UserRegisterForm | ||||
from appenlight.lib.ext_json import json | ||||
r153 | from appenlight.models import DBSession, Group, GroupPermission, User, AuthToken | |||
r0 | from appenlight.models.services.group import GroupService | |||
log = logging.getLogger(__name__) | ||||
_ = str | ||||
def is_yes(input_data): | ||||
r153 | return input_data in ["y", "yes"] | |||
r0 | ||||
def is_no(input_data): | ||||
r153 | return input_data in ["n", "no"] | |||
r0 | ||||
def main(): | ||||
parser = argparse.ArgumentParser( | ||||
r153 | description="Populate AppEnlight database", add_help=False | |||
) | ||||
parser.add_argument( | ||||
"-c", "--config", required=True, help="Configuration ini file of application" | ||||
) | ||||
parser.add_argument("--username", default=None, help="User to create") | ||||
parser.add_argument("--password", default=None, help="Password for created user") | ||||
parser.add_argument("--email", default=None, help="Email for created user") | ||||
parser.add_argument( | ||||
"--auth-token", default=None, help="Auth token for created user" | ||||
) | ||||
r0 | args = parser.parse_args() | |||
config_uri = args.config | ||||
setup_logging(config_uri) | ||||
env = bootstrap(config_uri) | ||||
r153 | request = env["request"] | |||
r0 | with get_current_request().tm: | |||
group = GroupService.by_id(1) | ||||
if not group: | ||||
r153 | group = Group( | |||
id=1, | ||||
group_name="Administrators", | ||||
description="Top level permission owners", | ||||
) | ||||
r0 | DBSession.add(group) | |||
r153 | permission = GroupPermission(perm_name="root_administration") | |||
r0 | group.permissions.append(permission) | |||
create_user = True if args.username else None | ||||
while create_user is None: | ||||
r153 | response = input("Do you want to create a new admin? (n)\n").lower() | |||
r0 | ||||
r153 | if is_yes(response or "n"): | |||
r0 | create_user = True | |||
r153 | elif is_no(response or "n"): | |||
r0 | create_user = False | |||
if create_user: | ||||
csrf_token = request.session.get_csrf_token() | ||||
user_name = args.username | ||||
r153 | print("*********************************************************") | |||
r0 | while user_name is None: | |||
r153 | response = input("What is the username of new admin?\n") | |||
r0 | form = UserRegisterForm( | |||
r153 | user_name=response, csrf_token=csrf_token, csrf_context=request | |||
) | ||||
r0 | form.validate() | |||
if form.user_name.errors: | ||||
print(form.user_name.errors[0]) | ||||
else: | ||||
user_name = response | ||||
print('The admin username is "{}"\n'.format(user_name)) | ||||
r153 | print("*********************************************************") | |||
r0 | email = args.email | |||
while email is None: | ||||
r153 | response = input("What is the email of admin account?\n") | |||
r0 | form = UserRegisterForm( | |||
r153 | email=response, csrf_token=csrf_token, csrf_context=request | |||
) | ||||
r0 | form.validate() | |||
if form.email.errors: | ||||
print(form.email.errors[0]) | ||||
else: | ||||
email = response | ||||
print('The admin email is "{}"\n'.format(email)) | ||||
r153 | print("*********************************************************") | |||
r0 | user_password = args.password | |||
confirmed_password = args.password | ||||
while user_password is None or confirmed_password is None: | ||||
r153 | response = getpass.getpass("What is the password for admin account?\n") | |||
r0 | form = UserRegisterForm( | |||
r153 | user_password=response, csrf_token=csrf_token, csrf_context=request | |||
) | ||||
r0 | form.validate() | |||
if form.user_password.errors: | ||||
print(form.user_password.errors[0]) | ||||
else: | ||||
user_password = response | ||||
r153 | response = getpass.getpass("Please confirm the password.\n") | |||
r0 | if user_password == response: | |||
confirmed_password = response | ||||
else: | ||||
r153 | print("Passwords do not match. Please try again") | |||
print("*********************************************************") | ||||
r0 | ||||
with get_current_request().tm: | ||||
if create_user: | ||||
group = GroupService.by_id(1) | ||||
user = User(user_name=user_name, email=email, status=1) | ||||
r135 | UserService.regenerate_security_code(user) | |||
UserService.set_password(user, user_password) | ||||
r0 | DBSession.add(user) | |||
token = AuthToken(description="Uptime monitoring token") | ||||
if args.auth_token: | ||||
token.token = args.auth_token | ||||
user.auth_tokens.append(token) | ||||
group.users.append(user) | ||||
r153 | print("USER CREATED") | |||
r0 | print(json.dumps(user.get_dict())) | |||
r153 | print("*********************************************************") | |||
print("AUTH TOKEN") | ||||
r0 | print(json.dumps(user.auth_tokens[0].get_dict())) | |||