From 4e1d2b0f4a25903a9755316e01fac0a191ef2480 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 14 Jul 2020 23:36:50 +0300 Subject: [PATCH] Add Optimizer set_optimizer_task and OptimizerOptuna --- trains/automation/optimization.py | 15 ++ trains/automation/optuna/__init__.py | 3 + trains/automation/optuna/optuna.py | 197 +++++++++++++++++++++++++++ 3 files changed, 215 insertions(+) create mode 100644 trains/automation/optuna/__init__.py create mode 100644 trains/automation/optuna/optuna.py diff --git a/trains/automation/optimization.py b/trains/automation/optimization.py index 4f15c543..f7b0c4de 100644 --- a/trains/automation/optimization.py +++ b/trains/automation/optimization.py @@ -286,6 +286,7 @@ class SearchStrategy(object): self.max_iteration_per_job and self.total_max_jobs else None ) self._validate_base_task() + self._optimizer_task = None def start(self): # type: () -> () @@ -522,6 +523,16 @@ class SearchStrategy(object): """ self._naming_function = naming_function + def set_optimizer_task(self, task): + # type: (Task) -> () + """ + Set the optimizer task object to be used to store/generate reports on the optimization process. + Usually this is the current task of this process. + + :param Task task: The optimizer's current Task. + """ + self._optimizer_task = task + def _validate_base_task(self): # type: () -> () """ @@ -901,6 +912,7 @@ class HyperParameterOptimizer(object): base_task_id=opts['base_task_id'], hyper_parameters=hyper_parameters, objective_metric=self.objective_metric, execution_queue=opts['execution_queue'], num_concurrent_workers=opts['max_number_of_concurrent_tasks'], **opts.get('optimizer_kwargs', {})) + self.optimizer.set_optimizer_task(self._task) self.optimization_timeout = None self.optimization_start_time = None self._thread = None @@ -1190,6 +1202,9 @@ class HyperParameterOptimizer(object): elif optimizer_class == 'OptimizerBOHB': from .hpbandster import OptimizerBOHB optimizer_class = OptimizerBOHB + elif optimizer_class == 'OptimizerOptuna': + from .optuna import OptimizerOptuna + optimizer_class = OptimizerOptuna else: logger.warning("Could not resolve optimizer_class {} reverting to original class {}".format( optimizer_class, original_class)) diff --git a/trains/automation/optuna/__init__.py b/trains/automation/optuna/__init__.py new file mode 100644 index 00000000..e2231c91 --- /dev/null +++ b/trains/automation/optuna/__init__.py @@ -0,0 +1,3 @@ +from .optuna import OptimizerOptuna + +__all__ = ["OptimizerOptuna"] diff --git a/trains/automation/optuna/optuna.py b/trains/automation/optuna/optuna.py new file mode 100644 index 00000000..e9cbe509 --- /dev/null +++ b/trains/automation/optuna/optuna.py @@ -0,0 +1,197 @@ +from time import sleep, time +from typing import Any, Optional, Sequence + +from ..optimization import Objective, SearchStrategy +from ..parameters import ( + DiscreteParameterRange, UniformParameterRange, RandomSeed, UniformIntegerParameterRange, Parameter, ) +from ...task import Task + +try: + # noinspection PyPackageRequirements + import optuna + Task.add_requirements('optuna') +except ImportError: + raise ValueError("OptimizerOptuna requires 'optuna' package, it was not found\n" + "install with: pip install optuna") + + +class OptunaObjective(object): + def __init__(self, base_task_id, queue_name, optimizer, max_iteration_per_job, sleep_interval, config_space): + # type: (str, str, OptimizerOptuna, int, float, dict) -> None + self.base_task_id = base_task_id + self.optimizer = optimizer + self.queue_name = queue_name + self.sleep_interval = sleep_interval + self.max_iteration_per_job = max_iteration_per_job + self._config_space = config_space + + def objective(self, trial): + # type: (optuna.Trial) -> float + """ + return metric value for a specified set of parameter, pulled from the trail object + + :param optuna.Trial trial: optuna.Trial object + :return: metric value float + """ + parameter_override = {} + for name, (func_name, params) in self._config_space.items(): + suggest = getattr(trial, func_name) + parameter_override[name] = suggest(name=name, **params) + + current_job = self.optimizer.helper_create_job(self.base_task_id, parameter_override=parameter_override) + # noinspection PyProtectedMember + self.optimizer._current_jobs.append(current_job) + current_job.launch(self.queue_name) + iteration_value = None + is_pending = True + + while not current_job.is_stopped(): + if is_pending and not current_job.is_pending(): + is_pending = False + + if not is_pending: + # noinspection PyProtectedMember + iteration_value = self.optimizer._objective_metric.get_current_raw_objective(current_job) + + if iteration_value: + # update budget + trial.report(value=iteration_value[1], step=iteration_value[0]) + + # Handle pruning based on the intermediate value. + if trial.should_prune(): + current_job.abort() + raise optuna.TrialPruned() + + # check if we exceeded this job budget + if self.max_iteration_per_job and iteration_value[0] >= self.max_iteration_per_job: + current_job.abort() + break + + sleep(self.sleep_interval) + + # noinspection PyProtectedMember + objective_metric = self.optimizer._objective_metric.get_objective(current_job) + print('OptunaObjective result metric={}, iteration {}'.format(objective_metric, iteration_value)) + # noinspection PyProtectedMember + self.optimizer._current_jobs.remove(current_job) + return objective_metric + + +class OptimizerOptuna(SearchStrategy): + def __init__( + self, + base_task_id, # type: str + hyper_parameters, # type: Sequence[Parameter] + objective_metric, # type: Objective + execution_queue, # type: str + num_concurrent_workers, # type: int + max_iteration_per_job, # type: Optional[int] + total_max_jobs, # type: Optional[int] + pool_period_min=2., # type: float + time_limit_per_job=None, # type: Optional[float] + optuna_sampler=None, # type: Optional[optuna.samplers.base] + optuna_pruner=None, # type: Optional[optuna.pruners.base] + continue_previous_study=None, # type: Optional[optuna.Study] + **optuna_kwargs, # type: Any + ): + # type: (...) -> None + """ + Initialize am Optuna search strategy optimizer + Optuna performs robust and efficient hyperparameter optimization at scale by combining. + Specific hyper-parameter pruning strategy can be selected via `sampler` and `pruner` argyments + + :param str base_task_id: Task ID (str) + :param list hyper_parameters: list of Parameter objects to optimize over + :param Objective objective_metric: Objective metric to maximize / minimize + :param str execution_queue: execution queue to use for launching Tasks (experiments). + :param int num_concurrent_workers: Limit number of concurrent running Tasks (machines) + :param int max_iteration_per_job: number of iteration per job + 'iterations' are the reported iterations for the specified objective, + not the maximum reported iteration of the Task. + :param int total_max_jobs: total maximum job for the optimization process. + Must be provided in order to calculate the total budget for the optimization process. + The total budget is measured by "iterations" (see above) + and will be set to `max_iteration_per_job * total_max_jobs` + This means more than total_max_jobs could be created, as long as the cumulative iterations + (summed over all created jobs) will not exceed `max_iteration_per_job * total_max_jobs` + :param float pool_period_min: time in minutes between two consecutive pools + :param float time_limit_per_job: Optional, maximum execution time per single job in minutes, + when time limit is exceeded job is aborted + :param optuna_kwargs: arguments passed directly to the Optuna object + """ + super(OptimizerOptuna, self).__init__( + base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric, + execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers, + pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job, + max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs) + self._optuna_sampler = optuna_sampler + self._optuna_pruner = optuna_pruner + self._optuna_kwargs = optuna_kwargs or {} + self._param_iterator = None + self._objective = None + self._study = continue_previous_study if continue_previous_study else None + + def start(self): + # type: () -> () + """ + Start the Optimizer controller function loop() + If the calling process is stopped, the controller will stop as well. + + .. important:: + This function returns only after optimization is completed or :meth:`stop` was called. + + """ + self._study = optuna.create_study( + direction="minimize" if self._objective_metric.get_objective_sign() < 0 else "maximize", + load_if_exists=False, + sampler=self._optuna_sampler, + pruner=self._optuna_pruner, + study_name=self._optimizer_task.id if self._optimizer_task else None, + ) + config_space = self._convert_hyper_parameters_to_optuna() + self._objective = OptunaObjective( + base_task_id=self._base_task_id, + queue_name=self._execution_queue, + optimizer=self, + max_iteration_per_job=self.max_iteration_per_job, + sleep_interval=int(self.pool_period_minutes * 60), + config_space=config_space, + ) + self._study.optimize( + self._objective.objective, n_trials=self.total_max_jobs, n_jobs=self._num_concurrent_workers) + + def stop(self): + # type: () -> () + """ + Stop the current running optimization loop, + Called from a different thread than the :meth:`start`. + """ + if self._study: + try: + self._study.stop() + except Exception as ex: + print(ex) + + def _convert_hyper_parameters_to_optuna(self): + # type: () -> dict + cs = {} + for p in self._hyper_parameters: + if isinstance(p, UniformParameterRange): + if p.include_max and p.step_size: + hp_type = 'suggest_discrete_uniform' + hp_params = dict(low=p.min_value, high=p.max_value, q=p.step_size) + else: + hp_type = 'suggest_float' + hp_params = dict(low=p.min_value, high=p.max_value, log=False, step=p.step_size) + elif isinstance(p, UniformIntegerParameterRange): + hp_type = 'suggest_int' + hp_params = dict(low=p.min_value, high=p.max_value if p.include_max else p.max_value - p.step_size, + log=False, step=p.step_size) + elif isinstance(p, DiscreteParameterRange): + hp_type = 'suggest_categorical' + hp_params = dict(choices=p.values) + else: + raise ValueError("HyperParameter type {} not supported yet with OptimizerBOHB".format(type(p))) + cs[p.name] = (hp_type, hp_params) + + return cs