diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index cbbe588..bef1407 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -11,7 +11,7 @@ from mongoengine import Q from nested_dict import nested_dict from apiserver.database import utils as dbutils -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.apierrors import errors from apiserver.bll.event.debug_images_iterator import DebugImagesIterator from apiserver.bll.event.event_metrics import EventMetrics, EventType diff --git a/apiserver/bll/queue/queue_bll.py b/apiserver/bll/queue/queue_bll.py index 6d8ffe7..1e6d13c 100644 --- a/apiserver/bll/queue/queue_bll.py +++ b/apiserver/bll/queue/queue_bll.py @@ -5,7 +5,7 @@ from typing import Callable, Sequence, Optional, Tuple from elasticsearch import Elasticsearch from apiserver import database -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.apierrors import errors from apiserver.bll.queue.queue_metrics import QueueMetrics from apiserver.bll.workers import WorkerBLL diff --git a/apiserver/bll/queue/queue_metrics.py b/apiserver/bll/queue/queue_metrics.py index febc1b0..bd9477c 100644 --- a/apiserver/bll/queue/queue_metrics.py +++ b/apiserver/bll/queue/queue_metrics.py @@ -5,7 +5,7 @@ from typing import Sequence import elasticsearch.helpers from elasticsearch import Elasticsearch -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.apierrors.errors import bad_request from apiserver.bll.query import Builder as QueryBuilder from apiserver.config import config diff --git a/apiserver/bll/task/task_bll.py b/apiserver/bll/task/task_bll.py index 847f0f6..c244bc1 100644 --- a/apiserver/bll/task/task_bll.py +++ b/apiserver/bll/task/task_bll.py @@ -12,7 +12,7 @@ from mongoengine import Q from six import string_types import apiserver.database.utils as dbutils -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.apierrors import errors from apiserver.apimodels.tasks import Artifact as ApiArtifact from apiserver.bll.organization import OrgBLL, Tags diff --git a/apiserver/bll/workers/__init__.py b/apiserver/bll/workers/__init__.py index 6e7e50a..6870110 100644 --- a/apiserver/bll/workers/__init__.py +++ b/apiserver/bll/workers/__init__.py @@ -5,7 +5,7 @@ from typing import Sequence, Set, Optional import attr import elasticsearch.helpers -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.apierrors import APIError from apiserver.apierrors.errors import bad_request, server_error from apiserver.apimodels.workers import ( diff --git a/apiserver/elastic/initialize.py b/apiserver/elastic/initialize.py index 23d1f25..2447bb4 100644 --- a/apiserver/elastic/initialize.py +++ b/apiserver/elastic/initialize.py @@ -5,7 +5,7 @@ from typing import Type, Optional, Sequence, Any, Union import urllib3.exceptions from elasticsearch import Elasticsearch, exceptions -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.config import config from apiserver.elastic.apply_mappings import apply_mappings_to_cluster diff --git a/apiserver/es_factory.py b/apiserver/es_factory.py index c53755c..55b2c1f 100644 --- a/apiserver/es_factory.py +++ b/apiserver/es_factory.py @@ -42,80 +42,85 @@ class InvalidClusterConfiguration(Exception): pass -def connect(cluster_name): - """ - Returns the es client for the cluster. - Connects to the cluster if did not connect previously - :param cluster_name: Dot separated cluster path in the configuration file - :return: es client - :raises MissingClusterConfiguration: in case no config section is found for the cluster - :raises InvalidClusterConfiguration: in case cluster config section misses needed properties - """ - if cluster_name not in _instances: - cluster_config = get_cluster_config(cluster_name) - hosts = cluster_config.get("hosts", None) - if not hosts: - raise InvalidClusterConfiguration(cluster_name) +class ESFactory: + @classmethod + def connect(cls, cluster_name): + """ + Returns the es client for the cluster. + Connects to the cluster if did not connect previously + :param cluster_name: Dot separated cluster path in the configuration file + :return: es client + :raises MissingClusterConfiguration: in case no config section is found for the cluster + :raises InvalidClusterConfiguration: in case cluster config section misses needed properties + """ + if cluster_name not in _instances: + cluster_config = cls.get_cluster_config(cluster_name) + hosts = cluster_config.get("hosts", None) + if not hosts: + raise InvalidClusterConfiguration(cluster_name) - args = cluster_config.get("args", {}) - _instances[cluster_name] = Elasticsearch( - hosts=hosts, transport_class=Transport, **args - ) + args = cluster_config.get("args", {}) + _instances[cluster_name] = Elasticsearch( + hosts=hosts, transport_class=Transport, **args + ) - return _instances[cluster_name] + return _instances[cluster_name] + + @classmethod + def get_all_cluster_names(cls): + return list(config.get("hosts.elastic")) + + @classmethod + def get_cluster_config(cls, cluster_name): + """ + Returns cluster config for the specified cluster path + :param cluster_name: Dot separated cluster path in the configuration file + :return: config section for the cluster + :raises MissingClusterConfiguration: in case no config section is found for the cluster + """ + cluster_key = ".".join(("hosts.elastic", cluster_name)) + cluster_config = config.get(cluster_key, None) + if not cluster_config: + raise MissingClusterConfiguration(cluster_name) + + def set_host_prop(key, value): + for host in cluster_config.get("hosts", []): + host[key] = value + + if OVERRIDE_HOST: + set_host_prop("host", OVERRIDE_HOST) + + if OVERRIDE_PORT: + set_host_prop("port", OVERRIDE_PORT) + + return cluster_config + + @classmethod + def connect_all(cls): + clusters = config.get("hosts.elastic").as_plain_ordered_dict() + for name in clusters: + cls.connect(name) + + @classmethod + def instances(cls): + return _instances + + @classmethod + def timestamp_str_to_millis(cls, ts_str): + epoch = datetime.utcfromtimestamp(0) + current_date = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S.%fZ") + return int((current_date - epoch).total_seconds() * 1000.0) + + @classmethod + def get_timestamp_millis(cls): + now = datetime.utcnow() + epoch = datetime.utcfromtimestamp(0) + return int((now - epoch).total_seconds() * 1000.0) + + @classmethod + def get_es_timestamp_str(cls): + now = datetime.utcnow() + return now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z" -def get_all_cluster_names(): - return list(config.get("hosts.elastic")) - - -def get_cluster_config(cluster_name): - """ - Returns cluster config for the specified cluster path - :param cluster_name: Dot separated cluster path in the configuration file - :return: config section for the cluster - :raises MissingClusterConfiguration: in case no config section is found for the cluster - """ - cluster_key = ".".join(("hosts.elastic", cluster_name)) - cluster_config = config.get(cluster_key, None) - if not cluster_config: - raise MissingClusterConfiguration(cluster_name) - - def set_host_prop(key, value): - for host in cluster_config.get("hosts", []): - host[key] = value - - if OVERRIDE_HOST: - set_host_prop("host", OVERRIDE_HOST) - - if OVERRIDE_PORT: - set_host_prop("port", OVERRIDE_PORT) - - return cluster_config - - -def connect_all(): - clusters = config.get("hosts.elastic").as_plain_ordered_dict() - for name in clusters: - connect(name) - - -def instances(): - return _instances - - -def timestamp_str_to_millis(ts_str): - epoch = datetime.utcfromtimestamp(0) - current_date = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S.%fZ") - return int((current_date - epoch).total_seconds() * 1000.0) - - -def get_timestamp_millis(): - now = datetime.utcnow() - epoch = datetime.utcfromtimestamp(0) - return int((now - epoch).total_seconds() * 1000.0) - - -def get_es_timestamp_str(): - now = datetime.utcnow() - return now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z" +es_factory = ESFactory() diff --git a/apiserver/tests/automated/test_task_events.py b/apiserver/tests/automated/test_task_events.py index 9059f86..1e0be39 100644 --- a/apiserver/tests/automated/test_task_events.py +++ b/apiserver/tests/automated/test_task_events.py @@ -10,7 +10,7 @@ from typing import Sequence, Optional, Tuple from boltons.iterutils import first -from apiserver import es_factory +from apiserver.es_factory import es_factory from apiserver.apierrors.errors.bad_request import EventsNotAdded from apiserver.tests.automated import TestService