import itertools from datetime import datetime from typing import Tuple, Optional, Sequence, Mapping from apiserver import database from apiserver.apierrors import errors from apiserver.database.model import EntityVisibility from apiserver.database.model.project import Project name_separator = "/" def _get_project_depth(project_name: str) -> int: return len(list(filter(None, project_name.split(name_separator)))) def _validate_project_name(project_name: str, raise_if_empty=True) -> Tuple[str, str]: """ Remove redundant '/' characters. Ensure that the project name is not empty Return the cleaned up project name and location """ name_parts = [p.strip() for p in project_name.split(name_separator) if p] if not name_parts: if raise_if_empty: raise errors.bad_request.InvalidProjectName(name=project_name) return "", "" return name_separator.join(name_parts), name_separator.join(name_parts[:-1]) def _ensure_project( company: str, user: str, name: str, creation_params: dict = None ) -> Optional[Project]: """ Makes sure that the project with the given name exists If needed auto-create the project and all the missing projects in the path to it Return the project """ name, location = _validate_project_name(name, raise_if_empty=False) if not name: return None project = _get_writable_project_from_name(company, name) if project: return project now = datetime.utcnow() project = Project(, user=user, company=company, created=now, last_update=now, name=name, basename=name.split("/")[-1], **(creation_params or dict(description="")), ) parent = _ensure_project(company, user, location, creation_params=creation_params) _save_under_parent(project=project, parent=parent) if parent: parent.update(last_update=now) return project def _save_under_parent(project: Project, parent: Optional[Project]): """ Save the project under the given parent project or top level (parent=None) Check that the project location matches the parent name """ location, _, _ = if not parent: if location: raise ValueError( f"Project location {location} does not match empty parent name" ) project.parent = None project.path = [] return if location != raise ValueError( f"Project location {location} does not match parent name {}" ) project.parent = project.path = [*(parent.path or []),] def _get_writable_project_from_name( company, name, _only: Optional[Sequence[str]] = ("id", "name", "path", "company", "parent"), ) -> Optional[Project]: """ Return a project from name. If the project not found then return None """ qs = Project.objects(company=company, name=name) if _only: qs = qs.only(*_only) return qs.first() ProjectsChildren = Mapping[str, Sequence[Project]] def _get_sub_projects( project_ids: Sequence[str], _only: Sequence[str] = ("id", "path"), search_hidden=True, allowed_ids: Sequence[str] = None, ) -> ProjectsChildren: """ Return the list of child projects of all the levels for the parent project ids """ query = dict(path__in=project_ids) if not search_hidden: query["system_tags__nin"] = [EntityVisibility.hidden.value] if allowed_ids: query["id__in"] = allowed_ids qs = Project.objects(**query) if _only: _only = set(_only) | {"path"} qs = qs.only(*_only) subprojects = list(qs) return { pid: [s for s in subprojects if pid in (s.path or [])] for pid in project_ids } def _ids_with_parents(project_ids: Sequence[str]) -> Sequence[str]: """ Return project ids with all the parent projects """ projects = Project.objects(id__in=project_ids).only("id", "path") parent_ids = set(itertools.chain.from_iterable(p.path for p in projects if p.path)) return list({*( for p in projects), *parent_ids}) def _ids_with_children(project_ids: Sequence[str]) -> Sequence[str]: """ Return project ids with the ids of all the subprojects """ children_ids = Project.objects(path__in=project_ids).scalar("id") return list({*project_ids, *children_ids}) def _update_subproject_names( project: Project, children: Sequence[Project], old_name: str, update_path: bool = False, old_path: Sequence[str] = None, ) -> int: """ Update sub project names when the base project name changes Optionally update the paths """ updated = 0 now = datetime.utcnow() for child in children: child_suffix = name_separator.join([len(old_name.split(name_separator)):] ) updates = { "name": name_separator.join((, child_suffix)), "last_update": now, } if update_path: updates["path"] = project.path + child.path[len(old_path):] updated += child.update(upsert=False, **updates) return updated def _reposition_project_with_children( project: Project, children: Sequence[Project], parent: Project ) -> int: new_location = if parent else None old_name = old_path = project.path = name_separator.join( filter(None, (new_location,[-1])) ) project.last_update = datetime.utcnow() _save_under_parent(project, parent=parent) moved = 1 + _update_subproject_names( project=project, children=children, old_name=old_name, update_path=True, old_path=old_path, ) return moved