Unify batch operations

This commit is contained in:
allegroai 2021-05-03 18:03:54 +03:00
parent 251ee57ffd
commit e2f265b4bc
11 changed files with 117 additions and 96 deletions

View File

@ -3,7 +3,7 @@ from typing import Sequence
from jsonmodels.models import Base
from jsonmodels.validators import Length
from apiserver.apimodels import ListField
from apiserver.apimodels import ListField, IntField
class BatchRequest(Base):
@ -11,4 +11,5 @@ class BatchRequest(Base):
class BatchResponse(Base):
succeeded: int = IntField()
failures: Sequence[dict] = ListField([dict])

View File

@ -55,12 +55,7 @@ class ModelsArchiveManyRequest(BatchRequest):
pass
class ModelsArchiveManyResponse(BatchResponse):
archived = fields.IntField(required=True)
class ModelsDeleteManyResponse(BatchResponse):
deleted = fields.IntField()
urls = fields.ListField([str])
@ -84,7 +79,6 @@ class ModelsPublishManyRequest(BatchRequest):
class ModelsPublishManyResponse(BatchResponse):
published = fields.IntField(required=True)
published_tasks = fields.ListField([ModelTaskPublishResponse])

View File

@ -7,7 +7,7 @@ from jsonmodels.validators import Enum, Length
from apiserver.apimodels import DictField, ListField
from apiserver.apimodels.base import UpdateResponse
from apiserver.apimodels.batch import BatchRequest, BatchResponse
from apiserver.apimodels.batch import BatchRequest
from apiserver.database.model.task.task import (
TaskType,
ArtifactModes,
@ -241,26 +241,14 @@ class StopManyRequest(TaskBatchRequest):
force = BoolField(default=False)
class StopManyResponse(BatchResponse):
stopped = IntField(required=True)
class ArchiveManyRequest(TaskBatchRequest):
pass
class ArchiveManyResponse(BatchResponse):
archived = IntField(required=True)
class EnqueueManyRequest(TaskBatchRequest):
queue = StringField()
class EnqueueManyResponse(BatchResponse):
queued = IntField()
class DeleteManyRequest(TaskBatchRequest):
move_to_trash = BoolField(default=True)
return_file_urls = BoolField(default=False)

View File

@ -106,7 +106,7 @@ def validate_status_change(current_status, new_status):
state_machine = {
TaskStatus.created: {TaskStatus.queued, TaskStatus.in_progress},
TaskStatus.queued: {TaskStatus.created, TaskStatus.in_progress},
TaskStatus.queued: {TaskStatus.created, TaskStatus.in_progress, TaskStatus.stopped},
TaskStatus.in_progress: {
TaskStatus.stopped,
TaskStatus.failed,

View File

@ -1,7 +1,10 @@
import os
import re
from datetime import datetime
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.errors import DuplicateKeyError
from apiserver.database.model.task.task import TaskModelTypes, TaskModelNames
from apiserver.services.utils import escape_dict
@ -94,8 +97,39 @@ def _migrate_model_labels(db: Database):
tasks.update_one({"_id": doc["_id"]}, {"$set": set_commands})
def _migrate_project_names(db: Database):
projects: Collection = db["project"]
regx = re.compile("/", re.IGNORECASE)
for doc in projects.find(filter={"name": regx, "path": {"$in": [None, []]}}):
name = doc.get("name")
if not name:
continue
max_tries = int(os.getenv("CLEARML_MIGRATION_PROJECT_RENAME_MAX_TRIES", 10))
iteration = 0
for iteration in range(max_tries):
new_name = name.replace("/", "_" * (iteration + 1))
try:
projects.update_one(
{"_id": doc["_id"]},
{
"$set": {"name": new_name}
}
)
break
except DuplicateKeyError:
pass
if iteration >= max_tries - 1:
print(
f"Could not upgrade the name {name} of the project {doc.get('_id')}"
)
def migrate_backend(db: Database):
_migrate_task_models(db)
_migrate_docker_cmd(db)
_migrate_model_labels(db)
_migrate_project_names(db)
_drop_all_indices_from_collections(db, ["task*"])

View File

@ -37,7 +37,6 @@ batch_operation {
required: [ids]
properties {
ids {
description: Entities to move
type: array
items {type: string}
}
@ -46,6 +45,9 @@ batch_operation {
response {
type: object
properties {
succeeded {
type: integer
}
failures {
type: array
items {

View File

@ -658,21 +658,21 @@ publish_many {
"2.13": ${_definitions.batch_operation} {
description: Publish models
request {
force_publish_task {
description: "Publish the associated tasks (if exist) even if they are not in the 'stopped' state. Optional, the default value is False."
type: boolean
}
publish_tasks {
description: "Indicates that the associated tasks (if exist) should be published. Optional, the default value is True."
type: boolean
properties {
ids.description: "IDs of models to publish"
force_publish_task {
description: "Publish the associated tasks (if exist) even if they are not in the 'stopped' state. Optional, the default value is False."
type: boolean
}
publish_tasks {
description: "Indicates that the associated tasks (if exist) should be published. Optional, the default value is True."
type: boolean
}
}
}
response {
properties {
published {
description: "Number of models published"
type: integer
}
succeeded.description: "Number of models published"
published_tasks {
type: array
items: ${_definitions.published_task_item}
@ -718,12 +718,14 @@ set_ready {
archive_many {
"2.13": ${_definitions.batch_operation} {
description: Archive models
request {
properties {
ids.description: "IDs of models to archive"
}
}
response {
properties {
archived {
description: "Number of models archived"
type: integer
}
succeeded.description: "Number of models archived"
}
}
}
@ -732,18 +734,18 @@ delete_many {
"2.13": ${_definitions.batch_operation} {
description: Delete models
request {
force {
description: """Force. Required if there are tasks that use the model as an execution model, or if the model's creating task is published.
properties {
ids.description: "IDs of models to delete"
force {
description: """Force. Required if there are tasks that use the model as an execution model, or if the model's creating task is published.
"""
type: boolean
type: boolean
}
}
}
response {
properties {
deleted {
description: "Number of models deleted"
type: integer
}
succeeded.description: "Number of models deleted"
urls {
descrition: "The urls of the deleted model files"
type: array

View File

@ -1421,6 +1421,7 @@ reset_many {
description: Reset tasks
request {
properties {
ids.description: "IDs of the tasks to reset"
force = ${_references.force_arg} {
description: "If not true, call fails if the task status is 'completed'"
}
@ -1441,10 +1442,7 @@ reset_many {
}
response {
properties {
reset {
description: "Number of tasks reset"
type: integer
}
succeeded.description: "Number of tasks reset"
dequeued {
description: "Number of tasks dequeued"
type: object
@ -1467,6 +1465,7 @@ delete_many {
description: Delete tasks
request {
properties {
ids.description: "IDs of the tasks to delete"
move_to_trash {
description: "Move task to trash instead of deleting it. For internal use only, tasks in the trash are not visible from the API and cannot be restored!"
type: boolean
@ -1487,10 +1486,7 @@ delete_many {
}
response {
properties {
deleted {
description: "Number of tasks deleted"
type: integer
}
succeeded.description: "Number of tasks deleted"
updated_children {
description: "Number of child tasks whose parent property was updated"
type: integer
@ -1615,11 +1611,13 @@ archive {
archive_many {
"2.13": ${_definitions.change_many_request} {
description: Archive tasks
request {
properties {
ids.description: "IDs of the tasks to archive"
}
response {
properties {
archived {
description: "Number of tasks archived"
type: integer
succeeded.description: "Number of tasks archived"
}
}
}
@ -1668,6 +1666,7 @@ stop_many {
description: "Request to stop running tasks"
request {
properties {
ids.description: "IDs of the tasks to stop"
force = ${_references.force_arg} {
description: "If not true, call fails if the task status is not 'in_progress'"
}
@ -1675,10 +1674,7 @@ stop_many {
}
response {
properties {
stopped {
description: "Number of tasks stopped"
type: integer
}
succeeded.description: "Number of tasks stopped"
}
}
}
@ -1754,6 +1750,7 @@ publish_many {
description: Publish tasks
request {
properties {
ids.description: "IDs of the tasks to publish"
force = ${_references.force_arg} {
description: "If not true, call fails if the task status is not 'stopped'"
}
@ -1765,10 +1762,7 @@ publish_many {
}
response {
properties {
published {
description: "Number of tasks published"
type: integer
}
succeeded.description: "Number of tasks published"
}
}
}
@ -1812,12 +1806,18 @@ Fails if the following parameters in the task were not filled:
enqueue_many {
"2.13": ${_definitions.change_many_request} {
description: Enqueue tasks
request {
properties {
ids.description: "IDs of the tasks to enqueue"
queue {
description: "Queue id. If not provided, tasks are added to the default queue."
type: string
}
}
}
response {
properties {
enqueued {
description: "Number of tasks enqueued"
type: integer
}
succeeded.description: "Number of tasks enqueued"
}
}
}

View File

@ -9,6 +9,7 @@ from apiserver import database
from apiserver.apierrors import errors
from apiserver.apierrors.errors.bad_request import InvalidModelId
from apiserver.apimodels.base import UpdateResponse, MakePublicRequest, MoveRequest
from apiserver.apimodels.batch import BatchResponse
from apiserver.apimodels.models import (
CreateModelRequest,
CreateModelResponse,
@ -24,7 +25,6 @@ from apiserver.apimodels.models import (
ModelsDeleteManyRequest,
ModelsDeleteManyResponse,
ModelsArchiveManyRequest,
ModelsArchiveManyResponse,
)
from apiserver.bll.model import ModelBLL
from apiserver.bll.organization import OrgBLL, Tags
@ -533,7 +533,7 @@ def publish_many(call: APICall, company_id, request: ModelsPublishManyRequest):
)
call.result.data_model = ModelsPublishManyResponse(
published=res.published, published_tasks=res.published_tasks, failures=failures,
succeeded=res.published, published_tasks=res.published_tasks, failures=failures,
)
@ -581,14 +581,14 @@ def delete(call: APICall, company_id, request: ModelsDeleteManyRequest):
res.urls.discard(None)
call.result.data_model = ModelsDeleteManyResponse(
deleted=res.deleted, urls=list(res.urls), failures=failures,
succeeded=res.deleted, urls=list(res.urls), failures=failures,
)
@endpoint(
"models.archive_many",
request_data_model=ModelsArchiveManyRequest,
response_data_model=ModelsArchiveManyResponse,
response_data_model=BatchResponse,
)
def archive_many(call: APICall, company_id, request: ModelsArchiveManyRequest):
archived, failures = run_batch_operation(
@ -596,9 +596,7 @@ def archive_many(call: APICall, company_id, request: ModelsArchiveManyRequest):
ids=request.ids,
init_res=0,
)
call.result.data_model = ModelsArchiveManyResponse(
archived=archived, failures=failures,
)
call.result.data_model = BatchResponse(succeeded=archived, failures=failures)
@endpoint("models.make_public", min_version="2.9", request_data_model=MakePublicRequest)

View File

@ -17,6 +17,7 @@ from apiserver.apimodels.base import (
MakePublicRequest,
MoveRequest,
)
from apiserver.apimodels.batch import BatchResponse
from apiserver.apimodels.tasks import (
StartedResponse,
ResetResponse,
@ -47,13 +48,10 @@ from apiserver.apimodels.tasks import (
ArchiveRequest,
AddUpdateModelRequest,
DeleteModelsRequest,
StopManyResponse,
StopManyRequest,
EnqueueManyRequest,
EnqueueManyResponse,
ResetManyRequest,
ArchiveManyRequest,
ArchiveManyResponse,
DeleteManyRequest,
PublishManyRequest,
)
@ -299,7 +297,7 @@ class StopRes:
@endpoint(
"tasks.stop_many",
request_data_model=StopManyRequest,
response_data_model=StopManyResponse,
response_data_model=BatchResponse,
)
def stop_many(call: APICall, company_id, request: StopManyRequest):
res, failures = run_batch_operation(
@ -313,7 +311,7 @@ def stop_many(call: APICall, company_id, request: StopManyRequest):
ids=request.ids,
init_res=StopRes(),
)
call.result.data_model = StopManyResponse(stopped=res.stopped, failures=failures)
call.result.data_model = BatchResponse(succeeded=res.stopped, failures=failures)
@endpoint(
@ -867,7 +865,7 @@ class EnqueueRes:
@endpoint(
"tasks.enqueue_many",
request_data_model=EnqueueManyRequest,
response_data_model=EnqueueManyResponse,
response_data_model=BatchResponse,
)
def enqueue_many(call: APICall, company_id, request: EnqueueManyRequest):
res, failures = run_batch_operation(
@ -881,7 +879,7 @@ def enqueue_many(call: APICall, company_id, request: EnqueueManyRequest):
ids=request.ids,
init_res=EnqueueRes(),
)
call.result.data_model = EnqueueManyResponse(queued=res.queued, failures=failures)
call.result.data_model = BatchResponse(succeeded=res.queued, failures=failures)
@endpoint(
@ -971,7 +969,7 @@ def reset_many(call: APICall, company_id, request: ResetManyRequest):
else:
cleanup_res = {}
call.result.data = dict(
reset=res.reset, dequeued=res.dequeued, **cleanup_res, failures=failures,
succeeded=res.reset, dequeued=res.dequeued, **cleanup_res, failures=failures,
)
@ -1001,7 +999,7 @@ def archive(call: APICall, company_id, request: ArchiveRequest):
@endpoint(
"tasks.archive_many",
request_data_model=ArchiveManyRequest,
response_data_model=ArchiveManyResponse,
response_data_model=BatchResponse,
)
def archive_many(call: APICall, company_id, request: ArchiveManyRequest):
archived, failures = run_batch_operation(
@ -1014,7 +1012,7 @@ def archive_many(call: APICall, company_id, request: ArchiveManyRequest):
ids=request.ids,
init_res=0,
)
call.result.data_model = ArchiveManyResponse(archived=archived, failures=failures)
call.result.data_model = BatchResponse(succeeded=archived, failures=failures)
@endpoint("tasks.delete", request_data_model=DeleteRequest)
@ -1067,7 +1065,7 @@ def delete_many(call: APICall, company_id, request: DeleteManyRequest):
_reset_cached_tags(company_id, projects=list(res.projects))
cleanup_res = attr.asdict(res.cleanup_res) if res.cleanup_res else {}
call.result.data = dict(deleted=res.deleted, **cleanup_res, failures=failures)
call.result.data = dict(succeeded=res.deleted, **cleanup_res, failures=failures)
@endpoint(
@ -1095,7 +1093,11 @@ class PublishRes:
return PublishRes(published=self.published + 1)
@endpoint("tasks.publish_many", request_data_model=PublishManyRequest)
@endpoint(
"tasks.publish_many",
request_data_model=PublishManyRequest,
response_data_model=BatchResponse,
)
def publish_many(call: APICall, company_id, request: PublishManyRequest):
res, failures = run_batch_operation(
func=partial(
@ -1112,7 +1114,7 @@ def publish_many(call: APICall, company_id, request: PublishManyRequest):
init_res=PublishRes(),
)
call.result.data = dict(published=res.published, failures=failures)
call.result.data_model = BatchResponse(succeeded=res.published, failures=failures)
@endpoint(

View File

@ -21,7 +21,7 @@ class TestBatchOperations(TestService):
# enqueue
res = self.api.tasks.enqueue_many(ids=ids)
self.assertEqual(res.queued, 2)
self.assertEqual(res.succeeded, 2)
self._assert_failures(res, [missing_id])
data = self.api.tasks.get_all_ex(id=ids).tasks
self.assertEqual({t.status for t in data}, {"queued"})
@ -30,14 +30,14 @@ class TestBatchOperations(TestService):
for t in tasks:
self.api.tasks.started(task=t)
res = self.api.tasks.stop_many(ids=ids)
self.assertEqual(res.stopped, 2)
self.assertEqual(res.succeeded, 2)
self._assert_failures(res, [missing_id])
data = self.api.tasks.get_all_ex(id=ids).tasks
self.assertEqual({t.status for t in data}, {"stopped"})
# publish
res = self.api.tasks.publish_many(ids=ids, publish_model=False)
self.assertEqual(res.published, 2)
self.assertEqual(res.succeeded, 2)
self._assert_failures(res, [missing_id])
data = self.api.tasks.get_all_ex(id=ids).tasks
self.assertEqual({t.status for t in data}, {"published"})
@ -46,7 +46,7 @@ class TestBatchOperations(TestService):
res = self.api.tasks.reset_many(
ids=ids, delete_output_models=True, return_file_urls=True, force=True
)
self.assertEqual(res.reset, 2)
self.assertEqual(res.succeeded, 2)
self.assertEqual(res.deleted_models, 2)
self.assertEqual(set(res.urls.model_urls), {"uri_0", "uri_1"})
self._assert_failures(res, [missing_id])
@ -55,7 +55,7 @@ class TestBatchOperations(TestService):
# archive
res = self.api.tasks.archive_many(ids=ids)
self.assertEqual(res.archived, 2)
self.assertEqual(res.succeeded, 2)
self._assert_failures(res, [missing_id])
data = self.api.tasks.get_all_ex(id=ids).tasks
self.assertTrue(all("archived" in t.system_tags for t in data))
@ -64,7 +64,7 @@ class TestBatchOperations(TestService):
res = self.api.tasks.delete_many(
ids=ids, delete_output_models=True, return_file_urls=True
)
self.assertEqual(res.deleted, 2)
self.assertEqual(res.succeeded, 2)
self._assert_failures(res, [missing_id])
data = self.api.tasks.get_all_ex(id=ids).tasks
self.assertEqual(data, [])
@ -84,13 +84,13 @@ class TestBatchOperations(TestService):
res = self.api.models.publish_many(
ids=ids, publish_task=True, force_publish_task=True
)
self.assertEqual(res.published, 1)
self.assertEqual(res.succeeded, 1)
self.assertEqual(res.published_tasks[0].id, task)
self._assert_failures(res, [ids[1], missing_id])
# archive
res = self.api.models.archive_many(ids=ids)
self.assertEqual(res.archived, 2)
self.assertEqual(res.succeeded, 2)
self._assert_failures(res, [missing_id])
data = self.api.models.get_all_ex(id=ids).models
for m in data:
@ -98,7 +98,7 @@ class TestBatchOperations(TestService):
# delete
res = self.api.models.delete_many(ids=[*models, missing_id], force=True)
self.assertEqual(res.deleted, 2)
self.assertEqual(res.succeeded, 2)
self.assertEqual(set(res.urls), set(uris))
self._assert_failures(res, [missing_id])
data = self.api.models.get_all_ex(id=ids).models