Move apiserver to clearml

This commit is contained in:
allegroai 2021-05-03 17:26:44 +03:00
parent cb3a7c90a8
commit 4b11a6efcd
22 changed files with 152 additions and 151 deletions

View File

@ -31,3 +31,4 @@ class GetSupportedModesResponse(Base):
server_errors = EmbeddedField(ServerErrors)
sso = DictField([str, type(None)])
sso_providers = ListField([dict])
authenticated = BoolField(default=False)

View File

@ -4,8 +4,8 @@ Module for polymorphism over different types of X axes in scalar aggregations
from abc import ABC, abstractmethod
from enum import auto
from apiserver.utilities import extract_properties_to_lists
from apiserver.utilities.stringenum import StringEnum
from apiserver.bll.util import extract_properties_to_lists
from apiserver.config_repo import config
log = config.logger(__file__)

View File

@ -45,7 +45,7 @@ class StatisticsReporter:
def start_reporter(cls):
"""
Periodically send statistics reports for companies who have opted in.
Note: in trains we usually have only a single company
Note: in clearml we usually have only a single company
"""
if not cls.supported:
return

View File

@ -6,8 +6,9 @@ from functools import reduce
from os import getenv
from os.path import expandvars
from pathlib import Path
from typing import List, Any, TypeVar
from typing import List, Any, TypeVar, Sequence
from boltons.iterutils import first
from pyhocon import ConfigTree, ConfigFactory
from pyparsing import (
ParseFatalException,
@ -18,8 +19,8 @@ from pyparsing import (
from apiserver.utilities import json
EXTRA_CONFIG_PATHS = ("/opt/trains/config",)
EXTRA_CONFIG_PATH_OVERRIDE_VAR = "TRAINS_CONFIG_DIR"
EXTRA_CONFIG_PATHS = ("/opt/clearml/config",)
DEFAULT_PREFIXES = ("clearml", "trains")
EXTRA_CONFIG_PATH_SEP = ":" if platform.system() != "Windows" else ";"
@ -30,7 +31,10 @@ class BasicConfig:
default_config_dir = "default"
def __init__(
self, folder: str = None, verbose: bool = True, prefix: str = "trains"
self,
folder: str = None,
verbose: bool = True,
prefix: Sequence[str] = DEFAULT_PREFIXES,
):
folder = (
Path(folder)
@ -41,8 +45,16 @@ class BasicConfig:
raise ValueError("Invalid configuration folder")
self.verbose = verbose
self.prefix = prefix
self.extra_config_values_env_key_prefix = f"{self.prefix.upper()}__"
self.extra_config_path_override_var = [
f"{p.upper()}_CONFIG_DIR" for p in prefix
]
self.prefix = prefix[0]
self.extra_config_values_env_key_prefix = [
f"{p.upper()}{self.extra_config_values_env_key_sep}"
for p in reversed(prefix)
]
self._paths = [folder, *self._get_paths()]
self._config = self._reload()
@ -73,24 +85,24 @@ class BasicConfig:
def _read_extra_env_config_values(self) -> ConfigTree:
""" Loads extra configuration from environment-injected values """
result = ConfigTree()
prefix = self.extra_config_values_env_key_prefix
keys = sorted(k for k in os.environ if k.startswith(prefix))
for key in keys:
path = (
key[len(prefix) :]
.replace(self.extra_config_values_env_key_sep, ".")
.lower()
)
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
)
for prefix in self.extra_config_values_env_key_prefix:
keys = sorted(k for k in os.environ if k.startswith(prefix))
for key in keys:
path = (
key[len(prefix) :]
.replace(self.extra_config_values_env_key_sep, ".")
.lower()
)
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
)
return result
def _get_paths(self) -> List[Path]:
default_paths = EXTRA_CONFIG_PATH_SEP.join(EXTRA_CONFIG_PATHS)
value = getenv(EXTRA_CONFIG_PATH_OVERRIDE_VAR, default_paths)
value = first(map(getenv, self.extra_config_path_override_var), default_paths)
paths = [
Path(expandvars(v)).expanduser() for v in value.split(EXTRA_CONFIG_PATH_SEP)
@ -100,7 +112,7 @@ class BasicConfig:
invalid = [path for path in paths if not path.is_dir()]
if invalid:
print(
f"WARNING: Invalid paths in {EXTRA_CONFIG_PATH_OVERRIDE_VAR} env var: {' '.join(map(str, invalid))}"
f"WARNING: Invalid paths in {self.extra_config_path_override_var} env var: {' '.join(map(str, invalid))}"
)
return [path for path in paths if path.is_dir()]

View File

@ -2,6 +2,8 @@ from functools import lru_cache
from os import getenv
from pathlib import Path
from boltons.iterutils import first
from apiserver.config_repo import config
from apiserver.version import __version__
@ -9,7 +11,9 @@ root = Path(__file__).parent.parent
def _get(prop_name, env_suffix=None, default=""):
value = getenv(f"TRAINS_SERVER_{env_suffix or prop_name}")
suffix = env_suffix or prop_name
keys = [f"{p}_SERVER_{suffix}" for p in ("CLEARML", "TRAINS")]
value = first(map(getenv, keys))
if value:
return value

View File

@ -17,11 +17,16 @@ log = config.logger("database")
strict = config.get("apiserver.mongo.strict", True)
OVERRIDE_HOST_ENV_KEY = (
"CLEARML_MONGODB_SERVICE_HOST",
"TRAINS_MONGODB_SERVICE_HOST",
"MONGODB_SERVICE_HOST",
"MONGODB_SERVICE_SERVICE_HOST",
)
OVERRIDE_PORT_ENV_KEY = ("TRAINS_MONGODB_SERVICE_PORT", "MONGODB_SERVICE_PORT")
OVERRIDE_PORT_ENV_KEY = (
"CLEARML_MONGODB_SERVICE_PORT",
"TRAINS_MONGODB_SERVICE_PORT",
"MONGODB_SERVICE_PORT",
)
class DatabaseEntry(models.Base):
@ -70,7 +75,9 @@ class DatabaseFactory:
except ValidationError as ex:
raise Exception("Invalid database entry `%s`: %s" % (key, ex.args[0]))
if missing:
raise ValueError("Missing database configuration for %s" % ", ".join(missing))
raise ValueError(
"Missing database configuration for %s" % ", ".join(missing)
)
@classmethod
def get_entries(cls):

View File

@ -9,11 +9,16 @@ from apiserver.config_repo import config
log = config.logger(__file__)
OVERRIDE_HOST_ENV_KEY = (
"CLEARML_ELASTIC_SERVICE_HOST",
"TRAINS_ELASTIC_SERVICE_HOST",
"ELASTIC_SERVICE_HOST",
"ELASTIC_SERVICE_SERVICE_HOST",
)
OVERRIDE_PORT_ENV_KEY = ("TRAINS_ELASTIC_SERVICE_PORT", "ELASTIC_SERVICE_PORT")
OVERRIDE_PORT_ENV_KEY = (
"CLEARML_ELASTIC_SERVICE_PORT",
"TRAINS_ELASTIC_SERVICE_PORT",
"ELASTIC_SERVICE_PORT",
)
OVERRIDE_HOST = first(filter(None, map(getenv, OVERRIDE_HOST_ENV_KEY)))
if OVERRIDE_HOST:
@ -120,7 +125,9 @@ class ESFactory:
@classmethod
def get_es_timestamp_str(cls):
now = datetime.utcnow()
return now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
return (
now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z"
)
es_factory = ESFactory()

View File

@ -11,8 +11,16 @@ from apiserver.config_repo import config
log = config.logger(__file__)
OVERRIDE_HOST_ENV_KEY = ("TRAINS_REDIS_SERVICE_HOST", "REDIS_SERVICE_HOST")
OVERRIDE_PORT_ENV_KEY = ("TRAINS_REDIS_SERVICE_PORT", "REDIS_SERVICE_PORT")
OVERRIDE_HOST_ENV_KEY = (
"CLEARML_REDIS_SERVICE_HOST",
"TRAINS_REDIS_SERVICE_HOST",
"REDIS_SERVICE_HOST",
)
OVERRIDE_PORT_ENV_KEY = (
"CLEARML_REDIS_SERVICE_PORT",
"TRAINS_REDIS_SERVICE_PORT",
"REDIS_SERVICE_PORT",
)
OVERRIDE_HOST = first(filter(None, map(getenv, OVERRIDE_HOST_ENV_KEY)))
if OVERRIDE_HOST:

View File

@ -85,6 +85,10 @@ supported_modes {
}
}
}
authenticated {
description: "Is user authenticated"
type: boolean
}
}
}
}

View File

@ -1,6 +1,6 @@
from typing import Text, Sequence, Callable, Union, Type
from funcsigs import signature
from inspect import signature
from jsonmodels import models
from .apicall import APICall, APICallResult

View File

@ -5,6 +5,7 @@ from typing import Type, Optional, Union, Tuple
import attr
from jsonmodels import models
from requests.structures import CaseInsensitiveDict
from six import string_types
from apiserver import database
@ -313,6 +314,13 @@ class APICall(DataContainer):
def HEADER_TRANSACTION(self):
return self._transaction_headers[0]
_client_headers = _get_headers("Client")
""" Client """
@property
def HEADER_CLIENT(self):
return self._client_headers[0]
_worker_headers = _get_headers("Worker")
""" Worker (machine) ID """
@ -366,7 +374,7 @@ class APICall(DataContainer):
assert isinstance(endpoint_version, PartialVersion), endpoint_version
self._requested_endpoint_version = endpoint_version
self._actual_endpoint_version = None
self._headers = {}
self._headers = CaseInsensitiveDict()
self._kpis = {}
self._log_api = True
if headers:
@ -420,7 +428,7 @@ class APICall(DataContainer):
:param header: Header name options (more than on supported, all will be cleared)
"""
for value in header if isinstance(header, (tuple, list)) else (header,):
self.headers.pop(value, None)
self._headers.pop(value, None)
def set_header(self, header, value):
"""
@ -514,7 +522,7 @@ class APICall(DataContainer):
@property
def headers(self):
return self._headers
return dict(self._headers.items())
@property
def kpis(self):
@ -532,6 +540,10 @@ class APICall(DataContainer):
def trx(self, value):
self.set_header(self._transaction_headers, value)
@property
def client(self):
return self.get_header(self._client_headers)
@property
def worker(self):
return self.get_worker(default="<unknown>")

View File

@ -3,4 +3,4 @@ from apiserver.service_repo import APICall, endpoint
@endpoint("debug.ping")
def ping(call: APICall, _, __):
call.result.data = {"msg": "Because it trains cats and dogs"}
call.result.data = {"msg": "ClearML server"}

View File

@ -13,10 +13,10 @@ from apiserver.apimodels.queues import (
QueueMetrics,
)
from apiserver.bll.queue import QueueBLL
from apiserver.bll.util import extract_properties_to_lists
from apiserver.bll.workers import WorkerBLL
from apiserver.service_repo import APICall, endpoint
from apiserver.services.utils import conform_tag_fields, conform_output_tags, conform_tags
from apiserver.utilities import extract_properties_to_lists
worker_bll = WorkerBLL()
queue_bll = QueueBLL(worker_bll)

View File

@ -23,10 +23,10 @@ from apiserver.apimodels.workers import (
GetActivityReportResponse,
ActivityReportSeries,
)
from apiserver.bll.util import extract_properties_to_lists
from apiserver.bll.workers import WorkerBLL
from apiserver.config_repo import config
from apiserver.service_repo import APICall, endpoint
from apiserver.utilities import extract_properties_to_lists
log = config.logger(__file__)

View File

@ -154,7 +154,7 @@ class APIClient:
self.http_session.mount("https://", adapter)
if impersonated_user_id:
self.http_session.headers["X-Trains-Impersonate-As"] = impersonated_user_id
self.http_session.headers["X-ClearML-Impersonate-As"] = impersonated_user_id
if not self.session_token:
self.login()
@ -211,7 +211,7 @@ class APIClient:
headers = {"Content-Type": "application/json"}
headers.update(headers_overrides)
if is_async:
headers["X-Trains-Async"] = "1"
headers["X-ClearML-Async"] = "1"
if not isinstance(data, six.string_types):
data = json.dumps(data)
@ -241,7 +241,7 @@ class APIClient:
call_id = res.meta.call_id
async_res_url = "%s/async.result?id=%s" % (self.base_url, call_id)
async_res_headers = headers.copy()
async_res_headers.pop("X-Trains-Async")
async_res_headers.pop("X-ClearML-Async")
while not got_result:
log.info("Got 202. Checking async result for %s (%s)" % (url, call_id))
http_res = self.http_session.get(

View File

@ -37,12 +37,12 @@ class CheckUpdatesThread(Thread):
@property
def component_name(self) -> str:
return config.get("apiserver.check_for_updates.component_name", "trains-server")
return config.get("apiserver.check_for_updates.component_name", "clearml-server")
def _check_new_version_available(self) -> Optional[_VersionResponse]:
url = config.get(
"apiserver.check_for_updates.url",
"https://updates.trains.allegro.ai/updates",
"https://updates.clear.ml/updates",
)
uid = Settings.get_by_key("server.uuid")

View File

@ -1,2 +1,28 @@
from operator import itemgetter
from typing import Sequence, Optional, Callable, Tuple
def strict_map(*args, **kwargs):
return list(map(*args, **kwargs))
def extract_properties_to_lists(
key_names: Sequence[str],
data: Sequence[dict],
extract_func: Optional[Callable[[dict], Tuple]] = None,
) -> dict:
"""
Given a list of dictionaries and names of dictionary keys
builds a dictionary with the requested keys and values lists
For the empty list return the dictionary of empty lists
:param key_names: names of the keys in the resulting dictionary
:param data: sequence of dictionaries to extract values from
:param extract_func: the optional callable that extracts properties
from a dictionary and put them in a tuple in the order corresponding to
key_names. If not specified then properties are extracted according to key_names
"""
if not data:
return {k: [] for k in key_names}
value_sequences = zip(*map(extract_func or itemgetter(*key_names), data))
return dict(zip(key_names, map(list, value_sequences)))

View File

@ -1,85 +0,0 @@
version: "3.6"
services:
trainsserver:
command:
- -c
- "echo \"#!/bin/bash\" > /opt/trains/all.sh && echo \"/opt/trains/wrapper.sh webserver&\" >> /opt/trains/all.sh && echo \"/opt/trains/wrapper.sh fileserver&\" >> /opt/trains/all.sh && echo \"/opt/trains/wrapper.sh apiserver\" >> /opt/trains/all.sh && cat /opt/trains/all.sh && chmod +x /opt/trains/all.sh && /opt/trains/all.sh"
entrypoint: /bin/bash
container_name: trains-server
image: allegroai/trains:latest
ports:
- 8008:8008
- 8080:80
- 8081:8081
restart: unless-stopped
volumes:
- /opt/trains/logs:/var/log/trains
- /opt/trains/data/fileserver:/mnt/fileserver
- /opt/trains/config:/opt/trains/config
depends_on:
- redis
- mongo
- elasticsearch
environment:
TRAINS_ELASTIC_SERVICE_HOST: elasticsearch
TRAINS_ELASTIC_SERVICE_PORT: 9200
TRAINS_MONGODB_SERVICE_HOST: mongo
TRAINS_MONGODB_SERVICE_PORT: 27017
TRAINS_REDIS_SERVICE_HOST: redis
TRAINS_REDIS_SERVICE_PORT: 6379
networks:
- backend
elasticsearch:
networks:
- backend
container_name: trains-elastic
environment:
ES_JAVA_OPTS: -Xms2g -Xmx2g
bootstrap.memory_lock: "true"
cluster.name: trains
cluster.routing.allocation.node_initial_primaries_recoveries: "500"
cluster.routing.allocation.disk.watermark.low: 10gb
cluster.routing.allocation.disk.watermark.high: 10gb
cluster.routing.allocation.disk.watermark.flood_stage: 10gb
discovery.zen.minimum_master_nodes: "1"
discovery.type: "single-node"
http.compression_level: "7"
node.ingest: "true"
node.name: trains
reindex.remote.whitelist: '*.*'
xpack.monitoring.enabled: "false"
xpack.security.enabled: "false"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2
restart: unless-stopped
volumes:
- /opt/trains/data/elastic_7:/usr/share/elasticsearch/data
mongo:
networks:
- backend
container_name: trains-mongo
image: mongo:3.6.5
restart: unless-stopped
command: --setParameter internalQueryExecMaxBlockingSortBytes=196100200
volumes:
- /opt/trains/data/mongo/db:/data/db
- /opt/trains/data/mongo/configdb:/data/configdb
redis:
networks:
- backend
container_name: trains-redis
image: redis:5.0
restart: unless-stopped
volumes:
- /opt/trains/data/redis:/data
networks:
backend:
driver: bridge

View File

@ -39,9 +39,9 @@ services:
bootstrap.memory_lock: "true"
cluster.name: trains
cluster.routing.allocation.node_initial_primaries_recoveries: "500"
cluster.routing.allocation.disk.watermark.low: 10gb
cluster.routing.allocation.disk.watermark.high: 10gb
cluster.routing.allocation.disk.watermark.flood_stage: 10gb
cluster.routing.allocation.disk.watermark.low: 500mb
cluster.routing.allocation.disk.watermark.high: 500mb
cluster.routing.allocation.disk.watermark.flood_stage: 500mb
discovery.zen.minimum_master_nodes: "1"
discovery.type: "single-node"
http.compression_level: "7"

View File

@ -42,9 +42,9 @@ services:
bootstrap.memory_lock: "true"
cluster.name: trains
cluster.routing.allocation.node_initial_primaries_recoveries: "500"
cluster.routing.allocation.disk.watermark.low: 10gb
cluster.routing.allocation.disk.watermark.high: 10gb
cluster.routing.allocation.disk.watermark.flood_stage: 10gb
cluster.routing.allocation.disk.watermark.low: 500mb
cluster.routing.allocation.disk.watermark.high: 500mb
cluster.routing.allocation.disk.watermark.flood_stage: 500mb
discovery.zen.minimum_master_nodes: "1"
discovery.type: "single-node"
http.compression_level: "7"

View File

@ -5,6 +5,7 @@ from os import getenv
from os.path import expandvars
from pathlib import Path
from boltons.iterutils import first
from pyhocon import ConfigTree, ConfigFactory
from pyparsing import (
ParseFatalException,
@ -13,12 +14,10 @@ from pyparsing import (
ParseSyntaxException,
)
DEFAULT_EXTRA_CONFIG_PATH = "/opt/trains/config"
EXTRA_CONFIG_PATH_ENV_KEY = "TRAINS_CONFIG_DIR"
DEFAULT_EXTRA_CONFIG_PATH = "/opt/clearml/config"
PREFIXES = ("CLEARML", "TRAINS")
EXTRA_CONFIG_PATH_SEP = ":"
EXTRA_CONFIG_VALUES_ENV_KEY_SEP = "__"
EXTRA_CONFIG_VALUES_ENV_KEY_PREFIX = f"TRAINS{EXTRA_CONFIG_VALUES_ENV_KEY_SEP}"
class BasicConfig:
@ -29,7 +28,15 @@ class BasicConfig:
if not self.folder.is_dir():
raise ValueError("Invalid configuration folder")
self.prefix = "trains"
self.extra_config_path_env_key = [
f"{p.upper()}_CONFIG_DIR" for p in PREFIXES
]
self.prefix = PREFIXES[0]
self.extra_config_values_env_key_prefix = [
f"{p.upper()}{EXTRA_CONFIG_VALUES_ENV_KEY_SEP}"
for p in reversed(PREFIXES)
]
self._load()
@ -50,24 +57,22 @@ class BasicConfig:
path = ".".join((self.prefix, Path(name).stem))
return logging.getLogger(path)
@staticmethod
def _read_extra_env_config_values():
def _read_extra_env_config_values(self):
""" Loads extra configuration from environment-injected values """
result = ConfigTree()
prefix = EXTRA_CONFIG_VALUES_ENV_KEY_PREFIX
keys = sorted(k for k in os.environ if k.startswith(prefix))
for key in keys:
path = key[len(prefix) :].replace(EXTRA_CONFIG_VALUES_ENV_KEY_SEP, ".").lower()
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
)
for prefix in self.extra_config_values_env_key_prefix:
keys = sorted(k for k in os.environ if k.startswith(prefix))
for key in keys:
path = key[len(prefix) :].replace(EXTRA_CONFIG_VALUES_ENV_KEY_SEP, ".").lower()
result = ConfigTree.merge_configs(
result, ConfigFactory.parse_string(f"{path}: {os.environ[key]}")
)
return result
@staticmethod
def _read_env_paths(key):
value = getenv(EXTRA_CONFIG_PATH_ENV_KEY, DEFAULT_EXTRA_CONFIG_PATH)
def _read_env_paths(self):
value = first(map(getenv, self.extra_config_path_env_key), DEFAULT_EXTRA_CONFIG_PATH)
if value is None:
return
paths = [
@ -79,11 +84,11 @@ class BasicConfig:
if not path.is_dir() and str(path) != DEFAULT_EXTRA_CONFIG_PATH
]
if invalid:
print(f"WARNING: Invalid paths in {key} env var: {' '.join(invalid)}")
print(f"WARNING: Invalid paths in {self.extra_config_path_env_key} env var: {' '.join(invalid)}")
return [path for path in paths if path.is_dir()]
def _load(self, verbose=True):
extra_config_paths = self._read_env_paths(EXTRA_CONFIG_PATH_ENV_KEY) or []
extra_config_paths = self._read_env_paths() or []
extra_config_values = self._read_extra_env_config_values()
configs = [
self._read_recursive(path, verbose=verbose)

View File

@ -16,7 +16,7 @@
backupCount: 3
maxBytes: 10240000,
class: "logging.handlers.RotatingFileHandler",
filename: "/var/log/trains/fileserver.log"
filename: "/var/log/clearml/fileserver.log"
}
}
root {