From 3bcbc38c4c4835aea0ffe28de2e5ef57e3e08e37 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Thu, 5 Dec 2024 22:21:12 +0200 Subject: [PATCH] Add storage service support --- apiserver/apimodels/storage.py | 60 +++++ apiserver/bll/organization/tags_cache.py | 1 - apiserver/bll/storage/__init__.py | 233 +++++++++++++++++- apiserver/database/model/storage_settings.py | 76 ++++++ apiserver/schema/services/storage.conf | 243 +++++++++++++++++++ apiserver/services/storage.py | 22 ++ apiserver/tests/automated/test_serving.py | 106 ++++---- 7 files changed, 688 insertions(+), 53 deletions(-) create mode 100644 apiserver/apimodels/storage.py create mode 100644 apiserver/database/model/storage_settings.py create mode 100644 apiserver/schema/services/storage.conf create mode 100644 apiserver/services/storage.py diff --git a/apiserver/apimodels/storage.py b/apiserver/apimodels/storage.py new file mode 100644 index 0000000..b0c244a --- /dev/null +++ b/apiserver/apimodels/storage.py @@ -0,0 +1,60 @@ +from jsonmodels.fields import StringField, BoolField, ListField, EmbeddedField +from jsonmodels.models import Base +from jsonmodels.validators import Enum + + +class AWSBucketSettings(Base): + bucket = StringField() + subdir = StringField() + host = StringField() + key = StringField() + secret = StringField() + token = StringField() + multipart = BoolField(default=True) + acl = StringField() + secure = BoolField(default=True) + region = StringField() + verify = BoolField(default=True) + use_credentials_chain = BoolField(default=False) + + +class AWSSettings(Base): + key = StringField() + secret = StringField() + region = StringField() + token = StringField() + use_credentials_chain = BoolField(default=False) + buckets = ListField(items_types=[AWSBucketSettings]) + + +class GoogleBucketSettings(Base): + bucket = StringField() + subdir = StringField() + project = StringField() + credentials_json = StringField() + + +class GoogleSettings(Base): + project = StringField() + credentials_json = StringField() + buckets = ListField(items_types=[GoogleBucketSettings]) + + +class AzureContainerSettings(Base): + account_name = StringField() + account_key = StringField() + container_name = StringField() + + +class AzureSettings(Base): + containers = ListField(items_types=[AzureContainerSettings]) + + +class SetSettingsRequest(Base): + aws = EmbeddedField(AWSSettings) + google = EmbeddedField(GoogleSettings) + azure = EmbeddedField(AzureSettings) + + +class ResetSettingsRequest(Base): + keys = ListField([str], item_validators=[Enum("aws", "google", "azure")]) diff --git a/apiserver/bll/organization/tags_cache.py b/apiserver/bll/organization/tags_cache.py index 3fd6abe..0cf9d91 100644 --- a/apiserver/bll/organization/tags_cache.py +++ b/apiserver/bll/organization/tags_cache.py @@ -6,7 +6,6 @@ from redis import Redis from apiserver.config_repo import config from apiserver.bll.project import project_ids_with_children -from apiserver.database.model import EntityVisibility from apiserver.database.model.base import GetMixin from apiserver.database.model.model import Model from apiserver.database.model.task.task import Task diff --git a/apiserver/bll/storage/__init__.py b/apiserver/bll/storage/__init__.py index 7c88689..0957700 100644 --- a/apiserver/bll/storage/__init__.py +++ b/apiserver/bll/storage/__init__.py @@ -1,14 +1,32 @@ +import json +import os +import tempfile from copy import copy +from datetime import datetime +from typing import Optional, Sequence +import attr from boltons.cacheutils import cachedproperty from clearml.backend_config.bucket_config import ( S3BucketConfigurations, AzureContainerConfigurations, GSBucketConfigurations, + AzureContainerConfig, + GSBucketConfig, + S3BucketConfig, ) +from apiserver.apierrors import errors +from apiserver.apimodels.storage import SetSettingsRequest from apiserver.config_repo import config - +from apiserver.database.model.storage_settings import ( + StorageSettings, + GoogleBucketSettings, + AWSSettings, + AzureStorageSettings, + GoogleStorageSettings, +) +from apiserver.database.utils import id as db_id log = config.logger(__file__) @@ -32,17 +50,224 @@ class StorageBLL: def get_azure_settings_for_company( self, company_id: str, + db_settings: StorageSettings = None, + query_db: bool = True, ) -> AzureContainerConfigurations: - return copy(self._default_azure_configs) + if not db_settings and query_db: + db_settings = ( + StorageSettings.objects(company=company_id).only("azure").first() + ) + + if not db_settings or not db_settings.azure: + return copy(self._default_azure_configs) + + azure = db_settings.azure + return AzureContainerConfigurations( + container_configs=[ + AzureContainerConfig(**entry.to_proper_dict()) + for entry in (azure.containers or []) + ] + ) def get_gs_settings_for_company( self, company_id: str, + db_settings: StorageSettings = None, + query_db: bool = True, + json_string: bool = False, ) -> GSBucketConfigurations: - return copy(self._default_gs_configs) + if not db_settings and query_db: + db_settings = ( + StorageSettings.objects(company=company_id).only("google").first() + ) + + if not db_settings or not db_settings.google: + if not json_string: + return copy(self._default_gs_configs) + + if self._default_gs_configs._buckets: + buckets = [ + attr.evolve( + b, + credentials_json=self._assure_json_string(b.credentials_json), + ) + for b in self._default_gs_configs._buckets + ] + else: + buckets = self._default_gs_configs._buckets + + return GSBucketConfigurations( + buckets=buckets, + default_project=self._default_gs_configs._default_project, + default_credentials=self._assure_json_string( + self._default_gs_configs._default_credentials + ), + ) + + def get_bucket_config(bc: GoogleBucketSettings) -> GSBucketConfig: + data = bc.to_proper_dict() + if not json_string and bc.credentials_json: + data["credentials_json"] = self._assure_json_file(bc.credentials_json) + return GSBucketConfig(**data) + + google = db_settings.google + buckets_configs = [get_bucket_config(b) for b in (google.buckets or [])] + return GSBucketConfigurations( + buckets=buckets_configs, + default_project=google.project, + default_credentials=google.credentials_json + if json_string + else self._assure_json_file(google.credentials_json), + ) def get_aws_settings_for_company( self, company_id: str, + db_settings: StorageSettings = None, + query_db: bool = True, ) -> S3BucketConfigurations: - return copy(self._default_aws_configs) + if not db_settings and query_db: + db_settings = ( + StorageSettings.objects(company=company_id).only("aws").first() + ) + if not db_settings or not db_settings.aws: + return copy(self._default_aws_configs) + + aws = db_settings.aws + buckets_configs = S3BucketConfig.from_list( + [b.to_proper_dict() for b in (aws.buckets or [])] + ) + return S3BucketConfigurations( + buckets=buckets_configs, + default_key=aws.key, + default_secret=aws.secret, + default_region=aws.region, + default_use_credentials_chain=aws.use_credentials_chain, + default_token=aws.token, + default_extra_args={}, + ) + + def _assure_json_file(self, name_or_content: str) -> str: + if not name_or_content: + return name_or_content + + if name_or_content.endswith(".json") or os.path.exists(name_or_content): + return name_or_content + + try: + json.loads(name_or_content) + except Exception: + return name_or_content + + with tempfile.NamedTemporaryFile( + mode="wt", delete=False, suffix=".json" + ) as tmp: + tmp.write(name_or_content) + + return tmp.name + + def _assure_json_string(self, name_or_content: str) -> Optional[str]: + if not name_or_content: + return name_or_content + + try: + json.loads(name_or_content) + return name_or_content + except Exception: + pass + + try: + with open(name_or_content) as fp: + return fp.read() + except Exception: + return None + + def get_company_settings(self, company_id: str) -> dict: + db_settings = StorageSettings.objects(company=company_id).first() + aws = self.get_aws_settings_for_company(company_id, db_settings, query_db=False) + aws_dict = { + "key": aws._default_key, + "secret": aws._default_secret, + "token": aws._default_token, + "region": aws._default_region, + "use_credentials_chain": aws._default_use_credentials_chain, + "buckets": [attr.asdict(b) for b in aws._buckets], + } + + gs = self.get_gs_settings_for_company( + company_id, db_settings, query_db=False, json_string=True + ) + gs_dict = { + "project": gs._default_project, + "credentials_json": gs._default_credentials, + "buckets": [attr.asdict(b) for b in gs._buckets], + } + + azure = self.get_azure_settings_for_company(company_id, db_settings) + azure_dict = { + "containers": [attr.asdict(ac) for ac in azure._container_configs], + } + + return { + "aws": aws_dict, + "google": gs_dict, + "azure": azure_dict, + "last_update": db_settings.last_update if db_settings else None, + } + + def set_company_settings( + self, company_id: str, settings: SetSettingsRequest + ) -> int: + update_dict = {} + if settings.aws: + update_dict["aws"] = { + **{ + k: v + for k, v in settings.aws.to_struct().items() + if k in AWSSettings.get_fields() + } + } + + if settings.azure: + update_dict["azure"] = { + **{ + k: v + for k, v in settings.azure.to_struct().items() + if k in AzureStorageSettings.get_fields() + } + } + + if settings.google: + update_dict["google"] = { + **{ + k: v + for k, v in settings.google.to_struct().items() + if k in GoogleStorageSettings.get_fields() + } + } + cred_json = update_dict["google"].get("credentials_json") + if cred_json: + try: + json.loads(cred_json) + except Exception as ex: + raise errors.bad_request.ValidationError( + f"Invalid json credentials: {str(ex)}" + ) + + if not update_dict: + raise errors.bad_request.ValidationError("No settings were provided") + + settings = StorageSettings.objects(company=company_id).only("id").first() + settings_id = settings.id if settings else db_id() + return StorageSettings.objects(id=settings_id).update( + upsert=True, + id=settings_id, + company=company_id, + last_update=datetime.utcnow(), + **update_dict, + ) + + def reset_company_settings(self, company_id: str, keys: Sequence[str]) -> int: + return StorageSettings.objects(company=company_id).update( + last_update=datetime.utcnow(), **{f"unset__{k}": 1 for k in keys} + ) diff --git a/apiserver/database/model/storage_settings.py b/apiserver/database/model/storage_settings.py new file mode 100644 index 0000000..7ab7dd3 --- /dev/null +++ b/apiserver/database/model/storage_settings.py @@ -0,0 +1,76 @@ +from mongoengine import ( + Document, + EmbeddedDocument, + StringField, + DateTimeField, + EmbeddedDocumentListField, + EmbeddedDocumentField, + BooleanField, +) + +from apiserver.database import Database, strict +from apiserver.database.model import DbModelMixin +from apiserver.database.model.base import ProperDictMixin + +class AWSBucketSettings(EmbeddedDocument, ProperDictMixin): + bucket = StringField() + subdir = StringField() + host = StringField() + key = StringField() + secret = StringField() + token = StringField() + multipart = BooleanField() + acl = StringField() + secure = BooleanField() + region = StringField() + verify = BooleanField() + use_credentials_chain = BooleanField() + + +class AWSSettings(EmbeddedDocument, DbModelMixin): + key = StringField() + secret = StringField() + region = StringField() + token = StringField() + use_credentials_chain = BooleanField() + buckets = EmbeddedDocumentListField(AWSBucketSettings) + + +class GoogleBucketSettings(EmbeddedDocument, ProperDictMixin): + bucket = StringField() + subdir = StringField() + project = StringField() + credentials_json = StringField() + + +class GoogleStorageSettings(EmbeddedDocument, DbModelMixin): + project = StringField() + credentials_json = StringField() + buckets = EmbeddedDocumentListField(GoogleBucketSettings) + + +class AzureStorageContainerSettings(EmbeddedDocument, ProperDictMixin): + account_name = StringField(required=True) + account_key = StringField(required=True) + container_name = StringField() + + +class AzureStorageSettings(EmbeddedDocument, DbModelMixin): + containers = EmbeddedDocumentListField(AzureStorageContainerSettings) + + +class StorageSettings(DbModelMixin, Document): + meta = { + "db_alias": Database.backend, + "strict": strict, + "indexes": [ + "company" + ], + } + + id = StringField(primary_key=True) + company = StringField(required=True, unique=True) + last_update = DateTimeField() + aws: AWSSettings = EmbeddedDocumentField(AWSSettings) + google: GoogleStorageSettings = EmbeddedDocumentField(GoogleStorageSettings) + azure: AzureStorageSettings = EmbeddedDocumentField(AzureStorageSettings) diff --git a/apiserver/schema/services/storage.conf b/apiserver/schema/services/storage.conf new file mode 100644 index 0000000..0fc2a32 --- /dev/null +++ b/apiserver/schema/services/storage.conf @@ -0,0 +1,243 @@ +_description: """This service provides storage settings managmement""" +_default { + internal: true + allow_roles: ["system", "root", "admin"] +} + +_definitions { + include "_common.conf" + aws_bucket { + type: object + description: Settings per S3 bucket + properties { + bucket { + description: The name of the bucket + type: string + } + subdir { + description: The path to match + type: string + } + host { + description: Host address (for minio servers) + type: string + } + key { + description: Access key + type: string + } + secret { + description: Secret key + type: string + } + token { + description: Access token + type: string + } + multipart { + description: Multipart upload + type: boolean + default: true + } + acl { + description: ACL + type: string + } + secure { + description: Use SSL connection + type: boolean + default: true + } + region { + description: AWS Region + type: string + } + verify { + description: Verify server certificate + type: boolean + default: true + } + use_credentials_chain { + description: Use host configured credentials + type: boolean + default: false + } + } + } + aws { + type: object + description: AWS S3 storage settings + properties { + key { + description: Access key + type: string + } + secret { + description: Secret key + type: string + } + region { + description: AWS region + type: string + } + token { + description: Access token + type: string + } + use_credentials_chain { + description: If set then use host credentials + type: boolean + default: false + } + buckets { + description: Credential settings per bucket + type: array + items {"$ref": "#/definitions/aws_bucket"} + } + } + } + google_bucket { + type: object + description: Settings per Google storage bucket + properties { + bucket { + description: The name of the bucket + type: string + } + project { + description: The name of the project + type: string + } + subdir { + description: The path to match + type: string + } + credentials_json { + description: The contents of the credentials json file + type: string + } + } + } + google { + type: object + description: Google storage settings + properties { + project { + description: Project name + type: string + } + credentials_json { + description: The contents of the credentials json file + type: string + } + buckets { + description: Credentials per bucket + type: array + items {"$ref": "#/definitions/google_bucket"} + } + } + } + azure_container { + type: object + description: Azure container settings + properties { + account_name { + description: Account name + type: string + } + account_key { + description: Account key + type: string + } + container_name { + description: The name of the container + type: string + } + } + } + azure { + type: object + description: Azure storage settings + properties { + containers { + description: Credentials per container + type: array + items {"$ref": "#/definitions/azure_container"} + } + } + } +} + +set_settings { + "2.31" { + description: Set Storage settings + request { + type: object + properties { + aws {"$ref": "#/definitions/aws"} + google {"$ref": "#/definitions/google"} + azure {"$ref": "#/definitions/azure"} + } + } + response { + type: object + properties { + updated { + description: "Number of settings documents updated (0 or 1)" + type: integer + enum: [0, 1] + } + } + } + } +} +reset_settings { + "2.31" { + description: Reset selected storage settings + request { + type: object + properties { + keys { + description: The names of the settings to delete + type: array + items { + type: string + enum: ["azure", "aws", "google"] + } + } + } + } + response { + type: object + properties { + updated { + description: "Number of settings documents updated (0 or 1)" + type: integer + enum: [0, 1] + } + } + } + } +} +get_settings { + "2.22" { + description: Get storage settings + request { + type: object + additionalProperties: false + } + response { + type: object + properties { + last_update { + description: "Settings last update time (UTC) " + type: string + format: "date-time" + } + aws {"$ref": "#/definitions/aws"} + google {"$ref": "#/definitions/google"} + azure {"$ref": "#/definitions/azure"} + } + } + } +} \ No newline at end of file diff --git a/apiserver/services/storage.py b/apiserver/services/storage.py new file mode 100644 index 0000000..d20b362 --- /dev/null +++ b/apiserver/services/storage.py @@ -0,0 +1,22 @@ +from apiserver.apimodels.storage import ResetSettingsRequest, SetSettingsRequest +from apiserver.bll.storage import StorageBLL +from apiserver.service_repo import endpoint, APICall + +storage_bll = StorageBLL() + + +@endpoint("storage.get_settings") +def get_settings(call: APICall, company: str, _): + call.result.data = {"settings": storage_bll.get_company_settings(company)} + + +@endpoint("storage.set_settings") +def set_settings(call: APICall, company: str, request: SetSettingsRequest): + call.result.data = {"updated": storage_bll.set_company_settings(company, request)} + + +@endpoint("storage.reset_settings") +def reset_settings(call: APICall, company: str, request: ResetSettingsRequest): + call.result.data = { + "updated": storage_bll.reset_company_settings(company, request.keys) + } diff --git a/apiserver/tests/automated/test_serving.py b/apiserver/tests/automated/test_serving.py index 76f9dd4..b48ab15 100644 --- a/apiserver/tests/automated/test_serving.py +++ b/apiserver/tests/automated/test_serving.py @@ -26,6 +26,7 @@ class TestServing(TestService): for container_id in (container_id1, container_id2) ] + # registering instances for container_info in container_infos: self.api.serving.register_container( **container_info, @@ -39,63 +40,72 @@ class TestServing(TestService): requests_num=1000 * mul, requests_min=5 * mul, # requests per minute latency_ms=100 * mul, # average latency - machine_stats={ # the same structure here as used by worker status_reports + machine_stats={ # the same structure here as used by worker status_reports "cpu_usage": [10, 20], "memory_used": 50, - } + }, ) - endpoints = self.api.serving.get_endpoints().endpoints - details = self.api.serving.get_endpoint_details(endpoint_url=url) - details = self.api.serving.get_endpoint_details(endpoint_url=url) + # getting endpoints and endpoint details + endpoints = self.api.serving.get_endpoints().endpoints + self.assertTrue(any(e for e in endpoints if e.url == url)) + details = self.api.serving.get_endpoint_details(endpoint_url=url) + self.assertEqual(details.url, url) + self.assertEqual(details.uptime_sec, 2000) + self.assertEqual( + { + inst.id: [ + inst[field] + for field in ( + "uptime_sec", + "requests", + "requests_min", + "latency_ms", + ) + ] + for inst in details.instances + }, + {"container_1": [1000, 1000, 5, 100], "container_2": [2000, 2000, 10, 200]}, + ) + # make sure that the first call did not invalidate anything + new_details = self.api.serving.get_endpoint_details(endpoint_url=url) + self.assertEqual(details, new_details) + + # charts sleep(5) # give time to ES to accomodate data to_date = int(time()) + 40 from_date = to_date - 100 - res1 = self.api.serving.get_endpoint_metrics_history( - endpoint_url=url, - from_date=from_date, - to_date=to_date, - interval=1, - ) - res2 = self.api.serving.get_endpoint_metrics_history( - endpoint_url=url, - from_date=from_date, - to_date=to_date, - interval=1, - metric_type="requests_min", - ) - res3 = self.api.serving.get_endpoint_metrics_history( - endpoint_url=url, - from_date=from_date, - to_date=to_date, - interval=1, - metric_type="latency_ms", - ) - res4 = self.api.serving.get_endpoint_metrics_history( - endpoint_url=url, - from_date=from_date, - to_date=to_date, - interval=1, - metric_type="cpu_count", - ) - res5 = self.api.serving.get_endpoint_metrics_history( - endpoint_url=url, - from_date=from_date, - to_date=to_date, - interval=1, - metric_type="cpu_util", - ) - res6 = self.api.serving.get_endpoint_metrics_history( - endpoint_url=url, - from_date=from_date, - to_date=to_date, - interval=1, - metric_type="ram_total", - ) + for metric_type, title, value in ( + (None, "Number of Requests", 3000), + ("requests_min", "Requests per Minute", 15), + ("latency_ms", "Average Latency (ms)", 150), + ("cpu_count", "CPU Count", 4), + ("cpu_util", "Average CPU Load (%)", 15), + ("ram_total", "RAM Total (GB)", 100), + ): + res = self.api.serving.get_endpoint_metrics_history( + endpoint_url=url, + from_date=from_date, + to_date=to_date, + interval=1, + **({"metric_type": metric_type} if metric_type else {}), + ) + self.assertEqual(res.computed_interval, 40) + self.assertEqual(res.total.title, title) + length = len(res.total.dates) + self.assertTrue(3>=length>=1) + self.assertEqual(len(res.total["values"]), length) + self.assertIn(value, res.total["values"]) + self.assertEqual(set(res.instances), {container_id1, container_id2}) + for inst in res.instances.values(): + self.assertEqual(inst.dates, res.total.dates) + self.assertEqual(len(inst["values"]), length) + # unregistering containers for container_id in (container_id1, container_id2): self.api.serving.unregister_container(container_id=container_id) endpoints = self.api.serving.get_endpoints().endpoints + self.assertFalse(any(e for e in endpoints if e.url == url)) + with self.api.raises(errors.bad_request.NoContainersForUrl): - details = self.api.serving.get_endpoint_details(endpoint_url=url) - pass + self.api.serving.get_endpoint_details(endpoint_url=url)