Make sure that legacy templates are deleted and empty db check is done on the new templates

This commit is contained in:
allegroai 2024-03-18 15:40:13 +02:00
parent d3013ac285
commit 6112910768
4 changed files with 62 additions and 21 deletions

View File

@ -8,15 +8,18 @@ import logging
from pathlib import Path from pathlib import Path
from typing import Optional, Sequence, Tuple from typing import Optional, Sequence, Tuple
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch, exceptions
HERE = Path(__file__).resolve().parent HERE = Path(__file__).resolve().parent
logging.getLogger('elasticsearch').setLevel(logging.WARNING) logging.getLogger("elasticsearch").setLevel(logging.WARNING)
logging.getLogger('elastic_transport').setLevel(logging.WARNING) logging.getLogger("elastic_transport").setLevel(logging.WARNING)
def apply_mappings_to_cluster( def apply_mappings_to_cluster(
hosts: Sequence, key: Optional[str] = None, es_args: dict = None, http_auth: Tuple = None hosts: Sequence,
key: Optional[str] = None,
es_args: dict = None,
http_auth: Tuple = None,
): ):
"""Hosts maybe a sequence of strings or dicts in the form {"host": <host>, "port": <port>}""" """Hosts maybe a sequence of strings or dicts in the form {"host": <host>, "port": <port>}"""
@ -34,21 +37,33 @@ def apply_mappings_to_cluster(
res = es.indices.put_index_template(name=template_name, body=body) res = es.indices.put_index_template(name=template_name, body=body)
return {"index_template": template_name, "result": res} return {"index_template": template_name, "result": res}
def _send_template(f): # def _send_legacy_template(f):
with f.open() as json_data: # with f.open() as json_data:
data = json.load(json_data) # data = json.load(json_data)
template_name = f.stem # template_name = f.stem
res = es.indices.put_template(name=template_name, body=data) # res = es.indices.put_template(name=template_name, body=data)
return {"mapping": template_name, "result": res} # return {"mapping": template_name, "result": res}
def _delete_legacy_templates(legacy_folder):
res_list = []
for lt in legacy_folder.glob("*.json"):
template_name = lt.stem
try:
if not es.indices.get_template(name=template_name):
continue
res = es.indices.delete_template(name=template_name)
except exceptions.NotFoundError:
continue
res_list.append({"deleted legacy mapping": template_name, "result": res})
return res_list
es = Elasticsearch(hosts=hosts, http_auth=http_auth, **(es_args or {})) es = Elasticsearch(hosts=hosts, http_auth=http_auth, **(es_args or {}))
p = HERE / "index_templates" root = HERE / "index_templates"
if key: if key:
folders = [p / key] folders = [root / key]
else: else:
folders = [ folders = [f for f in root.iterdir() if f.is_dir()]
f for f in p.iterdir() if f.is_dir()
]
ret = [] ret = []
for f in folders: for f in folders:
@ -57,6 +72,13 @@ def apply_mappings_to_cluster(
for it in f.glob("*.json"): for it in f.glob("*.json"):
ret.append(_send_index_template(it)) ret.append(_send_index_template(it))
legacy_root = HERE / "mappings"
for f in folders:
legacy_f = legacy_root / f.stem
if not legacy_f.exists() or not legacy_f.is_dir():
continue
ret.extend(_delete_legacy_templates(legacy_f))
return ret return ret
# p = HERE / "mappings" # p = HERE / "mappings"
# if key: # if key:

View File

@ -1,5 +1,10 @@
{ {
"index_patterns": "queue_metrics_*",
"template": { "template": {
"settings": {
"number_of_replicas": 0,
"number_of_shards": 1
},
"mappings": { "mappings": {
"_source": { "_source": {
"enabled": true "enabled": true

View File

@ -1,5 +1,10 @@
{ {
"index_patterns": "worker_stats_*",
"template": { "template": {
"settings": {
"number_of_replicas": 0,
"number_of_shards": 1
},
"mappings": { "mappings": {
"_source": { "_source": {
"enabled": true "enabled": true

View File

@ -10,8 +10,8 @@ from apiserver.config_repo import config
from apiserver.elastic.apply_mappings import apply_mappings_to_cluster from apiserver.elastic.apply_mappings import apply_mappings_to_cluster
log = config.logger(__file__) log = config.logger(__file__)
logging.getLogger('elasticsearch').setLevel(logging.WARNING) logging.getLogger("elasticsearch").setLevel(logging.WARNING)
logging.getLogger('elastic_transport').setLevel(logging.WARNING) logging.getLogger("elastic_transport").setLevel(logging.WARNING)
class MissingElasticConfiguration(Exception): class MissingElasticConfiguration(Exception):
@ -80,6 +80,18 @@ def check_elastic_empty() -> bool:
err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",) err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",)
) )
def events_legacy_template():
try:
return es.indices.get_template(name="events*")
except exceptions.NotFoundError:
return False
def events_template():
try:
return es.indices.get_index_template(name="events*")
except exceptions.NotFoundError:
return False
try: try:
es_logger.addFilter(log_filter) es_logger.addFilter(log_filter)
for retry in range(max_retries): for retry in range(max_retries):
@ -89,10 +101,7 @@ def check_elastic_empty() -> bool:
http_auth=es_factory.get_credentials("events", cluster_conf), http_auth=es_factory.get_credentials("events", cluster_conf),
**cluster_conf.get("args", {}), **cluster_conf.get("args", {}),
) )
return not es.indices.get_template(name="events*") return not (events_template() or events_legacy_template())
except exceptions.NotFoundError as ex:
log.error(ex)
return True
except exceptions.ConnectionError as ex: except exceptions.ConnectionError as ex:
if retry >= max_retries - 1: if retry >= max_retries - 1:
raise ElasticConnectionError( raise ElasticConnectionError(