clearml-server/server/elastic/initialize.py

113 lines
3.5 KiB
Python
Raw Normal View History

import logging
from time import sleep
from typing import Type, Optional, Sequence, Any, Union
import urllib3.exceptions
from elasticsearch import Elasticsearch, exceptions
import es_factory
from config import config
from elastic.apply_mappings import apply_mappings_to_cluster
log = config.logger(__file__)
class MissingElasticConfiguration(Exception):
"""
Exception when cluster configuration is not found in config files
"""
pass
class ElasticConnectionError(Exception):
"""
Exception when could not connect to elastic during init
"""
pass
class ConnectionErrorFilter(logging.Filter):
def __init__(
self,
level: Optional[Union[int, str]] = None,
err_type: Optional[Type] = None,
args_prefix: Optional[Sequence[Any]] = None,
):
super(ConnectionErrorFilter, self).__init__()
if level is None:
self.level = None
else:
try:
self.level = int(level)
except ValueError:
self.level = logging.getLevelName(level)
self.err_type = err_type
self.args = args_prefix and tuple(args_prefix)
self.last_blocked = None
def filter(self, record):
allow = (
(self.err_type is None or record.exc_info[0] != self.err_type)
and (self.level is None or record.levelno != self.level)
and (self.args is None or record.args[: len(self.args)] != self.args)
)
if not allow:
self.last_blocked = record
return int(allow)
def check_elastic_empty() -> bool:
"""
Check for elasticsearch connection
Use probing settings and not the default es cluster ones
so that we can handle correctly the connection rejects due to ES not fully started yet
:return:
"""
cluster_conf = es_factory.get_cluster_config("events")
max_retries = config.get("apiserver.elastic.probing.max_retries", 4)
timeout = config.get("apiserver.elastic.probing.timeout", 30)
es_logger = logging.getLogger("elasticsearch")
log_filter = ConnectionErrorFilter(
err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",)
)
try:
es_logger.addFilter(log_filter)
for retry in range(max_retries):
try:
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
return not es.indices.get_template(name="events*")
except exceptions.NotFoundError as ex:
log.error(ex)
return True
except exceptions.ConnectionError as ex:
if retry >= max_retries - 1:
raise ElasticConnectionError(
f"Error connecting to Elasticsearch: {str(ex)}"
)
log.warn(
f"Could not connect to ElasticSearch Service. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
)
sleep(timeout)
finally:
es_logger.removeFilter(log_filter)
def init_es_data():
for name in es_factory.get_all_cluster_names():
cluster_conf = es_factory.get_cluster_config(name)
hosts_config = cluster_conf.get("hosts")
if not hosts_config:
raise MissingElasticConfiguration(f"for cluster '{name}'")
log.info(f"Applying mappings to ES host: {hosts_config}")
args = cluster_conf.get("args", {})
res = apply_mappings_to_cluster(hosts_config, name, es_args=args)
log.info(res)