From dd49ba180a52a981db639e9b3ac78e880d57db52 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 23 Mar 2023 19:11:45 +0200 Subject: [PATCH] Improve statistics on projects children --- apiserver/apimodels/projects.py | 4 - apiserver/bll/project/project_bll.py | 269 +++++++++++++++--- apiserver/bll/project/sub_projects.py | 20 +- apiserver/schema/services/projects.conf | 30 +- apiserver/services/projects.py | 81 ++++-- apiserver/tests/automated/test_subprojects.py | 20 +- 6 files changed, 346 insertions(+), 78 deletions(-) diff --git a/apiserver/apimodels/projects.py b/apiserver/apimodels/projects.py index 017de80..24b4e82 100644 --- a/apiserver/apimodels/projects.py +++ b/apiserver/apimodels/projects.py @@ -58,10 +58,6 @@ class ProjectModelMetadataValuesRequest(MultiProjectRequest): allow_public = fields.BoolField(default=True) -class ChildrenCondition(models.Base): - system_tags = fields.ListField([str]) - - class ProjectChildrenType(Enum): pipeline = "pipeline" report = "report" diff --git a/apiserver/bll/project/project_bll.py b/apiserver/bll/project/project_bll.py index 5716463..c30f776 100644 --- a/apiserver/bll/project/project_bll.py +++ b/apiserver/bll/project/project_bll.py @@ -1,8 +1,7 @@ -import itertools from collections import defaultdict from datetime import datetime, timedelta from functools import reduce -from itertools import groupby +from itertools import groupby, chain from operator import itemgetter from typing import ( Sequence, @@ -41,11 +40,14 @@ from .sub_projects import ( _ids_with_children, _ids_with_parents, _get_project_depth, + ProjectsChildren, ) log = config.logger(__file__) max_depth = config.get("services.projects.sub_projects.max_depth", 10) reports_project_name = ".reports" +datasets_project_name = ".datasets" +pipelines_project_name = ".pipelines" reports_tag = "reports" dataset_tag = "dataset" pipeline_tag = "pipeline" @@ -516,7 +518,7 @@ class ProjectBLL: def aggregate_project_data( func: Callable[[T, T], T], project_ids: Sequence[str], - child_projects: Mapping[str, Sequence[Project]], + child_projects: ProjectsChildren, data: Mapping[str, T], ) -> Dict[str, T]: """ @@ -568,6 +570,136 @@ class ProjectBLL: for r in Task.aggregate(task_runtime_pipeline) } + @staticmethod + def _get_projects_children( + project_ids: Sequence[str], search_hidden: bool, allowed_ids: Sequence[str], + ) -> Tuple[ProjectsChildren, Set[str]]: + child_projects = _get_sub_projects( + project_ids, + _only=("id", "name"), + search_hidden=search_hidden, + allowed_ids=allowed_ids, + ) + return ( + child_projects, + {c.id for c in chain.from_iterable(child_projects.values())}, + ) + + @staticmethod + def _get_children_info( + project_ids: Sequence[str], child_projects: ProjectsChildren + ) -> dict: + return { + project: sorted( + [{"id": c.id, "name": c.name} for c in child_projects.get(project, [])], + key=itemgetter("name"), + ) + for project in project_ids + } + + @classmethod + def _get_project_dataset_stats_core( + cls, + company: str, + project_ids: Sequence[str], + project_field: str, + entity_class: Type[AttributedDocument], + include_children: bool = True, + filter_: Mapping[str, Any] = None, + users: Sequence[str] = None, + selected_project_ids: Sequence[str] = None, + ) -> Tuple[Dict[str, dict], Dict[str, dict]]: + if not project_ids: + return {}, {} + + child_projects = {} + project_ids_with_children = set(project_ids) + if include_children: + child_projects, children_ids = cls._get_projects_children( + project_ids, search_hidden=True, allowed_ids=selected_project_ids, + ) + project_ids_with_children |= children_ids + + pipeline = [ + { + "$match": cls.get_match_conditions( + company=company, + project_ids=list(project_ids_with_children), + filter_=filter_, + users=users, + project_field=project_field, + ) + }, + {"$project": {project_field: 1, "tags": 1}}, + { + "$group": { + "_id": f"${project_field}", + "count": {"$sum": 1}, + "tags": {"$push": "$tags"}, + } + }, + ] + res = entity_class.aggregate(pipeline) + + project_stats = { + result["_id"]: { + "count": result.get("count", 0), + "tags": set(chain.from_iterable(result.get("tags", []))), + } + for result in res + } + + def concat_dataset_stats(a: dict, b: dict) -> dict: + return { + "count": a.get("count", 0) + b.get("count", 0), + "tags": a.get("tags", {}) | b.get("tags", {}), + } + + top_project_stats = cls.aggregate_project_data( + func=concat_dataset_stats, + project_ids=project_ids, + child_projects=child_projects, + data=project_stats, + ) + for _, stat in top_project_stats.items(): + stat["tags"] = sorted(list(stat.get("tags", {}))) + + empty_stats = {"count": 0, "tags": []} + stats = { + project: {"datasets": top_project_stats.get(project, empty_stats)} + for project in project_ids + } + return stats, cls._get_children_info(project_ids, child_projects) + + @classmethod + def get_project_dataset_stats( + cls, + company: str, + project_ids: Sequence[str], + include_children: bool = True, + filter_: Mapping[str, Any] = None, + users: Sequence[str] = None, + selected_project_ids: Sequence[str] = None, + ) -> Tuple[Dict[str, dict], Dict[str, dict]]: + filter_ = filter_ or {} + filter_system_tags = filter_.get("system_tags") + if not isinstance(filter_system_tags, list): + filter_system_tags = [] + if dataset_tag not in filter_system_tags: + filter_system_tags.append(dataset_tag) + filter_["system_tags"] = filter_system_tags + + return cls._get_project_dataset_stats_core( + company=company, + project_ids=project_ids, + project_field="parent", + entity_class=Project, + include_children=include_children, + filter_=filter_, + users=users, + selected_project_ids=selected_project_ids, + ) + @classmethod def get_project_stats( cls, @@ -583,19 +715,16 @@ class ProjectBLL: if not project_ids: return {}, {} - child_projects = ( - _get_sub_projects( + child_projects = {} + project_ids_with_children = set(project_ids) + if include_children: + child_projects, children_ids = cls._get_projects_children( project_ids, - _only=("id", "name"), search_hidden=search_hidden, allowed_ids=selected_project_ids, ) - if include_children - else {} - ) - project_ids_with_children = set(project_ids) | { - c.id for c in itertools.chain.from_iterable(child_projects.values()) - } + project_ids_with_children |= children_ids + status_count_pipeline, runtime_pipeline = cls.make_projects_get_all_pipelines( company, project_ids=list(project_ids_with_children), @@ -699,14 +828,7 @@ class ProjectBLL: for project in project_ids } - children = { - project: sorted( - [{"id": c.id, "name": c.name} for c in child_projects.get(project, [])], - key=itemgetter("name"), - ) - for project in project_ids - } - return stats, children + return stats, cls._get_children_info(project_ids, child_projects) @classmethod def get_active_users( @@ -769,13 +891,13 @@ class ProjectBLL: children_type: ProjectChildrenType = None, ) -> Tuple[Sequence[str], Sequence[str]]: """ - Get the projects ids with children matching children_type (if passed) or created by the passed user + Get the projects ids matching children_condition (if passed) or where the passed user created any tasks including all the parents of these projects If project ids are specified then filter the results by these project ids """ if not (users or children_type): raise errors.bad_request.ValidationError( - "Either active users or children_type should be specified" + "Either active users or children_condition should be specified" ) query = ( @@ -786,29 +908,42 @@ class ProjectBLL: if users: query &= Q(user__in=users) + project_query = None if children_type == ProjectChildrenType.dataset: - project_query = query & Q(system_tags__in=[dataset_tag]) - entity_queries = {} + child_queries = { + Project: query + & Q(system_tags__in=[dataset_tag], basename__ne=datasets_project_name) + } elif children_type == ProjectChildrenType.pipeline: - project_query = query & Q(system_tags__in=[pipeline_tag]) - entity_queries = {} + child_queries = {Task: query & Q(system_tags__in=[pipeline_tag])} elif children_type == ProjectChildrenType.report: - project_query = None - entity_queries = {Task: query & Q(system_tags__in=[reports_tag])} + child_queries = {Task: query & Q(system_tags__in=[reports_tag])} else: project_query = query - entity_queries = {entity_cls: query for entity_cls in cls.child_classes} + child_queries = {entity_cls: query for entity_cls in cls.child_classes} if project_ids: ids_with_children = _ids_with_children(project_ids) if project_query: project_query &= Q(id__in=ids_with_children) - for entity_cls in entity_queries: - entity_queries[entity_cls] &= Q(project__in=ids_with_children) + for child_cls in child_queries: + child_queries[child_cls] &= ( + Q(parent__in=ids_with_children) + if child_cls is Project + else Q(project__in=ids_with_children) + ) - res = {p.id for p in Project.objects(project_query).only("id")} if project_query else set() - for cls_, query_ in entity_queries.items(): - res |= set(cls_.objects(query_).distinct(field="project")) + res = ( + {p.id for p in Project.objects(project_query).only("id")} + if project_query + else set() + ) + for cls_, query_ in child_queries.items(): + res |= set( + cls_.objects(query_).distinct( + field="parent" if cls_ is Project else "project" + ) + ) res = list(res) if not res: @@ -894,10 +1029,11 @@ class ProjectBLL: project_ids: Sequence[str], filter_: Mapping[str, Any], users: Sequence[str], + project_field: str = "project", ): conditions = { "company": {"$in": [None, "", company]}, - "project": {"$in": project_ids}, + project_field: {"$in": project_ids}, } if users: conditions["user"] = {"$in": users} @@ -922,6 +1058,69 @@ class ProjectBLL: return conditions + @classmethod + def _calc_own_datasets_core( + cls, + company: str, + project_ids: Sequence[str], + project_field: str, + entity_class: Type[AttributedDocument], + filter_: Mapping[str, Any] = None, + users: Sequence[str] = None, + ) -> Dict[str, dict]: + """ + Returns the amount of hyper datasets per requested project + """ + if not project_ids: + return {} + + pipeline = [ + { + "$match": cls.get_match_conditions( + company=company, + project_ids=project_ids, + filter_=filter_, + users=users, + project_field=project_field, + ) + }, + {"$project": {project_field: 1}}, + {"$group": {"_id": f"${project_field}", "count": {"$sum": 1}}}, + ] + datasets = { + data["_id"]: data["count"] for data in entity_class.aggregate(pipeline) + } + + return {pid: {"own_datasets": datasets.get(pid, 0)} for pid in project_ids} + + @classmethod + def calc_own_datasets( + cls, + company: str, + project_ids: Sequence[str], + filter_: Mapping[str, Any] = None, + users: Sequence[str] = None, + ) -> Dict[str, dict]: + """ + Returns the amount of datasets per requested project + """ + filter_ = filter_ or {} + filter_system_tags = filter_.get("system_tags") + if not isinstance(filter_system_tags, list): + filter_system_tags = [] + if dataset_tag not in filter_system_tags: + filter_system_tags.append(dataset_tag) + filter_["system_tags"] = filter_system_tags + + return cls._calc_own_datasets_core( + company=company, + project_ids=project_ids, + project_field="parent", + entity_class=Project, + filter_=filter_, + users=users, + ) + @classmethod def calc_own_contents( cls, diff --git a/apiserver/bll/project/sub_projects.py b/apiserver/bll/project/sub_projects.py index cd10ff5..7b5d758 100644 --- a/apiserver/bll/project/sub_projects.py +++ b/apiserver/bll/project/sub_projects.py @@ -14,14 +14,16 @@ def _get_project_depth(project_name: str) -> int: return len(list(filter(None, project_name.split(name_separator)))) -def _validate_project_name(project_name: str) -> Tuple[str, str]: +def _validate_project_name(project_name: str, raise_if_empty=True) -> Tuple[str, str]: """ Remove redundant '/' characters. Ensure that the project name is not empty Return the cleaned up project name and location """ - name_parts = list(filter(None, project_name.split(name_separator))) + name_parts = [p.strip() for p in project_name.split(name_separator) if p] if not name_parts: - raise errors.bad_request.InvalidProjectName(name=project_name) + if raise_if_empty: + raise errors.bad_request.InvalidProjectName(name=project_name) + return "", "" return name_separator.join(name_parts), name_separator.join(name_parts[:-1]) @@ -34,7 +36,7 @@ def _ensure_project( If needed auto-create the project and all the missing projects in the path to it Return the project """ - name = name.strip(name_separator) + name, location = _validate_project_name(name, raise_if_empty=False) if not name: return None @@ -43,7 +45,6 @@ def _ensure_project( return project now = datetime.utcnow() - name, location = _validate_project_name(name) project = Project( id=database.utils.id(), user=user, @@ -101,12 +102,15 @@ def _get_writable_project_from_name( return qs.first() +ProjectsChildren = Mapping[str, Sequence[Project]] + + def _get_sub_projects( project_ids: Sequence[str], _only: Sequence[str] = ("id", "path"), search_hidden=True, allowed_ids: Sequence[str] = None, -) -> Mapping[str, Sequence[Project]]: +) -> ProjectsChildren: """ Return the list of child projects of all the levels for the parent project ids """ @@ -159,14 +163,14 @@ def _update_subproject_names( now = datetime.utcnow() for child in children: child_suffix = name_separator.join( - child.name.split(name_separator)[len(old_name.split(name_separator)) :] + child.name.split(name_separator)[len(old_name.split(name_separator)):] ) updates = { "name": name_separator.join((project.name, child_suffix)), "last_update": now, } if update_path: - updates["path"] = project.path + child.path[len(old_path) :] + updates["path"] = project.path + child.path[len(old_path):] updated += child.update(upsert=False, **updates) return updated diff --git a/apiserver/schema/services/projects.conf b/apiserver/schema/services/projects.conf index 95ee096..c238d32 100644 --- a/apiserver/schema/services/projects.conf +++ b/apiserver/schema/services/projects.conf @@ -61,12 +61,26 @@ _definitions { type: string } last_update { - description: """Last project update time. Reflects the last time the project metadata was changed or a task in this project has changed status""" + description: "Last project update time. Reflects the last time the project metadata was changed or a task in this project has changed status" type: string format: "date-time" } } } + stats_datasets { + type: object + properties { + count { + description: Number of datasets + type: integer + } + tags { + description: Dataset tags + type: array + items {type: string} + } + } + } stats_status_count { type: object properties { @@ -141,6 +155,10 @@ _definitions { description: "Stats for archived tasks" "$ref": "#/definitions/stats_status_count" } + datasets { + description: "Stats for childrent datasets" + "$ref": "#/definitions/stats_datasets" + } } } projects_get_all_response_single { @@ -191,11 +209,15 @@ _definitions { type: string } last_update { - description: """Last project update time. Reflects the last time the project metadata was changed or a task in this project has changed status""" + description: "Last project update time. Reflects the last time the project metadata was changed or a task in this project has changed status" type: string format: "date-time" } // extra properties + hidden { + description: "Returned if the search_hidden flag was specified in the get_all_ex call and the project is hidden" + type: boolean + } stats { description: "Additional project stats" "$ref": "#/definitions/stats" @@ -217,6 +239,10 @@ _definitions { } } } + own_datasets { + description: "The amount of datasets/hyperdatasers under this project (without children projects). Returned if 'check_own_contents' flag is set in the request and children_type is set to 'dataset' or 'hyperdataset'" + type: integer + } own_tasks { description: "The amount of tasks under this project (without children projects). Returned if 'check_own_contents' flag is set in the request" type: integer diff --git a/apiserver/services/projects.py b/apiserver/services/projects.py index c17aa7c..b9350cc 100644 --- a/apiserver/services/projects.py +++ b/apiserver/services/projects.py @@ -1,4 +1,4 @@ -from typing import Sequence, Optional +from typing import Sequence, Optional, Tuple import attr from mongoengine import Q @@ -22,7 +22,7 @@ from apiserver.apimodels.projects import ( ) from apiserver.bll.organization import OrgBLL, Tags from apiserver.bll.project import ProjectBLL, ProjectQueries -from apiserver.bll.project.project_bll import dataset_tag, pipeline_tag, reports_tag +from apiserver.bll.project.project_bll import pipeline_tag, reports_tag from apiserver.bll.project.project_cleanup import ( delete_project, validate_project_delete, @@ -99,15 +99,16 @@ def _adjust_search_parameters(data: dict, shallow_search: bool): data["parent"] = [None] -def _get_filter_from_children_type(type_: ProjectChildrenType) -> Optional[dict]: - if type_ == ProjectChildrenType.dataset: - return {"system_tags": [dataset_tag], "type": [TaskType.data_processing]} - if type_ == ProjectChildrenType.pipeline: - return {"system_tags": [pipeline_tag], "type": [TaskType.controller]} - if type_ == ProjectChildrenType.report: - return {"system_tags": [reports_tag], "type": [TaskType.report]} +def _get_project_stats_filter(request: ProjectsGetRequest) -> Tuple[Optional[dict], bool]: + if request.include_stats_filter or not request.children_type: + return request.include_stats_filter, request.search_hidden - return None + if request.children_type == ProjectChildrenType.pipeline: + return {"system_tags": [pipeline_tag], "type": [TaskType.controller]}, True + if request.children_type == ProjectChildrenType.report: + return {"system_tags": [reports_tag], "type": [TaskType.report]}, True + + return request.include_stats_filter, request.search_hidden @endpoint("projects.get_all_ex", request_data_model=ProjectsGetRequest) @@ -142,6 +143,14 @@ def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest): data["id"] = ids ret_params = {} + + remove_system_tags = False + if request.search_hidden: + only_fields = data.get("only_fields") + if isinstance(only_fields, list) and "system_tags" not in only_fields: + only_fields.append("system_tags") + remove_system_tags = True + projects: Sequence[dict] = Project.get_many_with_join( company=company_id, query_dict=data, @@ -152,28 +161,50 @@ def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest): if not projects: return {"projects": projects, **ret_params} + if request.search_hidden: + for p in projects: + system_tags = ( + p.pop("system_tags", []) + if remove_system_tags + else p.get("system_tags", []) + ) + if EntityVisibility.hidden.value in system_tags: + p["hidden"] = True + conform_output_tags(call, projects) project_ids = list({project["id"] for project in projects}) - if request.check_own_contents or request.include_stats: - if request.children_type and not request.include_stats_filter: - filter_ = _get_filter_from_children_type(request.children_type) - search_hidden = True if filter_ else request.search_hidden + if request.check_own_contents: + if request.children_type == ProjectChildrenType.dataset: + contents = project_bll.calc_own_datasets( + company=company_id, + project_ids=project_ids, + filter_=request.include_stats_filter, + users=request.active_users, + ) else: - filter_ = request.include_stats_filter - search_hidden = request.search_hidden - - if request.check_own_contents: contents = project_bll.calc_own_contents( company=company_id, project_ids=project_ids, - filter_=filter_, + filter_=_get_project_stats_filter(request)[0], users=request.active_users, ) - for project in projects: - project.update(**contents.get(project["id"], {})) - if request.include_stats: + for project in projects: + project.update(**contents.get(project["id"], {})) + + if request.include_stats: + if request.children_type == ProjectChildrenType.dataset: + stats, children = project_bll.get_project_dataset_stats( + company=company_id, + project_ids=project_ids, + include_children=request.stats_with_children, + filter_=request.include_stats_filter, + users=request.active_users, + selected_project_ids=selected_project_ids, + ) + else: + filter_, search_hidden = _get_project_stats_filter(request) stats, children = project_bll.get_project_stats( company=company_id, project_ids=project_ids, @@ -185,9 +216,9 @@ def get_all_ex(call: APICall, company_id: str, request: ProjectsGetRequest): selected_project_ids=selected_project_ids, ) - for project in projects: - project["stats"] = stats[project["id"]] - project["sub_projects"] = children[project["id"]] + for project in projects: + project["stats"] = stats[project["id"]] + project["sub_projects"] = children[project["id"]] if request.include_dataset_stats: dataset_stats = project_bll.get_dataset_stats( diff --git a/apiserver/tests/automated/test_subprojects.py b/apiserver/tests/automated/test_subprojects.py index 52b56ae..3f726d1 100644 --- a/apiserver/tests/automated/test_subprojects.py +++ b/apiserver/tests/automated/test_subprojects.py @@ -36,8 +36,11 @@ class TestSubProjects(TestService): def test_query_children(self): test_root_name = "TestQueryChildren" test_root = self._temp_project(name=test_root_name) + dataset_tags = ["hello", "world"] dataset_project = self._temp_project( - name=f"{test_root_name}/Project1/Dataset", system_tags=["dataset"] + name=f"{test_root_name}/Project1/Dataset", + system_tags=["dataset"], + tags=dataset_tags, ) self._temp_task( name="dataset task", @@ -80,12 +83,21 @@ class TestSubProjects(TestService): children_type=type_, shallow_search=True, include_stats=True, + check_own_contents=True, ).projects self.assertEqual({p.basename for p in projects}, {f"Project{i+1}"}) p = projects[0] - self.assertEqual( - p.stats.active.total_tasks, 1 - ) + if type_ in ("dataset",): + self.assertEqual(p.own_datasets, 1) + self.assertIsNone(p.get("own_tasks")) + self.assertEqual(p.stats.datasets.count, 1) + self.assertEqual(p.stats.datasets.tags, dataset_tags) + else: + self.assertEqual(p.own_tasks, 0) + self.assertIsNone(p.get("own_datasets")) + self.assertEqual( + p.stats.active.total_tasks, 1 if p.basename != "Project4" else 0 + ) def test_project_aggregations(self): """This test requires user with user_auth_only... credentials in db"""