Add support for new IDs generation when importing projects

This commit is contained in:
allegroai 2022-07-08 18:04:40 +03:00
parent d760cf5835
commit fe29743c54

View File

@ -24,6 +24,7 @@ from typing import (
Callable, Callable,
) )
from urllib.parse import unquote, urlparse from urllib.parse import unquote, urlparse
from uuid import uuid4
from zipfile import ZipFile, ZIP_BZIP2 from zipfile import ZipFile, ZIP_BZIP2
import mongoengine import mongoengine
@ -690,6 +691,19 @@ class PrePopulate:
continue continue
yield clean yield clean
@classmethod
def _generate_new_ids(
cls, reader: ZipFile, entity_files: Sequence
) -> Mapping[str, str]:
ids = {}
for entity_file in entity_files:
with reader.open(entity_file) as f:
for item in cls.json_lines(f):
orig_id = json.loads(item).get("_id")
if orig_id:
ids[orig_id] = str(uuid4()).replace("-", "")
return ids
@classmethod @classmethod
def _import( def _import(
cls, cls,
@ -704,37 +718,46 @@ class PrePopulate:
Start from entities since event import will require the tasks already in DB Start from entities since event import will require the tasks already in DB
""" """
event_file_ending = cls.events_file_suffix + ".json" event_file_ending = cls.events_file_suffix + ".json"
entity_files = ( entity_files = [
fi fi
for fi in reader.filelist for fi in reader.filelist
if not fi.orig_filename.endswith(event_file_ending) if not fi.orig_filename.endswith(event_file_ending)
and fi.orig_filename != cls.metadata_filename and fi.orig_filename != cls.metadata_filename
) ]
metadata = metadata or {} metadata = metadata or {}
old_to_new_ids = (
cls._generate_new_ids(reader, entity_files)
if metadata.get("new_ids")
else {}
)
tasks = [] tasks = []
for entity_file in entity_files: for entity_file in entity_files:
with reader.open(entity_file) as f: with reader.open(entity_file) as f:
full_name = splitext(entity_file.orig_filename)[0] full_name = splitext(entity_file.orig_filename)[0]
print(f"Reading {reader.filename}:{full_name}...") print(f"Reading {reader.filename}:{full_name}...")
res = cls._import_entity(f, full_name, company_id, user_id, metadata) res = cls._import_entity(
f, full_name, company_id, user_id, metadata, old_to_new_ids
)
if res: if res:
tasks = res tasks = res
if sort_tasks_by_last_updated: if sort_tasks_by_last_updated:
tasks = sorted(tasks, key=attrgetter("last_update")) tasks = sorted(tasks, key=attrgetter("last_update"))
new_to_old_ids = {v: k for k, v in old_to_new_ids.items()}
for task in tasks: for task in tasks:
old_task_id = new_to_old_ids.get(task.id, task.id)
events_file = first( events_file = first(
fi fi
for fi in reader.filelist for fi in reader.filelist
if fi.orig_filename.endswith(task.id + event_file_ending) if fi.orig_filename.endswith(old_task_id + event_file_ending)
) )
if not events_file: if not events_file:
continue continue
with reader.open(events_file) as f: with reader.open(events_file) as f:
full_name = splitext(events_file.orig_filename)[0] full_name = splitext(events_file.orig_filename)[0]
print(f"Reading {reader.filename}:{full_name}...") print(f"Reading {reader.filename}:{full_name}...")
cls._import_events(f, full_name, company_id, user_id) cls._import_events(f, company_id, user_id, task.id)
@classmethod @classmethod
def _get_entity_type(cls, full_name) -> Type[mongoengine.Document]: def _get_entity_type(cls, full_name) -> Type[mongoengine.Document]:
@ -847,6 +870,7 @@ class PrePopulate:
company_id: str, company_id: str,
user_id: str, user_id: str,
metadata: Mapping[str, Any], metadata: Mapping[str, Any],
old_to_new_ids: Mapping[str, str] = None,
) -> Optional[Sequence[Task]]: ) -> Optional[Sequence[Task]]:
cls_ = cls._get_entity_type(full_name) cls_ = cls._get_entity_type(full_name)
print(f"Writing {cls_.__name__.lower()}s into database") print(f"Writing {cls_.__name__.lower()}s into database")
@ -858,6 +882,11 @@ class PrePopulate:
cls.project_cls: cls._upgrade_project_data, cls.project_cls: cls._upgrade_project_data,
} }
for item in cls.json_lines(f): for item in cls.json_lines(f):
if old_to_new_ids:
for old_id, new_id in old_to_new_ids.items():
# replace ids only when they are standalone strings
# otherwise artifacts uris that contain old ids may get damaged
item = item.replace(f'"{old_id}"', f'"{new_id}"')
upgrade_func = data_upgrade_funcs.get(cls_) upgrade_func = data_upgrade_funcs.get(cls_)
if upgrade_func: if upgrade_func:
item = json.dumps(upgrade_func(json.loads(item))) item = json.dumps(upgrade_func(json.loads(item)))
@ -894,11 +923,15 @@ class PrePopulate:
return tasks return tasks
@classmethod @classmethod
def _import_events(cls, f: IO[bytes], full_name: str, company_id: str, _): def _import_events(
_, _, task_id = full_name[0 : -len(cls.events_file_suffix)].rpartition("_") cls, f: IO[bytes], company_id: str, _, task_id: str
):
print(f"Writing events for task {task_id} into database") print(f"Writing events for task {task_id} into database")
for events_chunk in chunked_iter(cls.json_lines(f), 1000): for events_chunk in chunked_iter(cls.json_lines(f), 1000):
events = [json.loads(item) for item in events_chunk] events = [json.loads(item) for item in events_chunk]
for ev in events:
ev["task"] = task_id
ev["company_id"] = company_id
cls.event_bll.add_events( cls.event_bll.add_events(
company_id, events=events, worker="", allow_locked_tasks=True company_id, events=events, worker="", allow_locked_tasks=True
) )