diff --git a/apiserver/database/model/task/task.py b/apiserver/database/model/task/task.py index e877771..76de7e2 100644 --- a/apiserver/database/model/task/task.py +++ b/apiserver/database/model/task/task.py @@ -125,7 +125,6 @@ class Execution(EmbeddedDocument, ProperDictMixin): model_labels = ModelLabels() framework = StringField() artifacts: Dict[str, Artifact] = SafeMapField(field=EmbeddedDocumentField(Artifact)) - docker_cmd = StringField() queue = StringField(reference_field="Queue") """ Queue ID where task was queued """ @@ -250,7 +249,7 @@ class Task(AttributedDocument): configuration = SafeMapField(field=EmbeddedDocumentField(ConfigurationItem)) runtime = SafeDictField(default=dict) models: Models = EmbeddedDocumentField(Models, default=Models) - docker_init_script = StringField() + container = SafeMapField(field=StringField(default="")) def get_index_company(self) -> str: """ diff --git a/apiserver/mongo/initialize/pre_populate.py b/apiserver/mongo/initialize/pre_populate.py index 9f1b799..b4fde70 100644 --- a/apiserver/mongo/initialize/pre_populate.py +++ b/apiserver/mongo/initialize/pre_populate.py @@ -759,6 +759,61 @@ class PrePopulate: module = importlib.import_module(module_name) return getattr(module, class_name) + @classmethod + def _upgrade_task_data(cls, task_data: dict) -> dict: + """ + Upgrade artifacts list to dict + Migrate from execution.model and output.model to the new models field + Move docker_cmd contents into the container field + :param task_data: + :return: + """ + artifacts_path = ("execution", "artifacts") + artifacts = nested_get(task_data, artifacts_path) + if isinstance(artifacts, list): + nested_set( + task_data, + path=artifacts_path, + value={get_artifact_id(a): a for a in artifacts}, + ) + + models = task_data.get("models", {}) + now = datetime.utcnow() + for old_field, type_ in ( + ("execution.model", "input"), + ("output.model", "output"), + ): + old_path = old_field.split(".") + old_model = nested_get(task_data, old_path) + new_models = models.get(type_, []) + if old_model and not any( + m + for m in new_models + if m.get("model") == old_model or m.get("name") == type_ + ): + model_item = {"model": old_model, "name": type_, "updated": now} + if type_ == "input": + new_models = [model_item, *new_models] + else: + new_models = [*new_models, model_item] + models[type_] = new_models + nested_delete(task_data, old_path) + task_data["models"] = models + + docker_cmd_path = ("execution", "docker_cmd") + container_path = ("execution", "container") + docker_cmd = nested_get(task_data, docker_cmd_path) + if docker_cmd and not nested_get(task_data, container_path): + image, _, arguments = docker_cmd.partition(" ") + nested_set( + task_data, + path=container_path, + value={"image": image, "arguments": arguments}, + ) + nested_delete(task_data, docker_cmd_path) + + return task_data + @classmethod def _import_entity( cls, @@ -774,40 +829,7 @@ class PrePopulate: override_project_count = 0 for item in cls.json_lines(f): if cls_ == cls.task_cls: - task_data = json.loads(item) - artifacts_path = ("execution", "artifacts") - artifacts = nested_get(task_data, artifacts_path) - if isinstance(artifacts, list): - nested_set( - task_data, - artifacts_path, - value={get_artifact_id(a): a for a in artifacts}, - ) - - models = task_data.get("models", {}) - now = datetime.utcnow() - for old_field, type_ in ( - ("execution.model", "input"), - ("output.model", "output"), - ): - old_path = old_field.split(".") - old_model = nested_get(task_data, old_path) - new_models = models.get(type_, []) - if old_model and not any( - m - for m in new_models - if m.get("model") == old_model or m.get("name") == type_ - ): - model_item = {"model": old_model, "name": type_, "updated": now} - if type_ == "input": - new_models = [model_item, *new_models] - else: - new_models = [*new_models, model_item] - models[type_] = new_models - nested_delete(task_data, old_path) - task_data["models"] = models - - item = json.dumps(task_data) + item = json.dumps(cls._upgrade_task_data(task_data=json.loads(item))) print(item) doc = cls_.from_json(item, created=True) diff --git a/apiserver/mongo/migrations/0_18_0.py b/apiserver/mongo/migrations/0_18_0.py index dd31b58..a5db0e3 100644 --- a/apiserver/mongo/migrations/0_18_0.py +++ b/apiserver/mongo/migrations/0_18_0.py @@ -7,11 +7,10 @@ from apiserver.utilities.dicts import nested_get from .utils import _drop_all_indices_from_collections -def migrate_backend(db: Database): +def _migrate_task_models(db: Database): """ Collect the task output models from the models collections Move the execution and output models to new models.input and output lists - Drop the task indices to accommodate the change in schema """ tasks: Collection = db["task"] models: Collection = db["model"] @@ -44,30 +43,28 @@ def migrate_backend(db: Database): fields = {input: "execution.model", output: "output.model"} query = { "$or": [ - {field: {"$exists": True, "$nin": [None, ""]}} for field in fields.values() + {field: {"$exists": True}} for field in fields.values() ] } for doc in tasks.find(filter=query, projection=[*fields.values(), models_field]): set_commands = {} for mode, field in fields.items(): value = nested_get(doc, field.split(".")) - if not value: - continue - - model_doc = models.find_one(filter={"_id": value}, projection=["name"]) - name = model_doc.get("name", mode) if model_doc else mode - model_item = {"model": value, "name": name, "updated": now} - existing_models = nested_get(doc, (models_field, mode), default=[]) - existing_models = ( - m - for m in existing_models - if m.get("name") != name and m.get("model") != value - ) - if mode == input: - updated_models = [model_item, *existing_models] - else: - updated_models = [*existing_models, model_item] - set_commands[f"{models_field}.{mode}"] = updated_models + if value: + model_doc = models.find_one(filter={"_id": value}, projection=["name"]) + name = model_doc.get("name", mode) if model_doc else mode + model_item = {"model": value, "name": name, "updated": now} + existing_models = nested_get(doc, (models_field, mode), default=[]) + existing_models = ( + m + for m in existing_models + if m.get("name") != name and m.get("model") != value + ) + if mode == input: + updated_models = [model_item, *existing_models] + else: + updated_models = [*existing_models, model_item] + set_commands[f"{models_field}.{mode}"] = updated_models tasks.update_one( {"_id": doc["_id"]}, @@ -77,4 +74,30 @@ def migrate_backend(db: Database): }, ) + +def _migrate_docker_cmd(db: Database): + tasks: Collection = db["task"] + + docker_cmd_field = "execution.docker_cmd" + query = {docker_cmd_field: {"$exists": True}} + + for doc in tasks.find(filter=query, projection=(docker_cmd_field,)): + set_commands = {} + docker_cmd = nested_get(doc, docker_cmd_field.split(".")) + if docker_cmd: + image, _, arguments = docker_cmd.partition(" ") + set_commands["container"] = {"image": image, "arguments": arguments} + + tasks.update_one( + {"_id": doc["_id"]}, + { + "$unset": {docker_cmd_field: 1}, + **({"$set": set_commands} if set_commands else {}), + } + ) + + +def migrate_backend(db: Database): + _migrate_task_models(db) + _migrate_docker_cmd(db) _drop_all_indices_from_collections(db, ["task*"]) diff --git a/apiserver/services/tasks.py b/apiserver/services/tasks.py index 8e78ab0..86d6878 100644 --- a/apiserver/services/tasks.py +++ b/apiserver/services/tasks.py @@ -90,6 +90,7 @@ from apiserver.services.utils import ( conform_tag_fields, conform_output_tags, ModelsBackwardsCompatibility, + DockerCmdBackwardsCompatibility, ) from apiserver.timing_context import TimingContext from apiserver.utilities.partial_version import PartialVersion @@ -348,6 +349,7 @@ def prepare_for_save(call: APICall, fields: dict, previous_task: Task = None): params_prepare_for_save(fields, previous_task=previous_task) artifacts_prepare_for_save(fields) ModelsBackwardsCompatibility.prepare_for_save(call, fields) + DockerCmdBackwardsCompatibility.prepare_for_save(call, fields) # Strip all script fields (remove leading and trailing whitespace chars) to avoid unusable names and paths for field in task_script_stripped_fields: @@ -369,9 +371,11 @@ def unprepare_from_saved(call: APICall, tasks_data: Union[Sequence[dict], dict]) conform_output_tags(call, tasks_data) ModelsBackwardsCompatibility.unprepare_from_saved(call, tasks_data) + DockerCmdBackwardsCompatibility.unprepare_from_saved(call, tasks_data) + + need_legacy_params = call.requested_endpoint_version < PartialVersion("2.9") for data in tasks_data: - need_legacy_params = call.requested_endpoint_version < PartialVersion("2.9") params_unprepare_from_saved( fields=data, copy_to_legacy=need_legacy_params, ) diff --git a/apiserver/services/utils.py b/apiserver/services/utils.py index 9f81f21..d91d509 100644 --- a/apiserver/services/utils.py +++ b/apiserver/services/utils.py @@ -100,17 +100,15 @@ class ModelsBackwardsCompatibility: for mode, field in cls.mode_to_fields.items(): value = nested_get(fields, field) - if not value: - continue + if value: + nested_set( + fields, + (cls.models_field, mode), + value=[dict(name=mode, model=value, updated=datetime.utcnow())], + ) nested_delete(fields, field) - nested_set( - fields, - (cls.models_field, mode), - value=[dict(name=mode, model=value, updated=datetime.utcnow())], - ) - @classmethod def unprepare_from_saved( cls, call: APICall, tasks_data: Union[Sequence[dict], dict] @@ -130,3 +128,38 @@ class ModelsBackwardsCompatibility: model = models[0] if mode == "input" else models[-1] if model: nested_set(task, field, model.get("model")) + + +class DockerCmdBackwardsCompatibility: + max_version = PartialVersion("2.13") + field = ("execution", "docker_cmd") + + @classmethod + def prepare_for_save(cls, call: APICall, fields: dict): + if call.requested_endpoint_version > cls.max_version: + return + + docker_cmd = nested_get(fields, cls.field) + if docker_cmd: + image, _, arguments = docker_cmd.partition(" ") + nested_set(fields, ("container", "image"), value=image) + nested_set(fields, ("container", "arguments"), value=arguments) + + nested_delete(fields, cls.field) + + @classmethod + def unprepare_from_saved(cls, call: APICall, tasks_data: Union[Sequence[dict], dict]): + if call.requested_endpoint_version > cls.max_version: + return + + if isinstance(tasks_data, dict): + tasks_data = [tasks_data] + + for task in tasks_data: + container = task.get("container") + if not container or not container.get("image"): + continue + + docker_cmd = " ".join(filter(None, map(container.get, ("image", "arguments")))) + if docker_cmd: + nested_set(task, cls.field, docker_cmd)