From 99bf89a3609a000e3bdf3a8a044fa708612655c4 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 5 Mar 2020 14:54:34 +0200 Subject: [PATCH] Add pre-populate feature to allow starting a new server installation with packaged example experiments --- docker-compose.yml | 2 + server/api_version.py | 1 + server/config/basic.py | 2 +- server/config/default/apiserver.conf | 6 + server/elastic/initialize.py | 27 +++ server/init_data.py | 222 ------------------------ server/mongo/initialize/__init__.py | 70 ++++++++ server/mongo/initialize/migration.py | 86 +++++++++ server/mongo/initialize/pre_populate.py | 153 ++++++++++++++++ server/mongo/initialize/user.py | 74 ++++++++ server/mongo/initialize/util.py | 40 +++++ server/requirements.txt | 49 +++--- server/server.py | 3 +- 13 files changed, 487 insertions(+), 248 deletions(-) create mode 100644 server/api_version.py create mode 100644 server/elastic/initialize.py delete mode 100644 server/init_data.py create mode 100644 server/mongo/initialize/__init__.py create mode 100644 server/mongo/initialize/migration.py create mode 100644 server/mongo/initialize/pre_populate.py create mode 100644 server/mongo/initialize/user.py create mode 100644 server/mongo/initialize/util.py diff --git a/docker-compose.yml b/docker-compose.yml index a57b780..92d14bc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,8 @@ services: TRAINS_MONGODB_SERVICE_PORT: 27017 TRAINS_REDIS_SERVICE_HOST: redis TRAINS_REDIS_SERVICE_PORT: 6379 + TRAINS__apiserver__mongo__pre_populate__enabled: "true" + TRAINS__apiserver__mongo__pre_populate__zip_file: "/opt/trains/db-pre-populate/export.zip" ports: - "8008:8008" networks: diff --git a/server/api_version.py b/server/api_version.py new file mode 100644 index 0000000..2614ce9 --- /dev/null +++ b/server/api_version.py @@ -0,0 +1 @@ +__version__ = "2.7.0" diff --git a/server/config/basic.py b/server/config/basic.py index a203249..e86752e 100644 --- a/server/config/basic.py +++ b/server/config/basic.py @@ -47,7 +47,7 @@ class BasicConfig: def logger(self, name): if Path(name).is_file(): name = Path(name).stem - path = ".".join((self.prefix, Path(name).stem)) + path = ".".join((self.prefix, name)) return logging.getLogger(path) def _read_extra_env_config_values(self): diff --git a/server/config/default/apiserver.conf b/server/config/default/apiserver.conf index f5be1e3..c39041d 100644 --- a/server/config/default/apiserver.conf +++ b/server/config/default/apiserver.conf @@ -34,6 +34,12 @@ aggregate { allow_disk_use: true } + + pre_populate { + enabled: false + zip_file: "/path/to/export.zip" + fail_on_error: false + } } auth { diff --git a/server/elastic/initialize.py b/server/elastic/initialize.py new file mode 100644 index 0000000..bfa51bf --- /dev/null +++ b/server/elastic/initialize.py @@ -0,0 +1,27 @@ +from furl import furl + +from config import config +from elastic.apply_mappings import apply_mappings_to_host +from es_factory import get_cluster_config + +log = config.logger(__file__) + + +class MissingElasticConfiguration(Exception): + """ + Exception when cluster configuration is not found in config files + """ + + pass + + +def init_es_data(): + hosts_config = get_cluster_config("events").get("hosts") + if not hosts_config: + raise MissingElasticConfiguration("for cluster 'events'") + + for conf in hosts_config: + host = furl(scheme="http", host=conf["host"], port=conf["port"]).url + log.info(f"Applying mappings to host: {host}") + res = apply_mappings_to_host(host) + log.info(res) diff --git a/server/init_data.py b/server/init_data.py deleted file mode 100644 index 7e2f578..0000000 --- a/server/init_data.py +++ /dev/null @@ -1,222 +0,0 @@ -import importlib.util -from datetime import datetime -from pathlib import Path -from uuid import uuid4 - -import attr -from furl import furl -from mongoengine.connection import get_db -from semantic_version import Version - -import database.utils -from bll.queue import QueueBLL -from config import config -from config.info import get_default_company -from database import Database -from database.model.auth import Role -from database.model.auth import User as AuthUser, Credentials -from database.model.company import Company -from database.model.queue import Queue -from database.model.settings import Settings -from database.model.user import User -from database.model.version import Version as DatabaseVersion -from elastic.apply_mappings import apply_mappings_to_host -from es_factory import get_cluster_config -from service_repo.auth.fixed_user import FixedUser - -log = config.logger(__file__) - -migration_dir = Path(__file__).resolve().parent / "mongo" / "migrations" - - -class MissingElasticConfiguration(Exception): - """ - Exception when cluster configuration is not found in config files - """ - - pass - - -def init_es_data(): - hosts_config = get_cluster_config("events").get("hosts") - if not hosts_config: - raise MissingElasticConfiguration("for cluster 'events'") - - for conf in hosts_config: - host = furl(scheme="http", host=conf["host"], port=conf["port"]).url - log.info(f"Applying mappings to host: {host}") - res = apply_mappings_to_host(host) - log.info(res) - - -def _ensure_company(): - company_id = get_default_company() - company = Company.objects(id=company_id).only("id").first() - if company: - return company_id - - company_name = "trains" - log.info(f"Creating company: {company_name}") - company = Company(id=company_id, name=company_name) - company.save() - return company_id - - -def _ensure_default_queue(company): - """ - If no queue is present for the company then - create a new one and mark it as a default - """ - queue = Queue.objects(company=company).only("id").first() - if queue: - return - - QueueBLL.create(company, name="default", system_tags=["default"]) - - -def _ensure_auth_user(user_data, company_id): - ensure_credentials = {"key", "secret"}.issubset(user_data.keys()) - if ensure_credentials: - user = AuthUser.objects( - credentials__match=Credentials( - key=user_data["key"], secret=user_data["secret"] - ) - ).first() - if user: - return user.id - - log.info(f"Creating user: {user_data['name']}") - user = AuthUser( - id=user_data.get("id", f"__{user_data['name']}__"), - name=user_data["name"], - company=company_id, - role=user_data["role"], - email=user_data["email"], - created=datetime.utcnow(), - credentials=[Credentials(key=user_data["key"], secret=user_data["secret"])] - if ensure_credentials - else None, - ) - - user.save() - - return user.id - - -def _ensure_user(user: FixedUser, company_id: str): - if User.objects(id=user.user_id).first(): - return - - data = attr.asdict(user) - data["id"] = user.user_id - data["email"] = f"{user.user_id}@example.com" - data["role"] = Role.user - - _ensure_auth_user(user_data=data, company_id=company_id) - - given_name, _, family_name = user.name.partition(" ") - - User( - id=user.user_id, - company=company_id, - name=user.name, - given_name=given_name, - family_name=family_name, - ).save() - - -def _apply_migrations(): - if not migration_dir.is_dir(): - raise ValueError(f"Invalid migration dir {migration_dir}") - - try: - previous_versions = sorted( - (Version(ver.num) for ver in DatabaseVersion.objects().only("num")), - reverse=True, - ) - except ValueError as ex: - raise ValueError(f"Invalid database version number encountered: {ex}") - - last_version = previous_versions[0] if previous_versions else Version("0.0.0") - - try: - new_scripts = { - ver: path - for ver, path in ((Version(f.stem), f) for f in migration_dir.glob("*.py")) - if ver > last_version - } - except ValueError as ex: - raise ValueError(f"Failed parsing migration version from file: {ex}") - - dbs = {Database.auth: "migrate_auth", Database.backend: "migrate_backend"} - - migration_log = log.getChild("mongodb_migration") - - for script_version in sorted(new_scripts.keys()): - script = new_scripts[script_version] - spec = importlib.util.spec_from_file_location(script.stem, str(script)) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - for alias, func_name in dbs.items(): - func = getattr(module, func_name, None) - if not func: - continue - try: - migration_log.info(f"Applying {script.stem}/{func_name}()") - func(get_db(alias)) - except Exception: - migration_log.exception(f"Failed applying {script}:{func_name}()") - raise ValueError("Migration failed, aborting. Please restore backup.") - - DatabaseVersion( - id=database.utils.id(), - num=script.stem, - created=datetime.utcnow(), - desc="Applied on server startup", - ).save() - - -def _ensure_uuid(): - Settings.add_value("server.uuid", str(uuid4())) - - -def init_mongo_data(): - try: - _apply_migrations() - - _ensure_uuid() - - company_id = _ensure_company() - _ensure_default_queue(company_id) - - users = [ - { - "name": "apiserver", - "role": Role.system, - "email": "apiserver@example.com", - }, - { - "name": "webserver", - "role": Role.system, - "email": "webserver@example.com", - }, - {"name": "tests", "role": Role.user, "email": "tests@example.com"}, - ] - - for user in users: - credentials = config.get(f"secure.credentials.{user['name']}") - user["key"] = credentials.user_key - user["secret"] = credentials.user_secret - _ensure_auth_user(user, company_id) - - if FixedUser.enabled(): - log.info("Fixed users mode is enabled") - FixedUser.validate() - for user in FixedUser.from_config(): - try: - _ensure_user(user, company_id) - except Exception as ex: - log.error(f"Failed creating fixed user {user.name}: {ex}") - except Exception as ex: - log.exception("Failed initializing mongodb") diff --git a/server/mongo/initialize/__init__.py b/server/mongo/initialize/__init__.py new file mode 100644 index 0000000..7ad7a2a --- /dev/null +++ b/server/mongo/initialize/__init__.py @@ -0,0 +1,70 @@ +from pathlib import Path + +from config import config +from database.model.auth import Role +from service_repo.auth.fixed_user import FixedUser +from .migration import _apply_migrations +from .pre_populate import PrePopulate +from .user import ensure_fixed_user, _ensure_auth_user, _ensure_backend_user +from .util import _ensure_company, _ensure_default_queue, _ensure_uuid + +log = config.logger(__package__) + + +def init_mongo_data(): + try: + empty_dbs = _apply_migrations(log) + + _ensure_uuid() + + company_id = _ensure_company(log) + + _ensure_default_queue(company_id) + + if empty_dbs and config.get("apiserver.mongo.pre_populate.enabled", False): + zip_file = config.get("apiserver.mongo.pre_populate.zip_file") + if not zip_file or not Path(zip_file).is_file(): + msg = f"Failed pre-populating database: invalid zip file {zip_file}" + if config.get("apiserver.mongo.pre_populate.fail_on_error", False): + log.error(msg) + raise ValueError(msg) + else: + log.warning(msg) + else: + + user_id = _ensure_backend_user( + "__allegroai__", company_id, "Allegro.ai" + ) + + PrePopulate.import_from_zip(zip_file, user_id=user_id) + + users = [ + { + "name": "apiserver", + "role": Role.system, + "email": "apiserver@example.com", + }, + { + "name": "webserver", + "role": Role.system, + "email": "webserver@example.com", + }, + {"name": "tests", "role": Role.user, "email": "tests@example.com"}, + ] + + for user in users: + credentials = config.get(f"secure.credentials.{user['name']}") + user["key"] = credentials.user_key + user["secret"] = credentials.user_secret + _ensure_auth_user(user, company_id, log=log) + + if FixedUser.enabled(): + log.info("Fixed users mode is enabled") + FixedUser.validate() + for user in FixedUser.from_config(): + try: + ensure_fixed_user(user, company_id, log=log) + except Exception as ex: + log.error(f"Failed creating fixed user {user.name}: {ex}") + except Exception as ex: + log.exception("Failed initializing mongodb") diff --git a/server/mongo/initialize/migration.py b/server/mongo/initialize/migration.py new file mode 100644 index 0000000..f976200 --- /dev/null +++ b/server/mongo/initialize/migration.py @@ -0,0 +1,86 @@ +import importlib.util +from datetime import datetime +from logging import Logger +from pathlib import Path + +from mongoengine.connection import get_db +from semantic_version import Version + +import database.utils +from database import Database +from database.model.version import Version as DatabaseVersion + +migration_dir = Path(__file__).resolve().parent.with_name("migrations") + + +def _apply_migrations(log: Logger) -> bool: + """ + Apply migrations as found in the migration dir. + Returns a boolean indicating whether the database was empty prior to migration. + """ + log = log.getChild(Path(__file__).stem) + + log.info(f"Started mongodb migrations") + + if not migration_dir.is_dir(): + raise ValueError(f"Invalid migration dir {migration_dir}") + + empty_dbs = not any( + get_db(alias).collection_names() + for alias in database.utils.get_options(Database) + ) + + try: + previous_versions = sorted( + (Version(ver.num) for ver in DatabaseVersion.objects().only("num")), + reverse=True, + ) + except ValueError as ex: + raise ValueError(f"Invalid database version number encountered: {ex}") + + last_version = previous_versions[0] if previous_versions else Version("0.0.0") + + try: + new_scripts = { + ver: path + for ver, path in ((Version(f.stem), f) for f in migration_dir.glob("*.py")) + if ver > last_version + } + except ValueError as ex: + raise ValueError(f"Failed parsing migration version from file: {ex}") + + dbs = {Database.auth: "migrate_auth", Database.backend: "migrate_backend"} + + for script_version in sorted(new_scripts): + script = new_scripts[script_version] + + if empty_dbs: + log.info(f"Skipping migration {script.name} (empty databases)") + else: + spec = importlib.util.spec_from_file_location(script.stem, str(script)) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for alias, func_name in dbs.items(): + func = getattr(module, func_name, None) + if not func: + continue + try: + log.info(f"Applying {script.stem}/{func_name}()") + func(get_db(alias)) + except Exception: + log.exception(f"Failed applying {script}:{func_name}()") + raise ValueError( + "Migration failed, aborting. Please restore backup." + ) + + DatabaseVersion( + id=database.utils.id(), + num=script.stem, + created=datetime.utcnow(), + desc="Applied on server startup", + ).save() + + log.info("Finished mongodb migrations") + + return empty_dbs diff --git a/server/mongo/initialize/pre_populate.py b/server/mongo/initialize/pre_populate.py new file mode 100644 index 0000000..3035d62 --- /dev/null +++ b/server/mongo/initialize/pre_populate.py @@ -0,0 +1,153 @@ +import importlib +from collections import defaultdict +from datetime import datetime +from os.path import splitext +from typing import List, Optional, Any, Type, Set, Dict +from zipfile import ZipFile, ZIP_BZIP2 + +import mongoengine +from tqdm import tqdm + + +class PrePopulate: + @classmethod + def export_to_zip( + cls, filename: str, experiments: List[str] = None, projects: List[str] = None + ): + with ZipFile(filename, mode="w", compression=ZIP_BZIP2) as zfile: + cls._export(zfile, experiments, projects) + + @classmethod + def import_from_zip(cls, filename: str, user_id: str = None): + with ZipFile(filename) as zfile: + cls._import(zfile, user_id) + + @staticmethod + def _resolve_type( + cls: Type[mongoengine.Document], ids: Optional[List[str]] + ) -> List[Any]: + ids = set(ids) + items = list(cls.objects(id__in=list(ids))) + resolved = {i.id for i in items} + missing = ids - resolved + for name_candidate in missing: + results = list(cls.objects(name=name_candidate)) + if not results: + print(f"ERROR: no match for `{name_candidate}`") + exit(1) + elif len(results) > 1: + print(f"ERROR: more than one match for `{name_candidate}`") + exit(1) + items.append(results[0]) + return items + + @classmethod + def _resolve_entities( + cls, experiments: List[str] = None, projects: List[str] = None + ) -> Dict[Type[mongoengine.Document], Set[mongoengine.Document]]: + from database.model.project import Project + from database.model.task.task import Task + + entities = defaultdict(set) + + if projects: + print("Reading projects...") + entities[Project].update(cls._resolve_type(Project, projects)) + print("--> Reading project experiments...") + objs = Task.objects( + project__in=list(set(filter(None, (p.id for p in entities[Project])))) + ) + entities[Task].update(o for o in objs if o.id not in (experiments or [])) + + if experiments: + print("Reading experiments...") + entities[Task].update(cls._resolve_type(Task, experiments)) + print("--> Reading experiments projects...") + objs = Project.objects( + id__in=list(set(filter(None, (p.project for p in entities[Task])))) + ) + project_ids = {p.id for p in entities[Project]} + entities[Project].update(o for o in objs if o.id not in project_ids) + + return entities + + @classmethod + def _cleanup_task(cls, task): + from database.model.task.task import TaskStatus + + task.completed = None + task.started = None + if task.execution: + task.execution.model = None + task.execution.model_desc = None + task.execution.model_labels = None + if task.output: + task.output.model = None + + task.status = TaskStatus.created + task.comment = "Auto generated by Allegro.ai" + task.created = datetime.utcnow() + task.last_iteration = 0 + task.last_update = task.created + task.status_changed = task.created + task.status_message = "" + task.status_reason = "" + task.user = "" + + @classmethod + def _cleanup_entity(cls, entity_cls, entity): + from database.model.task.task import Task + if entity_cls == Task: + cls._cleanup_task(entity) + + @classmethod + def _export( + cls, writer: ZipFile, experiments: List[str] = None, projects: List[str] = None + ): + entities = cls._resolve_entities(experiments, projects) + + for cls_, items in entities.items(): + if not items: + continue + filename = f"{cls_.__module__}.{cls_.__name__}.json" + print(f"Writing {len(items)} items into {writer.filename}:{filename}") + with writer.open(filename, "w") as f: + f.write("[\n".encode("utf-8")) + last = len(items) - 1 + for i, item in enumerate(items): + cls._cleanup_entity(cls_, item) + f.write(item.to_json().encode("utf-8")) + if i != last: + f.write(",".encode("utf-8")) + f.write("\n".encode("utf-8")) + f.write("]\n".encode("utf-8")) + + @staticmethod + def _import(reader: ZipFile, user_id: str = None): + for file_info in reader.filelist: + full_name = splitext(file_info.orig_filename)[0] + print(f"Reading {reader.filename}:{full_name}...") + module_name, _, class_name = full_name.rpartition(".") + module = importlib.import_module(module_name) + cls_: Type[mongoengine.Document] = getattr(module, class_name) + + with reader.open(file_info) as f: + for item in tqdm( + f.readlines(), + desc=f"Writing {cls_.__name__.lower()}s into database", + unit="doc", + ): + item = ( + item.decode("utf-8") + .strip() + .lstrip("[") + .rstrip("]") + .rstrip(",") + .strip() + ) + if not item: + continue + doc = cls_.from_json(item) + if user_id is not None and hasattr(doc, "user"): + doc.user = user_id + doc.save(force_insert=True) diff --git a/server/mongo/initialize/user.py b/server/mongo/initialize/user.py new file mode 100644 index 0000000..3b68ae9 --- /dev/null +++ b/server/mongo/initialize/user.py @@ -0,0 +1,74 @@ +from datetime import datetime +from logging import Logger + +import attr + +from database.model.auth import Role +from database.model.auth import User as AuthUser, Credentials +from database.model.user import User +from service_repo.auth.fixed_user import FixedUser + + +def _ensure_auth_user(user_data: dict, company_id: str, log: Logger): + ensure_credentials = {"key", "secret"}.issubset(user_data) + if ensure_credentials: + user = AuthUser.objects( + credentials__match=Credentials( + key=user_data["key"], secret=user_data["secret"] + ) + ).first() + if user: + return user.id + + log.info(f"Creating user: {user_data['name']}") + user = AuthUser( + id=user_data.get("id", f"__{user_data['name']}__"), + name=user_data["name"], + company=company_id, + role=user_data["role"], + email=user_data["email"], + created=datetime.utcnow(), + credentials=[Credentials(key=user_data["key"], secret=user_data["secret"])] + if ensure_credentials + else None, + ) + + user.save() + + return user.id + + +def _ensure_backend_user(user_id: str, company_id: str, user_name: str): + given_name, _, family_name = user_name.partition(" ") + + User( + id=user_id, + company=company_id, + name=user_name, + given_name=given_name, + family_name=family_name, + ).save() + + return user_id + + +def ensure_fixed_user(user: FixedUser, company_id: str, log: Logger): + if User.objects(id=user.user_id).first(): + return + + data = attr.asdict(user) + data["id"] = user.user_id + data["email"] = f"{user.user_id}@example.com" + data["role"] = Role.user + + _ensure_auth_user(user_data=data, company_id=company_id, log=log) + + given_name, _, family_name = user.name.partition(" ") + + User( + id=user.user_id, + company=company_id, + name=user.name, + given_name=given_name, + family_name=family_name, + ).save() diff --git a/server/mongo/initialize/util.py b/server/mongo/initialize/util.py new file mode 100644 index 0000000..b5586e3 --- /dev/null +++ b/server/mongo/initialize/util.py @@ -0,0 +1,40 @@ +from logging import Logger +from uuid import uuid4 + +from bll.queue import QueueBLL +from config import config +from config.info import get_default_company +from database.model.company import Company +from database.model.queue import Queue +from database.model.settings import Settings + +log = config.logger(__file__) + + +def _ensure_company(log: Logger): + company_id = get_default_company() + company = Company.objects(id=company_id).only("id").first() + if company: + return company_id + + company_name = "trains" + log.info(f"Creating company: {company_name}") + company = Company(id=company_id, name=company_name) + company.save() + return company_id + + +def _ensure_default_queue(company): + """ + If no queue is present for the company then + create a new one and mark it as a default + """ + queue = Queue.objects(company=company).only("id").first() + if queue: + return + + QueueBLL.create(company, name="default", system_tags=["default"]) + + +def _ensure_uuid(): + Settings.add_value("server.uuid", str(uuid4())) diff --git a/server/requirements.txt b/server/requirements.txt index acc7916..7cb394c 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -1,29 +1,30 @@ -six -Flask>=0.12.2 -elasticsearch>=5.0.0,<6.0.0 -pyhocon>=0.3.35 -requests>=2.13.0 -pymongo==3.6.1 # 3.7 has a bug multiple users logged in -Flask-Cors>=3.0.5 -Flask-Compress>=1.4.0 -mongoengine==0.16.2 -jsonmodels>=2.3 -pyjwt>=1.3.0 -gunicorn>=19.7.1 -Jinja2==2.10 -python-rapidjson>=0.6.3 -jsonschema>=2.6.0 -dpath>=1.4.2,<2.0 -funcsigs==1.0.2 -luqum>=0.7.2 attrs>=19.1.0 -nested_dict>=1.61 -related>=0.7.2 -validators>=0.12.4 -fastjsonschema>=2.8 boltons>=19.1.0 -semantic_version>=2.8.0,<3 +dpath>=1.4.2,<2.0 +elasticsearch>=5.0.0,<6.0.0 +fastjsonschema>=2.8 +Flask-Compress>=1.4.0 +Flask-Cors>=3.0.5 +Flask>=0.12.2 +funcsigs==1.0.2 furl>=2.0.0 -redis>=2.10.5 +gunicorn>=19.7.1 humanfriendly==4.18 +Jinja2==2.10 +jsonmodels>=2.3 +jsonschema>=2.6.0 +luqum>=0.7.2 +mongoengine==0.16.2 +nested_dict>=1.61 psutil>=5.6.5 +pyhocon>=0.3.35 +pyjwt>=1.3.0 +pymongo==3.6.1 # 3.7 has a bug multiple users logged in +python-rapidjson>=0.6.3 +redis>=2.10.5 +related>=0.7.2 +requests>=2.13.0 +semantic_version>=2.8.0,<3 +six +tqdm +validators>=0.12.4 \ No newline at end of file diff --git a/server/server.py b/server/server.py index f55d028..fa648bd 100644 --- a/server/server.py +++ b/server/server.py @@ -10,7 +10,8 @@ import database from apierrors.base import BaseError from bll.statistics.stats_reporter import StatisticsReporter from config import config -from init_data import init_es_data, init_mongo_data +from elastic.initialize import init_es_data +from mongo.initialize import init_mongo_data from service_repo import ServiceRepo, APICall from service_repo.auth import AuthType from service_repo.errors import PathParsingError