##// END OF EJS Templates
ini: added new key
ini: added new key

File last commit:

r112:998f0d14
r129:489ce37b
Show More
partitions.py
148 lines | 5.8 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyramid.view import view_config
from appenlight.models import DBSession, Datastores
from appenlight.forms import get_partition_deletion_form
import logging
from zope.sqlalchemy import mark_changed
from datetime import datetime
import sqlalchemy as sa
log = logging.getLogger(__name__)
def get_partition_stats():
table_query = """
SELECT table_name
FROM information_schema.tables
GROUP BY table_name
ORDER BY table_name
"""
permanent_partitions = {}
daily_partitions = {}
def is_int(data):
try:
int(data)
return True
except Exception:
pass
return False
def add_key(key, holder):
if not ix_time in holder:
holder[ix_time] = {'pg': [], 'elasticsearch': []}
for partition in list(Datastores.es.aliases().keys()):
if not partition.startswith('rcae'):
continue
split_data = partition.split('_')
permanent = False
# if we dont have a day then treat it as permanent partion
if False in list(map(is_int, split_data[-3:])):
ix_time = datetime(year=int(split_data[-2]),
month=int(split_data[-1]),
day=1).date()
permanent = True
else:
ix_time = datetime(year=int(split_data[-3]),
month=int(split_data[-2]),
day=int(split_data[-1])).date()
ix_time = str(ix_time)
if permanent:
add_key(ix_time, permanent_partitions)
if ix_time not in permanent_partitions:
permanent_partitions[ix_time]['elasticsearch'] = []
permanent_partitions[ix_time]['elasticsearch'].append(partition)
else:
add_key(ix_time, daily_partitions)
if ix_time not in daily_partitions:
daily_partitions[ix_time]['elasticsearch'] = []
daily_partitions[ix_time]['elasticsearch'].append(partition)
for row in DBSession.execute(table_query):
splitted = row['table_name'].split('_')
if 'p' in splitted:
# dealing with partition
split_data = [int(x) for x in splitted[splitted.index('p') + 1:]]
if len(split_data) == 3:
ix_time = datetime(split_data[0], split_data[1],
split_data[2]).date()
ix_time = str(ix_time)
add_key(ix_time, daily_partitions)
daily_partitions[ix_time]['pg'].append(row['table_name'])
else:
ix_time = datetime(split_data[0], split_data[1], 1).date()
ix_time = str(ix_time)
add_key(ix_time, permanent_partitions)
permanent_partitions[ix_time]['pg'].append(row['table_name'])
return permanent_partitions, daily_partitions
@view_config(route_name='section_view', permission='root_administration',
match_param=['section=admin_section', 'view=partitions'],
renderer='json', request_method='GET')
def index(request):
permanent_partitions, daily_partitions = get_partition_stats()
return {"permanent_partitions": sorted(list(permanent_partitions.items()),
key=lambda x: x[0], reverse=True),
"daily_partitions": sorted(list(daily_partitions.items()),
key=lambda x: x[0], reverse=True)}
@view_config(route_name='section_view', request_method='POST',
match_param=['section=admin_section', 'view=partitions_remove'],
renderer='json', permission='root_administration')
def partitions_remove(request):
permanent_partitions, daily_partitions = get_partition_stats()
pg_partitions = []
es_partitions = []
for item in list(permanent_partitions.values()) + list(daily_partitions.values()):
es_partitions.extend(item['elasticsearch'])
pg_partitions.extend(item['pg'])
FormCls = get_partition_deletion_form(es_partitions, pg_partitions)
form = FormCls(es_index=request.unsafe_json_body['es_indices'],
pg_index=request.unsafe_json_body['pg_indices'],
confirm=request.unsafe_json_body['confirm'],
csrf_context=request)
if form.validate():
for ix in form.data['es_index']:
log.warning('deleting ES partition: {}'.format(ix))
Datastores.es.delete_index(ix)
for ix in form.data['pg_index']:
log.warning('deleting PG partition: {}'.format(ix))
stmt = sa.text('DROP TABLE %s CASCADE' % sa.text(ix))
session = DBSession()
session.connection().execute(stmt)
mark_changed(session)
for field, error in form.errors.items():
msg = '%s: %s' % (field, error[0])
request.session.flash(msg, 'error')
permanent_partitions, daily_partitions = get_partition_stats()
return {
"permanent_partitions": sorted(
list(permanent_partitions.items()), key=lambda x: x[0], reverse=True),
"daily_partitions": sorted(
list(daily_partitions.items()), key=lambda x: x[0], reverse=True)}