mirror of
https://github.com/clearml/clearml-server
synced 2025-01-31 10:56:48 +00:00
Add elasticsearch log filtering while trying to connect
This commit is contained in:
parent
10f326eda9
commit
0fe1bf8a61
@ -1,5 +1,8 @@
|
||||
import logging
|
||||
from time import sleep
|
||||
from typing import Type, Optional, Sequence, Any, Union
|
||||
|
||||
import urllib3.exceptions
|
||||
from elasticsearch import Elasticsearch, exceptions
|
||||
|
||||
import es_factory
|
||||
@ -25,6 +28,39 @@ class ElasticConnectionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionErrorFilter(logging.Filter):
|
||||
def __init__(
|
||||
self,
|
||||
level: Optional[Union[int, str]] = None,
|
||||
err_type: Optional[Type] = None,
|
||||
args_prefix: Optional[Sequence[Any]] = None,
|
||||
):
|
||||
super(ConnectionErrorFilter, self).__init__()
|
||||
if level is None:
|
||||
self.level = None
|
||||
else:
|
||||
try:
|
||||
self.level = int(level)
|
||||
except ValueError:
|
||||
self.level = logging.getLevelName(level)
|
||||
|
||||
self.err_type = err_type
|
||||
self.args = args_prefix and tuple(args_prefix)
|
||||
self.last_blocked = None
|
||||
|
||||
def filter(self, record):
|
||||
allow = (
|
||||
(self.err_type is None or record.exc_info[0] != self.err_type)
|
||||
and (self.level is None or record.levelno != self.level)
|
||||
and (self.args is None or record.args[: len(self.args)] != self.args)
|
||||
)
|
||||
|
||||
if not allow:
|
||||
self.last_blocked = record
|
||||
|
||||
return int(allow)
|
||||
|
||||
|
||||
def check_elastic_empty() -> bool:
|
||||
"""
|
||||
Check for elasticsearch connection
|
||||
@ -35,20 +71,32 @@ def check_elastic_empty() -> bool:
|
||||
cluster_conf = es_factory.get_cluster_config("events")
|
||||
max_retries = config.get("apiserver.elastic.probing.max_retries", 4)
|
||||
timeout = config.get("apiserver.elastic.probing.timeout", 30)
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
|
||||
return not es.indices.get_template(name="events*")
|
||||
except exceptions.NotFoundError as ex:
|
||||
log.error(ex)
|
||||
return True
|
||||
except exceptions.ConnectionError:
|
||||
if retry >= max_retries - 1:
|
||||
raise ElasticConnectionError()
|
||||
log.warn(
|
||||
f"Could not connect to es server. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
|
||||
)
|
||||
sleep(timeout)
|
||||
|
||||
es_logger = logging.getLogger("elasticsearch")
|
||||
log_filter = ConnectionErrorFilter(
|
||||
err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",)
|
||||
)
|
||||
|
||||
try:
|
||||
es_logger.addFilter(log_filter)
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
|
||||
return not es.indices.get_template(name="events*")
|
||||
except exceptions.NotFoundError as ex:
|
||||
log.error(ex)
|
||||
return True
|
||||
except exceptions.ConnectionError as ex:
|
||||
if retry >= max_retries - 1:
|
||||
raise ElasticConnectionError(
|
||||
f"Error connecting to Elasticsearch: {str(ex)}"
|
||||
)
|
||||
log.warn(
|
||||
f"Could not connect to ElasticSearch Service. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
|
||||
)
|
||||
sleep(timeout)
|
||||
finally:
|
||||
es_logger.removeFilter(log_filter)
|
||||
|
||||
|
||||
def init_es_data():
|
||||
|
Loading…
Reference in New Issue
Block a user