Compare commits

9 Commits

31 changed files with 193 additions and 49 deletions

View File

@@ -17,6 +17,7 @@ class GetDefaultResp(Base):
class CreateRequest(Base):
name = StringField(required=True)
display_name = StringField()
tags = ListField(items_types=[str])
system_tags = ListField(items_types=[str])
metadata = DictField(value_types=[MetadataItem])
@@ -47,6 +48,7 @@ class DeleteRequest(QueueRequest):
class UpdateRequest(QueueRequest):
name = StringField()
display_name = StringField()
tags = ListField(items_types=[str])
system_tags = ListField(items_types=[str])
metadata = DictField(value_types=[MetadataItem])

View File

@@ -86,6 +86,7 @@ class CurrentTaskEntry(IdNameEntry):
class QueueEntry(IdNameEntry):
display_name = StringField()
next_task = EmbeddedField(IdNameEntry)
num_tasks = IntField()

View File

@@ -201,6 +201,8 @@ class EventBLL(object):
invalid_iteration_error = f"Iteration number should not exceed {MAX_LONG}"
for event in events:
x_axis_label = event.pop("x_axis_label", None)
# remove spaces from event type
event_type = event.get("type")
if event_type is None:
@@ -296,6 +298,7 @@ class EventBLL(object):
self._update_last_scalar_events_for_task(
last_events=task_last_scalar_events[task_or_model_id],
event=event,
x_axis_label=x_axis_label,
)
actions.append(es_action)
@@ -431,7 +434,7 @@ class EventBLL(object):
return False
return True
def _update_last_scalar_events_for_task(self, last_events, event):
def _update_last_scalar_events_for_task(self, last_events, event, x_axis_label=None):
"""
Update last_events structure with the provided event details if this event is more
recent than the currently stored event for its metric/variant combination.
@@ -463,6 +466,8 @@ class EventBLL(object):
last_event["value"] = value
last_event["iter"] = event_iter
last_event["timestamp"] = event_timestamp
if x_axis_label is not None:
last_event["x_axis_label"] = x_axis_label
first_value_iter = last_event.get("first_value_iter")
if first_value_iter is None or event_iter < first_value_iter:

View File

@@ -24,6 +24,8 @@ from apiserver.bll.event.scalar_key import ScalarKey, ScalarKeyEnum
from apiserver.bll.query import Builder as QueryBuilder
from apiserver.config_repo import config
from apiserver.database.errors import translate_errors_context
from apiserver.database.model.model import Model
from apiserver.database.model.task.task import Task
from apiserver.utilities.dicts import nested_get
log = config.logger(__file__)
@@ -43,6 +45,7 @@ class EventMetrics:
samples: int,
key: ScalarKeyEnum,
metric_variants: MetricVariants = None,
model_events: bool = False,
) -> dict:
"""
Get scalar metric histogram per metric and variant
@@ -60,6 +63,7 @@ class EventMetrics:
samples=samples,
key=ScalarKey.resolve(key),
metric_variants=metric_variants,
model_events=model_events,
)
def _get_scalar_average_per_iter_core(
@@ -71,6 +75,7 @@ class EventMetrics:
key: ScalarKey,
run_parallel: bool = True,
metric_variants: MetricVariants = None,
model_events: bool = False,
) -> dict:
intervals = self._get_task_metric_intervals(
company_id=company_id,
@@ -102,7 +107,22 @@ class EventMetrics:
)
ret = defaultdict(dict)
if not metrics:
return ret
last_metrics = {}
cls_ = Model if model_events else Task
task = cls_.objects(id=task_id).only("last_metrics").first()
if task and task.last_metrics:
for m_data in task.last_metrics.values():
for v_data in m_data.values():
last_metrics[(v_data.metric, v_data.variant)] = v_data
for metric_key, metric_values in metrics:
for variant_key, data in metric_values.items():
last_metrics_data = last_metrics.get((metric_key, variant_key))
if last_metrics_data and last_metrics_data.x_axis_label is not None:
data["x_axis_label"] = last_metrics_data.x_axis_label
ret[metric_key].update(metric_values)
return ret
@@ -113,6 +133,7 @@ class EventMetrics:
samples,
key: ScalarKeyEnum,
metric_variants: MetricVariants = None,
model_events: bool = False,
):
"""
Compare scalar metrics for different tasks per metric and variant
@@ -136,6 +157,7 @@ class EventMetrics:
key=ScalarKey.resolve(key),
metric_variants=metric_variants,
run_parallel=False,
model_events=model_events,
)
task_ids, company_ids = zip(
*(
@@ -165,7 +187,7 @@ class EventMetrics:
self,
companies: TaskCompanies,
metric_variants: MetricVariants = None,
) -> Mapping[str, dict]:
) -> Mapping[str, Sequence[dict]]:
"""
For the requested tasks return all the events delivered for the single iteration (-2**31)
"""

View File

@@ -170,6 +170,7 @@ class ProjectBLL:
now = datetime.utcnow()
affected = set()
p: Project
for p in filter(None, (old_parent, new_parent)):
p.update(last_update=now)
affected.update({p.id, *(p.path or [])})
@@ -184,6 +185,7 @@ class ProjectBLL:
new_name = fields.pop("name", None)
if new_name:
# noinspection PyTypeChecker
new_name, new_location = _validate_project_name(new_name)
old_name, old_location = _validate_project_name(project.name)
if new_location != old_location:
@@ -823,7 +825,7 @@ class ProjectBLL:
}
def sum_runtime(
a: Mapping[str, Mapping], b: Mapping[str, Mapping]
a: Mapping[str, dict], b: Mapping[str, dict]
) -> Dict[str, dict]:
return {
section: a.get(section, 0) + b.get(section, 0)
@@ -1059,7 +1061,7 @@ class ProjectBLL:
if not parent_ids:
return []
parents = Task.get_many_with_join(
parents: Sequence[dict] = Task.get_many_with_join(
company_id,
query=Q(id__in=parent_ids),
query_dict={"name": name} if name else None,
@@ -1166,7 +1168,7 @@ class ProjectBLL:
if or_conditions:
if len(or_conditions) == 1:
conditions = next(iter(or_conditions))
conditions.update(next(iter(or_conditions)))
else:
conditions["$and"] = [c for c in or_conditions]

View File

@@ -34,6 +34,7 @@ class QueueBLL(object):
def create(
company_id: str,
name: str,
display_name: str = None,
tags: Optional[Sequence[str]] = None,
system_tags: Optional[Sequence[str]] = None,
metadata: Optional[dict] = None,
@@ -46,6 +47,7 @@ class QueueBLL(object):
company=company_id,
created=now,
name=name,
display_name=display_name,
tags=tags or [],
system_tags=system_tags or [],
metadata=metadata,

View File

@@ -270,15 +270,20 @@ class ServingStats:
}
},
}
hist_params = {}
if metric_type == MetricType.requests:
hist_params["min_doc_count"] = 1
else:
hist_params["extended_bounds"] = {
"min": int(from_date) * 1000,
"max": int(to_date) * 1000,
}
aggs = {
"dates": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": f"{interval}s",
"extended_bounds": {
"min": int(from_date) * 1000,
"max": int(to_date) * 1000,
},
**hist_params,
},
"aggs": aggs,
}

View File

@@ -408,7 +408,7 @@ class TaskBLL:
task's last iteration value.
:param last_iteration_max: Last reported iteration. Use this to conditionally set a value only
if the current task's last iteration value is smaller than the provided value.
:param last_scalar_values: Last reported metrics summary for scalar events (value, metric, variant).
:param last_scalar_events: Last reported metrics summary for scalar events (value, metric, variant).
:param last_events: Last reported metrics summary (value, metric, event type).
:param extra_updates: Extra task updates to include in this update call.
:return:

View File

@@ -395,7 +395,7 @@ def get_last_metric_updates(
is_min=(key == "min_value"),
is_first=(key == "first_value"),
)
elif key in ("metric", "variant", "value"):
elif key in ("metric", "variant", "value", "x_axis_label"):
extra_updates[f"set__{path}__{key}"] = value
count = variant_data.get("count")

View File

@@ -297,6 +297,7 @@ class WorkerBLL:
{
"$project": {
"name": 1,
"display_name": 1,
"next_entry": {"$arrayElemAt": ["$entries", 0]},
"num_entries": {"$size": "$entries"},
}
@@ -330,6 +331,7 @@ class WorkerBLL:
if not info:
continue
entry.name = info.get("name", None)
entry.display_name = info.get("display_name", None)
entry.num_tasks = info.get("num_entries", 0)
task_id = nested_get(info, ("next_entry", "task"))
if task_id:

View File

@@ -37,6 +37,8 @@ OVERRIDE_QUERY_ENV_KEY = "CLEARML_MONGODB_SERVICE_QUERY"
class DatabaseEntry(models.Base):
host = StringField(required=True)
alias = StringField()
name = StringField()
db = StringField()
class DatabaseFactory:
@@ -78,10 +80,13 @@ class DatabaseFactory:
missing.append(key)
continue
entry = cls._create_db_entry(alias=alias, settings=db_entries.get(key))
settings = {**db_entries.get(key)}
if not any(field in settings for field in ("name", "db")):
settings["name"] = key
entry = cls._create_db_entry(alias=alias, settings=settings)
if override_connection_string:
con_str = furl(override_connection_string).add(path=key).url
con_str = override_connection_string
log.info(f"Using override mongodb connection string for {alias}: {con_str}")
entry.host = con_str
else:

View File

@@ -47,6 +47,7 @@ class Queue(DbModelMixin, Document):
name = StrippedStringField(
required=True, unique_with="company", min_length=3, user_set_allowed=True
)
display_name = StringField(user_set_allowed=True)
company = StringField(required=True, reference_field=Company)
created = DateTimeField(required=True)
tags = SafeSortedListField(

View File

@@ -28,6 +28,7 @@ class MetricEvent(EmbeddedDocument):
first_value_iteration = IntField()
count = IntField()
mean_value = FloatField()
x_axis_label = StringField()
class EventStats(EmbeddedDocument):

View File

@@ -47,20 +47,20 @@ def get_last_server_version() -> Version:
def _ensure_mongodb_version():
db: pymongo.database.Database = get_db(Database.backend)
db_version = db.client.server_info()["version"]
if not db_version.startswith("5.0"):
log.warning(f"Database version should be 5.0.x. Instead: {str(db_version)}")
if not db_version.startswith("6.0"):
log.warning(f"Database version should be 6.0.x. Instead: {str(db_version)}")
return
res = db.client.admin.command({"getParameter": 1, "featureCompatibilityVersion": 1})
version = nested_get(res, ("featureCompatibilityVersion", "version"))
if version == "5.0":
if version == "6.0":
return
if version != "4.4":
log.warning(f"Cannot upgrade DB version. Should be 4.4. {str(res)}")
if version != "5.0":
log.warning(f"Cannot upgrade DB version. Should be 5.0. {str(res)}")
return
log.info("Upgrading db version from 4.4 to 5.0")
res = db.client.admin.command({"setFeatureCompatibilityVersion": "5.0"})
log.info("Upgrading db version from 5.0 to 6.0")
res = db.client.admin.command({"setFeatureCompatibilityVersion": "6.0"})
log.info(res)

View File

@@ -1171,7 +1171,7 @@ class PrePopulate:
if isinstance(doc, cls.task_cls):
tasks.append(doc)
cls.event_bll.delete_task_events(company_id, doc.id)
cls.event_bll.delete_task_events(company_id, doc.id, wait_for_delete=True)
if tasks:
return tasks

View File

@@ -6,7 +6,7 @@ boto3>=1.26
boto3-stubs[s3]>=1.26
clearml>=1.10.3
dpath>=1.4.2,<2.0
elasticsearch==8.12.0
elasticsearch==8.17.0
fastjsonschema>=2.8
flask-compress>=1.4.0
flask-cors>=3.0.5
@@ -19,15 +19,15 @@ jinja2
jsonmodels>=2.3
jsonschema>=2.6.0
luqum>=0.10.0
mongoengine==0.27.0
mongoengine==0.29.1
nested_dict>=1.61
packaging==20.3
psutil>=5.6.5
pyhocon>=0.3.35r
pyjwt>=2.4.0
pymongo==4.7.3
pymongo==4.10.1
python-rapidjson>=0.6.3
redis>=4.5.4,<5
redis==5.2.1
requests>=2.13.0
semantic_version>=2.8.3,<3
setuptools>=65.5.1

View File

@@ -299,6 +299,10 @@ last_metrics_event {
description: "The total count of reported values"
type: integer
}
x_axis_label {
description: The user defined value for the X-Axis name stored with the event
type: string
}
}
}
last_metrics_variants {

View File

@@ -27,13 +27,17 @@ _definitions {
type: string
}
variant {
description: "E.g. 'class_1', 'total', 'average"
description: "E.g. 'class_1', 'total', 'average'"
type: string
}
value {
description: ""
type: number
}
x_axis_label {
description: "Custom X-Axis label to be used when displaying the scalars histogram"
type: string
}
}
}
metrics_vector_event {

View File

@@ -50,6 +50,10 @@ _definitions {
description: "Queue name"
type: string
}
display_name {
description: "Display name"
type: string
}
user {
description: "Associated user id"
type: string
@@ -324,7 +328,7 @@ create {
}
}
"2.13": ${create."2.4"} {
metadata {
request.properties.metadata {
description: "Queue metadata"
type: object
additionalProperties {
@@ -332,6 +336,12 @@ create {
}
}
}
"2.31": ${create."2.13"} {
request.properties.display_name {
description: "Display name"
type: string
}
}
}
update {
"2.4" {
@@ -377,7 +387,7 @@ update {
}
}
"2.13": ${update."2.4"} {
metadata {
request.properties.metadata {
description: "Queue metadata"
type: object
additionalProperties {
@@ -385,6 +395,12 @@ update {
}
}
}
"2.31": ${update."2.13"} {
request.properties.display_name {
description: "Display name"
type: string
}
}
}
delete {
"2.4" {

View File

@@ -194,6 +194,10 @@ _definitions {
queue_entry = ${_definitions.id_name_entry} {
properties {
display_name {
description: "Display name for the queue (if defined)"
type: string
}
next_task {
description: "Next task in the queue"
"$ref": "#/definitions/id_name_entry"

View File

@@ -2,6 +2,7 @@ import unicodedata
import urllib.parse
from functools import partial
from boltons.iterutils import first
from flask import request, Response, redirect
from werkzeug.datastructures import ImmutableMultiDict
from werkzeug.exceptions import BadRequest
@@ -22,12 +23,26 @@ log = config.logger(__file__)
class RequestHandlers:
_request_strip_prefix = config.get("apiserver.request.strip_prefix", None)
_server_header = config.get("apiserver.response.headers.server", "clearml")
_basic_cookie_settings = config.get("apiserver.auth.cookies")
_custom_cookie_settings = {
c["name"]: c["settings"]
for c in config.get("apiserver.auth.custom_cookies", {}).values()
if c.get("enabled") and c.get("settings")
}
def _get_cookie_settings(self, cookie_key=None):
settings = (
self._custom_cookie_settings.get(cookie_key) or self._basic_cookie_settings
).copy()
if isinstance(settings["domain"], list):
host_without_port, _, _ = request.host.partition(":")
domain = first(
settings["domain"],
key=lambda d: host_without_port.endswith(d) if d else False,
)
settings["domain"] = domain
return settings
def before_request(self):
if request.method == "OPTIONS":
return "", 200
@@ -80,10 +95,7 @@ class RequestHandlers:
if call.result.cookies:
for key, value in call.result.cookies.items():
kwargs = (
self._custom_cookie_settings.get(key)
or config.get("apiserver.auth.cookies")
).copy()
kwargs = self._get_cookie_settings(key)
if value is None:
# Removing a cookie
kwargs["max_age"] = 0

View File

@@ -490,6 +490,7 @@ def scalar_metrics_iter_histogram(
samples=request.samples,
key=request.key,
metric_variants=_get_metric_variants_from_request(request.metrics),
model_events=request.model_events,
)
call.result.data = metrics
@@ -540,12 +541,13 @@ def multi_task_scalar_metrics_iter_histogram(
samples=request.samples,
key=request.key,
metric_variants=_get_metric_variants_from_request(request.metrics),
model_events=request.model_events,
)
)
def _get_single_value_metrics_response(
companies: TaskCompanies, value_metrics: Mapping[str, dict]
companies: TaskCompanies, value_metrics: Mapping[str, Sequence[dict]]
) -> Sequence[dict]:
task_names = {
task.id: task.name for task in itertools.chain.from_iterable(companies.values())

View File

@@ -121,6 +121,7 @@ def create(call: APICall, company_id, request: CreateRequest):
queue = queue_bll.create(
company_id=company_id,
name=request.name,
display_name=request.display_name,
tags=tags,
system_tags=system_tags,
metadata=Metadata.metadata_from_api(request.metadata),

View File

@@ -282,6 +282,7 @@ def get_task_data(call: APICall, company_id, request: GetTasksDataRequest):
metric_variants=_get_metric_variants_from_request(
request.scalar_metrics_iter_histogram.metrics
),
model_events=request.model_events,
)
if request.single_value_metrics:

View File

@@ -259,12 +259,19 @@ class TestQueues(TestService):
def test_get_all_ex(self):
queue_name = "TestTempQueue1"
queue_display_name = "Test display name"
queue_tags = ["Test1", "Test2"]
queue = self._temp_queue(queue_name, tags=queue_tags)
queue = self._temp_queue(queue_name, display_name=queue_display_name, tags=queue_tags)
res = self.api.queues.get_all_ex(name="TestTempQueue*").queues
self.assertQueue(
res, queue_id=queue, name=queue_name, tags=queue_tags, tasks=[], workers=[]
res,
queue_id=queue,
display_name=queue_display_name,
name=queue_name,
tags=queue_tags,
tasks=[],
workers=[],
)
tasks = [
@@ -279,6 +286,7 @@ class TestQueues(TestService):
res,
queue_id=queue,
name=queue_name,
display_name=queue_display_name,
tags=queue_tags,
tasks=tasks,
workers=workers,
@@ -306,6 +314,7 @@ class TestQueues(TestService):
queues: Sequence[AttrDict],
queue_id: str,
name: str,
display_name: str,
tags: Sequence[str],
tasks: Sequence[dict],
workers: Sequence[dict],
@@ -314,15 +323,33 @@ class TestQueues(TestService):
assert queue.last_update
self.assertEqualNoOrder(queue.tags, tags)
self.assertEqual(queue.name, name)
self.assertQueueTasks(queue, tasks)
self.assertQueueWorkers(queue, workers)
self.assertEqual(queue.display_name, display_name)
self.assertQueueTasks(queue, tasks, name, display_name)
self.assertQueueWorkers(queue, workers, name, display_name)
def assertTaskTags(self, task, system_tags):
res = self.api.tasks.get_by_id(task=task)
self.assertSequenceEqual(res.task.system_tags, system_tags)
def assertQueueTasks(self, queue: AttrDict, tasks: Sequence):
def assertQueueTasks(
self,
queue: AttrDict,
tasks: Sequence,
queue_name: str = None,
display_queue_name: str = None,
):
self.assertEqual([e.task for e in queue.entries], tasks)
if queue_name:
for task in tasks:
execution = self.api.tasks.get_by_id_ex(
id=[task["id"]],
only_fields=[
"execution.queue.name",
"execution.queue.display_name",
],
).tasks[0].execution
self.assertEqual(execution.queue.name, queue_name)
self.assertEqual(execution.queue.display_name, display_queue_name)
def assertGetNextTasks(self, queue, tasks):
for task_id in tasks:
@@ -330,11 +357,28 @@ class TestQueues(TestService):
self.assertEqual(res.entry.task, task_id)
assert not self.api.queues.get_next_task(queue=queue)
def assertQueueWorkers(self, queue: AttrDict, workers: Sequence[dict]):
def assertQueueWorkers(
self,
queue: AttrDict,
workers: Sequence[dict],
queue_name: str = None,
display_queue_name: str = None,
):
sort_key = itemgetter("name")
self.assertEqual(
sorted(queue.workers, key=sort_key), sorted(workers, key=sort_key)
)
if not workers:
return
res = self.api.workers.get_all()
worker_ids = {w["key"] for w in workers}
found = [w for w in res.workers if w.key in worker_ids]
self.assertEqual(len(found), len(worker_ids))
for worker in found:
for queue in worker.queues:
self.assertEqual(queue.name, queue_name)
self.assertEqual(queue.display_name, display_queue_name)
def _temp_queue(self, queue_name, **kwargs):
return self.create_temp("queues", name=queue_name, **kwargs)

View File

@@ -246,6 +246,7 @@ class TestTaskEvents(TestService):
"variant": f"Variant{variant_idx}",
"value": iteration,
"model_event": True,
"x_axis_label": f"Label_{metric_idx}_{variant_idx}"
}
for iteration in range(2)
for metric_idx in range(5)
@@ -274,6 +275,7 @@ class TestTaskEvents(TestService):
variant_data = metric_data.Variant0
self.assertEqual(variant_data.x, [0, 1])
self.assertEqual(variant_data.y, [0.0, 1.0])
self.assertEqual(variant_data.x_axis_label, "Label_0_0")
model_data = self.api.models.get_all_ex(
id=[model], only_fields=["last_metrics", "last_iteration"]
@@ -285,6 +287,7 @@ class TestTaskEvents(TestService):
self.assertEqual(1, metric_data.max_value_iteration)
self.assertEqual(0, metric_data.min_value)
self.assertEqual(0, metric_data.min_value_iteration)
self.assertEqual("Label_4_4", metric_data.x_axis_label)
self._assert_log_events(task=task, expected_total=1)

View File

@@ -1,4 +1,4 @@
from typing import Sequence, Tuple, Any, Union, Callable, Optional, Mapping
from typing import Sequence, Tuple, Any, Union, Callable, Optional, Protocol
def flatten_nested_items(
@@ -35,8 +35,13 @@ def deep_merge(source: dict, override: dict) -> dict:
return source
class GetItem(Protocol):
def __getitem__(self, key: Any) -> Any:
pass
def nested_get(
dictionary: Mapping,
dictionary: GetItem,
path: Sequence[str],
default: Optional[Union[Any, Callable]] = None,
) -> Any:

View File

@@ -1 +1 @@
__version__ = "1.17.0"
__version__ = "2.0.0"

View File

@@ -58,7 +58,7 @@ services:
nofile:
soft: 65536
hard: 65536
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
image: elasticsearch:8.17.0
restart: unless-stopped
volumes:
- c:/opt/clearml/data/elastic_7:/usr/share/elasticsearch/data
@@ -87,7 +87,7 @@ services:
networks:
- backend
container_name: clearml-mongo
image: mongo:5.0.26
image: mongo:6.0.19
restart: unless-stopped
command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200
volumes:
@@ -98,7 +98,7 @@ services:
networks:
- backend
container_name: clearml-redis
image: redis:6.2
image: redis:7.4.1
restart: unless-stopped
volumes:
- c:/opt/clearml/data/redis:/data

View File

@@ -60,7 +60,7 @@ services:
nofile:
soft: 65536
hard: 65536
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
image: elasticsearch:8.17.0
restart: unless-stopped
volumes:
- /opt/clearml/data/elastic_7:/usr/share/elasticsearch/data
@@ -88,7 +88,7 @@ services:
networks:
- backend
container_name: clearml-mongo
image: mongo:5.0.26
image: mongo:6.0.19
restart: unless-stopped
command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200
volumes:
@@ -99,7 +99,7 @@ services:
networks:
- backend
container_name: clearml-redis
image: redis:6.2
image: redis:7.4.1
restart: unless-stopped
volumes:
- /opt/clearml/data/redis:/data

View File

@@ -5,7 +5,7 @@ flask-cors>=3.0.5
flask>=2.3.3
gunicorn>=20.1.0
pyhocon>=0.3.35
redis>=4.5.4,<5
redis==5.2.1
setuptools>=65.5.1
urllib3>=1.26.18
werkzeug>=3.0.1