diff --git a/apiserver/elastic/apply_mappings.py b/apiserver/elastic/apply_mappings.py index a43f28b..7373647 100755 --- a/apiserver/elastic/apply_mappings.py +++ b/apiserver/elastic/apply_mappings.py @@ -8,15 +8,18 @@ import logging from pathlib import Path from typing import Optional, Sequence, Tuple -from elasticsearch import Elasticsearch +from elasticsearch import Elasticsearch, exceptions HERE = Path(__file__).resolve().parent -logging.getLogger('elasticsearch').setLevel(logging.WARNING) -logging.getLogger('elastic_transport').setLevel(logging.WARNING) +logging.getLogger("elasticsearch").setLevel(logging.WARNING) +logging.getLogger("elastic_transport").setLevel(logging.WARNING) def apply_mappings_to_cluster( - hosts: Sequence, key: Optional[str] = None, es_args: dict = None, http_auth: Tuple = None + hosts: Sequence, + key: Optional[str] = None, + es_args: dict = None, + http_auth: Tuple = None, ): """Hosts maybe a sequence of strings or dicts in the form {"host": , "port": }""" @@ -34,21 +37,33 @@ def apply_mappings_to_cluster( res = es.indices.put_index_template(name=template_name, body=body) return {"index_template": template_name, "result": res} - def _send_template(f): - with f.open() as json_data: - data = json.load(json_data) - template_name = f.stem - res = es.indices.put_template(name=template_name, body=data) - return {"mapping": template_name, "result": res} + # def _send_legacy_template(f): + # with f.open() as json_data: + # data = json.load(json_data) + # template_name = f.stem + # res = es.indices.put_template(name=template_name, body=data) + # return {"mapping": template_name, "result": res} + + def _delete_legacy_templates(legacy_folder): + res_list = [] + for lt in legacy_folder.glob("*.json"): + template_name = lt.stem + try: + if not es.indices.get_template(name=template_name): + continue + res = es.indices.delete_template(name=template_name) + except exceptions.NotFoundError: + continue + res_list.append({"deleted legacy mapping": template_name, "result": res}) + + return res_list es = Elasticsearch(hosts=hosts, http_auth=http_auth, **(es_args or {})) - p = HERE / "index_templates" + root = HERE / "index_templates" if key: - folders = [p / key] + folders = [root / key] else: - folders = [ - f for f in p.iterdir() if f.is_dir() - ] + folders = [f for f in root.iterdir() if f.is_dir()] ret = [] for f in folders: @@ -57,6 +72,13 @@ def apply_mappings_to_cluster( for it in f.glob("*.json"): ret.append(_send_index_template(it)) + legacy_root = HERE / "mappings" + for f in folders: + legacy_f = legacy_root / f.stem + if not legacy_f.exists() or not legacy_f.is_dir(): + continue + ret.extend(_delete_legacy_templates(legacy_f)) + return ret # p = HERE / "mappings" # if key: diff --git a/apiserver/elastic/index_templates/workers/component_templates/queue_metrics.json b/apiserver/elastic/index_templates/workers/queue_metrics.json similarity index 78% rename from apiserver/elastic/index_templates/workers/component_templates/queue_metrics.json rename to apiserver/elastic/index_templates/workers/queue_metrics.json index 46a86b3..348f66d 100644 --- a/apiserver/elastic/index_templates/workers/component_templates/queue_metrics.json +++ b/apiserver/elastic/index_templates/workers/queue_metrics.json @@ -1,5 +1,10 @@ { + "index_patterns": "queue_metrics_*", "template": { + "settings": { + "number_of_replicas": 0, + "number_of_shards": 1 + }, "mappings": { "_source": { "enabled": true diff --git a/apiserver/elastic/index_templates/workers/component_templates/worker_stats.json b/apiserver/elastic/index_templates/workers/worker_stats.json similarity index 84% rename from apiserver/elastic/index_templates/workers/component_templates/worker_stats.json rename to apiserver/elastic/index_templates/workers/worker_stats.json index 61a13e2..9a08cfc 100644 --- a/apiserver/elastic/index_templates/workers/component_templates/worker_stats.json +++ b/apiserver/elastic/index_templates/workers/worker_stats.json @@ -1,5 +1,10 @@ { + "index_patterns": "worker_stats_*", "template": { + "settings": { + "number_of_replicas": 0, + "number_of_shards": 1 + }, "mappings": { "_source": { "enabled": true diff --git a/apiserver/elastic/initialize.py b/apiserver/elastic/initialize.py index 4551c71..cf13e1a 100644 --- a/apiserver/elastic/initialize.py +++ b/apiserver/elastic/initialize.py @@ -10,8 +10,8 @@ from apiserver.config_repo import config from apiserver.elastic.apply_mappings import apply_mappings_to_cluster log = config.logger(__file__) -logging.getLogger('elasticsearch').setLevel(logging.WARNING) -logging.getLogger('elastic_transport').setLevel(logging.WARNING) +logging.getLogger("elasticsearch").setLevel(logging.WARNING) +logging.getLogger("elastic_transport").setLevel(logging.WARNING) class MissingElasticConfiguration(Exception): @@ -80,6 +80,18 @@ def check_elastic_empty() -> bool: err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",) ) + def events_legacy_template(): + try: + return es.indices.get_template(name="events*") + except exceptions.NotFoundError: + return False + + def events_template(): + try: + return es.indices.get_index_template(name="events*") + except exceptions.NotFoundError: + return False + try: es_logger.addFilter(log_filter) for retry in range(max_retries): @@ -89,10 +101,7 @@ def check_elastic_empty() -> bool: http_auth=es_factory.get_credentials("events", cluster_conf), **cluster_conf.get("args", {}), ) - return not es.indices.get_template(name="events*") - except exceptions.NotFoundError as ex: - log.error(ex) - return True + return not (events_template() or events_legacy_template()) except exceptions.ConnectionError as ex: if retry >= max_retries - 1: raise ElasticConnectionError(