Compare commits

31 Commits

Author SHA1 Message Date
allegroai
452f606889 Version bump to v1.11 2023-05-25 19:40:07 +03:00
allegroai
fc47ccbf09 Add default services agent user 2023-05-25 19:39:53 +03:00
allegroai
0206811342 Improve empty database check during startup 2023-05-25 19:39:17 +03:00
allegroai
a3ac1049a3 Update ClearML SDK dependency 2023-05-25 19:38:48 +03:00
allegroai
8488f63a3a Add fileserver URL prefixes for async deletion 2023-05-25 19:38:07 +03:00
allegroai
9206a7c57d Schedule external file URLs for deletion on models deletion 2023-05-25 19:36:28 +03:00
allegroai
0c37ced2a1 Fix model Id handling when deleting models for tasks 2023-05-25 19:35:18 +03:00
allegroai
b22f26129e Update requirements 2023-05-25 19:34:19 +03:00
allegroai
d8b998ebd8 Bump API version to 2.25 2023-05-25 19:33:37 +03:00
allegroai
741fa84b52 Fix projects own_tasks does not take task state filter into account 2023-05-25 19:32:52 +03:00
allegroai
d9579891c8 Return only reports from the .reports projects in reports.get_all_ex 2023-05-25 19:31:05 +03:00
allegroai
900414d0de Add option to echo ping payload 2023-05-25 19:30:13 +03:00
allegroai
5449b332d2 Support reports from the root project in reports.get_all_ex 2023-05-25 19:29:46 +03:00
allegroai
875f4b9536 Fix task dequeue will changes status for un-queued/running tasks 2023-05-25 19:28:49 +03:00
allegroai
95b8f22899 Add CLEARML_FILES_HOST to async_delete in windows 2023-05-25 19:27:40 +03:00
allegroai
4058fb9ce5 Migrate to python 3.9 bullseye docker images
Update Mongo driver version
2023-05-25 19:27:14 +03:00
allegroai
cf8e847ed3 Switch to new redis version 2023-05-25 19:22:39 +03:00
allegroai
755cc803d9 Add remove_from_all_queues parameter to tasks.dequeue/dequeue_many endpoints 2023-05-25 19:22:10 +03:00
allegroai
3729afe014 Optimize queues.get_next_task to retrieve required task fields only 2023-05-25 19:21:24 +03:00
allegroai
dff2ed34e8 Support receiving mixed events for both locked and unlocked tasks and models events.add_batch 2023-05-25 19:20:35 +03:00
allegroai
de9651d761 Allow mixing Model and task events in the same events batch 2023-05-25 19:19:45 +03:00
allegroai
818496236b Support filtering by children tags in projects.get_all_ex 2023-05-25 19:19:10 +03:00
allegroai
e99817b28b Task reports can now return single value metrics 2023-05-25 19:18:24 +03:00
allegroai
58465fbc17 Model events are fully supported 2023-05-25 19:17:40 +03:00
allegroai
2e4e060a82 Task move forward/backwards in queue is now atomic 2023-05-25 19:16:33 +03:00
allegroai
5c5d9b6434 Fix numeric hyperparam values are not sorted lexicographically with descending sort order 2023-05-25 19:15:59 +03:00
allegroai
4291ad682a Support filtering by task name in projects.get_task_parent 2023-05-25 19:15:26 +03:00
allegroai
4c22757002 Fix task that is not in queue but has 'queued' status can't be dequeued 2023-05-25 19:14:25 +03:00
allegroai
6e777e80b8 Cleaned up unit tests 2023-05-25 19:13:10 +03:00
allegroai
c8e4d9eeac Fix Dockerfile uses deprecated base image 2023-04-18 10:50:13 +03:00
dependabot[bot]
b51aa5c29b Bump redis from 3.5.3 to 4.4.4 in /apiserver (#190)
Bumps [redis](https://github.com/redis/redis-py) from 3.5.3 to 4.4.4.
- [Release notes](https://github.com/redis/redis-py/releases)
- [Changelog](https://github.com/redis/redis-py/blob/master/CHANGES)
- [Commits](https://github.com/redis/redis-py/compare/3.5.3...v4.4.4)

---
updated-dependencies:
- dependency-name: redis
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-04-01 08:59:02 +03:00
73 changed files with 1282 additions and 614 deletions

View File

@@ -44,10 +44,12 @@ class ModelRequest(models.Base):
class DeleteModelRequest(ModelRequest):
force = fields.BoolField(default=False)
delete_external_artifacts = fields.BoolField(default=True)
class ModelsDeleteManyRequest(BatchRequest):
force = fields.BoolField(default=False)
delete_external_artifacts = fields.BoolField(default=True)
class PublishModelRequest(ModelRequest):

View File

@@ -29,6 +29,10 @@ class ProjectOrNoneRequest(models.Base):
include_subprojects = fields.BoolField(default=True)
class GetUniqueMetricsRequest(ProjectOrNoneRequest):
model_metrics = fields.BoolField(default=False)
class GetParamsRequest(ProjectOrNoneRequest):
page = fields.IntField(default=0)
page_size = fields.IntField(default=500)
@@ -45,6 +49,7 @@ class MultiProjectRequest(models.Base):
class ProjectTaskParentsRequest(MultiProjectRequest):
tasks_state = ActualEnumField(EntityVisibility)
task_name = fields.StringField()
class ProjectHyperparamValuesRequest(MultiProjectRequest):
@@ -77,3 +82,4 @@ class ProjectsGetRequest(models.Base):
search_hidden = fields.BoolField(default=False)
allow_public = fields.BoolField(default=True)
children_type = ActualEnumField(ProjectChildrenType)
children_tags = fields.ListField(str)

View File

@@ -61,11 +61,19 @@ class ScalarMetricsIterHistogram(HistogramRequestBase):
metrics: Sequence[MetricVariants] = ListField(items_types=MetricVariants)
class SingleValueMetrics(Base):
pass
class GetTasksDataRequest(Base):
debug_images: EventsRequest = EmbeddedField(EventsRequest)
plots: EventsRequest = EmbeddedField(EventsRequest)
scalar_metrics_iter_histogram: ScalarMetricsIterHistogram = EmbeddedField(ScalarMetricsIterHistogram)
scalar_metrics_iter_histogram: ScalarMetricsIterHistogram = EmbeddedField(
ScalarMetricsIterHistogram
)
single_value_metrics: SingleValueMetrics = EmbeddedField(SingleValueMetrics)
allow_public = BoolField(default=True)
model_events: bool = BoolField(default=False)
class GetAllRequest(Base):

View File

@@ -96,6 +96,10 @@ class UpdateRequest(TaskUpdateRequest):
status_message = StringField(default="")
class DequeueRequest(UpdateRequest):
remove_from_all_queues = BoolField(default=False)
class EnqueueRequest(UpdateRequest):
queue = StringField()
queue_name = StringField()
@@ -274,6 +278,10 @@ class StopManyRequest(TaskBatchRequest):
force = BoolField(default=False)
class DequeueManyRequest(TaskBatchRequest):
remove_from_all_queues = BoolField(default=False)
class EnqueueManyRequest(TaskBatchRequest):
queue = StringField()
queue_name = StringField()

View File

@@ -30,6 +30,7 @@ from apiserver.bll.event.history_debug_image_iterator import HistoryDebugImageIt
from apiserver.bll.event.history_plots_iterator import HistoryPlotsIterator
from apiserver.bll.event.metric_debug_images_iterator import MetricDebugImagesIterator
from apiserver.bll.event.metric_plots_iterator import MetricPlotsIterator
from apiserver.bll.model import ModelBLL
from apiserver.bll.util import parallel_chunked_decorator
from apiserver.database import utils as dbutils
from apiserver.database.model.model import Model
@@ -102,47 +103,72 @@ class EventBLL(object):
return self._metrics
@staticmethod
def _get_valid_tasks(company_id, task_ids: Set, allow_locked_tasks=False) -> Set:
"""Verify that task exists and can be updated"""
if not task_ids:
def _get_valid_entities(company_id, ids: Mapping[str, bool], model=False) -> Set:
"""Verify that task or model exists and can be updated"""
if not ids:
return set()
with translate_errors_context():
query = Q(id__in=task_ids, company=company_id)
if not allow_locked_tasks:
query &= Q(status__nin=LOCKED_TASK_STATUSES)
res = Task.objects(query).only("id")
return {r.id for r in res}
allow_locked = {id_ for id_, allowed in ids.items() if allowed}
not_locked = {id_ for id_, allowed in ids.items() if not allowed}
res = set()
allow_locked_q = Q()
not_locked_q = (
Q(ready__ne=True) if model else Q(status__nin=LOCKED_TASK_STATUSES)
)
for requested_ids, locked_q in (
(allow_locked, allow_locked_q),
(not_locked, not_locked_q),
):
if not requested_ids:
continue
query = Q(id__in=requested_ids, company=company_id)
res.update(
(Model if model else Task).objects(query & locked_q).scalar("id")
)
@staticmethod
def _get_valid_models(company_id, model_ids: Set, allow_locked_models=False) -> Set:
"""Verify that task exists and can be updated"""
if not model_ids:
return set()
with translate_errors_context():
query = Q(id__in=model_ids, company=company_id)
if not allow_locked_models:
query &= Q(ready__ne=True)
res = Model.objects(query).only("id")
return {r.id for r in res}
return res
def add_events(
self, company_id, events, worker, allow_locked=False
self, company_id, events, worker
) -> Tuple[int, int, dict]:
model_events = events[0].get("model_event", False)
task_ids = {}
model_ids = {}
for event in events:
if event.get("model_event", model_events) != model_events:
if event.get("model_event", False):
model = event.pop("model", None)
if model is not None:
event["task"] = model
entity_ids = model_ids
else:
event["model_event"] = False
entity_ids = task_ids
id_ = event.get("task")
allow_locked = event.pop("allow_locked", False)
if not id_:
continue
allowed_for_entity = entity_ids.get(id_)
if allowed_for_entity is None:
entity_ids[id_] = allow_locked
elif allowed_for_entity != allow_locked:
raise errors.bad_request.ValidationError(
"Inconsistent model_event setting in the passed events"
)
if event.pop("allow_locked", allow_locked) != allow_locked:
raise errors.bad_request.ValidationError(
"Inconsistent allow_locked setting in the passed events"
f"Inconsistent allow_locked setting in the passed events for {id_}"
)
found_in_both = set(task_ids).intersection(set(model_ids))
if found_in_both:
raise errors.bad_request.ValidationError(
"Inconsistent model_event setting in the passed events",
tasks=found_in_both,
)
valid_models = self._get_valid_entities(company_id, ids=model_ids, model=True)
valid_tasks = self._get_valid_entities(company_id, ids=task_ids)
actions: List[dict] = []
task_or_model_ids = set()
used_task_ids = set()
used_model_ids = set()
task_iteration = defaultdict(lambda: 0)
task_last_scalar_events = nested_dict(
3, dict
@@ -152,28 +178,6 @@ class EventBLL(object):
) # task_id -> metric_hash -> event_type -> MetricEvent
errors_per_type = defaultdict(int)
invalid_iteration_error = f"Iteration number should not exceed {MAX_LONG}"
if model_events:
for event in events:
model = event.pop("model", None)
if model is not None:
event["task"] = model
valid_entities = self._get_valid_models(
company_id,
model_ids={
event["task"] for event in events if event.get("task") is not None
},
allow_locked_models=allow_locked,
)
entity_name = "model"
else:
valid_entities = self._get_valid_tasks(
company_id,
task_ids={
event["task"] for event in events if event.get("task") is not None
},
allow_locked_tasks=allow_locked,
)
entity_name = "task"
for event in events:
# remove spaces from event type
@@ -187,7 +191,8 @@ class EventBLL(object):
errors_per_type[f"Invalid event type {event_type}"] += 1
continue
if model_events and event_type == EventType.task_log.value:
model_event = event["model_event"]
if model_event and event_type == EventType.task_log.value:
errors_per_type[f"Task log events are not supported for models"] += 1
continue
@@ -196,8 +201,12 @@ class EventBLL(object):
errors_per_type["Event must have a 'task' field"] += 1
continue
if task_or_model_id not in valid_entities:
errors_per_type[f"Invalid {entity_name} id {task_or_model_id}"] += 1
if (model_event and task_or_model_id not in valid_models) or (
not model_event and task_or_model_id not in valid_tasks
):
errors_per_type[
f"Invalid {'model' if model_event else 'task'} id {task_or_model_id}"
] += 1
continue
event["type"] = event_type
@@ -232,7 +241,6 @@ class EventBLL(object):
event["metric"] = event.get("metric") or ""
event["variant"] = event.get("variant") or ""
event["model_event"] = model_events
index_name = get_index_name(company_id, event_type)
es_action = {
@@ -241,31 +249,31 @@ class EventBLL(object):
"_source": event,
}
# for "log" events, don't assing custom _id - whatever is sent, is written (not overwritten)
# for "log" events, don't assign custom _id - whatever is sent, is written (not overwritten)
if event_type != EventType.task_log.value:
es_action["_id"] = self._get_event_id(event)
else:
es_action["_id"] = dbutils.id()
task_or_model_ids.add(task_or_model_id)
if (
iter is not None
and not model_events
and event.get("metric") not in self._skip_iteration_for_metric
):
task_iteration[task_or_model_id] = max(
iter, task_iteration[task_or_model_id]
)
if not model_events:
if model_event:
used_model_ids.add(task_or_model_id)
else:
used_task_ids.add(task_or_model_id)
self._update_last_metric_events_for_task(
last_events=task_last_events[task_or_model_id], event=event,
)
if event_type == EventType.metrics_scalar.value:
self._update_last_scalar_events_for_task(
last_events=task_last_scalar_events[task_or_model_id],
event=event,
)
if event_type == EventType.metrics_scalar.value:
self._update_last_scalar_events_for_task(
last_events=task_last_scalar_events[task_or_model_id], event=event,
)
actions.append(es_action)
@@ -303,31 +311,32 @@ class EventBLL(object):
else:
errors_per_type["Error when indexing events batch"] += 1
if not model_events:
remaining_tasks = set()
now = datetime.utcnow()
for task_or_model_id in task_or_model_ids:
# Update related tasks. For reasons of performance, we prefer to update
# all of them and not only those who's events were successful
updated = self._update_task(
company_id=company_id,
task_id=task_or_model_id,
now=now,
iter_max=task_iteration.get(task_or_model_id),
last_scalar_events=task_last_scalar_events.get(
task_or_model_id
),
last_events=task_last_events.get(task_or_model_id),
)
for model_id in used_model_ids:
ModelBLL.update_statistics(
company_id=company_id,
model_id=model_id,
last_iteration_max=task_iteration.get(model_id),
last_scalar_events=task_last_scalar_events.get(model_id),
)
remaining_tasks = set()
now = datetime.utcnow()
for task_id in used_task_ids:
# Update related tasks. For reasons of performance, we prefer to update
# all of them and not only those who's events were successful
updated = self._update_task(
company_id=company_id,
task_id=task_id,
now=now,
iter_max=task_iteration.get(task_id),
last_scalar_events=task_last_scalar_events.get(task_id),
last_events=task_last_events.get(task_id),
)
if not updated:
remaining_tasks.add(task_id)
continue
if not updated:
remaining_tasks.add(task_or_model_id)
continue
if remaining_tasks:
TaskBLL.set_last_update(
remaining_tasks, company_id, last_update=now
)
if remaining_tasks:
TaskBLL.set_last_update(remaining_tasks, company_id, last_update=now)
# this is for backwards compatibility with streaming bulk throwing exception on those
invalid_iterations_count = errors_per_type.get(invalid_iteration_error)
@@ -484,7 +493,9 @@ class EventBLL(object):
)
def _get_event_id(self, event):
id_values = (str(event[field]) for field in self.event_id_fields if field in event)
id_values = (
str(event[field]) for field in self.event_id_fields if field in event
)
return hashlib.md5("-".join(id_values).encode()).hexdigest()
def scroll_task_events(
@@ -556,9 +567,7 @@ class EventBLL(object):
must.append(get_metric_variants_condition(metric_variants))
query = {"bool": {"must": must}}
search_args = dict(
es=self.es, company_id=company_id, event_type=event_type,
)
search_args = dict(es=self.es, company_id=company_id, event_type=event_type)
max_metrics, max_variants = get_max_metric_and_variant_counts(
query=query, **search_args,
)
@@ -586,7 +595,7 @@ class EventBLL(object):
"events": {
"top_hits": {
"sort": {"iter": {"order": "desc"}},
"size": last_iterations_per_plot
"size": last_iterations_per_plot,
}
}
},
@@ -597,11 +606,7 @@ class EventBLL(object):
}
with translate_errors_context():
es_response = search_company_events(
body=es_req,
ignore=404,
**search_args,
)
es_response = search_company_events(body=es_req, ignore=404, **search_args)
aggs_result = es_response.get("aggregations")
if not aggs_result:
@@ -614,9 +619,7 @@ class EventBLL(object):
for hit in variants_bucket["events"]["hits"]["hits"]
]
self.uncompress_plots(events)
return TaskEventsResult(
events=events, total_events=len(events)
)
return TaskEventsResult(events=events, total_events=len(events))
def _get_events_from_es_res(self, es_res: dict) -> Tuple[list, int, Optional[str]]:
"""
@@ -731,12 +734,7 @@ class EventBLL(object):
if not company_ids:
return TaskEventsResult()
task_ids = (
[task_id]
if isinstance(task_id, str)
else task_id
)
task_ids = [task_id] if isinstance(task_id, str) else task_id
must = []
if metrics:
@@ -967,7 +965,7 @@ class EventBLL(object):
event_type: EventType,
task_id: Union[str, Sequence[str]],
iters: int,
metrics: MetricVariants = None
metrics: MetricVariants = None,
) -> Mapping[str, Sequence]:
company_ids = [company_id] if isinstance(company_id, str) else company_id
company_ids = [

View File

@@ -5,7 +5,8 @@ from mongoengine import Q
from apiserver.apierrors import errors
from apiserver.apimodels.models import ModelTaskPublishResponse
from apiserver.bll.task.utils import deleted_prefix
from apiserver.bll.task.utils import deleted_prefix, get_last_metric_updates
from apiserver.config_repo import config
from apiserver.database.model import EntityVisibility
from apiserver.database.model.model import Model
from apiserver.database.model.task.task import Task, TaskStatus
@@ -28,11 +29,7 @@ class ModelBLL:
@staticmethod
def assert_exists(
company_id,
model_ids,
only=None,
allow_public=False,
return_models=True,
company_id, model_ids, only=None, allow_public=False, return_models=True,
) -> Optional[Sequence[Model]]:
model_ids = [model_ids] if isinstance(model_ids, str) else model_ids
ids = set(model_ids)
@@ -86,7 +83,7 @@ class ModelBLL:
@classmethod
def delete_model(
cls, model_id: str, company_id: str, force: bool
cls, model_id: str, company_id: str, user_id: str, force: bool, delete_external_artifacts: bool = True,
) -> Tuple[int, Model]:
model = cls.get_company_model_by_id(
company_id=company_id,
@@ -112,25 +109,52 @@ class ModelBLL:
if model.task:
task = Task.objects(id=model.task).first()
if task and task.status == TaskStatus.published:
if not force:
raise errors.bad_request.ModelCreatingTaskExists(
"and published, use force=True to delete", task=model.task
)
if task.models.output and model_id in task.models.output:
now = datetime.utcnow()
if task:
now = datetime.utcnow()
if task.status == TaskStatus.published:
if not force:
raise errors.bad_request.ModelCreatingTaskExists(
"and published, use force=True to delete", task=model.task
)
Task._get_collection().update_one(
filter={"_id": model.task, "models.output.model": model_id},
update={
"$set": {
"models.output.$[elem].model": deleted_model_id,
"output.error": f"model deleted on {now.isoformat()}",
"last_change": now,
},
"last_change": now,
},
array_filters=[{"elem.model": model_id}],
upsert=False,
)
else:
task.update(
pull__models__output__model=model_id, set__last_change=now
)
delete_external_artifacts = delete_external_artifacts and config.get(
"services.async_urls_delete.enabled", True
)
if delete_external_artifacts:
from apiserver.bll.task.task_cleanup import (
collect_debug_image_urls,
collect_plot_image_urls,
_schedule_for_delete,
)
urls = set()
urls.update(collect_debug_image_urls(company_id, model_id))
urls.update(collect_plot_image_urls(company_id, model_id))
if model.uri:
urls.add(model.uri)
if urls:
_schedule_for_delete(
task_id=model_id,
company=company_id,
user=user_id,
urls=urls,
can_delete_folders=False,
)
del_count = Model.objects(id=model_id, company=company_id).delete()
return del_count, model
@@ -179,12 +203,36 @@ class ModelBLL:
"labels_count": {"$size": {"$objectToArray": "$labels"}}
}
},
{
"$project": {"labels_count": 1},
},
{"$project": {"labels_count": 1}},
]
)
return {
r.pop("_id"): r
for r in result
}
return {r.pop("_id"): r for r in result}
@staticmethod
def update_statistics(
company_id: str,
model_id: str,
last_iteration_max: int = None,
last_scalar_events: Dict[str, Dict[str, dict]] = None,
):
updates = {"last_update": datetime.utcnow()}
if last_iteration_max is not None:
updates.update(max__last_iteration=last_iteration_max)
raw_updates = {}
if last_scalar_events is not None:
raw_updates = {}
if last_scalar_events is not None:
get_last_metric_updates(
task_id=model_id,
last_scalar_events=last_scalar_events,
raw_updates=raw_updates,
extra_updates=updates,
model_events=True,
)
ret = Model.objects(id=model_id).update_one(**updates)
if ret and raw_updates:
Model.objects(id=model_id).update_one(__raw__=[{"$set": raw_updates}])
return ret

View File

@@ -16,7 +16,6 @@ from typing import (
Any,
)
from boltons.iterutils import partition
from mongoengine import Q, Document
from apiserver import database
@@ -58,7 +57,7 @@ class ProjectBLL:
@classmethod
def merge_project(
cls, company, source_id: str, destination_id: str
cls, company: str, source_id: str, destination_id: str
) -> Tuple[int, int, Set[str]]:
"""
Move all the tasks and sub projects from the source project to the destination
@@ -901,6 +900,7 @@ class ProjectBLL:
project_ids: Optional[Sequence[str]] = None,
allow_public: bool = True,
children_type: ProjectChildrenType = None,
children_tags: Sequence[str] = None,
) -> Tuple[Sequence[str], Sequence[str]]:
"""
Get the projects ids matching children_condition (if passed) or where the passed user created any tasks
@@ -921,15 +921,20 @@ class ProjectBLL:
query &= Q(user__in=users)
project_query = None
child_query = (
query & GetMixin.get_list_field_query("tags", children_tags)
if children_tags
else query
)
if children_type == ProjectChildrenType.dataset:
child_queries = {
Project: query
Project: child_query
& Q(system_tags__in=[dataset_tag], basename__ne=datasets_project_name)
}
elif children_type == ProjectChildrenType.pipeline:
child_queries = {Task: query & Q(system_tags__in=[pipeline_tag])}
child_queries = {Task: child_query & Q(system_tags__in=[pipeline_tag])}
elif children_type == ProjectChildrenType.report:
child_queries = {Task: query & Q(system_tags__in=[reports_tag])}
child_queries = {Task: child_query & Q(system_tags__in=[reports_tag])}
else:
project_query = query
child_queries = {entity_cls: query for entity_cls in cls.child_classes}
@@ -946,9 +951,7 @@ class ProjectBLL:
)
res = (
{p.id for p in Project.objects(project_query).only("id")}
if project_query
else set()
set(Project.objects(project_query).scalar("id")) if project_query else set()
)
for cls_, query_ in child_queries.items():
res |= set(
@@ -977,6 +980,7 @@ class ProjectBLL:
projects: Sequence[str],
include_subprojects: bool,
state: Optional[EntityVisibility] = None,
name: str = None,
) -> Sequence[dict]:
"""
Get list of unique parent tasks sorted by task name for the passed company projects
@@ -1003,9 +1007,11 @@ class ProjectBLL:
parents = Task.get_many_with_join(
company_id,
query=Q(id__in=parent_ids),
query_dict={"name": name} if name else None,
allow_public=True,
override_projection=("id", "name", "project.name"),
)
return sorted(parents, key=itemgetter("name"))
@classmethod
@@ -1062,10 +1068,11 @@ class ProjectBLL:
raise errors.bad_request.ValidationError(
f"List of strings expected for the field: {field}"
)
exclude, include = partition(field_filter, lambda x: x.startswith("-"))
helper = GetMixin.ListFieldBucketHelper(field, legacy=True)
actions = helper.get_actions(field_filter)
conditions[field] = {
**({"$in": include} if include else {}),
**({"$nin": [e[1:] for e in exclude]} if exclude else {}),
f"${action}": list(set(actions[action]))
for action in filter(None, actions)
}
return conditions
@@ -1139,6 +1146,7 @@ class ProjectBLL:
company: str,
project_ids: Sequence[str],
filter_: Mapping[str, Any] = None,
specific_state: Optional[EntityVisibility] = None,
users: Sequence[str] = None,
) -> Dict[str, dict]:
"""
@@ -1149,6 +1157,20 @@ class ProjectBLL:
if not project_ids:
return {}
if specific_state:
filter_ = filter_ or {}
system_tags_filter = filter_.get("system_tags", [])
archived = EntityVisibility.archived.value
non_archived = f"-{EntityVisibility.archived.value}"
if not any(t in system_tags_filter for t in (archived, non_archived)):
filter_ = {k: v for k, v in filter_.items()}
filter_["system_tags"] = [
archived
if specific_state == EntityVisibility.archived
else non_archived,
*system_tags_filter,
]
pipeline = [
{
"$match": cls.get_match_conditions(

View File

@@ -1,4 +1,5 @@
from collections import defaultdict
from datetime import datetime
from typing import Tuple, Set, Sequence
import attr
@@ -15,7 +16,7 @@ from apiserver.config_repo import config
from apiserver.database.model import EntityVisibility
from apiserver.database.model.model import Model
from apiserver.database.model.project import Project
from apiserver.database.model.task.task import Task, ArtifactModes, TaskType
from apiserver.database.model.task.task import Task, ArtifactModes, TaskType, TaskStatus
from .project_bll import ProjectBLL
from .sub_projects import _ids_with_children
@@ -76,7 +77,7 @@ def delete_project(
raise errors.bad_request.InvalidProjectId(id=project_id)
delete_external_artifacts = delete_external_artifacts and config.get(
"services.async_urls_delete.enabled", False
"services.async_urls_delete.enabled", True
)
is_pipeline = "pipeline" in (project.system_tags or [])
project_ids = _ids_with_children([project_id])
@@ -185,29 +186,43 @@ def _delete_models(
return 0, set(), set()
model_ids = list({m.id for m in models})
deleted = "__DELETED__"
Task._get_collection().update_many(
filter={
"project": {"$nin": projects},
"models.input.model": {"$in": model_ids},
},
update={"$set": {"models.input.$[elem].model": None}},
update={"$set": {"models.input.$[elem].model": deleted}},
array_filters=[{"elem.model": {"$in": model_ids}}],
upsert=False,
)
model_tasks = list({m.task for m in models if m.task})
if model_tasks:
now = datetime.utcnow()
# update published tasks
Task._get_collection().update_many(
filter={
"_id": {"$in": model_tasks},
"project": {"$nin": projects},
"models.output.model": {"$in": model_ids},
"status": TaskStatus.published,
},
update={
"$set": {
"models.output.$[elem].model": deleted,
"last_change": now,
}
},
update={"$set": {"models.output.$[elem].model": None}},
array_filters=[{"elem.model": {"$in": model_ids}}],
upsert=False,
)
# update unpublished tasks
Task.objects(
id__in=model_tasks,
project__nin=projects,
status__ne=TaskStatus.published,
).update(pull__models__output__model__in=model_ids, set__last_change=now)
event_urls, model_urls = set(), set()
for m in models:

View File

@@ -209,7 +209,11 @@ class ProjectQueries:
@classmethod
def get_unique_metric_variants(
cls, company_id, project_ids: Sequence[str], include_subprojects: bool
cls,
company_id,
project_ids: Sequence[str],
include_subprojects: bool,
model_metrics: bool = False,
):
pipeline = [
{
@@ -246,7 +250,8 @@ class ProjectQueries:
{"$sort": OrderedDict({"_id.metric": 1, "_id.variant": 1})},
]
result = Task.aggregate(pipeline)
entity_cls = Model if model_metrics else Task
result = entity_cls.aggregate(pipeline)
return [r["metrics"][0] for r in result]
@classmethod

View File

@@ -144,8 +144,8 @@ def _ids_with_children(project_ids: Sequence[str]) -> Sequence[str]:
"""
Return project ids with the ids of all the subprojects
"""
subprojects = Project.objects(path__in=project_ids).only("id")
return list({*project_ids, *(child.id for child in subprojects)})
children_ids = Project.objects(path__in=project_ids).scalar("id")
return list({*project_ids, *children_ids})
def _update_subproject_names(

View File

@@ -1,6 +1,6 @@
from collections import defaultdict
from datetime import datetime
from typing import Callable, Sequence, Optional, Tuple
from typing import Sequence, Optional, Tuple, Union
from elasticsearch import Elasticsearch
from mongoengine import Q
@@ -16,6 +16,8 @@ from apiserver.database.errors import translate_errors_context
from apiserver.database.model.queue import Queue, Entry
log = config.logger(__file__)
MOVE_FIRST = "first"
MOVE_LAST = "last"
class QueueBLL(object):
@@ -323,42 +325,130 @@ class QueueBLL(object):
company_id: str,
queue_id: str,
task_id: str,
pos_func: Callable[[int], int],
move_count: Union[int, str],
) -> int:
"""
Moves the task in the queue to the position calculated by pos_func
Returns the updated task position in the queue
"""
with translate_errors_context():
queue = self.get_queue_with_task(
def get_queue_and_task_position():
q = self.get_queue_with_task(
company_id=company_id, queue_id=queue_id, task_id=task_id
)
return q, next(i for i, e in enumerate(q.entries) if e.task == task_id)
position = next(i for i, e in enumerate(queue.entries) if e.task == task_id)
new_position = pos_func(position)
with translate_errors_context():
queue, position = get_queue_and_task_position()
if move_count == MOVE_FIRST:
new_position = 0
elif move_count == MOVE_LAST:
new_position = len(queue.entries) - 1
else:
new_position = position + move_count
if new_position == position:
return new_position
if new_position != position:
entry = queue.entries[position]
query = dict(id=queue_id, company=company_id)
updated = Queue.objects(entries__task=task_id, **query).update_one(
pull__entries=entry, last_update=datetime.utcnow()
)
if not updated:
raise errors.bad_request.RemovedDuringReposition(
task=task_id, **query
)
inst = {"$push": {"entries": {"$each": [entry.to_proper_dict()]}}}
if new_position >= 0:
inst["$push"]["entries"]["$position"] = new_position
res = Queue.objects(entries__task__ne=task_id, **query).update_one(
__raw__=inst
)
if not res:
raise errors.bad_request.FailedAddingDuringReposition(
task=task_id, **query
)
without_entry = {
"$filter": {
"input": "$entries",
"as": "entry",
"cond": {"$ne": ["$$entry.task", task_id]},
}
}
task_entry = {
"$filter": {
"input": "$entries",
"as": "entry",
"cond": {"$eq": ["$$entry.task", task_id]},
}
}
if move_count == MOVE_FIRST:
operations = [
{
"$set": {
"entries": {"$concatArrays": [task_entry, without_entry]}
}
}
]
elif move_count == MOVE_LAST:
operations = [
{
"$set": {
"entries": {"$concatArrays": [without_entry, task_entry]}
}
}
]
else:
operations = [
{
"$set": {
"new_pos": {
"$add": [
{"$indexOfArray": ["$entries.task", task_id]},
move_count,
]
},
"without_entry": without_entry,
"task_entry": task_entry,
}
},
{
"$set": {
"entries": {
"$switch": {
"branches": [
{
"case": {"$lte": ["$new_pos", 0]},
"then": {
"$concatArrays": [
"$task_entry",
"$without_entry",
]
},
},
{
"case": {
"$gte": [
"$new_pos",
{"$size": "$without_entry"},
]
},
"then": {
"$concatArrays": [
"$without_entry",
"$task_entry",
]
},
},
],
"default": {
"$concatArrays": [
{"$slice": ["$without_entry", "$new_pos"]},
"$task_entry",
{
"$slice": [
"$without_entry",
"$new_pos",
{"$size": "$without_entry"},
]
},
]
},
}
}
}
},
{"$unset": ["new_pos", "without_entry", "task_entry"]},
]
return new_position
updated = Queue.objects(
id=queue_id, company=company_id, entries__task=task_id
).update_one(__raw__=operations)
if not updated:
raise errors.bad_request.FailedAddingDuringReposition(task=task_id)
return get_queue_and_task_position()[1]
def count_entries(self, company: str, queue_id: str) -> Optional[int]:
res = next(

View File

@@ -7,7 +7,7 @@ from redis import StrictRedis
from six import string_types
import apiserver.database.utils as dbutils
from apiserver.apierrors import errors
from apiserver.apierrors import errors, APIError
from apiserver.apimodels.tasks import TaskInputModel
from apiserver.bll.queue import QueueBLL
from apiserver.bll.organization import OrgBLL, Tags
@@ -30,6 +30,7 @@ from apiserver.database.model.task.task import (
TaskModelTypes,
)
from apiserver.database.model import EntityVisibility
from apiserver.database.model.queue import Queue
from apiserver.database.utils import get_company_or_none_constraint, id as create_id
from apiserver.es_factory import es_factory
from apiserver.redis_manager import redman
@@ -40,6 +41,7 @@ from .utils import (
ChangeStatusRequest,
update_project_time,
deleted_prefix,
get_last_metric_updates,
)
log = config.logger(__file__)
@@ -412,81 +414,12 @@ class TaskBLL:
raw_updates = {}
if last_scalar_events is not None:
max_values = config.get("services.tasks.max_last_metrics", 2000)
total_metrics = set()
if max_values:
query = dict(id=task_id)
to_add = sum(len(v) for m, v in last_scalar_events.items())
if to_add <= max_values:
query[f"unique_metrics__{max_values-to_add}__exists"] = True
task = Task.objects(**query).only("unique_metrics").first()
if task and task.unique_metrics:
total_metrics = set(task.unique_metrics)
new_metrics = []
def add_last_metric_conditional_update(
metric_path: str, metric_value, iter_value: int, is_min: bool
):
"""
Build an aggregation for an atomic update of the min or max value and the corresponding iteration
"""
if is_min:
field_prefix = "min"
op = "$gt"
else:
field_prefix = "max"
op = "$lt"
value_field = f"{metric_path}__{field_prefix}_value".replace("__", ".")
condition = {
"$or": [
{"$lte": [f"${value_field}", None]},
{op: [f"${value_field}", metric_value]},
]
}
raw_updates[value_field] = {
"$cond": [condition, metric_value, f"${value_field}"]
}
value_iteration_field = f"{metric_path}__{field_prefix}_value_iteration".replace(
"__", "."
)
raw_updates[value_iteration_field] = {
"$cond": [
condition,
iter_value,
f"${value_iteration_field}",
]
}
for metric_key, metric_data in last_scalar_events.items():
for variant_key, variant_data in metric_data.items():
metric = (
f"{variant_data.get('metric')}/{variant_data.get('variant')}"
)
if max_values:
if (
len(total_metrics) >= max_values
and metric not in total_metrics
):
continue
total_metrics.add(metric)
new_metrics.append(metric)
path = f"last_metrics__{metric_key}__{variant_key}"
for key, value in variant_data.items():
if key in ("min_value", "max_value"):
add_last_metric_conditional_update(
metric_path=path,
metric_value=value,
iter_value=variant_data.get(f"{key}_iter", 0),
is_min=(key == "min_value"),
)
elif key in ("metric", "variant", "value"):
extra_updates[f"set__{path}__{key}"] = value
if new_metrics:
extra_updates["add_to_set__unique_metrics"] = new_metrics
get_last_metric_updates(
task_id=task_id,
last_scalar_events=last_scalar_events,
raw_updates=raw_updates,
extra_updates=extra_updates,
)
if last_events is not None:
@@ -515,6 +448,12 @@ class TaskBLL:
return ret
@staticmethod
def remove_task_from_all_queues(company_id: str, task: Task) -> int:
return Queue.objects(company=company_id, entries__task=task.id).update(
pull__entries__task=task.id, last_update=datetime.utcnow()
)
@classmethod
def dequeue_and_change_status(
cls,
@@ -523,13 +462,20 @@ class TaskBLL:
user_id: str,
status_message: str,
status_reason: str,
remove_from_all_queues=False,
):
try:
cls.dequeue(task, company_id)
except errors.bad_request.InvalidQueueOrTaskNotQueued:
cls.dequeue(task, company_id, silent_fail=True)
except APIError:
# dequeue may fail if the queue was deleted
pass
if remove_from_all_queues:
cls.remove_task_from_all_queues(company_id=company_id, task=task)
if task.status not in [TaskStatus.queued, TaskStatus.in_progress]:
return {"updated": 0}
return ChangeStatusRequest(
task=task,
new_status=task.enqueue_status or TaskStatus.created,

View File

@@ -105,13 +105,21 @@ def collect_debug_image_urls(company: str, task_or_model: str) -> Set[str]:
supported_storage_types = {
"https://": StorageType.fileserver,
"http://": StorageType.fileserver,
"s3://": StorageType.s3,
"azure://": StorageType.azure,
"gs://": StorageType.gs,
}
supported_storage_types.update(
{
p: StorageType.fileserver
for p in config.get(
"services.async_urls_delete.fileserver.url_prefixes",
["https://", "http://"],
)
}
)
def _schedule_for_delete(
company: str, user: str, task_id: str, urls: Set[str], can_delete_folders: bool,
@@ -196,7 +204,7 @@ def cleanup_task(
task, force
)
delete_external_artifacts = delete_external_artifacts and config.get(
"services.async_urls_delete.enabled", False
"services.async_urls_delete.enabled", True
)
event_urls, artifact_urls, model_urls = set(), set(), set()
if return_file_urls or delete_external_artifacts:

View File

@@ -61,6 +61,7 @@ def archive_task(
user_id=user_id,
status_message=status_message,
status_reason=status_reason,
remove_from_all_queues=True,
)
except APIError:
# dequeue may fail if the task was not enqueued
@@ -99,6 +100,7 @@ def dequeue_task(
user_id: str,
status_message: str,
status_reason: str,
remove_from_all_queues: bool = False,
) -> Tuple[int, dict]:
query = dict(id=task_id, company=company_id)
task = Task.get_for_writing(**query)
@@ -111,6 +113,7 @@ def dequeue_task(
user_id=user_id,
status_message=status_message,
status_reason=status_reason,
remove_from_all_queues=remove_from_all_queues,
)
return 1, res
@@ -244,6 +247,7 @@ def delete_task(
user_id=user_id,
status_message=status_message,
status_reason=status_reason,
remove_from_all_queues=True,
)
except APIError:
# dequeue may fail if the task was not enqueued
@@ -262,6 +266,7 @@ def delete_task(
if move_to_trash:
# make sure that whatever changes were done to the task are saved
# the task itself will be deleted later in the move_tasks_to_trash operation
task.last_update = datetime.utcnow()
task.save()
else:
task.delete()
@@ -296,6 +301,8 @@ def reset_task(
# dequeue may fail if the task was not enqueued
pass
TaskBLL.remove_task_from_all_queues(company_id=company_id, task=task)
cleaned_up = cleanup_task(
company=company_id,
user=user_id,

View File

@@ -5,7 +5,9 @@ import attr
import six
from apiserver.apierrors import errors
from apiserver.config_repo import config
from apiserver.database.errors import translate_errors_context
from apiserver.database.model.model import Model
from apiserver.database.model.project import Project
from apiserver.database.model.task.task import Task, TaskStatus, TaskSystemTags
from apiserver.database.utils import get_options
@@ -167,7 +169,7 @@ def update_project_time(project_ids: Union[str, Sequence[str]]):
def get_task_for_update(
company_id: str, task_id: str, allow_all_statuses: bool = False, force: bool = False
company_id: str, task_id: str, allow_all_statuses: bool = False, force: bool = False
) -> Task:
"""
Loads only task id and return the task only if it is updatable (status == 'created')
@@ -189,9 +191,88 @@ def get_task_for_update(
return task
def update_task(task: Task, user_id: str, update_cmds: dict, set_last_update: bool = True):
def update_task(
task: Task, user_id: str, update_cmds: dict, set_last_update: bool = True
):
now = datetime.utcnow()
last_updates = dict(last_change=now, last_changed_by=user_id)
if set_last_update:
last_updates.update(last_update=now)
return task.update(**update_cmds, **last_updates)
def get_last_metric_updates(
task_id: str,
last_scalar_events: dict,
raw_updates: dict,
extra_updates: dict,
model_events: bool = False,
):
max_values = config.get("services.tasks.max_last_metrics", 2000)
total_metrics = set()
if max_values:
query = dict(id=task_id)
to_add = sum(len(v) for m, v in last_scalar_events.items())
if to_add <= max_values:
query[f"unique_metrics__{max_values - to_add}__exists"] = True
db_cls = Model if model_events else Task
task = db_cls.objects(**query).only("unique_metrics").first()
if task and task.unique_metrics:
total_metrics = set(task.unique_metrics)
new_metrics = []
def add_last_metric_conditional_update(
metric_path: str, metric_value, iter_value: int, is_min: bool
):
"""
Build an aggregation for an atomic update of the min or max value and the corresponding iteration
"""
if is_min:
field_prefix = "min"
op = "$gt"
else:
field_prefix = "max"
op = "$lt"
value_field = f"{metric_path}__{field_prefix}_value".replace("__", ".")
condition = {
"$or": [
{"$lte": [f"${value_field}", None]},
{op: [f"${value_field}", metric_value]},
]
}
raw_updates[value_field] = {
"$cond": [condition, metric_value, f"${value_field}"]
}
value_iteration_field = f"{metric_path}__{field_prefix}_value_iteration".replace(
"__", "."
)
raw_updates[value_iteration_field] = {
"$cond": [condition, iter_value, f"${value_iteration_field}"]
}
for metric_key, metric_data in last_scalar_events.items():
for variant_key, variant_data in metric_data.items():
metric = f"{variant_data.get('metric')}/{variant_data.get('variant')}"
if max_values:
if len(total_metrics) >= max_values and metric not in total_metrics:
continue
total_metrics.add(metric)
new_metrics.append(metric)
path = f"last_metrics__{metric_key}__{variant_key}"
for key, value in variant_data.items():
if key in ("min_value", "max_value"):
add_last_metric_conditional_update(
metric_path=path,
metric_value=value,
iter_value=variant_data.get(f"{key}_iter", 0),
is_min=(key == "min_value"),
)
elif key in ("metric", "variant", "value"):
extra_updates[f"set__{path}__{key}"] = value
if new_metrics:
extra_updates["add_to_set__unique_metrics"] = new_metrics

View File

@@ -23,6 +23,11 @@
user_secret: "yfc8KQo*GMXb*9p((qcYC7ByFIpF7I&4VH3BfUYXH%o9vX1ZUZQEEw1Inc)S"
revoke_in_fixed_mode: true
}
services_agent {
role: "admin"
user_key: "P4BMJA7RK3TKBXGSY8OAA1FA8TOD11"
user_secret: "9LsgSfa0SYz0zli1_c500ZcLqanre2xkWOpepyt1w-BKK3_DKPHrtoj3JSHvyy8bIi0"
}
tests {
role: "user"
display_name: "Default User"

View File

@@ -1,4 +1,4 @@
# if set to True then on task delete/reset external file urls for know storage types are scheduled for async delete
# if set to true then on task delete/reset external file urls for known storage types are scheduled for async delete
# otherwise they are returned to a client for the client side delete
enabled: true
max_retries: 3

View File

@@ -16,7 +16,7 @@ from mongoengine.errors import (
LookUpError,
InvalidQueryError,
)
from pymongo.errors import PyMongoError, NotMasterError
from pymongo.errors import PyMongoError, NotPrimaryError
from apiserver.apierrors import errors
@@ -198,7 +198,7 @@ def translate_errors_context(message=None, **kwargs):
MongoEngineErrorsHandler.invalid_query_error(e, message, **kwargs)
except PyMongoError as e:
raise errors.server_error.InternalError(message, err=str(e))
except NotMasterError as e:
except NotPrimaryError as e:
raise errors.server_error.InternalError(message, err=str(e))
except MakeGetAllQueryError as e:
raise errors.bad_request.ValidationError(e.error, field=e.field)

View File

@@ -754,7 +754,9 @@ class GetMixin(PropsMixin):
@classmethod
def _get_collation_override(cls, field: str) -> Optional[dict]:
return first(
v for k, v in cls._field_collation_overrides.items() if field.startswith(k)
v
for k, v in cls._field_collation_overrides.items()
if field.startswith(k) or field.startswith(f"-{k}")
)
@classmethod

View File

@@ -3,6 +3,8 @@ from mongoengine import (
DateTimeField,
BooleanField,
EmbeddedDocumentField,
IntField,
ListField,
)
from apiserver.database import Database, strict
@@ -17,12 +19,14 @@ from apiserver.database.model.base import GetMixin
from apiserver.database.model.metadata import MetadataItem
from apiserver.database.model.model_labels import ModelLabels
from apiserver.database.model.project import Project
from apiserver.database.model.task.metrics import MetricEvent
from apiserver.database.model.task.task import Task
class Model(AttributedDocument):
_field_collation_overrides = {
"metadata.": AttributedDocument._numeric_locale,
"last_metrics.": AttributedDocument._numeric_locale,
}
meta = {
@@ -67,6 +71,7 @@ class Model(AttributedDocument):
"parent",
"metadata.*",
),
range_fields=("last_metrics.*", "last_iteration"),
datetime_fields=("last_update",),
)
@@ -92,6 +97,9 @@ class Model(AttributedDocument):
metadata = SafeMapField(
field=EmbeddedDocumentField(MetadataItem), user_set_allowed=True
)
last_iteration = IntField(default=0)
last_metrics = SafeMapField(field=SafeMapField(EmbeddedDocumentField(MetricEvent)))
unique_metrics = ListField(StringField(required=True), exclude_by_default=True)
def get_index_company(self) -> str:
return self.company or self.company_origin or ""

View File

@@ -18,9 +18,15 @@ _migration_dir = _parent_dir / _migrations
def check_mongo_empty() -> bool:
return not all(
get_db(alias).collection_names() for alias in utils.get_options(Database)
)
for alias in utils.get_options(Database):
collection_names = get_db(alias).list_collection_names()
if collection_names and any(
name in collection_names
for name in ["company", "user", "versions"]
):
return False
return True
def get_last_server_version() -> Version:

View File

@@ -967,6 +967,7 @@ class PrePopulate:
for ev in events:
ev["task"] = task_id
ev["company_id"] = company_id
ev["allow_locked"] = True
cls.event_bll.add_events(
company_id, events=events, worker="", allow_locked=True
company_id, events=events, worker=""
)

View File

@@ -2,7 +2,7 @@ from os import getenv
from boltons.iterutils import first
from redis import StrictRedis
from rediscluster import RedisCluster
from redis.cluster import RedisCluster
from apiserver.apierrors.errors.server_error import ConfigError, GeneralError
from apiserver.config_repo import config
@@ -83,7 +83,7 @@ class RedisManager(object):
def host(self, alias):
r = self.connection(alias)
if isinstance(r, RedisCluster):
connections = first(r.connection_pool._available_connections.values())
connections = r.get_default_node().redis_connection.connection_pool._available_connections
else:
connections = r.connection_pool._available_connections

View File

@@ -1,38 +1,36 @@
attrs>=22.1.0
attrs>=22.1.0,<23
azure-storage-blob>=12.13.1
bcrypt>=3.1.4
boltons>=19.1.0
boto3==1.14.13
boto3-stubs[s3]>=1.24.35
clearml>=1.6.0,<1.7.0
clearml>=1.10.3
dpath>=1.4.2,<2.0
elasticsearch==7.13.3
fastjsonschema>=2.8
flask-compress>=1.4.0
flask-cors>=3.0.5
flask>=0.12.2
funcsigs==1.0.2
furl>=2.0.0
google-cloud-storage==2.0.0
protobuf==3.19.5
gunicorn>=19.7.1
humanfriendly==4.18
jinja2==2.11.3
google-cloud-storage>=2.8.0
gunicorn>=20.1.0
humanfriendly>=4.17
jinja2<3.0
jsonmodels>=2.3
jsonschema>=2.6.0
luqum>=0.10.0
markupsafe==2.0.1
mongoengine==0.24.2
nested_dict>=1.61
packaging==20.3
psutil>=5.6.5
pyhocon>=0.3.35
pyhocon>=0.3.35r
pyjwt>=2.4.0
pymongo[srv]==3.12.0
pymongo[srv]==4.1.1
python-rapidjson>=0.6.3
redis==3.5.3
redis-py-cluster>=2.1.3
redis>=4.5.4,<5
requests>=2.13.0
semantic_version>=2.8.3,<3
setuptools>=65.5.1
six
tqdm
validators>=0.12.4

View File

@@ -103,4 +103,35 @@ plots_response {
items {"$ref": "#/definitions/plots_response_task_metrics"}
}
}
}
single_value_task_metrics {
type: object
properties {
task {
type: string
description: Task ID
}
values {
type: array
items {
type: object
properties {
metric { type: string }
variant { type: string}
value { type: number }
timestamp { type: number }
}
}
}
}
}
single_value_metrics_response {
type: object
properties {
tasks {
description: Single value metrics grouped by task
type: array
items {"$ref": "#/definitions/single_value_task_metrics"}
}
}
}

View File

@@ -1353,36 +1353,7 @@ get_task_single_value_metrics {
}
}
}
response {
type: object
properties {
tasks {
description: Single value metrics grouped by task
type: array
items {
type: object
properties {
task {
type: string
description: Task ID
}
values {
type: array
items {
type: object
properties {
metric { type: string }
variant { type: string}
value { type: number }
timestamp { type: number }
}
}
}
}
}
}
}
}
response {"$ref": "#/definitions/single_value_metrics_response"}
}
"2.22": ${get_task_single_value_metrics."2.20"} {
request.properties.model_events {

View File

@@ -1,6 +1,6 @@
_description: """This service provides a management interface for models (results of training tasks) stored in the system."""
_definitions {
include "_common.conf"
include "_tasks_common.conf"
multi_field_pattern_data {
type: object
properties {
@@ -104,6 +104,17 @@ _definitions {
"$ref": "#/definitions/metadata_item"
}
}
last_iteration {
description: "Last iteration reported for this model"
type: integer
}
last_metrics {
description: "Last metric variants (hash to events), one for each metric hash"
type: object
additionalProperties {
"$ref": "#/definitions/last_metrics_variants"
}
}
stats {
description: "Model statistics"
type: object

View File

@@ -653,6 +653,13 @@ get_all_ex {
enum: [pipeline, report, dataset]
}
}
"2.25": ${get_all_ex."2.24"} {
request.properties.children_tags {
description: "The list of tag values to filter children by. Takes effect only if children_type is set. Use 'null' value to specify empty tags. Use '__Snot' value to specify that the following value should be excluded"
type: array
items {type: string}
}
}
}
update {
"2.1" {
@@ -898,6 +905,13 @@ get_unique_metric_variants {
}
}
}
"2.25": ${get_unique_metric_variants."2.13"} {
request.properties.model_metrics {
description: If set to true then bring unique metric and variant names from the project models otherwise from the project tasks
type: boolean
default: false
}
}
}
get_hyperparam_values {
"2.13" {
@@ -1227,4 +1241,10 @@ get_task_parents {
}
}
}
"2.25": ${get_task_parents."2.13"} {
request.properties.task_name {
description: Task name pattern for the returned parent tasks
type: string
}
}
}

View File

@@ -568,6 +568,25 @@ get_task_data {
}
}
}
"2.25": ${get_task_data."2.23"} {
request.properties {
model_events {
type: boolean
description: If set then the retrieving model events. Otherwise task events
default: false
}
single_value_metrics {
type: object
description: If passed then task single value metrics are returned
additonalProperties: false
}
}
response.properties.single_value_metrics {
type: array
description: Single value metrics grouped by task
items {"$ref": "#/definitions/single_value_task_metrics"}
}
}
}
get_all_ex {
"2.23" {

View File

@@ -1507,6 +1507,13 @@ dequeue {
}
}
}
"2.25": ${dequeue."1.5"} {
request.properties.remove_from_all_queues {
type: boolean
description: If set to 'true' then the task is searched and removed from all the queues. Otherwise only from the queue stored in the task execution parameters
default: false
}
}
}
dequeue_many {
"2.13": ${_definitions.change_many_request} {
@@ -1525,6 +1532,13 @@ dequeue_many {
}
}
}
"2.25": ${dequeue_many."2.13"} {
request.properties.remove_from_all_queues {
type: boolean
description: If set to 'true' then the tasks are searched and removed from all the queues. Otherwise only from the queue stored in the task execution parameters
default: false
}
}
}
set_requirements {
"2.1" {

View File

@@ -39,7 +39,7 @@ class ServiceRepo(object):
"""If the check is set, parsing will fail for endpoint request with the version that is grater than the current
maximum """
_max_version = PartialVersion("2.24")
_max_version = PartialVersion("2.25")
""" Maximum version number (the highest min_version value across all endpoints) """
_endpoint_exp = (

View File

@@ -3,4 +3,7 @@ from apiserver.service_repo import APICall, endpoint
@endpoint("debug.ping")
def ping(call: APICall, _, __):
call.result.data = {"msg": "ClearML server"}
res = {"msg": "ClearML server"}
if call.data:
res.update(call.data)
call.result.data = res

View File

@@ -70,9 +70,8 @@ def _assert_task_or_model_exists(
@endpoint("events.add")
def add(call: APICall, company_id, _):
data = call.data.copy()
allow_locked = data.pop("allow_locked", False)
added, err_count, err_info = event_bll.add_events(
company_id, [data], call.worker, allow_locked=allow_locked
company_id, [data], call.worker
)
call.result.data = dict(added=added, errors=err_count, errors_info=err_info)
@@ -87,7 +86,6 @@ def add_batch(call: APICall, company_id, _):
company_id,
events,
call.worker,
allow_locked=events[0].get("allow_locked", False),
)
call.result.data = dict(added=added, errors=err_count, errors_info=err_info)
@@ -508,6 +506,12 @@ def multi_task_scalar_metrics_iter_histogram(
)
def _get_single_value_metrics_response(
value_metrics: Mapping[str, dict]
) -> Sequence[dict]:
return [{"task": task, "values": values} for task, values in value_metrics.items()]
@endpoint("events.get_task_single_value_metrics")
def get_task_single_value_metrics(
call, company_id: str, request: SingleValueMetricsRequest
@@ -517,9 +521,7 @@ def get_task_single_value_metrics(
company_id, request.tasks, request.model_events
),
)
call.result.data = dict(
tasks=[{"task": task, "values": values} for task, values in res.items()]
)
call.result.data = dict(tasks=_get_single_value_metrics_response(res))
@endpoint("events.get_multi_task_plots", required_fields=["tasks"])
@@ -561,7 +563,6 @@ def _get_multitask_plots(
metrics: MetricVariants = None,
scroll_id=None,
no_scroll=True,
model_events=False,
) -> Tuple[dict, int, str]:
task_names = {
t.id: t.name for t in itertools.chain.from_iterable(companies.values())
@@ -598,7 +599,6 @@ def get_multi_task_plots(call, company_id, _):
last_iters=iters,
scroll_id=scroll_id,
no_scroll=no_scroll,
model_events=model_events,
)
call.result.data = dict(
plots=return_events,

View File

@@ -1,6 +1,6 @@
from datetime import datetime
from functools import partial
from typing import Sequence
from typing import Sequence, Union
from mongoengine import Q, EmbeddedDocument
@@ -59,6 +59,11 @@ org_bll = OrgBLL()
project_bll = ProjectBLL()
def conform_model_data(call: APICall, model_data: Union[Sequence[dict], dict]):
conform_output_tags(call, model_data)
unescape_metadata(call, model_data)
@endpoint("models.get_by_id", required_fields=["model"])
def get_by_id(call: APICall, company_id, _):
model_id = call.data["model"]
@@ -74,8 +79,7 @@ def get_by_id(call: APICall, company_id, _):
raise errors.bad_request.InvalidModelId(
"no such public or company model", id=model_id, company=company_id,
)
conform_output_tags(call, models[0])
unescape_metadata(call, models[0])
conform_model_data(call, models[0])
call.result.data = {"model": models[0]}
@@ -102,8 +106,7 @@ def get_by_task_id(call: APICall, company_id, _):
"no such public or company model", id=model_id, company=company_id,
)
model_dict = model.to_proper_dict()
conform_output_tags(call, model_dict)
unescape_metadata(call, model_dict)
conform_model_data(call, model_dict)
call.result.data = {"model": model_dict}
@@ -119,8 +122,7 @@ def get_all_ex(call: APICall, company_id, request: ModelsGetRequest):
allow_public=request.allow_public,
ret_params=ret_params,
)
conform_output_tags(call, models)
unescape_metadata(call, models)
conform_model_data(call, models)
if not request.include_stats:
call.result.data = {"models": models, **ret_params}
@@ -142,8 +144,7 @@ def get_by_id_ex(call: APICall, company_id, _):
models = Model.get_many_with_join(
company=company_id, query_dict=call.data, allow_public=True
)
conform_output_tags(call, models)
unescape_metadata(call, models)
conform_model_data(call, models)
call.result.data = {"models": models}
@@ -159,8 +160,7 @@ def get_all(call: APICall, company_id, _):
allow_public=True,
ret_params=ret_params,
)
conform_output_tags(call, models)
unescape_metadata(call, models)
conform_model_data(call, models)
call.result.data = {"models": models, **ret_params}
@@ -428,8 +428,7 @@ def edit(call: APICall, company_id, _):
_reset_cached_tags(company_id, projects=[new_project, model.project])
else:
_update_cached_tags(company_id, project=model.project, fields=fields)
conform_output_tags(call, fields)
unescape_metadata(call, fields)
conform_model_data(call, fields)
call.result.data_model = UpdateResponse(updated=updated, fields=fields)
else:
call.result.data_model = UpdateResponse(updated=0)
@@ -461,8 +460,7 @@ def _update_model(call: APICall, company_id, model_id=None):
_update_cached_tags(
company_id, project=model.project, fields=updated_fields
)
conform_output_tags(call, updated_fields)
unescape_metadata(call, updated_fields)
conform_model_data(call, updated_fields)
return UpdateResponse(updated=updated_count, fields=updated_fields)
@@ -524,7 +522,11 @@ def publish_many(call: APICall, company_id, request: ModelsPublishManyRequest):
@endpoint("models.delete", request_data_model=DeleteModelRequest)
def delete(call: APICall, company_id, request: DeleteModelRequest):
del_count, model = ModelBLL.delete_model(
model_id=request.model, company_id=company_id, force=request.force
model_id=request.model,
company_id=company_id,
user_id=call.identity.user,
force=request.force,
delete_external_artifacts=request.delete_external_artifacts,
)
if del_count:
_reset_cached_tags(
@@ -541,7 +543,13 @@ def delete(call: APICall, company_id, request: DeleteModelRequest):
)
def delete(call: APICall, company_id, request: ModelsDeleteManyRequest):
results, failures = run_batch_operation(
func=partial(ModelBLL.delete_model, company_id=company_id, force=request.force),
func=partial(
ModelBLL.delete_model,
company_id=company_id,
user_id=call.identity.user,
force=request.force,
delete_external_artifacts=request.delete_external_artifacts,
),
ids=request.ids,
)

View File

@@ -7,18 +7,18 @@ from apiserver.apierrors import errors
from apiserver.apierrors.errors.bad_request import InvalidProjectId
from apiserver.apimodels.base import UpdateResponse, MakePublicRequest, IdResponse
from apiserver.apimodels.projects import (
DeleteRequest,
GetParamsRequest,
ProjectTagsRequest,
ProjectTaskParentsRequest,
ProjectHyperparamValuesRequest,
ProjectsGetRequest,
DeleteRequest,
MoveRequest,
MergeRequest,
ProjectOrNoneRequest,
ProjectRequest,
ProjectModelMetadataValuesRequest,
ProjectChildrenType,
GetUniqueMetricsRequest,
)
from apiserver.bll.organization import OrgBLL, Tags
from apiserver.bll.project import ProjectBLL, ProjectQueries
@@ -99,19 +99,31 @@ def _adjust_search_parameters(data: dict, shallow_search: bool):
data["parent"] = [None]
def _get_project_stats_filter(request: ProjectsGetRequest) -> Tuple[Optional[dict], bool]:
def _get_project_stats_filter(
request: ProjectsGetRequest,
) -> Tuple[Optional[dict], bool]:
if request.include_stats_filter or not request.children_type:
return request.include_stats_filter, request.search_hidden
stats_filter = {"tags": request.children_tags} if request.children_tags else {}
if request.children_type == ProjectChildrenType.pipeline:
return {"system_tags": [pipeline_tag], "type": [TaskType.controller]}, True
return (
{
**stats_filter,
"system_tags": [pipeline_tag],
"type": [TaskType.controller],
},
True,
)
if request.children_type == ProjectChildrenType.report:
return {"system_tags": [reports_tag], "type": [TaskType.report]}, True
return request.include_stats_filter, request.search_hidden
return (
{**stats_filter, "system_tags": [reports_tag], "type": [TaskType.report]},
True,
)
return stats_filter, request.search_hidden
@endpoint("projects.get_all_ex", request_data_model=ProjectsGetRequest)
@endpoint("projects.get_all_ex")
def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest):
data = call.data
conform_tag_fields(call, data)
@@ -137,6 +149,7 @@ def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest):
project_ids=requested_ids,
allow_public=allow_public,
children_type=request.children_type,
children_tags=request.children_tags,
)
if not ids:
return {"projects": []}
@@ -174,19 +187,21 @@ def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest):
conform_output_tags(call, projects)
project_ids = list({project["id"] for project in projects})
stats_filter, stats_search_hidden = _get_project_stats_filter(request)
if request.check_own_contents:
if request.children_type == ProjectChildrenType.dataset:
contents = project_bll.calc_own_datasets(
company=company_id,
project_ids=project_ids,
filter_=request.include_stats_filter,
filter_=stats_filter,
users=request.active_users,
)
else:
contents = project_bll.calc_own_contents(
company=company_id,
project_ids=project_ids,
filter_=_get_project_stats_filter(request)[0],
filter_=stats_filter,
specific_state=request.stats_for_state,
users=request.active_users,
)
@@ -199,19 +214,18 @@ def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest):
company=company_id,
project_ids=project_ids,
include_children=request.stats_with_children,
filter_=request.include_stats_filter,
filter_=stats_filter,
users=request.active_users,
selected_project_ids=selected_project_ids,
)
else:
filter_, search_hidden = _get_project_stats_filter(request)
stats, children = project_bll.get_project_stats(
company=company_id,
project_ids=project_ids,
specific_state=request.stats_for_state,
include_children=request.stats_with_children,
search_hidden=search_hidden,
filter_=filter_,
search_hidden=stats_search_hidden,
filter_=stats_filter,
users=request.active_users,
selected_project_ids=selected_project_ids,
)
@@ -345,22 +359,23 @@ def delete(call: APICall, company_id: str, request: DeleteRequest):
@endpoint(
"projects.get_unique_metric_variants", request_data_model=ProjectOrNoneRequest
"projects.get_unique_metric_variants", request_data_model=GetUniqueMetricsRequest
)
def get_unique_metric_variants(
call: APICall, company_id: str, request: ProjectOrNoneRequest
call: APICall, company_id: str, request: GetUniqueMetricsRequest
):
metrics = project_queries.get_unique_metric_variants(
company_id,
[request.project] if request.project else None,
include_subprojects=request.include_subprojects,
model_metrics=request.model_metrics,
)
call.result.data = {"metrics": metrics}
@endpoint("projects.get_model_metadata_keys",)
@endpoint("projects.get_model_metadata_keys")
def get_model_metadata_keys(call: APICall, company_id: str, request: GetParamsRequest):
total, remaining, keys = project_queries.get_model_metadata_keys(
company_id,
@@ -504,10 +519,11 @@ def get_task_parents(
call: APICall, company_id: str, request: ProjectTaskParentsRequest
):
call.result.data = {
"parents": project_bll.get_task_parents(
"parents": ProjectBLL.get_task_parents(
company_id,
projects=request.projects,
include_subprojects=request.include_subprojects,
state=request.tasks_state,
name=request.task_name,
)
}

View File

@@ -1,3 +1,5 @@
from typing import Union, Sequence
from mongoengine import Q
from apiserver.apimodels.base import UpdateResponse
@@ -21,6 +23,7 @@ from apiserver.apimodels.queues import (
)
from apiserver.bll.model import Metadata
from apiserver.bll.queue import QueueBLL
from apiserver.bll.queue.queue_bll import MOVE_FIRST, MOVE_LAST
from apiserver.bll.workers import WorkerBLL
from apiserver.config_repo import config
from apiserver.database.model.task.task import Task
@@ -38,14 +41,18 @@ worker_bll = WorkerBLL()
queue_bll = QueueBLL(worker_bll)
def conform_queue_data(call: APICall, queue_data: Union[Sequence[dict], dict]):
conform_output_tags(call, queue_data)
unescape_metadata(call, queue_data)
@endpoint("queues.get_by_id", min_version="2.4", request_data_model=GetByIdRequest)
def get_by_id(call: APICall, company_id, request: GetByIdRequest):
queue = queue_bll.get_by_id(
company_id, request.queue, max_task_entries=request.max_task_entries
)
queue_dict = queue.to_proper_dict()
conform_output_tags(call, queue_dict)
unescape_metadata(call, queue_dict)
conform_queue_data(call, queue_dict)
call.result.data = {"queue": queue_dict}
@@ -84,8 +91,7 @@ def get_all_ex(call: APICall, company: str, request: GetAllRequest):
max_task_entries=request.max_task_entries,
ret_params=ret_params,
)
conform_output_tags(call, queues)
unescape_metadata(call, queues)
conform_queue_data(call, queues)
call.result.data = {"queues": queues, **ret_params}
@@ -101,8 +107,7 @@ def get_all(call: APICall, company: str, request: GetAllRequest):
max_task_entries=request.max_task_entries,
ret_params=ret_params,
)
conform_output_tags(call, queues)
unescape_metadata(call, queues)
conform_queue_data(call, queues)
call.result.data = {"queues": queues, **ret_params}
@@ -134,8 +139,7 @@ def update(call: APICall, company_id, req_model: UpdateRequest):
updated, fields = queue_bll.update(
company_id=company_id, queue_id=req_model.queue, **data
)
conform_output_tags(call, fields)
unescape_metadata(call, fields)
conform_queue_data(call, fields)
call.result.data_model = UpdateResponse(updated=updated, fields=fields)
@@ -167,7 +171,7 @@ def get_next_task(call: APICall, company_id, request: GetNextTaskRequest):
if entry:
data = {"entry": entry.to_proper_dict()}
if request.get_task_info:
task = Task.objects(id=entry.task).first()
task = Task.objects(id=entry.task).only("company", "user").first()
if task:
data["task_info"] = {"company": task.company, "user": task.user}
@@ -195,7 +199,7 @@ def move_task_forward(call: APICall, company_id, req_model: MoveTaskRequest):
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
pos_func=lambda p: max(0, p - req_model.count),
move_count=-req_model.count,
)
)
@@ -212,7 +216,7 @@ def move_task_backward(call: APICall, company_id, req_model: MoveTaskRequest):
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
pos_func=lambda p: max(0, p + req_model.count),
move_count=req_model.count,
)
)
@@ -229,7 +233,7 @@ def move_task_to_front(call: APICall, company_id, req_model: TaskRequest):
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
pos_func=lambda p: 0,
move_count=MOVE_FIRST,
)
)
@@ -246,7 +250,7 @@ def move_task_to_back(call: APICall, company_id, req_model: TaskRequest):
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
pos_func=lambda p: -1,
move_count=MOVE_LAST,
)
)

View File

@@ -3,6 +3,8 @@ from datetime import datetime
from itertools import chain
from typing import Sequence
from mongoengine import Q
from apiserver.apimodels.reports import (
CreateReportRequest,
UpdateReportRequest,
@@ -17,6 +19,8 @@ from apiserver.apimodels.reports import (
from apiserver.apierrors import errors
from apiserver.apimodels.base import UpdateResponse
from apiserver.bll.project.project_bll import reports_project_name, reports_tag
from apiserver.database.model.model import Model
from apiserver.services.models import conform_model_data
from apiserver.services.utils import process_include_subprojects, sort_tags_response
from apiserver.bll.organization import OrgBLL
from apiserver.bll.project import ProjectBLL
@@ -31,11 +35,12 @@ from apiserver.services.events import (
_get_metrics_response,
_get_metric_variants_from_request,
_get_multitask_plots,
_get_single_value_metrics_response,
)
from apiserver.services.tasks import (
escape_execution_parameters,
_hidden_query,
unprepare_from_saved,
conform_task_data,
)
org_bll = OrgBLL()
@@ -168,9 +173,21 @@ def _delete_reports_project_if_empty(project_id):
def get_all_ex(call: APICall, company_id, request: GetAllRequest):
call_data = call.data
call_data["type"] = TaskType.report
call_data["include_subprojects"] = True
process_include_subprojects(call_data)
# bring projects one level down in case not the .reports project was passed
if "project" in call_data:
project_ids = call_data["project"]
if not isinstance(project_ids, list):
project_ids = [project_ids]
query = Q(parent__in=project_ids) | Q(id__in=project_ids)
project_ids = Project.objects(
query & Q(basename=reports_project_name)
).scalar("id")
if not project_ids:
return {"tasks": []}
call_data["project"] = list(project_ids)
ret_params = {}
tasks = Task.get_many_with_join(
company=company_id,
@@ -178,7 +195,7 @@ def get_all_ex(call: APICall, company_id, request: GetAllRequest):
allow_public=request.allow_public,
ret_params=ret_params,
)
unprepare_from_saved(call, tasks)
conform_task_data(call, tasks)
call.result.data = {"tasks": tasks, **ret_params}
@@ -198,26 +215,38 @@ def _get_task_metrics_from_request(
@endpoint("reports.get_task_data")
def get_task_data(call: APICall, company_id, request: GetTasksDataRequest):
if request.model_events:
entity_cls = Model
conform_data = conform_model_data
else:
entity_cls = Task
conform_data = conform_task_data
call_data = escape_execution_parameters(call)
process_include_subprojects(call_data)
ret_params = {}
tasks = Task.get_many_with_join(
tasks = entity_cls.get_many_with_join(
company=company_id,
query_dict=call_data,
query=_hidden_query(call_data),
allow_public=request.allow_public,
ret_params=ret_params,
)
unprepare_from_saved(call, tasks)
conform_data(call, tasks)
res = {"tasks": tasks, **ret_params}
if not (
request.debug_images or request.plots or request.scalar_metrics_iter_histogram
request.debug_images
or request.plots
or request.scalar_metrics_iter_histogram
or request.single_value_metrics
):
return res
task_ids = [task["id"] for task in tasks]
companies = _get_task_or_model_index_companies(company_id, task_ids=task_ids)
companies = _get_task_or_model_index_companies(
company_id, task_ids=task_ids, model_events=request.model_events
)
if request.debug_images:
result = event_bll.debug_images_iterator.get_task_events(
companies={
@@ -249,6 +278,11 @@ def get_task_data(call: APICall, company_id, request: GetTasksDataRequest):
),
)
if request.single_value_metrics:
res["single_value_metrics"] = _get_single_value_metrics_response(
event_bll.metrics.get_task_single_value_metrics(companies=companies)
)
call.result.data = res

View File

@@ -65,6 +65,8 @@ from apiserver.apimodels.tasks import (
CompletedRequest,
CompletedResponse,
GetAllReq,
DequeueRequest,
DequeueManyRequest,
)
from apiserver.bll.event import EventBLL
from apiserver.bll.model import ModelBLL
@@ -182,7 +184,7 @@ def get_by_id(call: APICall, company_id, req_model: TaskRequest):
req_model.task, company_id=company_id, allow_public=True
)
task_dict = task.to_proper_dict()
unprepare_from_saved(call, task_dict)
conform_task_data(call, task_dict)
call.result.data = {"task": task_dict}
@@ -231,7 +233,7 @@ def get_all_ex(call: APICall, company_id, request: GetAllReq):
allow_public=request.allow_public,
ret_params=ret_params,
)
unprepare_from_saved(call, tasks)
conform_task_data(call, tasks)
call.result.data = {"tasks": tasks, **ret_params}
@@ -245,7 +247,7 @@ def get_by_id_ex(call: APICall, company_id, _):
company=company_id, query_dict=call_data, allow_public=True,
)
unprepare_from_saved(call, tasks)
conform_task_data(call, tasks)
call.result.data = {"tasks": tasks}
@@ -264,7 +266,7 @@ def get_all(call: APICall, company_id, _):
allow_public=True,
ret_params=ret_params,
)
unprepare_from_saved(call, tasks)
conform_task_data(call, tasks)
call.result.data = {"tasks": tasks, **ret_params}
@@ -383,7 +385,8 @@ def close(call: APICall, company_id, req_model: UpdateRequest):
req_model,
company_id=company_id,
user_id=call.identity.user,
new_status=TaskStatus.closed)
new_status=TaskStatus.closed,
)
)
@@ -430,7 +433,7 @@ def prepare_for_save(call: APICall, fields: dict, previous_task: Task = None):
return fields
def unprepare_from_saved(call: APICall, tasks_data: Union[Sequence[dict], dict]):
def conform_task_data(call: APICall, tasks_data: Union[Sequence[dict], dict]):
if isinstance(tasks_data, dict):
tasks_data = [tasks_data]
@@ -608,7 +611,7 @@ def update(call: APICall, company_id, req_model: UpdateRequest):
company_id, project=task.project, fields=updated_fields
)
update_project_time(updated_fields.get("project"))
unprepare_from_saved(call, updated_fields)
conform_task_data(call, updated_fields)
return UpdateResponse(updated=updated_count, fields=updated_fields)
@@ -763,7 +766,7 @@ def edit(call: APICall, company_id, req_model: UpdateRequest):
company_id, project=task.project, fields=fixed_fields
)
update_project_time(fields.get("project"))
unprepare_from_saved(call, fields)
conform_task_data(call, fields)
call.result.data_model = UpdateResponse(updated=updated, fields=fields)
else:
call.result.data_model = UpdateResponse(updated=0)
@@ -929,27 +932,24 @@ def enqueue_many(call: APICall, company_id, request: EnqueueManyRequest):
@endpoint(
"tasks.dequeue",
request_data_model=UpdateRequest,
response_data_model=DequeueResponse,
"tasks.dequeue", response_data_model=DequeueResponse,
)
def dequeue(call: APICall, company_id, request: UpdateRequest):
def dequeue(call: APICall, company_id, request: DequeueRequest):
dequeued, res = dequeue_task(
task_id=request.task,
company_id=company_id,
user_id=call.identity.user,
status_message=request.status_message,
status_reason=request.status_reason,
remove_from_all_queues=request.remove_from_all_queues,
)
call.result.data_model = DequeueResponse(dequeued=dequeued, **res)
@endpoint(
"tasks.dequeue_many",
request_data_model=TaskBatchRequest,
response_data_model=DequeueManyResponse,
"tasks.dequeue_many", response_data_model=DequeueManyResponse,
)
def dequeue_many(call: APICall, company_id, request: TaskBatchRequest):
def dequeue_many(call: APICall, company_id, request: DequeueManyRequest):
results, failures = run_batch_operation(
func=partial(
dequeue_task,
@@ -957,6 +957,7 @@ def dequeue_many(call: APICall, company_id, request: TaskBatchRequest):
user_id=call.identity.user,
status_message=request.status_message,
status_reason=request.status_reason,
remove_from_all_queues=request.remove_from_all_queues,
),
ids=request.ids,
)
@@ -1357,5 +1358,7 @@ def delete_models(call: APICall, company_id: str, request: DeleteModelsRequest):
if names
}
updated = task.update(last_change=datetime.utcnow(), last_changed_by=call.identity.user, **commands,)
updated = task.update(
last_change=datetime.utcnow(), last_changed_by=call.identity.user, **commands,
)
return {"updated": updated}

View File

@@ -127,7 +127,7 @@ class TestBatchOperations(TestService):
def _temp_task(self):
return self.create_temp(
service="tasks", type="testing", name=self.name, input=dict(view={}),
service="tasks", type="testing", name=self.name,
)
def _temp_task_model(self, task, **kwargs) -> str:

View File

@@ -139,6 +139,5 @@ class TestEntityOrdering(TestService):
name="test",
comment=self.test_comment,
type="testing",
input=dict(view=dict()),
**kwargs,
)

View File

@@ -1,3 +1,4 @@
from apiserver.apierrors import errors
from apiserver.apierrors.errors.bad_request import InvalidModelId
from apiserver.tests.automated import TestService
@@ -11,6 +12,31 @@ class TestModelsService(TestService):
def setUp(self, version="2.9"):
super().setUp(version=version)
def test_delete_model_for_task(self):
# non published task
task_id, model_id = self._create_task_and_model()
task = self.api.tasks.get_by_id(task=task_id).task
self.assertEqual(task.models.output[0].model, model_id)
res = self.api.models.delete(model=model_id)
self.assertTrue(res.deleted)
with self.api.raises(errors.bad_request.InvalidModelId):
self.api.models.get_by_id(model=model_id)
task = self.api.tasks.get_by_id(task=task_id).task
self.assertEqual(task.models.output, [])
# published task
task_id, model_id = self._create_task_and_model()
self.api.tasks.stopped(task=task_id)
self.api.tasks.publish(task=task_id, publish_model=False)
with self.api.raises(errors.bad_request.ModelCreatingTaskExists):
self.api.models.delete(model=model_id)
res = self.api.models.delete(model=model_id, force=True)
self.assertTrue(res.deleted)
with self.api.raises(errors.bad_request.InvalidModelId):
self.api.models.get_by_id(model=model_id)
task = self.api.tasks.get_by_id(task=task_id).task
self.assertEqual(task.models.output[0].model, f"__DELETED__{model_id}")
def test_publish_output_model_running_task(self):
task_id, model_id = self._create_task_and_model()
self._assert_model_ready(model_id, False)
@@ -108,9 +134,7 @@ class TestModelsService(TestService):
self._assert_model_ready(model_id, True)
def test_publish_task_no_output_model(self):
task_id = self.create_temp(
service="tasks", type="testing", name="server-test", input=dict(view={})
)
task_id = self.create_temp(service="tasks", type="testing", name="server-test")
self.api.tasks.started(task=task_id)
self.api.tasks.stopped(task=task_id)
@@ -263,7 +287,6 @@ class TestModelsService(TestService):
service="tasks",
type=kwargs.pop("type", "testing"),
name=kwargs.pop("name", "server-test"),
input=kwargs.pop("input", dict(view={})),
**kwargs,
)
@@ -271,13 +294,15 @@ class TestModelsService(TestService):
def _create_task_and_model(self):
execution_model_id = self.create_temp(
service="models", name="test", uri="file:///a", labels={}
service="models",
name="test",
uri="https://files.trains-master.hosted.allegro.ai/a.jpg",
labels={},
)
task_id = self.create_temp(
service="tasks",
type="testing",
name="server-test",
input=dict(view={}),
execution=dict(model=execution_model_id),
)
self.api.tasks.started(task=task_id)

View File

@@ -45,7 +45,7 @@ class TestMoveUnderProject(TestService):
def _temp_task(self):
task_input = dict(
name=self.entity_name, type="training", input=dict(mapping={}, view=dict(entries=[])),
name=self.entity_name, type="training"
)
return self.create_temp("tasks", **task_input)

View File

@@ -75,6 +75,5 @@ class TestPagingAndScrolling(TestService):
name=name,
comment="Test task",
type="testing",
input=dict(view=dict()),
**kwargs,
)

View File

@@ -43,7 +43,6 @@ class TestPipelines(TestService):
"tasks",
name=name,
type="testing",
input=dict(view=dict()),
project=project,
system_tags=["pipeline"],
),

View File

@@ -10,7 +10,7 @@ class TestProjectsDelete(TestService):
def new_task(self, **kwargs):
return self.create_temp(
"tasks", type="testing", name=db_id(), input=dict(view=dict()), **kwargs
"tasks", type="testing", name=db_id(), **kwargs
)
def new_model(self, **kwargs):

View File

@@ -94,7 +94,7 @@ class TestProjectTags(TestService):
def new_task(self, **kwargs):
self.update_missing(
kwargs, type="testing", name="test project tags", input=dict(view=dict())
kwargs, type="testing", name="test project tags"
)
return self.create_temp("tasks", **kwargs)

View File

@@ -14,7 +14,6 @@ class TestProjection(TestService):
kwargs,
type="testing",
name="test projection",
input=dict(view=dict()),
delete_params=dict(force=True),
)
return self.create_temp("tasks", **kwargs)

View File

@@ -81,6 +81,18 @@ class TestQueues(TestService):
self.assertQueueTasks(res.queue, [task])
self.assertTaskTags(task, system_tags=[])
def test_dequeue_not_queued_task(self):
# queue = self._temp_queue("TestTempQueue")
task_name = "TempDevTask"
task = self._temp_task(task_name)
self.api.tasks.edit(task=task, status="queued") # , execution={"queue": queue})
res = self.api.tasks.get_by_id(task=task)
self.assertEqual(res.task.status, "queued")
self.api.tasks.dequeue(task=task)
res = self.api.tasks.get_by_id(task=task)
self.assertEqual(res.task.status, "created")
def test_dequeue_from_deleted_queue(self):
queue = self._temp_queue("TestTempQueue")
task_name = "TempDevTask"
@@ -126,27 +138,70 @@ class TestQueues(TestService):
queue = self._temp_queue("TestTempQueue")
tasks = [
self._create_temp_queued_task(t, queue)["id"]
for t in ("temp task1", "temp task2", "temp task3")
for t in ("temp task1", "temp task2", "temp task3", "temp task4")
]
res = self.api.queues.get_by_id(queue=queue)
self.assertQueueTasks(res.queue, tasks)
new_pos = self.api.queues.move_task_backward(
queue=queue, task=tasks[0], count=2
).position
self.assertEqual(new_pos, 2)
res = self.api.queues.get_by_id(queue=queue)
changed_tasks = tasks[1:] + tasks[:1]
self.assertQueueTasks(res.queue, changed_tasks)
new_pos = self.api.queues.move_task_forward(
queue=queue, task=tasks[0], count=2
# no change in position
new_pos = self.api.queues.move_task_to_front(
queue=queue, task=tasks[0]
).position
self.assertEqual(new_pos, 0)
res = self.api.queues.get_by_id(queue=queue)
self.assertQueueTasks(res.queue, tasks)
self.assertGetNextTasks(queue, tasks)
# move backwards in the middle
new_pos = self.api.queues.move_task_backward(
queue=queue, task=tasks[0], count=2
).position
self.assertEqual(new_pos, 2)
res = self.api.queues.get_by_id(queue=queue)
changed_tasks = tasks[1:3] + [tasks[0], tasks[3]]
self.assertQueueTasks(res.queue, changed_tasks)
# move backwards beyond the end
new_pos = self.api.queues.move_task_backward(
queue=queue, task=tasks[0], count=100
).position
self.assertEqual(new_pos, 3)
res = self.api.queues.get_by_id(queue=queue)
changed_tasks = tasks[1:] + [tasks[0]]
self.assertQueueTasks(res.queue, changed_tasks)
# move forwards in the middle
new_pos = self.api.queues.move_task_forward(
queue=queue, task=tasks[0], count=2
).position
self.assertEqual(new_pos, 1)
res = self.api.queues.get_by_id(queue=queue)
changed_tasks = [tasks[1], tasks[0]] + tasks[2:]
self.assertQueueTasks(res.queue, changed_tasks)
# move forwards beyond the beginning
new_pos = self.api.queues.move_task_forward(
queue=queue, task=tasks[0], count=100
).position
self.assertEqual(new_pos, 0)
res = self.api.queues.get_by_id(queue=queue)
self.assertQueueTasks(res.queue, tasks)
# move to back
new_pos = self.api.queues.move_task_to_back(
queue=queue, task=tasks[0]
).position
self.assertEqual(new_pos, 3)
res = self.api.queues.get_by_id(queue=queue)
changed_tasks = tasks[1:] + [tasks[0]]
self.assertQueueTasks(res.queue, changed_tasks)
# move to front
new_pos = self.api.queues.move_task_to_front(
queue=queue, task=tasks[0]
).position
self.assertEqual(new_pos, 0)
res = self.api.queues.get_by_id(queue=queue)
self.assertQueueTasks(res.queue, tasks)
def test_get_all_ex(self):
queue_name = "TestTempQueue1"
@@ -234,7 +289,6 @@ class TestQueues(TestService):
task_input = dict(
name=task_name,
type="testing" if is_testing else "training",
input=dict(mapping={}, view={}),
script={"repository": "test", "entry_point": "test"},
system_tags=["development"] if is_development else None,
)

View File

@@ -96,6 +96,44 @@ class TestReports(TestService):
self.assertEqual(project.get("parent"), None)
self.assertEqual(project.name, ".reports")
def test_root_reports(self):
root_report = self._temp_report(name="Rep1")
project_name = "Test reports"
project = self._temp_project(name=project_name)
project_report = self._temp_report(name="Rep2", project=project)
projects = self.api.projects.get_all_ex(
name=r"^\.reports$",
children_type="report",
include_stats=True,
check_own_contents=True,
search_hidden=True,
).projects
self.assertEqual(len(projects), 1)
p = projects[0]
self.assertEqual(p.name, ".reports")
self.assertEqual(p.own_tasks, 1)
projects = self.api.projects.get_all_ex(
name=rf"^{project_name}/\.reports$",
children_type="report",
include_stats=True,
check_own_contents=True,
search_hidden=True,
).projects
self.assertEqual(len(projects), 1)
p = projects[0]
self.assertEqual(p.name, f"{project_name}/.reports")
self.assertEqual(p.own_tasks, 1)
reports = self.api.reports.get_all_ex().tasks
self.assertTrue({root_report, project_report}.issubset({r.id for r in reports}))
reports = self.api.reports.get_all_ex(project=project).tasks
self.assertEqual([project_report], [r.id for r in reports])
reports = self.api.reports.get_all_ex(project=[None]).tasks
self.assertIn(root_report, {r.id for r in reports})
self.assertNotIn(project_report, {r.id for r in reports})
def test_reports_search(self):
report_task = self._temp_report(name="Rep1")
non_report_task = self._temp_task(name="hello")
@@ -113,66 +151,115 @@ class TestReports(TestService):
def test_reports_task_data(self):
report_task = self._temp_report(name="Rep1")
non_report_task = self._temp_task(name="hello")
debug_image_events = [
self._create_task_event(
task=non_report_task,
type_="training_debug_image",
iteration=1,
metric=f"Metric_{m}",
variant=f"Variant_{v}",
url=f"{m}_{v}",
for model_events in (False, True):
if model_events:
non_report_task = self._temp_model(name="hello")
event_args = {"model_event": True}
else:
non_report_task = self._temp_task(name="hello")
event_args = {}
debug_image_events = [
self._create_task_event(
task=non_report_task,
type_="training_debug_image",
iteration=1,
metric=f"Metric_{m}",
variant=f"Variant_{v}",
url=f"{m}_{v}",
**event_args,
)
for m in range(2)
for v in range(2)
]
plot_events = [
self._create_task_event(
task=non_report_task,
type_="plot",
iteration=1,
metric=f"Metric_{m}",
variant=f"Variant_{v}",
plot_str=f"Hello plot",
**event_args,
)
for m in range(2)
for v in range(2)
]
scalar_events = [
self._create_task_event(
task=non_report_task,
type_="training_stats_scalar",
iteration=iter_,
metric=f"Metric_{m}",
variant=f"Variant_{v}",
value=m * v,
**event_args,
)
for m in range(2)
for v in range(2)
for iter_ in (1, -(2 ** 31))
]
self.send_batch([*debug_image_events, *plot_events, *scalar_events])
res = self.api.reports.get_task_data(
id=[non_report_task], only_fields=["name"], model_events=model_events
)
for m in range(2)
for v in range(2)
]
plot_events = [
self._create_task_event(
task=non_report_task,
type_="plot",
iteration=1,
metric=f"Metric_{m}",
variant=f"Variant_{v}",
plot_str=f"Hello plot",
self.assertEqual(len(res.tasks), 1)
self.assertEqual(res.tasks[0].id, non_report_task)
self.assertFalse(
any(
field in res
for field in (
"plots",
"debug_images",
"scalar_metrics_iter_histogram",
"single_value_metrics",
)
)
)
for m in range(2)
for v in range(2)
]
self.send_batch([*debug_image_events, *plot_events])
res = self.api.reports.get_task_data(
id=[non_report_task], only_fields=["name"],
)
self.assertEqual(len(res.tasks), 1)
self.assertEqual(res.tasks[0].id, non_report_task)
self.assertFalse(any(field in res for field in ("plots", "debug_images")))
res = self.api.reports.get_task_data(
id=[non_report_task],
only_fields=["name"],
debug_images={"metrics": []},
plots={"metrics": [{"metric": "Metric_1"}]},
scalar_metrics_iter_histogram={},
single_value_metrics={},
model_events=model_events,
)
self.assertEqual(len(res.debug_images), 1)
task_events = res.debug_images[0]
self.assertEqual(task_events.task, non_report_task)
self.assertEqual(len(task_events.iterations), 1)
self.assertEqual(len(task_events.iterations[0].events), 4)
res = self.api.reports.get_task_data(
id=[non_report_task],
only_fields=["name"],
debug_images={"metrics": []},
plots={"metrics": [{"metric": "Metric_1"}]},
)
self.assertEqual(len(res.debug_images), 1)
task_events = res.debug_images[0]
self.assertEqual(task_events.task, non_report_task)
self.assertEqual(len(task_events.iterations), 1)
self.assertEqual(len(task_events.iterations[0].events), 4)
self.assertEqual(len(res.single_value_metrics), 1)
task_metrics = res.single_value_metrics[0]
self.assertEqual(task_metrics.task, non_report_task)
self.assertEqual(
{(v["metric"], v["variant"]) for v in task_metrics["values"]},
{(f"Metric_{x}", f"Variant_{y}") for x in range(2) for y in range(2)},
)
self.assertEqual(len(task_events.iterations[0].events), 4)
self.assertEqual(len(res.plots), 1)
for m, v in (("Metric_1", "Variant_0"), ("Metric_1", "Variant_1")):
tasks = nested_get(res.plots, (m, v))
self.assertEqual(len(tasks), 1)
task_plots = tasks[non_report_task]
self.assertEqual(len(task_plots), 1)
iter_plots = task_plots["1"]
self.assertEqual(iter_plots.name, "hello")
self.assertEqual(len(iter_plots.plots), 1)
ev = iter_plots.plots[0]
self.assertEqual(ev["metric"], m)
self.assertEqual(ev["variant"], v)
self.assertEqual(ev["task"], non_report_task)
self.assertEqual(ev["iter"], 1)
for m in ("Metric_0", "Metric_1"):
for v in ("Variant_0", "Variant_1"):
tasks = nested_get(res.scalar_metrics_iter_histogram, (m, v))
self.assertEqual(list(tasks.keys()), [non_report_task])
self.assertEqual(len(res.plots), 1)
for m, v in (("Metric_1", "Variant_0"), ("Metric_1", "Variant_1")):
tasks = nested_get(res.plots, (m, v))
self.assertEqual(len(tasks), 1)
task_plots = tasks[non_report_task]
self.assertEqual(len(task_plots), 1)
iter_plots = task_plots["1"]
self.assertEqual(iter_plots.name, "hello")
self.assertEqual(len(iter_plots.plots), 1)
ev = iter_plots.plots[0]
self.assertEqual(ev["metric"], m)
self.assertEqual(ev["variant"], v)
self.assertEqual(ev["task"], non_report_task)
self.assertEqual(ev["iter"], 1)
@staticmethod
def _create_task_event(type_, task, iteration, **kwargs):
@@ -185,12 +272,23 @@ class TestReports(TestService):
**kwargs,
}
delete_params = {"force": True}
def _temp_project(self, name, **kwargs):
return self.create_temp(
"projects",
delete_params=self.delete_params,
name=name,
description="",
**kwargs,
)
def _temp_report(self, name, **kwargs):
return self.create_temp(
"reports",
name=name,
object_name="task",
delete_params={"force": True},
delete_params=self.delete_params,
**kwargs,
)
@@ -199,10 +297,16 @@ class TestReports(TestService):
"tasks",
name=name,
type="training",
delete_params={"force": True},
delete_params=self.delete_params,
**kwargs,
)
def _temp_model(self, name="test model events", **kwargs):
self.update_missing(
kwargs, name=name, uri="file:///a/b", labels={}, ready=False
)
return self.create_temp("models", delete_params=self.delete_params, **kwargs)
def send_batch(self, events):
_, data = self.api.send_batch("events.add_batch", events)
return data

View File

@@ -33,6 +33,63 @@ class TestSubProjects(TestService):
).projects[0]
self.assertEqual(data.dataset_stats, {"file_count": 2, "total_size": 1000})
def test_query_children_system_tags(self):
test_root_name = "TestQueryChildrenTags"
test_root = self._temp_project(name=test_root_name)
project1 = self._temp_project(name=f"{test_root_name}/project1")
project2 = self._temp_project(name=f"{test_root_name}/project2")
self._temp_report(name="test report", project=project1)
self._temp_report(name="test report", project=project2, tags=["test1", "test2"])
self._temp_report(name="test report", project=project2, tags=["test1"])
projects = self.api.projects.get_all_ex(
parent=[test_root],
children_type="report",
shallow_search=True,
include_stats=True,
check_own_contents=True,
).projects
self.assertEqual(len(projects), 2)
projects = self.api.projects.get_all_ex(
parent=[test_root],
children_type="report",
children_tags=["test1", "test2"],
shallow_search=True,
include_stats=True,
check_own_contents=True,
).projects
self.assertEqual(len(projects), 1)
p = projects[0]
self.assertEqual(p.basename, "project2")
self.assertEqual(p.stats.active.total_tasks, 2)
projects = self.api.projects.get_all_ex(
parent=[test_root],
children_type="report",
children_tags=["__$all", "test1", "test2"],
shallow_search=True,
include_stats=True,
check_own_contents=True,
).projects
self.assertEqual(len(projects), 1)
p = projects[0]
self.assertEqual(p.basename, "project2")
self.assertEqual(p.stats.active.total_tasks, 1)
projects = self.api.projects.get_all_ex(
parent=[test_root],
children_type="report",
children_tags=["-test1", "-test2"],
shallow_search=True,
include_stats=True,
check_own_contents=True,
).projects
self.assertEqual(len(projects), 1)
p = projects[0]
self.assertEqual(p.basename, "project1")
self.assertEqual(p.stats.active.total_tasks, 1)
def test_query_children(self):
test_root_name = "TestQueryChildren"
test_root = self._temp_project(name=test_root_name)
@@ -291,7 +348,7 @@ class TestSubProjects(TestService):
id=[project1, project2], check_own_contents=True
).projects
res1 = next(p for p in res if p.id == project1)
self.assertEqual(res1.own_tasks, 2)
self.assertEqual(res1.own_tasks, 1)
self.assertEqual(res1.own_models, 1)
res2 = next(p for p in res if p.id == project2)
@@ -379,7 +436,6 @@ class TestSubProjects(TestService):
delete_params=self.delete_params,
type=type or "testing",
name=name or db_id(),
input=dict(view=dict()),
client=client,
**kwargs,
)

View File

@@ -222,7 +222,7 @@ class TestTags(TestService):
return self.create_temp("models", **kwargs)
def _temp_task(self, **kwargs):
self.update_missing(kwargs, name="Test tags", type="testing", input=dict(view=dict()))
self.update_missing(kwargs, name="Test tags", type="testing")
return self.create_temp("tasks", **kwargs)
def _send(self, service, action, **kwargs):

View File

@@ -14,7 +14,6 @@ class TestTasksArtifacts(TestService):
kwargs,
type="testing",
name="test artifacts",
input=dict(view=dict()),
delete_params=dict(force=True),
)
return self.create_temp("tasks", **kwargs)

View File

@@ -11,7 +11,7 @@ class TestTaskDebugImages(TestService):
def _temp_task(self, name="test task events"):
task_input = dict(
name=name, type="training", input=dict(mapping={}, view=dict(entries=[])),
name=name, type="training"
)
return self.create_temp("tasks", **task_input)

View File

@@ -16,9 +16,7 @@ class TestTaskEvents(TestService):
delete_params = dict(can_fail=True, force=True)
def _temp_task(self, name="test task events"):
task_input = dict(
name=name, type="training", input=dict(mapping={}, view=dict(entries=[])),
)
task_input = dict(name=name, type="training",)
return self.create_temp(
"tasks", delete_paramse=self.delete_params, **task_input
)
@@ -197,6 +195,7 @@ class TestTaskEvents(TestService):
with self.api.raises(errors.bad_request.EventsNotAdded):
self.send(log_event)
# mixed batch
events = [
{
**self._create_task_event("training_stats_scalar", model, iteration),
@@ -209,6 +208,19 @@ class TestTaskEvents(TestService):
for metric_idx in range(5)
for variant_idx in range(5)
]
task = self._temp_task()
# noinspection PyTypeChecker
events.append(
self._create_task_event(
"log",
task=task,
iteration=0,
msg=f"This is a log message",
metric="Metric0",
variant="Variant0",
allow_locked=True,
)
)
self.send_batch(events)
data = self.api.events.scalar_metrics_iter_histogram(
task=model, model_events=True
@@ -220,6 +232,19 @@ class TestTaskEvents(TestService):
self.assertEqual(variant_data.x, [0, 1])
self.assertEqual(variant_data.y, [0.0, 1.0])
model_data = self.api.models.get_all_ex(
id=[model], only_fields=["last_metrics", "last_iteration"]
).models[0]
metric_data = first(first(model_data.last_metrics.values()).values())
self.assertEqual(1, model_data.last_iteration)
self.assertEqual(1, metric_data.value)
self.assertEqual(1, metric_data.max_value)
self.assertEqual(1, metric_data.max_value_iteration)
self.assertEqual(0, metric_data.min_value)
self.assertEqual(0, metric_data.min_value_iteration)
self._assert_log_events(task=task, expected_total=1)
def test_error_events(self):
task = self._temp_task()
events = [

View File

@@ -24,7 +24,6 @@ class TestTasksHyperparams(TestService):
kwargs,
type="testing",
name="test hyperparams",
input=dict(view=dict()),
delete_params=dict(force=True),
)
return self.create_temp("tasks", **kwargs), kwargs["project"]
@@ -299,6 +298,23 @@ class TestTasksHyperparams(TestService):
).tasks[0]
self.assertEqual(new_params_dict2, res.hyperparams)
def test_numeric_ordering(self):
params = [
dict(section="section1", name="param1", type="type1", value="1"),
dict(section="section1", name="param1", type="type1", value="2"),
dict(section="section1", name="param1", type="type1", value="11"),
]
tasks = [
self.new_task(hyperparams=self._param_dict_from_list([p]), project=None)[0]
for p in params
]
res = self.api.tasks.get_all_ex(id=tasks, order_by=["hyperparams.section1.param1"]).tasks
self.assertEqual([t.id for t in res], tasks)
res = self.api.tasks.get_all_ex(id=tasks, order_by=["-hyperparams.section1.param1"]).tasks
self.assertEqual([t.id for t in res], list(reversed(tasks)))
def test_old_api(self):
legacy_params = {"legacy.1": "val1", "TF_DEFINE/param2": "val2"}
legacy_config = {"design": "hello"}

View File

@@ -104,7 +104,7 @@ class TestTaskModels(TestService):
def new_task(self, **kwargs):
self.update_missing(
kwargs, type="testing", name="test task models", input=dict(view=dict())
kwargs, type="testing", name="test task models"
)
return self.create_temp("tasks", **kwargs)

View File

@@ -56,6 +56,33 @@ class TestTaskParent(TestService):
parents,
)
def test_query_by_name(self):
project_name = "Test parents project"
project = self.create_temp("projects", name=project_name, description="test")
parent_names = [f"Parent{i}" for i in range(3)]
parents = [self.new_task(project=project, name=name) for name in parent_names]
for idx in range(2):
self.new_task(project=project, name=f"Child{idx}", parent=parents[idx])
parents = self.api.projects.get_task_parents(
projects=[project], task_name="Parent"
).parents
self.assertEqual(len(parents), 2)
for parent_name in parent_names[:2]:
res = self.api.projects.get_task_parents(
projects=[project], task_name=parent_name
).parents
self.assertEqual(len(res), 1)
self.assertEqual(res[0].name, parent_name)
parents = self.api.projects.get_task_parents(
projects=[project], task_name=parent_names[2]
).parents
self.assertEqual(len(parents), 0)
def test_query_by_state(self):
project_name = "Test parents project"
project = self.create_temp("projects", name=project_name, description="test")
@@ -74,15 +101,17 @@ class TestTaskParent(TestService):
self.assertEqual([parent1, parent2], [p.id for p in parents])
# Active tasks
parents = self.api.projects.get_task_parents(projects=[project], tasks_state="active").parents
parents = self.api.projects.get_task_parents(
projects=[project], tasks_state="active"
).parents
self.assertEqual([parent1], [p.id for p in parents])
# Archived tasks
parents = self.api.projects.get_task_parents(projects=[project], tasks_state="archived").parents
parents = self.api.projects.get_task_parents(
projects=[project], tasks_state="archived"
).parents
self.assertEqual([parent2], [p.id for p in parents])
def new_task(self, **kwargs):
self.update_missing(
kwargs, type="testing", name="test task parents", input=dict(view=dict())
)
self.update_missing(kwargs, type="testing", name="test task parents")
return self.create_temp("tasks", **kwargs)

View File

@@ -8,7 +8,7 @@ from apiserver.tests.automated import TestService
class TestTaskPlots(TestService):
def _temp_task(self, name="test task events"):
task_input = dict(
name=name, type="training", input=dict(mapping={}, view=dict(entries=[])),
name=name, type="training"
)
return self.create_temp("tasks", **task_input)

View File

@@ -242,7 +242,7 @@ class TestTasksResetDelete(TestService):
def new_task(self, **kwargs):
self.update_missing(
kwargs, name=self.name, type="testing", input=dict(view=dict())
kwargs, name=self.name, type="testing"
)
return self.create_temp("tasks", delete_params=self.delete_params, **kwargs,)

View File

@@ -10,7 +10,7 @@ class TestTasksDiff(TestService):
def new_task(self, **kwargs):
return self.create_temp(
"tasks", name="test", type="testing", input=dict(view=dict()), **kwargs
"tasks", name="test", type="testing", **kwargs
)
def _compare_script(self, task_id, script):

View File

@@ -13,7 +13,7 @@ class TestTasksEdit(TestService):
def new_task(self, **kwargs):
self.update_missing(
kwargs, type="testing", name="test", input=dict(view=dict())
kwargs, type="testing", name="test"
)
return self.create_temp("tasks", **kwargs)

View File

@@ -96,7 +96,6 @@ class TestTasksFiltering(TestService):
kwargs,
type="testing",
name="test tasks filtering",
input=dict(view=dict()),
delete_params=dict(force=True),
)
return self.create_temp("tasks", **kwargs)

View File

@@ -46,7 +46,6 @@ class TestTasksRunning(TestService):
task_input = dict(
name="task-1",
type="testing",
input=dict(mapping={}, view=dict()),
)
if is_development:
task_input["system_tags"] = ["development"]

View File

@@ -35,7 +35,7 @@ class TestUsersService(TestService):
task = (
self.api.impersonate(user_3)
.tasks.create(
name="test", type="testing", input=dict(view={}), project=project
name="test", type="testing", project=project
)
.id
)

View File

@@ -103,7 +103,7 @@ class TestWorkersService(TestService):
def _create_running_task(self, task_name):
task_input = dict(
name=task_name, type="testing", input=dict(mapping={}, view={})
name=task_name, type="testing"
)
task_id = self.create_temp("tasks", **task_input)

View File

@@ -1 +1 @@
__version__ = "1.10.0"
__version__ = "1.11.0"

View File

@@ -1,4 +1,4 @@
FROM centos/nodejs-12-centos7 AS webapp
FROM node:18-bullseye as webapp_builder
ARG CLEARML_WEB_GIT_URL=https://github.com/allegroai/clearml-web.git
@@ -10,20 +10,17 @@ RUN mv clearml-web /opt/open-webapp
COPY --chmod=744 docker/build/internal_files/build_webapp.sh /tmp/internal_files/
RUN /bin/bash -c '/tmp/internal_files/build_webapp.sh'
FROM centos:7 AS staging_image
FROM python:3.9-slim-bullseye
COPY --chmod=744 docker/build/internal_files/entrypoint.sh /opt/clearml/
COPY fileserver /opt/clearml/fileserver/
COPY apiserver /opt/clearml/apiserver/
FROM centos:7
COPY --from=staging_image /opt/clearml/ /opt/clearml/
COPY --chmod=744 docker/build/internal_files/final_image_preparation.sh /tmp/internal_files/
COPY docker/build/internal_files/clearml.conf.template /tmp/internal_files/
COPY docker/build/internal_files/clearml_subpath.conf.template /tmp/internal_files/
RUN /bin/bash -c '/tmp/internal_files/final_image_preparation.sh'
COPY --from=webapp /opt/open-webapp/build /usr/share/nginx/html
COPY --from=webapp_builder /opt/open-webapp/build /usr/share/nginx/html
COPY --from=webapp_builder /opt/open-webapp/dist/report-widgets /usr/share/nginx/widgets
EXPOSE 8080

View File

@@ -1,98 +1,60 @@
# For more information on configuration, see:
# * Official English Documentation: http://nginx.org/en/docs/
# * Official Russian Documentation: http://nginx.org/ru/docs/
server {
listen 80 default_server;
${COMMENT_IPV6_LISTEN}listen [::]:80 default_server;
server_name _;
root /usr/share/nginx/html;
proxy_http_version 1.1;
client_max_body_size 0;
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;
# compression
gzip on;
gzip_comp_level 9;
gzip_http_version 1.0;
gzip_min_length 512;
gzip_proxied expired no-cache no-store private auth;
gzip_types text/plain
text/css
application/json
application/javascript
application/x-javascript
text/xml application/xml
application/xml+rss
text/javascript
application/x-font-ttf
font/woff2
image/svg+xml
image/x-icon;
# Load dynamic modules. See /usr/share/doc/nginx/README.dynamic.
include /usr/share/nginx/modules/*.conf;
# Load configuration files for the default server block.
include /etc/nginx/default.d/*.conf;
events {
worker_connections 1024;
}
location / {
try_files $uri$args $uri$args/ $uri index.html /index.html;
}
http {
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
location /version.json {
add_header Cache-Control 'no-cache';
}
access_log /var/log/nginx/access.log main;
location /api {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_pass ${NGINX_APISERVER_ADDR};
rewrite /api/(.*) /$1 break;
}
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
location /files {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_pass ${NGINX_FILESERVER_ADDR};
rewrite /files/(.*) /$1 break;
}
include /etc/nginx/mime.types;
default_type application/octet-stream;
error_page 404 /404.html;
location = /40x.html {
}
# Load modular configuration files from the /etc/nginx/conf.d directory.
# See http://nginx.org/en/docs/ngx_core_module.html#include
# for more information.
include /etc/nginx/conf.d/*.conf;
server {
listen 80 default_server;
${COMMENT_IPV6_LISTEN}listen [::]:80 default_server;
server_name _;
root /usr/share/nginx/html;
proxy_http_version 1.1;
client_max_body_size 0;
# compression
gzip on;
gzip_comp_level 9;
gzip_http_version 1.0;
gzip_min_length 512;
gzip_proxied expired no-cache no-store private auth;
gzip_types text/plain
text/css
application/json
application/javascript
application/x-javascript
text/xml application/xml
application/xml+rss
text/javascript
application/x-font-ttf
font/woff2
image/svg+xml
image/x-icon;
# Load configuration files for the default server block.
include /etc/nginx/default.d/*.conf;
location / {
try_files $uri$args $uri$args/ $uri index.html /index.html;
}
location /version.json {
add_header Cache-Control 'no-cache';
}
location /api {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_pass ${NGINX_APISERVER_ADDR};
rewrite /api/(.*) /$1 break;
}
location /files {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_pass ${NGINX_FILESERVER_ADDR};
rewrite /files/(.*) /$1 break;
}
error_page 404 /404.html;
location = /40x.html {
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
}
}

View File

@@ -49,7 +49,7 @@ EOF
export NGINX_APISERVER_ADDR=${NGINX_APISERVER_ADDRESS:-http://apiserver:8008}
export NGINX_FILESERVER_ADDR=${NGINX_FILESERVER_ADDRESS:-http://fileserver:8081}
COMMENT_IPV6_LISTEN=$([ "$DISABLE_NGINX_IPV6" = "true" ] && echo "#" || echo "") \
envsubst '${COMMENT_IPV6_LISTEN} ${NGINX_APISERVER_ADDR} ${NGINX_FILESERVER_ADDR}' < /etc/nginx/clearml.conf.template > /etc/nginx/nginx.conf
envsubst '${COMMENT_IPV6_LISTEN} ${NGINX_APISERVER_ADDR} ${NGINX_FILESERVER_ADDR}' < /etc/nginx/clearml.conf.template > /etc/nginx/sites-enabled/default
if [[ -n "${CLEARML_SERVER_SUB_PATH}" ]]; then
envsubst '${CLEARML_SERVER_SUB_PATH}' < /etc/nginx/clearml_subpath.conf.template > /etc/nginx/default.d/clearml_subpath.conf

View File

@@ -3,17 +3,20 @@ set -o errexit
set -o nounset
set -o pipefail
yum update -y
yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
yum install -y python36 python36-pip nginx gcc gcc-c++ python3-devel gettext
yum -y upgrade
apt-get update -y
apt-get install -y python3-setuptools python3-dev build-essential nginx gettext
apt-get install -y vim curl
python3 -m ensurepip
python3 -m pip install --upgrade pip
python3 -m pip install -r /opt/clearml/fileserver/requirements.txt
python3 -m pip install -r /opt/clearml/apiserver/requirements.txt
mkdir -p /opt/clearml/log
mkdir -p /opt/clearml/config
ln -s /dev/stdout /var/log/nginx/access.log
ln -s /dev/stderr /var/log/nginx/error.log
mv /etc/nginx/nginx.conf /etc/nginx/nginx.conf.orig
ln -svf /dev/stdout /var/log/nginx/access.log
ln -svf /dev/stderr /var/log/nginx/error.log
mv /tmp/internal_files/clearml.conf.template /etc/nginx/clearml.conf.template
mv /tmp/internal_files/clearml_subpath.conf.template /etc/nginx/clearml_subpath.conf.template
yum clean all
rm -d -r "$(pip cache dir)"
apt-get clean

View File

@@ -29,6 +29,7 @@ services:
CLEARML__apiserver__pre_populate__zip_files: "/opt/clearml/db-pre-populate"
CLEARML__apiserver__pre_populate__artifacts_path: "/mnt/fileserver"
CLEARML__services__async_urls_delete__enabled: "true"
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
ports:
- "8008:8008"
networks:
@@ -145,7 +146,7 @@ services:
CLEARML_REDIS_SERVICE_HOST: redis
CLEARML_REDIS_SERVICE_PORT: 6379
PYTHONPATH: /opt/clearml/apiserver
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[]"
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
entrypoint:
- python3
- -m

View File

@@ -29,6 +29,7 @@ services:
CLEARML__apiserver__pre_populate__zip_files: "/opt/clearml/db-pre-populate"
CLEARML__apiserver__pre_populate__artifacts_path: "/mnt/fileserver"
CLEARML__services__async_urls_delete__enabled: "true"
CLEARML__services__async_urls_delete__fileserver__url_prefixes: "[${CLEARML_FILES_HOST:-}]"
ports:
- "8008:8008"
networks:

View File

@@ -1,5 +1,9 @@
boltons>=19.1.0
Flask
Flask-Cors>=3.0.5
Flask-Compress>=1.4.0
pyhocon>=0.3.35
flask-compress>=1.4.0
flask-cors>=3.0.5
flask>=0.12.2
gunicorn>=20.1.0
markupsafe==2.0.1
pyhocon>=0.3.35
setuptools>=65.5.1