mirror of
https://github.com/clearml/clearml-server
synced 2025-06-26 23:15:47 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b93591ec32 | ||
|
|
0abfd8da0d | ||
|
|
a9cc4e36c6 | ||
|
|
fe1c963eec | ||
|
|
111d80e88d | ||
|
|
6718862dbe | ||
|
|
0fe1bf8a61 | ||
|
|
10f326eda9 | ||
|
|
cd0d6c1a3d | ||
|
|
3205f2df97 |
@@ -11,6 +11,12 @@
|
||||
|
||||
## :rocket: Trains-Agent Services is now included, for more information see [services](https://github.com/allegroai/trains-server#services)
|
||||
|
||||
## v0.16 Upgrade Notice
|
||||
|
||||
In v0.16, the Elasticsearch subsystem of Trains Server has been upgraded from version 5.6 to version 7.6. This change necessitates the migration of the database contents to accommodate the change in index structure across the different versions.
|
||||
|
||||
Follow [this procedure](https://allegro.ai/docs/deploying_trains/trains_server_es7_migration/) to migrate existing data.
|
||||
|
||||
## Introduction
|
||||
|
||||
The **trains-server** is the backend service infrastructure for [Trains](https://github.com/allegroai/trains).
|
||||
|
||||
@@ -9,6 +9,7 @@ from jsonmodels.validators import Length, Min, Max
|
||||
from apimodels import ListField, IntField, ActualEnumField
|
||||
from bll.event.event_metrics import EventType
|
||||
from bll.event.scalar_key import ScalarKeyEnum
|
||||
from config import config
|
||||
from utilities.stringenum import StringEnum
|
||||
|
||||
|
||||
@@ -23,7 +24,15 @@ class ScalarMetricsIterHistogramRequest(HistogramRequestBase):
|
||||
|
||||
class MultiTaskScalarMetricsIterHistogramRequest(HistogramRequestBase):
|
||||
tasks: Sequence[str] = ListField(
|
||||
items_types=str, validators=[Length(minimum_value=1, maximum_value=10)]
|
||||
items_types=str,
|
||||
validators=[
|
||||
Length(
|
||||
minimum_value=1,
|
||||
maximum_value=config.get(
|
||||
"services.tasks.multi_task_histogram_limit", 10
|
||||
),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import itertools
|
||||
import math
|
||||
from collections import defaultdict
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
@@ -252,11 +253,14 @@ class EventMetrics:
|
||||
|
||||
min_index = safe_get(data, "min_index/value", default=0)
|
||||
max_index = safe_get(data, "max_index/value", default=min_index)
|
||||
index_range = max_index - min_index + 1
|
||||
interval = max(1, math.ceil(float(index_range) / samples))
|
||||
max_samples = math.ceil(float(index_range) / interval)
|
||||
return (
|
||||
metric,
|
||||
variant,
|
||||
max(1, int(max_index - min_index + 1) // samples),
|
||||
samples,
|
||||
interval,
|
||||
max_samples,
|
||||
)
|
||||
|
||||
MetricData = Tuple[str, dict]
|
||||
|
||||
@@ -93,21 +93,22 @@ class LogEventsIterator:
|
||||
},
|
||||
}
|
||||
es_result = self.es.search(index=es_index, body=es_req)
|
||||
hits = es_result["hits"]["hits"]
|
||||
if not hits or len(hits) < 2:
|
||||
last_second_hits = es_result["hits"]["hits"]
|
||||
if not last_second_hits or len(last_second_hits) < 2:
|
||||
# if only one element is returned for the last timestamp
|
||||
# then it is already present in the events
|
||||
return events, hits_total
|
||||
|
||||
last_events = [hit["_source"] for hit in es_result["hits"]["hits"]]
|
||||
already_present_ids = set(ev["_id"] for ev in events)
|
||||
already_present_ids = set(hit["_id"] for hit in hits)
|
||||
last_second_events = [
|
||||
hit["_source"]
|
||||
for hit in last_second_hits
|
||||
if hit["_id"] not in already_present_ids
|
||||
]
|
||||
|
||||
# return the list merged from original query results +
|
||||
# leftovers from the last timestamp
|
||||
return (
|
||||
[
|
||||
*events,
|
||||
*(ev for ev in last_events if ev["_id"] not in already_present_ids),
|
||||
],
|
||||
[*events, *last_second_events],
|
||||
hits_total,
|
||||
)
|
||||
|
||||
@@ -11,4 +11,6 @@ non_responsive_tasks_watchdog {
|
||||
artifacts {
|
||||
update_attempts: 10
|
||||
update_retry_msec: 500
|
||||
}
|
||||
}
|
||||
|
||||
multi_task_histogram_limit: 100
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import logging
|
||||
from time import sleep
|
||||
from typing import Type, Optional, Sequence, Any, Union
|
||||
|
||||
import urllib3.exceptions
|
||||
from elasticsearch import Elasticsearch, exceptions
|
||||
|
||||
import es_factory
|
||||
@@ -25,6 +28,40 @@ class ElasticConnectionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ConnectionErrorFilter(logging.Filter):
|
||||
def __init__(
|
||||
self,
|
||||
level: Optional[Union[int, str]] = None,
|
||||
err_type: Optional[Type] = None,
|
||||
args_prefix: Optional[Sequence[Any]] = None,
|
||||
):
|
||||
super(ConnectionErrorFilter, self).__init__()
|
||||
if level is None:
|
||||
self.level = None
|
||||
else:
|
||||
try:
|
||||
self.level = int(level)
|
||||
except ValueError:
|
||||
self.level = logging.getLevelName(level)
|
||||
|
||||
self.err_type = err_type
|
||||
self.args = args_prefix and tuple(args_prefix)
|
||||
self.last_blocked = None
|
||||
|
||||
def filter(self, record):
|
||||
try:
|
||||
allow = (
|
||||
(self.err_type is None or record.exc_info[0] != self.err_type)
|
||||
and (self.level is None or record.levelno != self.level)
|
||||
and (self.args is None or record.args[: len(self.args)] != self.args)
|
||||
)
|
||||
if not allow:
|
||||
self.last_blocked = record
|
||||
return allow
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def check_elastic_empty() -> bool:
|
||||
"""
|
||||
Check for elasticsearch connection
|
||||
@@ -35,20 +72,32 @@ def check_elastic_empty() -> bool:
|
||||
cluster_conf = es_factory.get_cluster_config("events")
|
||||
max_retries = config.get("apiserver.elastic.probing.max_retries", 4)
|
||||
timeout = config.get("apiserver.elastic.probing.timeout", 30)
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
|
||||
return not es.indices.get_template(name="events*")
|
||||
except exceptions.NotFoundError as ex:
|
||||
log.error(ex)
|
||||
return True
|
||||
except exceptions.ConnectionError:
|
||||
if retry >= max_retries - 1:
|
||||
raise ElasticConnectionError()
|
||||
log.warn(
|
||||
f"Could not connect to es server. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
|
||||
)
|
||||
sleep(timeout)
|
||||
|
||||
es_logger = logging.getLogger("elasticsearch")
|
||||
log_filter = ConnectionErrorFilter(
|
||||
err_type=urllib3.exceptions.NewConnectionError, args_prefix=("GET",)
|
||||
)
|
||||
|
||||
try:
|
||||
es_logger.addFilter(log_filter)
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
es = Elasticsearch(hosts=cluster_conf.get("hosts"))
|
||||
return not es.indices.get_template(name="events*")
|
||||
except exceptions.NotFoundError as ex:
|
||||
log.error(ex)
|
||||
return True
|
||||
except exceptions.ConnectionError as ex:
|
||||
if retry >= max_retries - 1:
|
||||
raise ElasticConnectionError(
|
||||
f"Error connecting to Elasticsearch: {str(ex)}"
|
||||
)
|
||||
log.warn(
|
||||
f"Could not connect to ElasticSearch Service. Retry {retry+1} of {max_retries}. Waiting for {timeout}sec"
|
||||
)
|
||||
sleep(timeout)
|
||||
finally:
|
||||
es_logger.removeFilter(log_filter)
|
||||
|
||||
|
||||
def init_es_data():
|
||||
|
||||
@@ -57,6 +57,10 @@ class PrePopulate:
|
||||
metadata_filename = "metadata.json"
|
||||
zip_args = dict(mode="w", compression=ZIP_BZIP2)
|
||||
artifacts_ext = ".artifacts"
|
||||
img_source_regex = re.compile(
|
||||
r"['\"]source['\"]:\s?['\"](https?://(?:localhost:8081|files.*?)/.*?)['\"]",
|
||||
flags=re.IGNORECASE,
|
||||
)
|
||||
|
||||
class JsonLinesWriter:
|
||||
def __init__(self, file: BinaryIO):
|
||||
@@ -360,6 +364,8 @@ class PrePopulate:
|
||||
@classmethod
|
||||
def update_featured_projects_order(cls):
|
||||
featured_order = config.get("services.projects.featured_order", [])
|
||||
if not featured_order:
|
||||
return
|
||||
|
||||
def get_index(p: Project):
|
||||
for index, entry in enumerate(featured_order):
|
||||
@@ -502,11 +508,19 @@ class PrePopulate:
|
||||
break
|
||||
scroll_id = res.next_scroll_id
|
||||
for event in res.events:
|
||||
if event.get("type") == "training_debug_image":
|
||||
event_type = event.get("type")
|
||||
if event_type == "training_debug_image":
|
||||
url = cls._get_fixed_url(event.get("url"))
|
||||
if url:
|
||||
event["url"] = url
|
||||
artifacts.append(url)
|
||||
elif event_type == "plot":
|
||||
plot_str: str = event.get("plot_str", "")
|
||||
for match in cls.img_source_regex.findall(plot_str):
|
||||
url = cls._get_fixed_url(match)
|
||||
if match != url:
|
||||
plot_str = plot_str.replace(match, url)
|
||||
artifacts.append(url)
|
||||
w.write(json.dumps(event))
|
||||
data = f.getvalue()
|
||||
hash_.update(data)
|
||||
|
||||
@@ -59,7 +59,15 @@ def _ensure_backend_user(user_id: str, company_id: str, user_name: str):
|
||||
|
||||
|
||||
def ensure_fixed_user(user: FixedUser, log: Logger):
|
||||
if User.objects(company=user.company, id=user.user_id).first():
|
||||
db_user = User.objects(company=user.company, id=user.user_id).first()
|
||||
if db_user:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
log.info(f"Updating user name: {user.name}")
|
||||
given_name, _, family_name = user.name.partition(" ")
|
||||
db_user.update(name=user.name, given_name=given_name, family_name=family_name)
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
|
||||
data = attr.asdict(user)
|
||||
|
||||
11
server/mongo/migrations/0.16.1.py
Normal file
11
server/mongo/migrations/0.16.1.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from pymongo.database import Database, Collection
|
||||
|
||||
|
||||
def migrate_backend(db: Database):
|
||||
collection: Collection = db["project"]
|
||||
featured = "featured"
|
||||
query = {featured: {"$exists": False}}
|
||||
for doc in collection.find(filter=query, projection=()):
|
||||
collection.update_one(
|
||||
{"_id": doc["_id"]}, {"$set": {featured: 9999}},
|
||||
)
|
||||
@@ -57,21 +57,26 @@ with distributed_lock(key, timeout=config.get("apiserver.db_init_timout", 120)):
|
||||
info.es_connection_error = True
|
||||
|
||||
empty_db = check_mongo_empty()
|
||||
if upgrade_monitoring:
|
||||
if not empty_db and (info.es_connection_error or empty_es):
|
||||
if get_last_server_version() < Version("0.16.0"):
|
||||
log.info(f"ES database seems not migrated")
|
||||
info.missed_es_upgrade = True
|
||||
proceed_with_init = not (info.es_connection_error or info.missed_es_upgrade)
|
||||
else:
|
||||
proceed_with_init = True
|
||||
if (
|
||||
upgrade_monitoring
|
||||
and not empty_db
|
||||
and (info.es_connection_error or empty_es)
|
||||
and get_last_server_version() < Version("0.16.0")
|
||||
):
|
||||
log.info(f"ES database seems not migrated")
|
||||
info.missed_es_upgrade = True
|
||||
|
||||
if proceed_with_init:
|
||||
if info.es_connection_error and not info.missed_es_upgrade:
|
||||
raise Exception(
|
||||
"Error starting server: failed connecting to ElasticSearch service"
|
||||
)
|
||||
|
||||
if not info.missed_es_upgrade:
|
||||
init_es_data()
|
||||
init_mongo_data()
|
||||
|
||||
if (
|
||||
proceed_with_init
|
||||
not info.missed_es_upgrade
|
||||
and empty_db
|
||||
and config.get("apiserver.pre_populate.enabled", False)
|
||||
):
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.16.0"
|
||||
__version__ = "0.16.1"
|
||||
|
||||
Reference in New Issue
Block a user