# -*- coding: utf-8 -*- # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from pyramid.view import view_config from appenlight.models import DBSession, Datastores from appenlight.forms import get_partition_deletion_form import logging from zope.sqlalchemy import mark_changed from datetime import datetime import sqlalchemy as sa log = logging.getLogger(__name__) def get_partition_stats(): table_query = """ SELECT table_name FROM information_schema.tables GROUP BY table_name ORDER BY table_name """ permanent_partitions = {} daily_partitions = {} def is_int(data): try: int(data) return True except Exception: pass return False def add_key(key, holder): if not ix_time in holder: holder[ix_time] = {"pg": [], "elasticsearch": []} for partition in list(Datastores.es.indices.get_alias("rcae*")): if not partition.startswith("rcae"): continue split_data = partition.split("_") permanent = False # if we dont have a day then treat it as permanent partion if False in list(map(is_int, split_data[-3:])): ix_time = datetime( year=int(split_data[-2]), month=int(split_data[-1]), day=1 ).date() permanent = True else: ix_time = datetime( year=int(split_data[-3]), month=int(split_data[-2]), day=int(split_data[-1]), ).date() ix_time = str(ix_time) if permanent: add_key(ix_time, permanent_partitions) if ix_time not in permanent_partitions: permanent_partitions[ix_time]["elasticsearch"] = [] permanent_partitions[ix_time]["elasticsearch"].append(partition) else: add_key(ix_time, daily_partitions) if ix_time not in daily_partitions: daily_partitions[ix_time]["elasticsearch"] = [] daily_partitions[ix_time]["elasticsearch"].append(partition) for row in DBSession.execute(table_query): splitted = row["table_name"].split("_") if "p" in splitted: # dealing with partition split_data = [int(x) for x in splitted[splitted.index("p") + 1 :]] if len(split_data) == 3: ix_time = datetime(split_data[0], split_data[1], split_data[2]).date() ix_time = str(ix_time) add_key(ix_time, daily_partitions) daily_partitions[ix_time]["pg"].append(row["table_name"]) else: ix_time = datetime(split_data[0], split_data[1], 1).date() ix_time = str(ix_time) add_key(ix_time, permanent_partitions) permanent_partitions[ix_time]["pg"].append(row["table_name"]) return permanent_partitions, daily_partitions @view_config( route_name="section_view", permission="root_administration", match_param=["section=admin_section", "view=partitions"], renderer="json", request_method="GET", ) def index(request): permanent_partitions, daily_partitions = get_partition_stats() return { "permanent_partitions": sorted( list(permanent_partitions.items()), key=lambda x: x[0], reverse=True ), "daily_partitions": sorted( list(daily_partitions.items()), key=lambda x: x[0], reverse=True ), } @view_config( route_name="section_view", request_method="POST", match_param=["section=admin_section", "view=partitions_remove"], renderer="json", permission="root_administration", ) def partitions_remove(request): permanent_partitions, daily_partitions = get_partition_stats() pg_partitions = [] es_partitions = [] for item in list(permanent_partitions.values()) + list(daily_partitions.values()): es_partitions.extend(item["elasticsearch"]) pg_partitions.extend(item["pg"]) FormCls = get_partition_deletion_form(es_partitions, pg_partitions) form = FormCls( es_index=request.unsafe_json_body["es_indices"], pg_index=request.unsafe_json_body["pg_indices"], confirm=request.unsafe_json_body["confirm"], csrf_context=request, ) if form.validate(): for ix in form.data["es_index"]: log.warning("deleting ES partition: {}".format(ix)) Datastores.es.indices.delete(ix) for ix in form.data["pg_index"]: log.warning("deleting PG partition: {}".format(ix)) stmt = sa.text("DROP TABLE %s CASCADE" % sa.text(ix)) session = DBSession() session.connection().execute(stmt) mark_changed(session) for field, error in form.errors.items(): msg = "%s: %s" % (field, error[0]) request.session.flash(msg, "error") permanent_partitions, daily_partitions = get_partition_stats() return { "permanent_partitions": sorted( list(permanent_partitions.items()), key=lambda x: x[0], reverse=True ), "daily_partitions": sorted( list(daily_partitions.items()), key=lambda x: x[0], reverse=True ), }