From a83a932e843fe4cd7a2e425407da36c08bbbc75c Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 26 Jul 2023 18:23:05 +0300 Subject: [PATCH] Add pipelines.delete_runs endpoint --- apiserver/apierrors/errors.conf | 3 ++ apiserver/apimodels/pipelines.py | 6 ++++ apiserver/schema/services/pipelines.conf | 37 ++++++++++++++++++++- apiserver/tests/automated/test_pipelines.py | 27 ++++++++++++++- 4 files changed, 71 insertions(+), 2 deletions(-) diff --git a/apiserver/apierrors/errors.conf b/apiserver/apierrors/errors.conf index 01db053..c42814f 100644 --- a/apiserver/apierrors/errors.conf +++ b/apiserver/apierrors/errors.conf @@ -53,6 +53,9 @@ # Reports 150: ["operation_supported_on_reports_only", "passed task is not report"] + # Pipelines + 160: ["cannot_remove_all_runs", "at least one pipeline run should be left"] + # Models 200: ["model_error", "general task error"] 201: ["invalid_model_id", "invalid model id"] diff --git a/apiserver/apimodels/pipelines.py b/apiserver/apimodels/pipelines.py index ef8bd52..49c7f51 100644 --- a/apiserver/apimodels/pipelines.py +++ b/apiserver/apimodels/pipelines.py @@ -1,4 +1,5 @@ from jsonmodels import models, fields +from jsonmodels.validators import Length from apiserver.apimodels import ListField @@ -8,6 +9,11 @@ class Arg(models.Base): value = fields.StringField(required=True) +class DeleteRunsRequest(models.Base): + project = fields.StringField(required=True) + ids = ListField([str], required=True, validators=[Length(1)]) + + class StartPipelineRequest(models.Base): task = fields.StringField(required=True) queue = fields.StringField(required=True) diff --git a/apiserver/schema/services/pipelines.conf b/apiserver/schema/services/pipelines.conf index 091ff9b..cdd5a74 100644 --- a/apiserver/schema/services/pipelines.conf +++ b/apiserver/schema/services/pipelines.conf @@ -1,7 +1,42 @@ _description: "Provides a management API for pipelines in the system." _definitions { + include "_common.conf" +} +delete_runs { + "999.0": ${_definitions.batch_operation} { + description: Delete pipeline runs + request { + required: [ids, project] + properties { + ids.description: "IDs of the pipeline runs to delete. Should be the ids of pipeline controller tasks" + project { + description: "Pipeline project ids. When deleting at least one run should be left" + type: string + } + } + } + response { + properties { + succeeded.items.properties.deleted { + description: "Indicates whether the task was deleted" + type: boolean + } + succeeded.items.properties.updated_children { + description: "Number of child tasks whose parent property was updated" + type: integer + } + succeeded.items.properties.updated_models { + description: "Number of models whose task property was updated" + type: integer + } + succeeded.items.properties.deleted_models { + description: "Number of deleted output models" + type: integer + } + } + } + } } - start_pipeline { "2.17" { description: "Start a pipeline" diff --git a/apiserver/tests/automated/test_pipelines.py b/apiserver/tests/automated/test_pipelines.py index 16190b4..caf38db 100644 --- a/apiserver/tests/automated/test_pipelines.py +++ b/apiserver/tests/automated/test_pipelines.py @@ -1,9 +1,34 @@ from typing import Tuple +from apiserver.apierrors import errors from apiserver.tests.automated import TestService class TestPipelines(TestService): + def test_delete_runs(self): + queue = self.api.queues.get_default().id + task_name = "pipelines test" + project, task = self._temp_project_and_task(name=task_name) + args = [{"name": "hello", "value": "test"}] + pipeline_tasks = [ + self.api.pipelines.start_pipeline( + task=task, queue=queue, args=args + ).pipeline + for _ in range(2) + ] + tasks = self.api.tasks.get_all_ex(project=project).tasks + self.assertEqual({task, *pipeline_tasks}, {t.id for t in tasks}) + + # cannot delete all runs + with self.api.raises(errors.bad_request.CannotRemoveAllRuns): + self.api.pipelines.delete_runs(project=project, ids=[task, *pipeline_tasks]) + + # successful deletion + res = self.api.pipelines.delete_runs(project=project, ids=pipeline_tasks) + self.assertEqual({r.id for r in res.succeeded}, set(pipeline_tasks)) + tasks = self.api.tasks.get_all_ex(project=project).tasks + self.assertEqual([task], [t.id for t in tasks]) + def test_start_pipeline(self): queue = self.api.queues.get_default().id task_name = "pipelines test" @@ -42,7 +67,7 @@ class TestPipelines(TestService): self.create_temp( "tasks", name=name, - type="testing", + type="controller", project=project, system_tags=["pipeline"], ),