diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 1fa4810..66e07ae 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -44,7 +44,6 @@ from apiserver.database.errors import translate_errors_context from apiserver.database.model.task.task import Task, TaskStatus from apiserver.redis_manager import redman from apiserver.service_repo.auth import Identity -from apiserver.tools import safe_get from apiserver.utilities.dicts import nested_get from apiserver.utilities.json import loads @@ -661,8 +660,8 @@ class EventBLL(object): Return events and next scroll id from the scrolled query Release the scroll once it is exhausted """ - total_events = safe_get(es_res, "hits/total/value", default=0) - events = [doc["_source"] for doc in safe_get(es_res, "hits/hits", default=[])] + total_events = nested_get(es_res, ("hits", "total", "value"), default=0) + events = [doc["_source"] for doc in nested_get(es_res, ("hits", "hits"), default=[])] next_scroll_id = es_res.get("_scroll_id") if next_scroll_id and not events: self.clear_scroll(next_scroll_id) diff --git a/apiserver/bll/event/event_common.py b/apiserver/bll/event/event_common.py index b49943f..9b9903a 100644 --- a/apiserver/bll/event/event_common.py +++ b/apiserver/bll/event/event_common.py @@ -9,7 +9,7 @@ from elasticsearch import Elasticsearch from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context from apiserver.database.model.task.task import Task -from apiserver.tools import safe_get +from apiserver.utilities.dicts import nested_get class EventType(Enum): @@ -123,8 +123,8 @@ def get_max_metric_and_variant_counts( es, company_id=company_id, event_type=event_type, body=es_req, **kwargs, ) - metrics_count = safe_get( - es_res, "aggregations/metrics_count/value", max_metrics_count + metrics_count = nested_get( + es_res, ("aggregations", "metrics_count", "value"), max_metrics_count ) if not metrics_count: return max_metrics_count, max_variants_count diff --git a/apiserver/bll/event/event_metrics.py b/apiserver/bll/event/event_metrics.py index 70234df..c38b4c5 100644 --- a/apiserver/bll/event/event_metrics.py +++ b/apiserver/bll/event/event_metrics.py @@ -24,7 +24,7 @@ from apiserver.bll.event.scalar_key import ScalarKey, ScalarKeyEnum from apiserver.bll.query import Builder as QueryBuilder from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context -from apiserver.tools import safe_get +from apiserver.utilities.dicts import nested_get log = config.logger(__file__) @@ -342,12 +342,12 @@ class EventMetrics: total amount of intervals does not exceeds the samples Return the interval and resulting amount of intervals """ - count = safe_get(data, "count/value", default=0) + count = nested_get(data, ("count", "value"), default=0) if count < samples: return metric, variant, 1, count - min_index = safe_get(data, "min_index/value", default=0) - max_index = safe_get(data, "max_index/value", default=min_index) + min_index = nested_get(data, ("min_index", "value"), default=0) + max_index = nested_get(data, ("max_index", "value"), default=min_index) index_range = max_index - min_index + 1 interval = max(1, math.ceil(float(index_range) / samples)) max_samples = math.ceil(float(index_range) / interval) @@ -592,5 +592,5 @@ class EventMetrics: return [ metric["key"] - for metric in safe_get(es_res, "aggregations/metrics/buckets", default=[]) + for metric in nested_get(es_res, ("aggregations", "metrics", "buckets"), default=[]) ] diff --git a/apiserver/bll/event/metric_events_iterator.py b/apiserver/bll/event/metric_events_iterator.py index a8272a4..0c5e61f 100644 --- a/apiserver/bll/event/metric_events_iterator.py +++ b/apiserver/bll/event/metric_events_iterator.py @@ -6,7 +6,6 @@ from operator import itemgetter from typing import Sequence, Tuple, Optional, Mapping, Callable import attr -import dpath from boltons.iterutils import first from elasticsearch import Elasticsearch from jsonmodels.fields import StringField, ListField, IntField @@ -27,6 +26,7 @@ from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context from apiserver.database.model.task.metrics import MetricEventStats from apiserver.database.model.task.task import Task +from apiserver.utilities.dicts import nested_get class VariantState(Base): @@ -305,13 +305,13 @@ class MetricEventsIterator: return [ MetricState( metric=metric["key"], - timestamp=dpath.get(metric, "last_event_timestamp/value"), + timestamp=nested_get(metric, ("last_event_timestamp", "value")), variants=[ init_variant_state(variant) - for variant in dpath.get(metric, "variants/buckets") + for variant in nested_get(metric, ("variants", "buckets")) ], ) - for metric in dpath.get(es_res, "aggregations/metrics/buckets") + for metric in nested_get(es_res, ("aggregations", "metrics", "buckets")) ] @abc.abstractmethod @@ -430,14 +430,14 @@ class MetricEventsIterator: def get_iteration_events(it_: dict) -> Sequence: return [ self._process_event(ev["_source"]) - for m in dpath.get(it_, "metrics/buckets") - for v in dpath.get(m, "variants/buckets") - for ev in dpath.get(v, "events/hits/hits") + for m in nested_get(it_, ("metrics", "buckets")) + for v in nested_get(m, ("variants", "buckets")) + for ev in nested_get(v, ("events", "hits", "hits")) if is_valid_event(ev["_source"]) ] iterations = [] - for it in dpath.get(es_res, "aggregations/iters/buckets"): + for it in nested_get(es_res, ("aggregations", "iters", "buckets")): events = get_iteration_events(it) if events: iterations.append({"iter": it["key"], "events": events}) diff --git a/apiserver/bll/statistics/stats_reporter.py b/apiserver/bll/statistics/stats_reporter.py index 77fd083..ca59e64 100644 --- a/apiserver/bll/statistics/stats_reporter.py +++ b/apiserver/bll/statistics/stats_reporter.py @@ -18,7 +18,7 @@ from apiserver.config.info import get_deployment_type from apiserver.database.model import Company, User from apiserver.database.model.queue import Queue from apiserver.database.model.task.task import Task -from apiserver.tools import safe_get +from apiserver.utilities.dicts import nested_get from apiserver.utilities.json import dumps from apiserver.version import __version__ as current_version from .resource_monitor import ResourceMonitor, stat_threads @@ -162,7 +162,7 @@ class StatisticsReporter: def _get_cardinality_fields(categories: Sequence[dict]) -> dict: names = {"cpu": "num_cores"} return { - names[c["key"]]: safe_get(c, "count/value") + names[c["key"]]: nested_get(c, ("count", "value")) for c in categories if c["key"] in names } @@ -175,21 +175,21 @@ class StatisticsReporter: } return { names[m["key"]]: { - "min": safe_get(m, "min/value"), - "max": safe_get(m, "max/value"), - "avg": safe_get(m, "avg/value"), + "min": nested_get(m, ("min", "value")), + "max": nested_get(m, ("max", "value")), + "avg": nested_get(m, ("avg", "value")), } for m in metrics if m["key"] in names } - buckets = safe_get(res, "aggregations/workers/buckets", default=[]) + buckets = nested_get(res, ("aggregations", "workers", "buckets"), default=[]) return { b["key"]: { key: { "interval_sec": agent_resource_threshold_sec, - **_get_cardinality_fields(safe_get(b, "categories/buckets", [])), - **_get_metric_fields(safe_get(b, "metrics/buckets", [])), + **_get_cardinality_fields(nested_get(b, ("categories", "buckets"), [])), + **_get_metric_fields(nested_get(b, ("metrics", "buckets"), [])), } } for b in buckets @@ -227,7 +227,7 @@ class StatisticsReporter: }, } res = cls._run_worker_stats_query(company_id, es_req) - buckets = safe_get(res, "aggregations/workers/buckets", default=[]) + buckets = nested_get(res, ("aggregations", "workers", "buckets"), default=[]) return { b["key"]: {"last_activity_time": b["last_activity_time"]["value"]} for b in buckets diff --git a/apiserver/bll/workers/__init__.py b/apiserver/bll/workers/__init__.py index 25963b2..0d04aa8 100644 --- a/apiserver/bll/workers/__init__.py +++ b/apiserver/bll/workers/__init__.py @@ -27,10 +27,9 @@ from apiserver.database.model.project import Project from apiserver.database.model.queue import Queue from apiserver.database.model.task.task import Task from apiserver.redis_manager import redman -from apiserver.tools import safe_get +from apiserver.utilities.dicts import nested_get from .stats import WorkerStats - log = config.logger(__file__) @@ -287,7 +286,7 @@ class WorkerBLL: filter( None, ( - safe_get(info, "next_entry/task") + nested_get(info, ("next_entry", "task")) for info in queues_info.values() ), ) @@ -311,7 +310,7 @@ class WorkerBLL: continue entry.name = info.get("name", None) entry.num_tasks = info.get("num_entries", 0) - task_id = safe_get(info, "next_entry/task") + task_id = nested_get(info, ("next_entry", "task")) if task_id: task = tasks_info.get(task_id, None) entry.next_task = IdNameEntry( diff --git a/apiserver/config/default/hosts.conf b/apiserver/config/default/hosts.conf index 8321682..29b4547 100644 --- a/apiserver/config/default/hosts.conf +++ b/apiserver/config/default/hosts.conf @@ -2,10 +2,9 @@ fileserver = "http://localhost:8081" elastic { events { - hosts: [{host: "127.0.0.1", port: 9200}] + hosts: [{host: "127.0.0.1", port: 9200, scheme: http}] args { timeout: 60 - dead_timeout: 10 max_retries: 3 retry_on_timeout: true } @@ -13,10 +12,9 @@ elastic { } workers { - hosts: [{host:"127.0.0.1", port:9200}] + hosts: [{host:"127.0.0.1", port:9200, scheme: http}] args { timeout: 60 - dead_timeout: 10 max_retries: 3 retry_on_timeout: true } diff --git a/apiserver/database/errors.py b/apiserver/database/errors.py index 1d62f47..e7c3310 100644 --- a/apiserver/database/errors.py +++ b/apiserver/database/errors.py @@ -5,7 +5,7 @@ from textwrap import shorten import dpath from dpath.exceptions import InvalidKeyName -from elasticsearch import ElasticsearchException +from elastic_transport import TransportError, ApiError from elasticsearch.helpers import BulkIndexError from jsonmodels.errors import ValidationError as JsonschemaValidationError from mongoengine.errors import ( @@ -210,9 +210,9 @@ def translate_errors_context(message=None, **kwargs): raise errors.bad_request.ValidationError(e.args[0]) except BulkIndexError as e: ElasticErrorsHandler.bulk_error(e, message, **kwargs) - except ElasticsearchException as e: + except (TransportError, ApiError) as e: raise errors.server_error.DataError(e, message, **kwargs) except InvalidKeyName: raise errors.server_error.DataError("invalid empty key encountered in data") - except Exception as ex: + except Exception: raise diff --git a/apiserver/elastic/apply_mappings.py b/apiserver/elastic/apply_mappings.py index c6b5c2f..a43f28b 100755 --- a/apiserver/elastic/apply_mappings.py +++ b/apiserver/elastic/apply_mappings.py @@ -4,12 +4,15 @@ Apply elasticsearch mappings to given hosts. """ import argparse import json +import logging from pathlib import Path from typing import Optional, Sequence, Tuple from elasticsearch import Elasticsearch HERE = Path(__file__).resolve().parent +logging.getLogger('elasticsearch').setLevel(logging.WARNING) +logging.getLogger('elastic_transport').setLevel(logging.WARNING) def apply_mappings_to_cluster( @@ -17,6 +20,20 @@ def apply_mappings_to_cluster( ): """Hosts maybe a sequence of strings or dicts in the form {"host": , "port": }""" + def _send_component_template(ct_file): + with ct_file.open() as json_data: + body = json.load(json_data) + template_name = f"{ct_file.stem}" + res = es.cluster.put_component_template(name=template_name, body=body) + return {"component_template": template_name, "result": res} + + def _send_index_template(it_file): + with it_file.open() as json_data: + body = json.load(json_data) + template_name = f"{it_file.stem}" + res = es.indices.put_index_template(name=template_name, body=body) + return {"index_template": template_name, "result": res} + def _send_template(f): with f.open() as json_data: data = json.load(json_data) @@ -24,14 +41,30 @@ def apply_mappings_to_cluster( res = es.indices.put_template(name=template_name, body=data) return {"mapping": template_name, "result": res} - p = HERE / "mappings" - if key: - files = (p / key).glob("*.json") - else: - files = p.glob("**/*.json") - es = Elasticsearch(hosts=hosts, http_auth=http_auth, **(es_args or {})) - return [_send_template(f) for f in files] + p = HERE / "index_templates" + if key: + folders = [p / key] + else: + folders = [ + f for f in p.iterdir() if f.is_dir() + ] + + ret = [] + for f in folders: + for ct in (f / "component_templates").glob("*.json"): + ret.append(_send_component_template(ct)) + for it in f.glob("*.json"): + ret.append(_send_index_template(it)) + + return ret + # p = HERE / "mappings" + # if key: + # files = (p / key).glob("*.json") + # else: + # files = p.glob("**/*.json") + # + # return [_send_template(f) for f in files] def parse_args(): diff --git a/apiserver/elastic/index_templates/events/component_templates/events_common.json b/apiserver/elastic/index_templates/events/component_templates/events_common.json new file mode 100644 index 0000000..dc3c19f --- /dev/null +++ b/apiserver/elastic/index_templates/events/component_templates/events_common.json @@ -0,0 +1,48 @@ +{ + "template": { + "settings": { + "number_of_replicas": 0, + "number_of_shards": 1 + }, + "mappings": { + "_source": { + "enabled": true + }, + "properties": { + "@timestamp": { + "type": "date" + }, + "task": { + "type": "keyword" + }, + "type": { + "type": "keyword" + }, + "worker": { + "type": "keyword" + }, + "timestamp": { + "type": "date" + }, + "iter": { + "type": "long" + }, + "metric": { + "type": "keyword" + }, + "variant": { + "type": "keyword" + }, + "value": { + "type": "float" + }, + "company_id": { + "type": "keyword" + }, + "model_event": { + "type": "boolean" + } + } + } + } +} \ No newline at end of file diff --git a/apiserver/elastic/index_templates/events/events_log.json b/apiserver/elastic/index_templates/events/events_log.json new file mode 100644 index 0000000..956e582 --- /dev/null +++ b/apiserver/elastic/index_templates/events/events_log.json @@ -0,0 +1,18 @@ +{ + "index_patterns": "events-log-*", + "template": { + "mappings": { + "properties": { + "msg": { + "type": "text", + "index": false + }, + "level": { + "type": "keyword" + } + } + } + }, + "priority": 500, + "composed_of": ["events_common"] +} \ No newline at end of file diff --git a/apiserver/elastic/index_templates/events/events_plot.json b/apiserver/elastic/index_templates/events/events_plot.json new file mode 100644 index 0000000..d363f0b --- /dev/null +++ b/apiserver/elastic/index_templates/events/events_plot.json @@ -0,0 +1,18 @@ +{ + "index_patterns": "events-plot-*", + "template": { + "mappings": { + "properties": { + "plot_str": { + "type": "text", + "index": false + }, + "plot_data": { + "type": "binary" + } + } + } + }, + "priority": 500, + "composed_of": ["events_common"] +} \ No newline at end of file diff --git a/apiserver/elastic/index_templates/events/events_training_debug_image.json b/apiserver/elastic/index_templates/events/events_training_debug_image.json new file mode 100644 index 0000000..99925c7 --- /dev/null +++ b/apiserver/elastic/index_templates/events/events_training_debug_image.json @@ -0,0 +1,17 @@ +{ + "index_patterns": "events-training_debug_image-*", + "template": { + "mappings": { + "properties": { + "key": { + "type": "keyword" + }, + "url": { + "type": "keyword" + } + } + } + }, + "priority": 500, + "composed_of": ["events_common"] +} \ No newline at end of file diff --git a/apiserver/elastic/index_templates/workers/component_templates/queue_metrics.json b/apiserver/elastic/index_templates/workers/component_templates/queue_metrics.json new file mode 100644 index 0000000..46a86b3 --- /dev/null +++ b/apiserver/elastic/index_templates/workers/component_templates/queue_metrics.json @@ -0,0 +1,26 @@ +{ + "template": { + "mappings": { + "_source": { + "enabled": true + }, + "properties": { + "timestamp": { + "type": "date" + }, + "queue": { + "type": "keyword" + }, + "average_waiting_time": { + "type": "float" + }, + "queue_length": { + "type": "integer" + }, + "company_id": { + "type": "keyword" + } + } + } + } +} \ No newline at end of file diff --git a/apiserver/elastic/index_templates/workers/component_templates/worker_stats.json b/apiserver/elastic/index_templates/workers/component_templates/worker_stats.json new file mode 100644 index 0000000..61a13e2 --- /dev/null +++ b/apiserver/elastic/index_templates/workers/component_templates/worker_stats.json @@ -0,0 +1,38 @@ +{ + "template": { + "mappings": { + "_source": { + "enabled": true + }, + "properties": { + "timestamp": { + "type": "date" + }, + "worker": { + "type": "keyword" + }, + "category": { + "type": "keyword" + }, + "metric": { + "type": "keyword" + }, + "variant": { + "type": "keyword" + }, + "value": { + "type": "float" + }, + "unit": { + "type": "keyword" + }, + "task": { + "type": "keyword" + }, + "company_id": { + "type": "keyword" + } + } + } + } +} \ No newline at end of file diff --git a/apiserver/elastic/initialize.py b/apiserver/elastic/initialize.py index 8bed2bc..4551c71 100644 --- a/apiserver/elastic/initialize.py +++ b/apiserver/elastic/initialize.py @@ -10,6 +10,8 @@ from apiserver.config_repo import config from apiserver.elastic.apply_mappings import apply_mappings_to_cluster log = config.logger(__file__) +logging.getLogger('elasticsearch').setLevel(logging.WARNING) +logging.getLogger('elastic_transport').setLevel(logging.WARNING) class MissingElasticConfiguration(Exception): diff --git a/apiserver/es_factory.py b/apiserver/es_factory.py index d18a992..52b6c07 100644 --- a/apiserver/es_factory.py +++ b/apiserver/es_factory.py @@ -1,3 +1,4 @@ +import logging from datetime import datetime from functools import lru_cache from os import getenv @@ -9,6 +10,8 @@ from elasticsearch import Elasticsearch from apiserver.config_repo import config log = config.logger(__file__) +logging.getLogger('elasticsearch').setLevel(logging.WARNING) +logging.getLogger('elastic_transport').setLevel(logging.WARNING) OVERRIDE_HOST_ENV_KEY = ( "CLEARML_ELASTIC_SERVICE_HOST", @@ -32,6 +35,7 @@ if OVERRIDE_HOST: OVERRIDE_PORT = first(filter(None, map(getenv, OVERRIDE_PORT_ENV_KEY))) if OVERRIDE_PORT: + OVERRIDE_PORT = int(OVERRIDE_PORT) log.info(f"Using override elastic port {OVERRIDE_PORT}") OVERRIDE_USERNAME = first(filter(None, map(getenv, OVERRIDE_USERNAME_ENV_KEY))) diff --git a/apiserver/requirements.txt b/apiserver/requirements.txt index c78eb23..c911783 100644 --- a/apiserver/requirements.txt +++ b/apiserver/requirements.txt @@ -6,7 +6,7 @@ boto3>=1.26 boto3-stubs[s3]>=1.26 clearml>=1.10.3 dpath>=1.4.2,<2.0 -elasticsearch==7.17.9 +elasticsearch==8.12.0 fastjsonschema>=2.8 flask-compress>=1.4.0 flask-cors>=3.0.5 diff --git a/docker/docker-compose-win10.yml b/docker/docker-compose-win10.yml index 4d46f13..d1f36dd 100644 --- a/docker/docker-compose-win10.yml +++ b/docker/docker-compose-win10.yml @@ -49,13 +49,10 @@ services: cluster.routing.allocation.disk.watermark.low: 500mb cluster.routing.allocation.disk.watermark.high: 500mb cluster.routing.allocation.disk.watermark.flood_stage: 500mb - discovery.zen.minimum_master_nodes: "1" discovery.type: "single-node" http.compression_level: "7" - node.ingest: "true" node.name: clearml - reindex.remote.whitelist: '*.*' - xpack.monitoring.enabled: "false" + reindex.remote.whitelist: "'*.*'" xpack.security.enabled: "false" ulimits: memlock: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index e6a912f..1b82d1f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -49,13 +49,10 @@ services: cluster.routing.allocation.disk.watermark.low: 500mb cluster.routing.allocation.disk.watermark.high: 500mb cluster.routing.allocation.disk.watermark.flood_stage: 500mb - discovery.zen.minimum_master_nodes: "1" discovery.type: "single-node" http.compression_level: "7" - node.ingest: "true" node.name: clearml - reindex.remote.whitelist: '*.*' - xpack.monitoring.enabled: "false" + reindex.remote.whitelist: "'*.*'" xpack.security.enabled: "false" ulimits: memlock: