Compare commits

18 Commits

Author SHA1 Message Date
clearml
d998b46cb2 Update image URLs 2025-06-04 12:01:32 +03:00
clearml
afcccffab3 Version bump to v2.1.0 2025-06-04 12:00:28 +03:00
clearml
cfe0b4fa55 API version bump to 2.32 2025-06-04 12:00:23 +03:00
clearml
0a02b7ad63 Small refactor 2025-06-04 11:59:39 +03:00
clearml
34a9f29465 Bump setuptools version to fix a known security issue 2025-06-04 11:58:09 +03:00
clearml
16d0955ae1 Remove example S3 credentials 2025-06-04 11:57:40 +03:00
clearml
c3f927d6c1 Make sure stack trace is not returned by default 2025-06-04 11:53:05 +03:00
clearml
06a7aa3126 Support skipping mongodb version check on startup 2025-06-04 11:52:34 +03:00
clearml
98122690df Fix empty object might be returned to the client for Google JSON credentials 2025-06-04 11:51:41 +03:00
clearml
f3c67ac3fd Support chart series per single resource in workers.get_stats 2025-06-04 11:51:00 +03:00
clearml
1983b22157 Report last_update field is now set when changing report name or tags 2025-06-04 11:49:48 +03:00
clearml
8a3fcacf5f Fixed requirements vulnerabilities 2025-06-04 11:49:15 +03:00
clearml
c0183e4302 Fix CSV export vulnerability by escaping cell text if it matches a macro or formula prefix 2025-06-04 11:44:24 +03:00
clearml
a7e340212f Add data_tool export improvements including 'company' flag, increased batch size for performance, date-time to log strings, more logs, an option to create a separate zip file per root project, an option to translate urls during tool export 2025-06-04 11:43:31 +03:00
clearml
bf00441146 Update gunicorn constraints due to CVE-2024-6827 2025-03-22 23:00:50 +02:00
pollfly
473aeb6ce9 Update README links (#275) 2025-03-02 12:07:10 +02:00
clearml
c3d305e0e2 Update GitHub repo 2025-01-16 12:04:43 +02:00
clearml
2976ce69cc Add docker-compose files for upgrade 2025-01-01 10:54:27 +02:00
31 changed files with 949 additions and 393 deletions

View File

@@ -1,7 +1,7 @@
Server Side Public License
VERSION 1, OCTOBER 16, 2018
Copyright © 2024 ClearML Inc.
Copyright © 2025 ClearML Inc.
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.

View File

@@ -7,42 +7,15 @@
[![GitHub license](https://img.shields.io/badge/license-SSPL-green.svg)](https://img.shields.io/badge/license-SSPL-green.svg)
[![Python versions](https://img.shields.io/badge/python-3.9-blue.svg)](https://img.shields.io/badge/python-3.9-blue.svg)
[![GitHub version](https://img.shields.io/github/release-pre/allegroai/trains-server.svg)](https://img.shields.io/github/release-pre/allegroai/trains-server.svg)
[![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/allegroai)](https://artifacthub.io/packages/search?repo=allegroai)
[![GitHub version](https://img.shields.io/github/release-pre/clearml/clearml-server.svg)](https://img.shields.io/github/release-pre/clearml/clearml-server.svg)
[![Artifact Hub](https://img.shields.io/endpoint?url=https://artifacthub.io/badge/repository/clearml)](https://artifacthub.io/packages/search?repo=clearml)
</div>
---
<div align="center">
**Note regarding Apache Log4j2 Remote Code Execution (RCE) Vulnerability - CVE-2021-44228 - ESA-2021-31**
</div>
According to [ElasticSearch's latest report](https://discuss.elastic.co/t/apache-log4j2-remote-code-execution-rce-vulnerability-cve-2021-44228-esa-2021-31/291476),
supported versions of Elasticsearch (6.8.9+, 7.8+) used with recent versions of the JDK (JDK9+) **are not susceptible to either remote code execution or information leakage**
due to Elasticsearchs usage of the Java Security Manager.
**As the latest version of ClearML Server uses Elasticsearch 7.10+ with JDK15, it is not affected by these vulnerabilities.**
As a precaution, we've upgraded the ES version to 7.16.2 and added the mitigation recommended by ElasticSearch to our latest [docker-compose.yml](https://github.com/allegroai/clearml-server/blob/cfccbe05c158b75e520581f86e9668291da5c70a/docker/docker-compose.yml#L42) file.
While previous Elasticsearch versions (5.6.11+, 6.4.0+ and 7.0.0+) used by older ClearML Server versions are only susceptible to the information leakage vulnerability
(which in any case **does not permit access to data within the Elasticsearch cluster**),
we still recommend upgrading to the latest version of ClearML Server. Alternatively, you can apply the mitigation as implemented in our latest
[docker-compose.yml](https://github.com/allegroai/clearml-server/blob/cfccbe05c158b75e520581f86e9668291da5c70a/docker/docker-compose.yml#L42) file.
**Update 15 December**: A further vulnerability (CVE-2021-45046) was disclosed on December 14th.
ElasticSearch's guidance for Elasticsearch remains unchanged by this new vulnerability, thus **not affecting ClearML Server**.
**Update 22 December**: To keep with ElasticSearch's recommendations, we've upgraded the ES version to the newly released 7.16.2
---
## ClearML Server
#### *Formerly known as Trains Server*
The **ClearML Server** is the backend service infrastructure for [ClearML](https://github.com/allegroai/clearml).
The **ClearML Server** is the backend service infrastructure for [ClearML](https://github.com/clearml/clearml).
It allows multiple users to collaborate and manage their experiments.
**ClearML** offers a [free hosted service](https://app.clear.ml/), which is maintained by **ClearML** and open to anyone.
In order to host your own server, you will need to launch the **ClearML Server** and point **ClearML** to it.
@@ -124,9 +97,9 @@ In order to set up the **ClearML** client to work with your **ClearML Server**:
it will be inferred from the http/s scheme.
After launching the **ClearML Server** and configuring the **ClearML** client to use the **ClearML Server**,
you can [use](https://github.com/allegroai/clearml) **ClearML** in your experiments and view them in your **ClearML Server** web server,
you can [use](https://github.com/clearml/clearml) **ClearML** in your experiments and view them in your **ClearML Server** web server,
for example http://localhost:8080.
For more information about the ClearML client, see [**ClearML**](https://github.com/allegroai/clearml).
For more information about the ClearML client, see [**ClearML**](https://github.com/clearml/clearml).
## ClearML-Agent Services <a name="services"></a>
@@ -143,7 +116,7 @@ increased data transparency)
ClearML-Agent Services container will spin **any** task enqueued into the dedicated `services` queue.
Every task launched by ClearML-Agent Services will be registered as a new node in the system,
providing tracking and transparency capabilities.
You can also run the ClearML-Agent Services manually, see details in [ClearML-agent services mode](https://github.com/allegroai/clearml-agent#clearml-agent-services-mode-)
You can also run the ClearML-Agent Services manually, see details in [ClearML-agent services mode](https://github.com/clearml/clearml-agent#clearml-agent-services-mode-)
**Note**: It is the user's responsibility to make sure the proper tasks are pushed into the `services` queue.
Do not enqueue training / inference tasks into the `services` queue, as it will put unnecessary load on the server.
@@ -166,7 +139,7 @@ To restart the **ClearML Server**, you must first stop the containers, and then
## Upgrading <a name="upgrade"></a>
**ClearML Server** releases are also reflected in the [docker compose configuration file](https://github.com/allegroai/trains-server/blob/master/docker/docker-compose.yml).
**ClearML Server** releases are also reflected in the [docker compose configuration file](https://github.com/clearml/clearml-server/blob/master/docker/docker-compose.yml).
We strongly encourage you to keep your **ClearML Server** up to date, by keeping up with the current release.
**Note**: The following upgrade instructions use the Linux OS as an example.
@@ -199,7 +172,7 @@ To upgrade your existing **ClearML Server** deployment:
1. Download the latest `docker-compose.yml` file.
```bash
curl https://raw.githubusercontent.com/allegroai/trains-server/master/docker/docker-compose.yml -o docker-compose.yml
curl https://raw.githubusercontent.com/clearml/clearml-server/master/docker/docker-compose.yml -o docker-compose.yml
```
1. Configure the ClearML-Agent Services (not supported on Windows installation).
@@ -227,7 +200,7 @@ To upgrade your existing **ClearML Server** deployment:
If you have any questions, look to the ClearML [FAQ](https://clear.ml/docs/latest/docs/faq), or
tag your questions on [stackoverflow](https://stackoverflow.com/questions/tagged/clearml) with '**clearml**' tag.
For feature requests or bug reports, please use [GitHub issues](https://github.com/allegroai/clearml-server/issues).
For feature requests or bug reports, please use [GitHub issues](https://github.com/clearml/clearml-server/issues).
Additionally, you can always find us at *clearml@allegro.ai*

View File

@@ -1,7 +1,7 @@
Server Side Public License
VERSION 1, OCTOBER 16, 2018
Copyright © 2024 ClearML Inc.
Copyright © 2025 ClearML Inc.
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.

View File

@@ -110,7 +110,7 @@
# Serving
1050: ["invalid_container_id", "invalid container id"]
1051: ["container_not_registered", "container is not registered"]
1052: ["no_containers_for_url", "no container instances found for serice url"]
1052: ["no_containers_for_url", "no container instances found for service url"]
1104: ["invalid_scroll_id", "Invalid scroll id"]
}

View File

@@ -12,7 +12,7 @@ from jsonmodels.fields import (
)
from jsonmodels.models import Base
from apiserver.apimodels import ListField, EnumField, JsonSerializableMixin
from apiserver.apimodels import ListField, EnumField, JsonSerializableMixin, ActualEnumField
from apiserver.config_repo import config
@@ -130,7 +130,7 @@ class AggregationType(Enum):
class StatItem(Base):
key = StringField(required=True)
aggregation = EnumField(AggregationType, default=AggregationType.avg)
aggregation = ActualEnumField(AggregationType, default=AggregationType.avg)
class GetStatsRequest(StatsReportBase):
@@ -138,17 +138,24 @@ class GetStatsRequest(StatsReportBase):
StatItem, required=True, validators=validators.Length(minimum_value=1)
)
split_by_variant = BoolField(default=False)
split_by_resource = BoolField(default=False)
class MetricResourceSeries(Base):
name = StringField()
values = ListField(float)
class AggregationStats(Base):
aggregation = EnumField(AggregationType)
dates = ListField(int)
values = ListField(float)
resource_series = ListField(MetricResourceSeries)
class MetricStats(Base):
metric = StringField()
variant = StringField()
dates = ListField(int)
stats = ListField(AggregationStats)

View File

@@ -199,7 +199,7 @@ class StorageBLL:
)
gs_dict = {
"project": gs._default_project,
"credentials_json": gs._default_credentials,
"credentials_json": gs._default_credentials or None,
"buckets": [attr.asdict(b) for b in gs._buckets],
}

View File

@@ -1,8 +1,9 @@
from operator import attrgetter
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Optional, Sequence
from boltons.iterutils import bucketize
from apiserver.apierrors import errors
from apiserver.apierrors.errors import bad_request
from apiserver.apimodels.workers import AggregationType, GetStatsRequest, StatItem
from apiserver.bll.query import Builder as QueryBuilder
@@ -14,6 +15,7 @@ log = config.logger(__file__)
class WorkerStats:
min_chart_interval = config.get("services.workers.min_chart_interval_sec", 40)
_max_metrics_concurrency = config.get("services.events.events_retrieval.max_metrics_concurrency", 4)
def __init__(self, es):
self.es = es
@@ -23,7 +25,7 @@ class WorkerStats:
"""Returns the es index prefix for the company"""
return f"worker_stats_{company_id.lower()}_"
def _search_company_stats(self, company_id: str, es_req: dict) -> dict:
def search_company_stats(self, company_id: str, es_req: dict) -> dict:
return self.es.search(
index=f"{self.worker_stats_prefix_for_company(company_id)}*",
body=es_req,
@@ -51,7 +53,7 @@ class WorkerStats:
if worker_ids:
es_req["query"] = QueryBuilder.terms("worker", worker_ids)
res = self._search_company_stats(company_id, es_req)
res = self.search_company_stats(company_id, es_req)
if not res["hits"]["total"]["value"]:
raise bad_request.WorkerStatsNotFound(
@@ -65,6 +67,75 @@ class WorkerStats:
for category in res["aggregations"]["categories"]["buckets"]
}
def _get_worker_stats_per_metric(
self,
metric_item: StatItem,
company_id: str,
from_date: float,
to_date: float,
interval: int,
split_by_resource: bool,
worker_ids: Sequence[str],
):
agg_types_to_es = {
AggregationType.avg: "avg",
AggregationType.min: "min",
AggregationType.max: "max",
}
agg = {
metric_item.aggregation.value: {
agg_types_to_es[metric_item.aggregation]: {"field": "value", "missing": 0.0 }
}
}
split_by_resource = split_by_resource and metric_item.key.startswith("gpu_")
if split_by_resource:
split_aggs = {"split": {"terms": {"field": "variant"}, "aggs": agg}}
else:
split_aggs = {}
es_req = {
"size": 0,
"aggs": {
"workers": {
"terms": {"field": "worker"},
"aggs": {
"dates": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": f"{interval}s",
"extended_bounds": {
"min": int(from_date) * 1000,
"max": int(to_date) * 1000,
},
},
"aggs": {
**agg,
**split_aggs,
},
}
},
}
},
}
query_terms = [
QueryBuilder.dates_range(from_date, to_date),
QueryBuilder.term("metric", metric_item.key),
]
if worker_ids:
query_terms.append(QueryBuilder.terms("worker", worker_ids))
es_req["query"] = {"bool": {"must": query_terms}}
with translate_errors_context():
data = self.search_company_stats(company_id, es_req)
cutoff_date = (
to_date - 0.9 * interval
) * 1000 # do not return the point for the incomplete last interval
return self._extract_results(
data, metric_item, split_by_resource, cutoff_date
)
def get_worker_stats(self, company_id: str, request: GetStatsRequest) -> dict:
"""
Get statistics for company workers metrics in the specified time range
@@ -76,123 +147,90 @@ class WorkerStats:
from_date = request.from_date
to_date = request.to_date
if from_date >= to_date:
raise bad_request.FieldsValueError("from_date must be less than to_date")
interval = max(request.interval, self.min_chart_interval)
def get_dates_agg() -> dict:
es_to_agg_types = (
("avg", AggregationType.avg.value),
("min", AggregationType.min.value),
("max", AggregationType.max.value),
raise errors.bad_request.FieldsValueError(
"from_date must be less than to_date"
)
return {
"dates": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": f"{interval}s",
"extended_bounds": {
"min": int(from_date) * 1000,
"max": int(to_date) * 1000,
}
},
"aggs": {
agg_type: {es_agg: {"field": "value"}}
for es_agg, agg_type in es_to_agg_types
},
}
}
interval = max(request.interval, self.min_chart_interval)
with ThreadPoolExecutor(self._max_metrics_concurrency) as pool:
res = list(
pool.map(
partial(
self._get_worker_stats_per_metric,
company_id=company_id,
from_date=from_date,
to_date=to_date,
interval=interval,
split_by_resource=request.split_by_resource,
worker_ids=request.worker_ids,
),
request.items,
)
)
def get_variants_agg() -> dict:
return {
"variants": {"terms": {"field": "variant"}, "aggs": get_dates_agg()}
}
ret = defaultdict(lambda: defaultdict(dict))
for workers in res:
for worker, metrics in workers.items():
for metric, stats in metrics.items():
ret[worker][metric].update(stats)
es_req = {
"size": 0,
"aggs": {
"workers": {
"terms": {"field": "worker"},
"aggs": {
"metrics": {
"terms": {"field": "metric"},
"aggs": get_variants_agg()
if request.split_by_variant
else get_dates_agg(),
}
},
}
},
}
query_terms = [
QueryBuilder.dates_range(from_date, to_date),
QueryBuilder.terms("metric", {item.key for item in request.items}),
]
if request.worker_ids:
query_terms.append(QueryBuilder.terms("worker", request.worker_ids))
es_req["query"] = {"bool": {"must": query_terms}}
with translate_errors_context():
data = self._search_company_stats(company_id, es_req)
cutoff_date = (to_date - 0.9 * interval) * 1000 # do not return the point for the incomplete last interval
return self._extract_results(data, request.items, request.split_by_variant, cutoff_date)
return ret
@staticmethod
def _extract_results(
data: dict, request_items: Sequence[StatItem], split_by_variant: bool, cutoff_date
data: dict,
metric_item: StatItem,
split_by_resource: bool,
cutoff_date,
) -> dict:
"""
Clean results returned from elastic search (remove "aggregations", "buckets" etc.),
leave only aggregation types requested by the user and return a clean dictionary
:param data: aggregation data retrieved from ES
:param request_items: aggs types requested by the user
:param split_by_variant: if False then aggregate by metric type, otherwise metric type + variant
"""
if "aggregations" not in data:
return {}
items_by_key = bucketize(request_items, key=attrgetter("key"))
aggs_per_metric = {
key: [item.aggregation for item in items]
for key, items in items_by_key.items()
}
def extract_metric_results(metric: dict) -> dict:
aggregation = metric_item.aggregation.value
date_buckets = metric["dates"]["buckets"]
length = len(date_buckets)
while length > 0 and date_buckets[length - 1]["key"] >= cutoff_date:
length -= 1
dates = [None] * length
agg_values = [0.0] * length
resource_series = defaultdict(lambda: [0.0] * length)
for idx in range(0, length):
date = date_buckets[idx]
dates[idx] = date["key"]
if aggregation in date:
agg_values[idx] = date[aggregation]["value"] or 0.0
if split_by_resource and "split" in date:
for resource in date["split"]["buckets"]:
series = resource_series[resource["key"]]
if aggregation in resource:
series[idx] = resource[aggregation]["value"] or 0.0
if len(resource_series) == 1:
resource_series = {}
def extract_date_stats(date: dict, metric_key) -> dict:
return {
"date": date["key"],
"count": date["doc_count"],
**{agg: date[agg]["value"] or 0.0 for agg in aggs_per_metric[metric_key]},
}
def extract_metric_results(
metric_or_variant: dict, metric_key: str
) -> Sequence[dict]:
return [
extract_date_stats(date, metric_key)
for date in metric_or_variant["dates"]["buckets"]
if date["key"] <= cutoff_date
]
def extract_variant_results(metric: dict) -> dict:
metric_key = metric["key"]
return {
variant["key"]: extract_metric_results(variant, metric_key)
for variant in metric["variants"]["buckets"]
}
def extract_worker_results(worker: dict) -> dict:
return {
metric["key"]: extract_variant_results(metric)
if split_by_variant
else extract_metric_results(metric, metric["key"])
for metric in worker["metrics"]["buckets"]
"dates": dates,
"values": agg_values,
**(
{"resource_series": resource_series} if resource_series else {}
),
}
return {
worker["key"]: extract_worker_results(worker)
worker["key"]: {
metric_item.key: {
metric_item.aggregation.value: extract_metric_results(worker)
}
}
for worker in data["aggregations"]["workers"]["buckets"]
}
@@ -237,7 +275,7 @@ class WorkerStats:
}
with translate_errors_context():
data = self._search_company_stats(company_id, es_req)
data = self.search_company_stats(company_id, es_req)
if "aggregations" not in data:
return {}

View File

@@ -2,8 +2,8 @@
watch: false # Watch for changes (dev only)
debug: false # Debug mode
pretty_json: false # prettify json response
return_stack: true # return stack trace on error
return_stack_to_caller: true # top-level control on whether to return stack trace in an API response
return_stack: false # return stack trace on error
return_stack_to_caller: false # top-level control on whether to return stack trace in an API response
# if 'return_stack' is true and error contains a status code, return stack trace only for these status codes
# valid values are:
@@ -41,6 +41,7 @@
# controls whether FieldDoesNotExist exception will be raised for any extra attribute existing in stored data
# but not declared in a data model
strict: false
ensure_db_version_on_startup: true
}
elastic {

View File

@@ -15,15 +15,15 @@ aws {
# key: "my-access-key"
# secret: "my-secret-key"
# },
{
# This will apply to all buckets in this host (unless key/value is specifically provided for a given bucket)
host: "localhost:9000"
key: "minioadmin"
secret: "minioadmin"
# region: my-server
multipart: false
secure: false
}
// {
// # This will apply to all buckets in this host (unless key/value is specifically provided for a given bucket)
// host: "localhost:9000"
// key: "minioadmin"
// secret: "minioadmin"
// # region: my-server
// multipart: false
// secure: false
// }
]
}
}

View File

@@ -3,3 +3,7 @@ default_cluster_timeout_sec: 600
# The minimal sampling interval for resource dashboard and worker activity charts
min_chart_interval_sec: 40
stats {
max_metrics_concurrency: 4
}

View File

@@ -2,6 +2,8 @@
| Release | ApiVersion |
|---------|------------|
| v2.1 | 2.32 |
| v2.0 | 2.31 |
| v1.17 | 2.31 |
| v1.16 | 2.30 |
| v1.15 | 2.29 |

View File

@@ -73,7 +73,9 @@ def init_mongo_data():
}
internal_user_emails.add(email.lower())
revoke = fixed_mode and credentials.get("revoke_in_fixed_mode", False)
user_id = _ensure_auth_user(user_data, company_id, log=log, revoke=revoke, internal_user=True)
user_id = _ensure_auth_user(
user_data, company_id, log=log, revoke=revoke, internal_user=True
)
if credentials.role == Role.user:
_ensure_backend_user(user_id, company_id, credentials.display_name)

View File

@@ -45,6 +45,10 @@ def get_last_server_version() -> Version:
def _ensure_mongodb_version():
if not config.get("apiserver.mongo.ensure_db_version_on_startup", True):
return
log.info("Checking DB version")
db: pymongo.database.Database = get_db(Database.backend)
db_version = db.client.server_info()["version"]
if not db_version.startswith("6.0"):
@@ -53,6 +57,7 @@ def _ensure_mongodb_version():
res = db.client.admin.command({"getParameter": 1, "featureCompatibilityVersion": 1})
version = nested_get(res, ("featureCompatibilityVersion", "version"))
log.info(f"DB version: {version}")
if version == "6.0":
return
if version != "5.0":

View File

@@ -65,6 +65,14 @@ from apiserver.utilities.parameter_key_escaper import ParameterKeyEscaper
replace_s3_scheme = os.getenv("CLEARML_REPLACE_S3_SCHEME")
def _print(msg: str):
time = datetime.now().isoformat(sep=" ", timespec="seconds")
print(f"{time} {msg}")
UrlTranslation = Tuple[str, str]
class PrePopulate:
module_name_prefix = "apiserver."
event_bll = EventBLL()
@@ -163,7 +171,7 @@ class PrePopulate:
return True, files
except Exception as ex:
print("Error reading map file. " + str(ex))
_print("Error reading map file. " + str(ex))
return True, files
return False, files
@@ -204,7 +212,7 @@ class PrePopulate:
return False
fileserver_links = [a for a in artifacts if is_fileserver_link(a)]
print(
_print(
f"Found {len(fileserver_links)} files on the fileserver from {len(artifacts)} total"
)
@@ -216,81 +224,114 @@ class PrePopulate:
filename: str,
experiments: Sequence[str] = None,
projects: Sequence[str] = None,
company: str = None,
artifacts_path: str = None,
task_statuses: Sequence[str] = None,
tag_exported_entities: bool = False,
metadata: Mapping[str, Any] = None,
export_events: bool = True,
export_users: bool = False,
project_split: bool = False,
url_trans: UrlTranslation = None,
) -> Sequence[str]:
cls._init_entity_types()
if task_statuses and not set(task_statuses).issubset(get_options(TaskStatus)):
raise ValueError("Invalid task statuses")
file = Path(filename)
if not (experiments or projects):
projects = cls.project_cls.objects(parent=None).scalar("id")
entities = cls._resolve_entities(
experiments=experiments, projects=projects, task_statuses=task_statuses
)
hash_ = hashlib.md5()
if metadata:
meta_str = json.dumps(metadata)
hash_.update(meta_str.encode())
metadata_hash = hash_.hexdigest()
else:
meta_str, metadata_hash = "", ""
map_file = file.with_suffix(".map")
updated, old_files = cls._check_for_update(
map_file, entities=entities, metadata_hash=metadata_hash
)
if not updated:
print(f"There are no updates from the last export")
return old_files
for old in old_files:
old_path = Path(old)
if old_path.is_file():
old_path.unlink()
with ZipFile(file, **cls.zip_args) as zfile:
if metadata:
zfile.writestr(cls.metadata_filename, meta_str)
if export_users:
cls._export_users(zfile)
artifacts = cls._export(
zfile,
entities=entities,
hash_=hash_,
tag_entities=tag_exported_entities,
export_events=export_events,
cleanup_users=not export_users,
def export_to_zip_core(file_base_name: Path, projects_: Sequence[str]):
entities = cls._resolve_entities(
experiments=experiments, projects=projects_, task_statuses=task_statuses
)
file_with_hash = file.with_name(f"{file.stem}_{hash_.hexdigest()}{file.suffix}")
file.replace(file_with_hash)
created_files = [str(file_with_hash)]
hash_ = hashlib.md5()
if metadata:
meta_str = json.dumps(metadata)
hash_.update(meta_str.encode())
metadata_hash = hash_.hexdigest()
else:
meta_str, metadata_hash = "", ""
artifacts = cls._filter_artifacts(artifacts)
if artifacts and artifacts_path and os.path.isdir(artifacts_path):
artifacts_file = file_with_hash.with_suffix(cls.artifacts_ext)
with ZipFile(artifacts_file, **cls.zip_args) as zfile:
cls._export_artifacts(zfile, artifacts, artifacts_path)
created_files.append(str(artifacts_file))
map_file = file_base_name.with_suffix(".map")
updated, old_files = cls._check_for_update(
map_file, entities=entities, metadata_hash=metadata_hash
)
if not updated:
_print(f"There are no updates from the last export")
return old_files
cls._write_update_file(
map_file,
entities=entities,
created_files=created_files,
metadata_hash=metadata_hash,
)
for old in old_files:
old_path = Path(old)
if old_path.is_file():
old_path.unlink()
if created_files:
print("Created files:\n" + "\n".join(file for file in created_files))
temp_file = file_base_name.with_suffix(file_base_name.suffix + "$")
try:
with ZipFile(temp_file, **cls.zip_args) as zfile:
if metadata:
zfile.writestr(cls.metadata_filename, meta_str)
if export_users:
cls._export_users(zfile)
artifacts = cls._export(
zfile,
entities=entities,
hash_=hash_,
tag_entities=tag_exported_entities,
export_events=export_events,
cleanup_users=not export_users,
url_trans=url_trans,
)
except:
temp_file.unlink(missing_ok=True)
raise
file_with_hash = file_base_name.with_stem(
f"{file_base_name.stem}_{hash_.hexdigest()}"
)
temp_file.replace(file_with_hash)
files = [str(file_with_hash)]
artifacts = cls._filter_artifacts(artifacts)
if artifacts and artifacts_path and os.path.isdir(artifacts_path):
artifacts_file = file_with_hash.with_suffix(cls.artifacts_ext)
with ZipFile(artifacts_file, **cls.zip_args) as zfile:
cls._export_artifacts(zfile, artifacts, artifacts_path)
files.append(str(artifacts_file))
cls._write_update_file(
map_file,
entities=entities,
created_files=files,
metadata_hash=metadata_hash,
)
if files:
_print("Created files:\n" + "\n".join(file for file in files))
return files
filename = Path(filename)
if not (experiments or projects):
query = dict(parent=None)
if company:
query["company"] = company
projects = list(cls.project_cls.objects(**query).scalar("id"))
# projects.append(None)
if projects and project_split:
created_files = list(
chain.from_iterable(
export_to_zip_core(
file_base_name=filename.with_stem(f"{filename.stem}_{pid}"),
projects_=[pid],
)
for pid in projects
)
)
else:
created_files = export_to_zip_core(
file_base_name=filename, projects_=projects
)
return created_files
@@ -320,8 +361,10 @@ class PrePopulate:
meta_user_id = metadata.get("user_id", "")
meta_user_name = metadata.get("user_name", "")
user_id, user_name = meta_user_id, meta_user_name
except Exception:
pass
except Exception as ex:
_print(
f"Error getting metadata from {cls.metadata_filename}: {str(ex)}"
)
# Make sure we won't end up with an invalid company ID
if company_id is None:
@@ -347,7 +390,7 @@ class PrePopulate:
if artifacts_path and os.path.isdir(artifacts_path):
artifacts_file = Path(filename).with_suffix(cls.artifacts_ext)
if artifacts_file.is_file():
print(f"Unzipping artifacts into {artifacts_path}")
_print(f"Unzipping artifacts into {artifacts_path}")
with ZipFile(artifacts_file) as zfile:
zfile.extractall(artifacts_path)
@@ -370,7 +413,7 @@ class PrePopulate:
base_file_name, _, old_hash = file.stem.rpartition("_")
new_hash = hash_.hexdigest()
if old_hash == new_hash:
print(f"The file {filename} was not updated")
_print(f"The file {filename} was not updated")
temp_file.unlink()
return []
@@ -384,7 +427,7 @@ class PrePopulate:
artifacts_file.replace(new_artifacts)
upadated.append(str(new_artifacts))
print(f"File {str(file)} replaced with {str(new_file)}")
_print(f"File {str(file)} replaced with {str(new_file)}")
file.unlink()
return upadated
@@ -446,12 +489,12 @@ class PrePopulate:
not_found = missing - set(resolved_by_name)
if not_found:
print(f"ERROR: no match for {', '.join(not_found)}")
_print(f"ERROR: no match for {', '.join(not_found)}")
exit(1)
duplicates = [k for k, v in resolved_by_name.items() if len(v) > 1]
if duplicates:
print(f"ERROR: more than one match for {', '.join(duplicates)}")
_print(f"ERROR: more than one match for {', '.join(duplicates)}")
exit(1)
def get_new_items(input_: Iterable) -> list:
@@ -489,20 +532,24 @@ class PrePopulate:
return
prefixes = [
cls.ParentPrefix(prefix=f"{project.name.rpartition('/')[0]}/", path=project.path)
cls.ParentPrefix(
prefix=f"{project.name.rpartition('/')[0]}/", path=project.path
)
for project in orphans
]
prefixes.sort(key=lambda p: len(p.path), reverse=True)
for project in projects:
prefix = first(pref for pref in prefixes if project.path[:len(pref.path)] == pref.path)
prefix = first(
pref for pref in prefixes if project.path[: len(pref.path)] == pref.path
)
if not prefix:
continue
project.path = project.path[len(prefix.path):]
project.path = project.path[len(prefix.path) :]
if not project.path:
project.parent = None
project.name = project.name.removeprefix(prefix.prefix)
# print(
# _print(
# f"ERROR: the following projects are exported without their parents: {orphans}"
# )
# exit(1)
@@ -518,16 +565,20 @@ class PrePopulate:
entities: Dict[Any] = defaultdict(set)
if projects:
print("Reading projects...")
projects = project_ids_with_children(projects)
entities[cls.project_cls].update(
cls._resolve_entity_type(cls.project_cls, projects)
)
print("--> Reading project experiments...")
_print("Reading projects...")
root = None in projects
projects = [p for p in projects if p]
if projects:
projects = project_ids_with_children(projects)
entities[cls.project_cls].update(
cls._resolve_entity_type(cls.project_cls, projects)
)
_print("--> Reading project experiments...")
p_ids = list(set(p.id for p in entities[cls.project_cls]))
if root:
p_ids.append(None)
query = Q(
project__in=list(
set(filter(None, (p.id for p in entities[cls.project_cls])))
),
project__in=p_ids,
system_tags__nin=[EntityVisibility.archived.value],
)
if task_statuses:
@@ -538,9 +589,11 @@ class PrePopulate:
)
if experiments:
print("Reading experiments...")
entities[cls.task_cls].update(cls._resolve_entity_type(cls.task_cls, experiments))
print("--> Reading experiments projects...")
_print("Reading experiments...")
entities[cls.task_cls].update(
cls._resolve_entity_type(cls.task_cls, experiments)
)
_print("--> Reading experiments projects...")
objs = cls.project_cls.objects(
id__in=list(
set(filter(None, (p.project for p in entities[cls.task_cls])))
@@ -560,7 +613,7 @@ class PrePopulate:
)
model_ids = {tm.model for tm in task_models}
if model_ids:
print("Reading models...")
_print("Reading models...")
entities[cls.model_cls] = set(cls.model_cls.objects(id__in=list(model_ids)))
# noinspection PyTypeChecker
@@ -625,22 +678,41 @@ class PrePopulate:
except AttributeError:
pass
@staticmethod
def _translate_url(url_: str, url_trans: UrlTranslation) -> str:
if not (url_ and url_trans):
return url_
source, target = url_trans
if not url_.startswith(source):
return url_
return target + url_[len(source):]
@classmethod
def _export_task_events(
cls, task: Task, base_filename: str, writer: ZipFile, hash_
cls,
task: Task,
base_filename: str,
writer: ZipFile,
hash_,
url_trans: UrlTranslation,
) -> Sequence[str]:
artifacts = []
filename = f"{base_filename}_{task.id}{cls.events_file_suffix}.json"
print(f"Writing task events into {writer.filename}:{filename}")
_print(f"Writing task events into {writer.filename}:{filename}")
with BytesIO() as f:
with cls.JsonLinesWriter(f) as w:
scroll_id = None
events_count = 0
while True:
res = cls.event_bll.get_task_events(
company_id=task.company,
task_id=task.id,
event_type=EventType.all,
scroll_id=scroll_id,
size=10_000,
)
if not res.events:
break
@@ -650,16 +722,22 @@ class PrePopulate:
if event_type == EventType.metrics_image.value:
url = cls._get_fixed_url(event.get("url"))
if url:
event["url"] = url
artifacts.append(url)
event["url"] = cls._translate_url(url, url_trans)
elif event_type == EventType.metrics_plot.value:
plot_str: str = event.get("plot_str", "")
for match in cls.img_source_regex.findall(plot_str):
url = cls._get_fixed_url(match)
if match != url:
plot_str = plot_str.replace(match, url)
artifacts.append(url)
if plot_str:
for match in cls.img_source_regex.findall(plot_str):
url = cls._get_fixed_url(match)
artifacts.append(url)
new_url = cls._translate_url(url, url_trans)
if match != new_url:
plot_str = plot_str.replace(match, new_url)
event["plot_str"] = plot_str
w.write(json.dumps(event))
events_count += 1
_print(f"Got {events_count} events for task {task.id}")
_print(f"Writing {events_count} events for task {task.id}")
data = f.getvalue()
hash_.update(data)
writer.writestr(filename, data)
@@ -677,53 +755,62 @@ class PrePopulate:
fixed.host += ".s3.amazonaws.com"
return fixed.url
except Exception as ex:
print(f"Failed processing link {url}. " + str(ex))
_print(f"Failed processing link {url}. " + str(ex))
return url
@classmethod
def _export_entity_related_data(
cls, entity_cls, entity, base_filename: str, writer: ZipFile, hash_
cls,
entity_cls,
entity,
base_filename: str,
writer: ZipFile,
hash_,
url_trans: UrlTranslation,
):
if entity_cls == cls.task_cls:
return [
*cls._get_task_output_artifacts(entity),
*cls._export_task_events(entity, base_filename, writer, hash_),
*cls._get_task_output_artifacts(entity, url_trans),
*cls._export_task_events(
entity, base_filename, writer, hash_, url_trans
),
]
if entity_cls == cls.model_cls:
entity.uri = cls._get_fixed_url(entity.uri)
return [entity.uri] if entity.uri else []
url = cls._get_fixed_url(entity.uri)
entity.uri = cls._translate_url(url, url_trans)
return [url] if url else []
return []
@classmethod
def _get_task_output_artifacts(cls, task: Task) -> Sequence[str]:
def _get_task_output_artifacts(cls, task: Task, url_trans: UrlTranslation) -> Sequence[str]:
if not task.execution.artifacts:
return []
artifact_urls = []
for a in task.execution.artifacts.values():
if a.mode == ArtifactModes.output:
a.uri = cls._get_fixed_url(a.uri)
url = cls._get_fixed_url(a.uri)
a.uri = cls._translate_url(url, url_trans)
if url and a.mode == ArtifactModes.output:
artifact_urls.append(url)
return [
a.uri
for a in task.execution.artifacts.values()
if a.mode == ArtifactModes.output and a.uri
]
return artifact_urls
@classmethod
def _export_artifacts(
cls, writer: ZipFile, artifacts: Sequence[str], artifacts_path: str
):
unique_paths = set(unquote(str(furl(artifact).path)) for artifact in artifacts)
print(f"Writing {len(unique_paths)} artifacts into {writer.filename}")
_print(f"Writing {len(unique_paths)} artifacts into {writer.filename}")
for path in unique_paths:
path = path.lstrip("/")
full_path = os.path.join(artifacts_path, path)
if os.path.isfile(full_path):
writer.write(full_path, path)
else:
print(f"Artifact {full_path} not found")
_print(f"Artifact {full_path} not found")
@classmethod
def _export_users(cls, writer: ZipFile):
@@ -742,7 +829,7 @@ class PrePopulate:
return
auth_users = {uid: data for uid, data in auth_users.items() if uid in be_users}
print(f"Writing {len(auth_users)} users into {writer.filename}")
_print(f"Writing {len(auth_users)} users into {writer.filename}")
data = {}
for field, users in (("auth", auth_users), ("backend", be_users)):
with BytesIO() as f:
@@ -773,6 +860,7 @@ class PrePopulate:
tag_entities: bool = False,
export_events: bool = True,
cleanup_users: bool = True,
url_trans: UrlTranslation = None,
) -> Sequence[str]:
"""
Export the requested experiments, projects and models and return the list of artifact files
@@ -780,7 +868,7 @@ class PrePopulate:
The projects should be sorted by name so that on import the hierarchy is correctly restored from top to bottom
"""
artifacts = []
now = datetime.utcnow()
now = datetime.now(timezone.utc)
for cls_ in sorted(entities, key=attrgetter("__name__")):
items = sorted(entities[cls_], key=attrgetter("name", "id"))
if not items:
@@ -790,11 +878,11 @@ class PrePopulate:
for item in items:
artifacts.extend(
cls._export_entity_related_data(
cls_, item, base_filename, writer, hash_
cls_, item, base_filename, writer, hash_, url_trans
)
)
filename = base_filename + ".json"
print(f"Writing {len(items)} items into {writer.filename}:{filename}")
_print(f"Writing {len(items)} items into {writer.filename}:{filename}")
with BytesIO() as f:
with cls.JsonLinesWriter(f) as w:
for item in items:
@@ -968,7 +1056,7 @@ class PrePopulate:
for entity_file in entity_files:
with reader.open(entity_file) as f:
full_name = splitext(entity_file.orig_filename)[0]
print(f"Reading {reader.filename}:{full_name}...")
_print(f"Reading {reader.filename}:{full_name}...")
res = cls._import_entity(
f,
full_name=full_name,
@@ -996,7 +1084,7 @@ class PrePopulate:
continue
with reader.open(events_file) as f:
full_name = splitext(events_file.orig_filename)[0]
print(f"Reading {reader.filename}:{full_name}...")
_print(f"Reading {reader.filename}:{full_name}...")
cls._import_events(f, company_id, task.user, task.id)
@classmethod
@@ -1082,14 +1170,16 @@ class PrePopulate:
)
models = task_data.get("models", {})
now = datetime.utcnow()
now = datetime.now(timezone.utc)
for old_field, type_ in (
("execution.model", TaskModelTypes.input),
("output.model", TaskModelTypes.output),
):
old_path = old_field.split(".")
old_model = nested_get(task_data, old_path)
new_models = [m for m in models.get(type_, []) if m.get("model") is not None]
new_models = [
m for m in models.get(type_, []) if m.get("model") is not None
]
name = TaskModelNames[type_]
if old_model and not any(
m
@@ -1127,7 +1217,7 @@ class PrePopulate:
) -> Optional[Sequence[Task]]:
user_mapping = user_mapping or {}
cls_ = cls._get_entity_type(full_name)
print(f"Writing {cls_.__name__.lower()}s into database")
_print(f"Writing {cls_.__name__.lower()}s into database")
tasks = []
override_project_count = 0
data_upgrade_funcs: Mapping[Type, Callable] = {
@@ -1164,21 +1254,23 @@ class PrePopulate:
doc.logo_blob = metadata.get("logo_blob", None)
cls_.objects(company=company_id, name=doc.name, id__ne=doc.id).update(
set__name=f"{doc.name}_{datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S')}"
set__name=f"{doc.name}_{datetime.now(timezone.utc).strftime('%Y-%m-%d_%H-%M-%S')}"
)
doc.save()
if isinstance(doc, cls.task_cls):
tasks.append(doc)
cls.event_bll.delete_task_events(company_id, doc.id, wait_for_delete=True)
cls.event_bll.delete_task_events(
company_id, doc.id, wait_for_delete=True
)
if tasks:
return tasks
@classmethod
def _import_events(cls, f: IO[bytes], company_id: str, user_id: str, task_id: str):
print(f"Writing events for task {task_id} into database")
_print(f"Writing events for task {task_id} into database")
for events_chunk in chunked_iter(cls.json_lines(f), 1000):
events = [json.loads(item) for item in events_chunk]
for ev in events:

View File

@@ -13,7 +13,7 @@ flask-cors>=3.0.5
flask>=2.3.3
furl>=2.0.0
google-cloud-storage>=2.8.0
gunicorn>=20.1.0
gunicorn>=23.0.0
humanfriendly>=4.17
jinja2
jsonmodels>=2.3
@@ -22,6 +22,7 @@ luqum>=0.10.0
mongoengine==0.29.1
nested_dict>=1.61
packaging==20.3
pillow>=10.3.0 # fix vulnerability derived from clearml 1.18.0
psutil>=5.6.5
pyhocon>=0.3.35r
pyjwt>=2.4.0
@@ -30,7 +31,7 @@ python-rapidjson>=0.6.3
redis==5.2.1
requests>=2.13.0
semantic_version>=2.8.3,<3
setuptools>=65.5.1
setuptools>=78.1.1
six
validators>=0.12.4
urllib3>=1.26.18

View File

@@ -15,6 +15,26 @@ _definitions {
}
}
}
worker_stat_key {
type: string
enum: [
cpu_usage
cpu_temperature
memory_used
memory_free
gpu_usage
gpu_temperature
gpu_fraction
gpu_memory_free
gpu_memory_used
network_tx
network_rx
disk_free_home
disk_free_temp
disk_read
disk_write
]
}
aggregation_type {
type: string
enum: [ avg, min, max ]
@@ -23,8 +43,7 @@ _definitions {
stat_item {
type: object
properties {
key {
type: string
key: ${_definitions.worker_stat_key} {
description: "Name of a metric"
}
category {
@@ -38,6 +57,30 @@ _definitions {
aggregation {
"$ref": "#/definitions/aggregation_type"
}
dates {
type: array
description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval. Timestamps where no workers activity was recorded are omitted."
items { type: integer }
}
values {
type: array
description: "List of values corresponding to the dates in metric statistics"
items { type: number }
}
resource_series {
type: array
description: "Metric data per single resource. Return only if split_by_resource request parameter is set to True"
items {"$ref": "#/definitions/metric_resource_series"}
}
}
}
metric_resource_series {
type: object
properties {
name {
type: string
description: Resource name
}
values {
type: array
description: "List of values corresponding to the dates in metric statistics"
@@ -56,11 +99,6 @@ _definitions {
type: string
description: "Name of the metric component. Set only if 'split_by_variant' was set in the request"
}
dates {
type: array
description: "List of timestamps (in seconds from epoch) in the acceding order. The timestamps are separated by the requested interval. Timestamps where no workers activity was recorded are omitted."
items { type: integer }
}
stats {
type: array
description: "Statistics data by type"
@@ -482,6 +520,20 @@ get_stats {
}
}
}
"2.32": ${get_stats."2.4"} {
request.properties {
split_by_variant {
description: "Obsolete, please do not use"
type: boolean
default: false
}
split_by_resource {
type: boolean
default: false
description: If set then for GPU related keys return the per GPU charts in addition to the aggregated one
}
}
}
}
get_activity_report {
"2.4" {

View File

@@ -43,6 +43,9 @@ class RequestHandlers:
settings["domain"] = domain
return settings
def _get_identity_from_encoded_token(self, encoded: str):
return Token.decode_identity(encoded)
def before_request(self):
if request.method == "OPTIONS":
return "", 200
@@ -105,7 +108,7 @@ class RequestHandlers:
# Setting a cookie, let's try to figure out the company
# noinspection PyBroadException
try:
company = Token.decode_identity(value).company
company = self._get_identity_from_encoded_token(value).company
except Exception:
pass

View File

@@ -39,7 +39,7 @@ class ServiceRepo(object):
"""If the check is set, parsing will fail for endpoint request with the version that is grater than the current
maximum """
_max_version = PartialVersion("2.31")
_max_version = PartialVersion("2.32")
""" Maximum version number (the highest min_version value across all endpoints) """
_endpoint_exp = (

View File

@@ -225,6 +225,18 @@ def prepare_download_for_get_all(
call.result.data = dict(prepare_id=call.id)
_dangerous_chars = ("=", "+", "-", "@", "\t", "\r")
def _sanitize_csv(value: str) -> str:
"""
Prevent csv injection:
If the string starts with any of the chars that Excel
interpret as a special char then prepend it with a single quote
"""
if value and value.startswith(_dangerous_chars):
return f"'{value}"
return value
@endpoint("organization.download_for_get_all")
def download_for_get_all(call: APICall, company, request: DownloadForGetAllRequest):
request_data = redis.get(f"get_all_download_{request.prepare_id}")
@@ -275,7 +287,7 @@ def download_for_get_all(call: APICall, company, request: DownloadForGetAllReque
if values and isinstance(val, Hashable):
val = values.get(val, val)
return str(val)
return _sanitize_csv(str(val))
def get_projected_fields(data: dict) -> Sequence[str]:
return [

View File

@@ -99,9 +99,11 @@ def update_report(call: APICall, company_id: str, request: UpdateReportRequest):
)
now = datetime.utcnow()
more_updates = {"last_change": now, "last_changed_by": call.identity.user}
if not allowed_for_published:
more_updates["last_update"] = now
more_updates = {
"last_change": now,
"last_changed_by": call.identity.user,
"last_update": now,
}
updated = task.update(upsert=False, **partial_update_dict, **more_updates)
if not updated:

View File

@@ -1,9 +1,3 @@
import itertools
from operator import attrgetter
from typing import Optional, Sequence, Union
from boltons.iterutils import bucketize
from apiserver.apierrors.errors import bad_request
from apiserver.apimodels.workers import (
WorkerRequest,
@@ -23,6 +17,7 @@ from apiserver.apimodels.workers import (
GetActivityReportResponse,
ActivityReportSeries,
GetCountRequest,
MetricResourceSeries,
)
from apiserver.bll.workers import WorkerBLL
from apiserver.config_repo import config
@@ -163,71 +158,47 @@ def get_activity_report(
@endpoint(
"workers.get_stats",
min_version="2.4",
request_data_model=GetStatsRequest,
response_data_model=GetStatsResponse,
validate_schema=True,
)
def get_stats(call: APICall, company_id, request: GetStatsRequest):
ret = worker_bll.stats.get_worker_stats(company_id, request)
def _get_variant_metric_stats(
metric: str,
agg_names: Sequence[str],
stats: Sequence[dict],
variant: Optional[str] = None,
) -> MetricStats:
stat_by_name = extract_properties_to_lists(agg_names, stats)
return MetricStats(
metric=metric,
variant=variant,
dates=stat_by_name["date"],
stats=[
AggregationStats(aggregation=name, values=aggs)
for name, aggs in stat_by_name.items()
if name != "date"
],
)
def _get_metric_stats(
metric: str, stats: Union[dict, Sequence[dict]], agg_types: Sequence[str]
) -> Sequence[MetricStats]:
"""
Return statistics for a certain metric or a list of statistic for
metric variants if break_by_variant was requested
"""
agg_names = ["date"] + list(set(agg_types))
if not isinstance(stats, dict):
# no variants were requested
return [_get_variant_metric_stats(metric, agg_names, stats)]
return [
_get_variant_metric_stats(metric, agg_names, variant_stats, variant)
for variant, variant_stats in stats.items()
]
def _get_worker_metrics(stats: dict) -> Sequence[MetricStats]:
"""
Convert the worker statistics data from the internal format of lists of structs
to a more "compact" format for json transfer (arrays of dates and arrays of values)
"""
# removed metrics that were requested but for some reason
# do not exist in stats data
metrics = [metric for metric in request.items if metric.key in stats]
aggs_by_metric = bucketize(
metrics, key=attrgetter("key"), value_transform=attrgetter("aggregation")
)
return list(
itertools.chain.from_iterable(
_get_metric_stats(metric, metric_stats, aggs_by_metric[metric])
for metric, metric_stats in stats.items()
)
def _get_agg_stats(
aggregation: str,
stats: dict,
) -> AggregationStats:
resource_series = []
if "resource_series" in stats:
for name, values in stats["resource_series"].items():
resource_series.append(
MetricResourceSeries(
name=name,
values=values
)
)
return AggregationStats(
aggregation=aggregation,
dates=stats["dates"],
values=stats["values"],
resource_series=resource_series,
)
return GetStatsResponse(
workers=[
WorkerStatistics(worker=worker, metrics=_get_worker_metrics(stats))
for worker, stats in ret.items()
WorkerStatistics(
worker=worker,
metrics=[
MetricStats(
metric=metric,
stats=[
_get_agg_stats(aggregation, a_stats)
for aggregation, a_stats in m_stats.items()
]
)
for metric, m_stats in w_stats.items()
],
)
for worker, w_stats in ret.items()
]
)

View File

@@ -32,8 +32,8 @@ class TestTasksArtifacts(TestService):
# test edit
artifacts = [
dict(key="bb", type="str", uri="test1", mode="output"),
dict(key="aa", type="int", uri="test2", mode="input"),
dict(key="bb", type="str", uri="http://files.clear.ml/test1", mode="output"),
dict(key="aa", type="int", uri="http://files.clear.ml/test2", mode="input"),
]
self.api.tasks.edit(task=task, execution={"artifacts": artifacts})
res = self.api.tasks.get_by_id(task=task).task

View File

@@ -14,6 +14,12 @@ class TestTaskPlots(TestService):
@staticmethod
def _create_task_event(task, iteration, **kwargs):
plot_str = kwargs.get("plot_str")
if plot_str:
if not plot_str.startswith("http"):
plot_str = "http://files.clear.ml/" + plot_str
kwargs["plot_str"] = '{"source": "' + plot_str + '"}'
return {
"worker": "test",
"type": "plot",

View File

@@ -1,3 +1,4 @@
import statistics
import time
from uuid import uuid4
from typing import Sequence
@@ -83,7 +84,7 @@ class TestWorkersService(TestService):
self._check_exists(test_worker, False, tags=["test"])
self._check_exists(test_worker, False, tags=["-application"])
def _simulate_workers(self, start: int) -> Sequence[str]:
def _simulate_workers(self, start: int, with_gpu: bool = False) -> dict:
"""
Two workers writing the same metrics. One for 4 seconds. Another one for 2
The first worker reports a task
@@ -93,20 +94,25 @@ class TestWorkersService(TestService):
task_id = self._create_running_task(task_name="task-1")
workers = [f"test_{uuid4().hex}", f"test_{uuid4().hex}"]
workers_stats = [
if with_gpu:
gpu_usage = [dict(gpu_usage=[60, 70]), dict(gpu_usage=[40])]
else:
gpu_usage = [{}, {}]
worker_stats = [
(
dict(cpu_usage=[10, 20], memory_used=50),
dict(cpu_usage=[5], memory_used=30),
dict(cpu_usage=[10, 20], memory_used=50, **gpu_usage[0]),
dict(cpu_usage=[5], memory_used=30, **gpu_usage[1]),
)
] * 4
workers_activity = [
worker_activity = [
(workers[0], workers[1]),
(workers[0], workers[1]),
(workers[0],),
(workers[0],),
]
timestamp = start * 1000
for ws, stats in zip(workers_activity, workers_stats):
for ws, stats in zip(worker_activity, worker_stats):
for w, s in zip(ws, stats):
data = dict(
worker=w,
@@ -118,7 +124,10 @@ class TestWorkersService(TestService):
self.api.workers.status_report(**data)
timestamp += 60*1000
return workers
return {
w: s
for w, s in zip(workers, worker_stats[0])
}
def _create_running_task(self, task_name):
task_input = dict(name=task_name, type="testing")
@@ -131,7 +140,7 @@ class TestWorkersService(TestService):
def test_get_keys(self):
workers = self._simulate_workers(int(time.time()))
time.sleep(5) # give to es time to refresh
res = self.api.workers.get_metric_keys(worker_ids=workers)
res = self.api.workers.get_metric_keys(worker_ids=list(workers))
assert {"cpu", "memory"} == set(c.name for c in res["categories"])
assert all(
c.metric_keys == ["cpu_usage"] for c in res["categories"] if c.name == "cpu"
@@ -147,7 +156,7 @@ class TestWorkersService(TestService):
def test_get_stats(self):
start = int(time.time())
workers = self._simulate_workers(start)
workers = self._simulate_workers(start, with_gpu=True)
time.sleep(5) # give to ES time to refresh
from_date = start
@@ -157,49 +166,72 @@ class TestWorkersService(TestService):
items=[
dict(key="cpu_usage", aggregation="avg"),
dict(key="cpu_usage", aggregation="max"),
dict(key="gpu_usage", aggregation="avg"),
dict(key="gpu_usage", aggregation="max"),
dict(key="memory_used", aggregation="max"),
dict(key="memory_used", aggregation="min"),
],
from_date=from_date,
to_date=to_date,
# split_by_variant=True,
interval=1,
worker_ids=workers,
worker_ids=list(workers),
)
self.assertWorkersInStats(workers, res.workers)
self.assertWorkersInStats(list(workers), res.workers)
for worker in res.workers:
self.assertEqual(
set(metric.metric for metric in worker.metrics),
{"cpu_usage", "memory_used"},
{"cpu_usage", "gpu_usage", "memory_used"},
)
for worker in res.workers:
worker_id = worker.worker
for metric, metric_stats in zip(
worker.metrics, ({"avg", "max"}, {"max", "min"})
worker.metrics, ({"avg", "max"}, {"avg", "max"}, {"max"})
):
metric_name = metric.metric
self.assertEqual(
set(stat.aggregation for stat in metric.stats), metric_stats
)
self.assertTrue(11 >= len(metric.dates) >= 10)
for stat in metric.stats:
expected = workers[worker_id][metric_name]
self.assertTrue(11 >= len(stat.dates) >= 10)
self.assertFalse(stat.get("resource_series"))
agg = stat.aggregation
if isinstance(expected, list):
if agg == "avg":
val = statistics.mean(expected)
elif agg == "min":
val = min(expected)
else:
val = max(expected)
else:
val = expected
self.assertEqual(set(stat["values"]), {val, 0})
# split by variants
# split by resources
res = self.api.workers.get_stats(
items=[dict(key="cpu_usage", aggregation="avg")],
items=[dict(key="gpu_usage", aggregation="avg")],
from_date=from_date,
to_date=to_date,
split_by_variant=True,
split_by_resource=True,
interval=1,
worker_ids=workers,
worker_ids=list(workers),
)
self.assertWorkersInStats(workers, res.workers)
self.assertWorkersInStats(list(workers), res.workers)
for worker in res.workers:
worker_id = worker.worker
for metric in worker.metrics:
self.assertEqual(
set(metric.variant for metric in worker.metrics),
{"0", "1"} if worker.worker == workers[0] else {"0"},
)
self.assertTrue(11 >= len(metric.dates) >= 10)
metric_name = metric.metric
for stat in metric.stats:
expected = workers[worker_id][metric_name]
if metric_name.startswith("gpu") and len(expected) > 1:
resource_series = stat.get("resource_series")
self.assertEqual(len(resource_series), len(expected))
for rs, value in zip(resource_series, expected):
self.assertEqual(set(rs["values"]), {value, 0})
else:
self.assertEqual(stat.get("resource_series"), [])
res = self.api.workers.get_stats(
items=[dict(key="cpu_usage", aggregation="avg")],

View File

@@ -1 +1 @@
__version__ = "2.0.0"
__version__ = "2.1.0"

View File

@@ -5,7 +5,7 @@ services:
command:
- apiserver
container_name: clearml-apiserver
image: allegroai/clearml:latest
image: clearml/server:latest
restart: unless-stopped
volumes:
- c:/opt/clearml/logs:/var/log/clearml
@@ -71,7 +71,7 @@ services:
command:
- fileserver
container_name: clearml-fileserver
image: allegroai/clearml:latest
image: clearml/server:latest
environment:
CLEARML__fileserver__delete__allow_batch: "true"
restart: unless-stopped
@@ -107,7 +107,7 @@ services:
command:
- webserver
container_name: clearml-webserver
image: allegroai/clearml:latest
image: clearml/server:latest
restart: unless-stopped
volumes:
- c:/clearml/logs:/var/log/clearml
@@ -127,7 +127,7 @@ services:
- elasticsearch
- fileserver
container_name: async_delete
image: allegroai/clearml:latest
image: clearml/server:latest
networks:
- backend
restart: unless-stopped

View File

@@ -5,7 +5,7 @@ services:
command:
- apiserver
container_name: clearml-apiserver
image: allegroai/clearml:latest
image: clearml/server:latest
restart: unless-stopped
volumes:
- /opt/clearml/logs:/var/log/clearml
@@ -73,7 +73,7 @@ services:
command:
- fileserver
container_name: clearml-fileserver
image: allegroai/clearml:latest
image: clearml/server:latest
environment:
CLEARML__fileserver__delete__allow_batch: "true"
restart: unless-stopped
@@ -110,7 +110,7 @@ services:
container_name: clearml-webserver
# environment:
# CLEARML_SERVER_SUB_PATH : clearml-web # Allow Clearml to be served with a URL path prefix.
image: allegroai/clearml:latest
image: clearml/server:latest
restart: unless-stopped
depends_on:
- apiserver
@@ -128,7 +128,7 @@ services:
- elasticsearch
- fileserver
container_name: async_delete
image: allegroai/clearml:latest
image: clearml/server:latest
networks:
- backend
restart: unless-stopped
@@ -155,7 +155,7 @@ services:
networks:
- backend
container_name: clearml-agent-services
image: allegroai/clearml-agent-services:latest
image: clearml/clearml-agent-services:latest
deploy:
restart_policy:
condition: on-failure

View File

@@ -1,7 +1,7 @@
Server Side Public License
VERSION 1, OCTOBER 16, 2018
Copyright © 2024 ClearML Inc.
Copyright © 2025 ClearML Inc.
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.

View File

@@ -3,9 +3,9 @@ clearml-agent>=1.5.2
flask-compress>=1.4.0
flask-cors>=3.0.5
flask>=2.3.3
gunicorn>=20.1.0
gunicorn>=23.0.0
pyhocon>=0.3.35
redis==5.2.1
setuptools>=65.5.1
setuptools>=78.1.1
urllib3>=1.26.18
werkzeug>=3.0.1

View File

@@ -0,0 +1,158 @@
version: "3.6"
services:
apiserver:
command:
- apiserver
container_name: clearml-apiserver
image: allegroai/clearml:1.17.1-554
restart: unless-stopped
volumes:
- c:/opt/clearml/logs:/var/log/clearml
- c:/opt/clearml/config:/opt/clearml/config
- c:/opt/clearml/data/fileserver:/mnt/fileserver
depends_on:
- redis
- mongo
- elasticsearch
- fileserver
environment:
CLEARML_ELASTIC_SERVICE_HOST: elasticsearch
CLEARML_ELASTIC_SERVICE_PORT: 9200
CLEARML_MONGODB_SERVICE_HOST: mongo
CLEARML_MONGODB_SERVICE_PORT: 27017
CLEARML_REDIS_SERVICE_HOST: redis
CLEARML_REDIS_SERVICE_PORT: 6379
CLEARML_SERVER_DEPLOYMENT_TYPE: win10
CLEARML__apiserver__pre_populate__enabled: "true"
CLEARML__apiserver__pre_populate__zip_files: "/opt/clearml/db-pre-populate"
CLEARML__apiserver__pre_populate__artifacts_path: "/mnt/fileserver"
CLEARML__services__async_urls_delete__enabled: "true"
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
ports:
- "8008:8008"
networks:
- backend
- frontend
elasticsearch:
networks:
- backend
container_name: clearml-elastic
environment:
bootstrap.memory_lock: "true"
cluster.name: clearml
cluster.routing.allocation.node_initial_primaries_recoveries: "500"
cluster.routing.allocation.disk.watermark.low: 500mb
cluster.routing.allocation.disk.watermark.high: 500mb
cluster.routing.allocation.disk.watermark.flood_stage: 500mb
discovery.type: "single-node"
http.compression_level: "7"
node.name: clearml
reindex.remote.whitelist: "'*.*'"
xpack.security.enabled: "false"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
restart: unless-stopped
volumes:
- c:/opt/clearml/data/elastic_7:/usr/share/elasticsearch/data
- /usr/share/elasticsearch/logs
fileserver:
networks:
- backend
- frontend
command:
- fileserver
container_name: clearml-fileserver
image: allegroai/clearml:1.17.1-554
environment:
CLEARML__fileserver__delete__allow_batch: "true"
restart: unless-stopped
volumes:
- c:/opt/clearml/logs:/var/log/clearml
- c:/opt/clearml/data/fileserver:/mnt/fileserver
- c:/opt/clearml/config:/opt/clearml/config
ports:
- "8081:8081"
mongo:
networks:
- backend
container_name: clearml-mongo
image: mongo:5.0.26
restart: unless-stopped
command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200
volumes:
- c:/opt/clearml/data/mongo_4/db:/data/db
- c:/opt/clearml/data/mongo_4/configdb:/data/configdb
redis:
networks:
- backend
container_name: clearml-redis
image: redis:6.2
restart: unless-stopped
volumes:
- c:/opt/clearml/data/redis:/data
webserver:
command:
- webserver
container_name: clearml-webserver
image: allegroai/clearml:1.17.1-554
restart: unless-stopped
volumes:
- c:/clearml/logs:/var/log/clearml
depends_on:
- apiserver
ports:
- "8080:80"
networks:
- backend
- frontend
async_delete:
depends_on:
- apiserver
- redis
- mongo
- elasticsearch
- fileserver
container_name: async_delete
image: allegroai/clearml:1.17.1-554
networks:
- backend
restart: unless-stopped
environment:
CLEARML_ELASTIC_SERVICE_HOST: elasticsearch
CLEARML_ELASTIC_SERVICE_PORT: 9200
CLEARML_MONGODB_SERVICE_HOST: mongo
CLEARML_MONGODB_SERVICE_PORT: 27017
CLEARML_REDIS_SERVICE_HOST: redis
CLEARML_REDIS_SERVICE_PORT: 6379
PYTHONPATH: /opt/clearml/apiserver
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
entrypoint:
- python3
- -m
- jobs.async_urls_delete
- --fileserver-host
- http://fileserver:8081
volumes:
- c:/opt/clearml/logs:/var/log/clearml
- c:/opt/clearml/config:/opt/clearml/config
networks:
backend:
driver: bridge
frontend:
name: frontend
driver: bridge

View File

@@ -0,0 +1,195 @@
version: "3.6"
services:
apiserver:
command:
- apiserver
container_name: clearml-apiserver
image: allegroai/clearml:1.17.1-554
restart: unless-stopped
volumes:
- /opt/clearml/logs:/var/log/clearml
- /opt/clearml/config:/opt/clearml/config
- /opt/clearml/data/fileserver:/mnt/fileserver
depends_on:
- redis
- mongo
- elasticsearch
- fileserver
environment:
CLEARML_ELASTIC_SERVICE_HOST: elasticsearch
CLEARML_ELASTIC_SERVICE_PORT: 9200
CLEARML_MONGODB_SERVICE_HOST: mongo
CLEARML_MONGODB_SERVICE_PORT: 27017
CLEARML_REDIS_SERVICE_HOST: redis
CLEARML_REDIS_SERVICE_PORT: 6379
CLEARML_SERVER_DEPLOYMENT_TYPE: linux
CLEARML__apiserver__pre_populate__enabled: "true"
CLEARML__apiserver__pre_populate__zip_files: "/opt/clearml/db-pre-populate"
CLEARML__apiserver__pre_populate__artifacts_path: "/mnt/fileserver"
CLEARML__services__async_urls_delete__enabled: "true"
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
CLEARML__secure__credentials__services_agent__user_key: ${CLEARML_AGENT_ACCESS_KEY:-}
CLEARML__secure__credentials__services_agent__user_secret: ${CLEARML_AGENT_SECRET_KEY:-}
ports:
- "8008:8008"
networks:
- backend
- frontend
elasticsearch:
networks:
- backend
container_name: clearml-elastic
environment:
bootstrap.memory_lock: "true"
cluster.name: clearml
cluster.routing.allocation.node_initial_primaries_recoveries: "500"
cluster.routing.allocation.disk.watermark.low: 500mb
cluster.routing.allocation.disk.watermark.high: 500mb
cluster.routing.allocation.disk.watermark.flood_stage: 500mb
discovery.type: "single-node"
http.compression_level: "7"
node.name: clearml
reindex.remote.whitelist: "'*.*'"
xpack.security.enabled: "false"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
restart: unless-stopped
volumes:
- /opt/clearml/data/elastic_7:/usr/share/elasticsearch/data
- /usr/share/elasticsearch/logs
fileserver:
networks:
- backend
- frontend
command:
- fileserver
container_name: clearml-fileserver
image: allegroai/clearml:1.17.1-554
environment:
CLEARML__fileserver__delete__allow_batch: "true"
restart: unless-stopped
volumes:
- /opt/clearml/logs:/var/log/clearml
- /opt/clearml/data/fileserver:/mnt/fileserver
- /opt/clearml/config:/opt/clearml/config
ports:
- "8081:8081"
mongo:
networks:
- backend
container_name: clearml-mongo
image: mongo:5.0.26
restart: unless-stopped
command: --setParameter internalQueryMaxBlockingSortMemoryUsageBytes=196100200
volumes:
- /opt/clearml/data/mongo_4/db:/data/db
- /opt/clearml/data/mongo_4/configdb:/data/configdb
redis:
networks:
- backend
container_name: clearml-redis
image: redis:6.2
restart: unless-stopped
volumes:
- /opt/clearml/data/redis:/data
webserver:
command:
- webserver
container_name: clearml-webserver
# environment:
# CLEARML_SERVER_SUB_PATH : clearml-web # Allow Clearml to be served with a URL path prefix.
image: allegroai/clearml:1.17.1-554
restart: unless-stopped
depends_on:
- apiserver
ports:
- "8080:80"
networks:
- backend
- frontend
async_delete:
depends_on:
- apiserver
- redis
- mongo
- elasticsearch
- fileserver
container_name: async_delete
image: allegroai/clearml:1.17.1-554
networks:
- backend
restart: unless-stopped
environment:
CLEARML_ELASTIC_SERVICE_HOST: elasticsearch
CLEARML_ELASTIC_SERVICE_PORT: 9200
CLEARML_MONGODB_SERVICE_HOST: mongo
CLEARML_MONGODB_SERVICE_PORT: 27017
CLEARML_REDIS_SERVICE_HOST: redis
CLEARML_REDIS_SERVICE_PORT: 6379
PYTHONPATH: /opt/clearml/apiserver
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
entrypoint:
- python3
- -m
- jobs.async_urls_delete
- --fileserver-host
- http://fileserver:8081
volumes:
- /opt/clearml/logs:/var/log/clearml
- /opt/clearml/config:/opt/clearml/config
agent-services:
networks:
- backend
container_name: clearml-agent-services
image: allegroai/clearml-agent-services:latest
deploy:
restart_policy:
condition: on-failure
privileged: true
environment:
CLEARML_HOST_IP: ${CLEARML_HOST_IP}
CLEARML_WEB_HOST: ${CLEARML_WEB_HOST:-}
CLEARML_API_HOST: http://apiserver:8008
CLEARML_FILES_HOST: ${CLEARML_FILES_HOST:-}
CLEARML_API_ACCESS_KEY: ${CLEARML_AGENT_ACCESS_KEY:-$CLEARML_API_ACCESS_KEY}
CLEARML_API_SECRET_KEY: ${CLEARML_AGENT_SECRET_KEY:-$CLEARML_API_SECRET_KEY}
CLEARML_AGENT_GIT_USER: ${CLEARML_AGENT_GIT_USER}
CLEARML_AGENT_GIT_PASS: ${CLEARML_AGENT_GIT_PASS}
CLEARML_AGENT_UPDATE_VERSION: ${CLEARML_AGENT_UPDATE_VERSION:->=0.17.0}
CLEARML_AGENT_DEFAULT_BASE_DOCKER: "ubuntu:18.04"
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION:-}
AZURE_STORAGE_ACCOUNT: ${AZURE_STORAGE_ACCOUNT:-}
AZURE_STORAGE_KEY: ${AZURE_STORAGE_KEY:-}
GOOGLE_APPLICATION_CREDENTIALS: ${GOOGLE_APPLICATION_CREDENTIALS:-}
CLEARML_WORKER_ID: "clearml-services"
CLEARML_AGENT_DOCKER_HOST_MOUNT: "/opt/clearml/agent:/root/.clearml"
SHUTDOWN_IF_NO_ACCESS_KEY: 1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /opt/clearml/agent:/root/.clearml
depends_on:
- apiserver
entrypoint: >
bash -c "curl --retry 10 --retry-delay 10 --retry-connrefused 'http://apiserver:8008/debug.ping' && /usr/agent/entrypoint.sh"
networks:
backend:
driver: bridge
frontend:
driver: bridge