From f7b441ab49e205ec8863a6ce1c1b5727e1d311a4 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 6 Jan 2024 12:35:49 +0200 Subject: [PATCH] Add multi objective optimization to HPO --- clearml/automation/hpbandster/bandster.py | 8 +- clearml/automation/optimization.py | 541 +++++++++++++++++----- clearml/automation/optuna/optuna.py | 57 ++- 3 files changed, 473 insertions(+), 133 deletions(-) diff --git a/clearml/automation/hpbandster/bandster.py b/clearml/automation/hpbandster/bandster.py index 1b0595e1..502691af 100644 --- a/clearml/automation/hpbandster/bandster.py +++ b/clearml/automation/hpbandster/bandster.py @@ -85,7 +85,7 @@ class _TrainsBandsterWorker(Worker): self.optimizer.budget.iterations.update(self._current_job.task_id(), iteration_value[0]) # check if we exceeded this job budget - if iteration_value[0] >= self.budget_iteration_scale * budget: + if iteration_value[0][0] >= self.budget_iteration_scale * budget: self._current_job.abort() break @@ -95,7 +95,7 @@ class _TrainsBandsterWorker(Worker): # noinspection PyProtectedMember self.optimizer.budget.jobs.update( self._current_job.task_id(), - float(iteration_value[0]) / self.optimizer._max_iteration_per_job) + float(iteration_value[0][0]) / self.optimizer._max_iteration_per_job) result = { # this is the a mandatory field to run hyperband @@ -104,7 +104,7 @@ class _TrainsBandsterWorker(Worker): # can be used for any user-defined information - also mandatory 'info': self._current_job.task_id() } - print('TrainsBandsterWorker result {}, iteration {}'.format(result, iteration_value)) + print('TrainsBandsterWorker result {}, iteration {}'.format(result, iteration_value[0])) # noinspection PyProtectedMember self.optimizer._current_jobs.remove(self._current_job) return result @@ -299,7 +299,7 @@ class OptimizerBOHB(SearchStrategy, RandomSeed): sleep_interval=int(self.pool_period_minutes * 60), budget_iteration_scale=budget_iteration_scale, base_task_id=self._base_task_id, - objective=self._objective_metric, + objective=self._objective_metric.objectives[0], queue_name=self._execution_queue, nameserver='127.0.0.1', nameserver_port=self._nameserver_port, run_id=fake_run_id, id=i) w.run(background=True) diff --git a/clearml/automation/optimization.py b/clearml/automation/optimization.py index 080142fe..63734316 100644 --- a/clearml/automation/optimization.py +++ b/clearml/automation/optimization.py @@ -1,13 +1,16 @@ import hashlib import json import six +import numpy as np +import functools from copy import copy, deepcopy from datetime import datetime from itertools import product from logging import getLogger from threading import Thread, Event from time import time -from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable +from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable, Tuple +from abc import ABC, abstractmethod from .job import ClearmlJob, LocalClearmlJob from .parameters import Parameter @@ -18,8 +21,33 @@ from ..task import Task logger = getLogger('clearml.automation.optimization') +class _ObjectiveInterface(ABC): + @abstractmethod + def get_objective(self, task_id): + pass -class Objective(object): + @abstractmethod + def get_current_raw_objective(self, task): + pass + + @abstractmethod + def get_objective_sign(self): + pass + + @abstractmethod + def get_objective_metric(self): + pass + + @abstractmethod + def get_normalized_objective(self, task_id): + pass + + @abstractmethod + def get_top_tasks(self, top_k, optimizer_task_id=None, task_filter=None): + pass + + +class Objective(_ObjectiveInterface): """ Optimization ``Objective`` class to maximize / minimize over all experiments. This class will sample a specific scalar from all experiments, and maximize / minimize over single scalar (i.e., title and series combination). @@ -53,7 +81,7 @@ class Objective(object): self.series = series assert order in ('min', 'max',) # normalize value so we always look for the highest objective value - self.sign = -1 if (isinstance(order, str) and order.lower().strip() == 'min') else +1 + self.sign = -1 if (isinstance(order, str) and order.lower().strip() == 'min') else 1 self._metric = None self.extremum = extremum @@ -243,7 +271,7 @@ class Budget(object): # type: () -> (Optional[float]) if self.limit is None or not self.current: return None - return sum(self.current.values())/float(self.limit) + return sum(self.current.values()) / float(self.limit) def __init__(self, jobs_limit, iterations_limit, compute_time_limit): # type: (Optional[int], Optional[int], Optional[float]) -> () @@ -467,7 +495,7 @@ class SearchStrategy(object): if self.max_iteration_per_job: iterations = self._get_job_iterations(job) - if iterations > 0: + if iterations and iterations > 0: self.budget.iterations.update(job.task_id(), iterations) if iterations > self.max_iteration_per_job: abort_job = True @@ -512,12 +540,8 @@ class SearchStrategy(object): :return: A list of Task objects, ordered by performance, where index 0 is the best performing Task. """ - # noinspection PyProtectedMember - top_tasks = self._get_child_tasks( - parent_task_id=self._job_parent_id or self._base_task_id, - order_by=self._objective_metric._get_last_metrics_encode_field(), - additional_filters={'page_size': int(top_k), 'page': 0}) - return top_tasks + return self._objective_metric.get_top_tasks(top_k=top_k, + optimizer_task_id=self._job_parent_id or self._base_task_id) def get_top_experiments_id_metrics_pair(self, top_k, all_metrics=False, only_completed=False): # type: (int, bool, bool) -> Sequence[(str, dict)] @@ -582,15 +606,17 @@ class SearchStrategy(object): # noinspection PyProtectedMember top_tasks_ids_metric = self._get_child_tasks_ids( parent_task_id=self._job_parent_id or self._base_task_id, - order_by=self._objective_metric._get_last_metrics_encode_field(), + order_by=self._objective_metric._get_last_metrics_encode_field()[0], additional_filters=additional_filters, additional_fields=['last_metrics'] ) - title, series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None) + title_series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None) + titles = [ts[0] for ts in title_series] + series = [ts[1] for ts in title_series] return [(i, {'{}/{}'.format(v['metric'], v['variant']): v for variant in metric.values() for v in variant.values() - if all_metrics or v['metric'] == title and v['variant'] == series} + if all_metrics or (v['metric'] in titles and v['variant'] in series)} ) for i, metric in top_tasks_ids_metric] def get_top_experiments_details( @@ -669,15 +695,28 @@ class SearchStrategy(object): # noinspection PyProtectedMember top_tasks_ids_metric_params = self._get_child_tasks_ids( parent_task_id=self._job_parent_id or self._base_task_id, - order_by=self._objective_metric._get_last_metrics_encode_field(), + order_by=self._objective_metric._get_last_metrics_encode_field()[ + 0] if self._objective_metric.len == 1 else None, additional_filters=additional_filters, additional_fields=['last_metrics', 'hyperparams'] ) + if self._objective_metric.len != 1: + top_tasks_ids_metric_params_dict = {} + for task in top_tasks_ids_metric_params: + objective = self._objective_metric.get_objective(task[0]) + if objective is None or any(o is None for o in objective): + continue + top_tasks_ids_metric_params_dict[task[0]] = (objective, task) + # noinspection PyProtectedMember + sorted_ids = self._objective_metric._sort_jobs_by_domination(top_tasks_ids_metric_params_dict) + top_tasks_ids_metric_params = [top_tasks_ids_metric_params_dict[s][1] for s in sorted_ids] # get hp_parameters: hp_params = set(p.name for p in self._hyper_parameters) - title, series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None) + title_series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None) + titles = [ts[0] for ts in title_series] + series = [ts[1] for ts in title_series] return [ { 'task_id': tid, @@ -688,19 +727,20 @@ class SearchStrategy(object): }, 'metrics': { '{}/{}'.format(v['metric'], v['variant']): v for variant in metric.values() - for v in variant.values() if all_metrics or v['metric'] == title and v['variant'] == series + for v in variant.values() if all_metrics or v['metric'] in titles and v['variant'] in series } } for tid, metric, param_sections in top_tasks_ids_metric_params ] def get_objective_metric(self): - # type: () -> (str, str) + # type: () -> Union[Tuple[str, str], List[Tuple[str, str]]] """ Return the metric title, series pair of the objective. :return: (title, series) """ - return self._objective_metric.get_objective_metric() + objective = self._objective_metric.get_objective_metric() + return objective[0] if self._objective_metric.len == 1 else objective def helper_create_job( self, @@ -823,7 +863,9 @@ class SearchStrategy(object): def _get_job_iterations(self, job): # type: (Union[ClearmlJob, Task]) -> int iteration_value = self._objective_metric.get_current_raw_objective(job) - return iteration_value[0] if iteration_value else -1 + if iteration_value is not None and any(iv is not None and iv[0] is not None for iv in iteration_value): + return max(iv[0] for iv in iteration_value if iv is not None) + return -1 @classmethod def _get_child_tasks_ids( @@ -887,7 +929,7 @@ class SearchStrategy(object): task_objects = Task._query_tasks(**task_filter) if not additional_fields: return [t.id for t in task_objects] - return [[t.id]+[getattr(t, f, None) for f in additional_fields] for t in task_objects] + return [[t.id] + [getattr(t, f, None) for f in additional_fields] for t in task_objects] @classmethod def _get_child_tasks( @@ -927,6 +969,146 @@ class SearchStrategy(object): ] +class MultiObjective(_ObjectiveInterface): + def __init__(self, title, series, order, extremum): + self.title = title + self.series = series + self.order = order + self.extremum = extremum + self.objectives = [] + for title_, series_, order_, extremum_ in zip(title, series, order, extremum): + self.objectives.append(Objective(title=title_, series=series_, order=order_, extremum=extremum_)) + self.len = len(self.objectives) + + def get_objective(self, task_id): + # type: (Union[str, Task, ClearmlJob]) -> Optional[List[float]] + """ + Return a specific task scalar values based on the objective settings (title/series). + + :param str task_id: The Task ID to retrieve scalar from (or ``ClearMLJob`` object). + + :return: The scalar values. + """ + objective = [o.get_objective(task_id) for o in self.objectives] + if any(o is None for o in objective): + return None + return objective + + def get_current_raw_objective(self, task): + # type: (Union[ClearmlJob, Task]) -> Optional[List[Tuple[int, float]]] + """ + Return the current raw value (without sign normalization) of each objective. + + :param str task: The Task or Job to retrieve scalar from (or ``ClearmlJob`` object). + :return: List[Optional[Tuple(iteration, value)]]. None if the metric does not exist. + """ + objective = [o.get_current_raw_objective(task) for o in self.objectives] + if any(o is None for o in objective): + return None + return objective + + def get_objective_sign(self): + # type: () -> List[float] + """ + Return the sign of the objectives. + + - ``+1`` - If maximizing + - ``-1`` - If minimizing + + :return: Objective function signs. + """ + return [o.get_objective_sign() for o in self.objectives] + + def get_normalized_objective(self, task_id): + # type: (Union[str, Task, ClearmlJob]) -> Optional[List[float]] + """ + Return a normalized task scalar values based on the objective settings (title/series). + I.e. objective is always to maximize the returned value + + :param str task_id: The Task ID to retrieve scalars from. + + :return: Normalized scalar values. + """ + objective = [o.get_normalized_objective(task_id) for o in self.objectives] + if any(o is None for o in objective): + return None + return objective + + def get_objective_metric(self): + # type: () -> List[(str, str)] + """ + Return the metric title, series pairs of the objectives. + + :return: List[(title, series)] + """ + return [o.get_objective_metric() for o in self.objectives] + + def get_top_tasks(self, top_k, optimizer_task_id=None, task_filter=None): + # type: (int, Optional[str], Optional[dict]) -> Sequence[Task] + """ + Return a list of Tasks of the top performing experiments. + If there is only one objective, the tasks are sorted based on that objective. + If there are multiple objectives, the tasks are sorted based on successive Pareto fronts. + A trial is located at the Pareto front if there are no trials that dominate the trial. + A trial dominates another trial if all its objective metrics are greater or equal than the other + trial's and there is at least one objective metric that is strictly greater than the other. + + :param int top_k: The number of Tasks (experiments) to return. + :param str optimizer_task_id: Parent optimizer Task ID + :param dict task_filter: Optional task_filtering for the query + + :return: A list of Task objects, ordered by performance, where index 0 is the best performing Task. + """ + if self.len == 1: + return self.objectives[0].get_top_tasks( + top_k=top_k, + optimizer_task_id=optimizer_task_id, + task_filter=task_filter + ) + task_filter = deepcopy(task_filter) if task_filter else {} + if optimizer_task_id: + task_filter["parent"] = optimizer_task_id + # noinspection PyProtectedMember + tasks = Task._query_tasks(**task_filter) + candidates = {} + for task in tasks: + values = self.get_objective(task.id) + if values is None or any(v is None for v in values): + continue + candidates[task.id] = (values, task.id) + sorted_ids = self._sort_jobs_by_domination(candidates) + if not sorted_ids: + return [] + return Task.get_tasks(task_ids=sorted_ids[:top_k]) + + def _get_last_metrics_encode_field(self): + # noinspection PyProtectedMember + return [o._get_last_metrics_encode_field() for o in self.objectives] + + def _weakly_dominates_normalized(self, lhs, rhs): + return all(lhs_elem * o.sign >= rhs_elem * o.sign for lhs_elem, rhs_elem, o in zip(lhs, rhs, self.objectives)) + + @staticmethod + def _dominates(lhs, rhs): + return all(lhs_elem >= rhs_elem for lhs_elem, rhs_elem in zip(lhs, rhs)) and \ + any(lhs_elem > rhs_elem for lhs_elem, rhs_elem in zip(lhs, rhs)) + + def _sort_jobs_by_domination(self, jobs): + job_ids = list(jobs.keys()) + job_ids_sorted = [] + while len(job_ids_sorted) < len(jobs.keys()): + have_result = False + for job_id in job_ids: + if all(self._weakly_dominates_normalized(jobs[job_id][0], jobs[other_job_id][0]) + for other_job_id in job_ids): + have_result = True + job_ids_sorted.append(job_id) + if not have_result: + job_ids_sorted.extend(job_ids) + job_ids = [job_id for job_id in job_ids if job_id not in job_ids_sorted] + return job_ids_sorted + + class GridSearch(SearchStrategy): """ Grid search strategy controller. Full grid sampling of every hyperparameter combination. @@ -1089,12 +1271,12 @@ class HyperParameterOptimizer(object): self, base_task_id, # type: str hyper_parameters, # type: Sequence[Parameter] - objective_metric_title, # type: str - objective_metric_series, # type: str - objective_metric_sign='min', # type: str + objective_metric_title, # type: Union[str, Sequence[str]] + objective_metric_series, # type: Union[str, Sequence[str]] + objective_metric_sign="min", # type: Union[str, Sequence[str]] optimizer_class=RandomSearch, # type: Union[SearchStrategy, type(SearchStrategy)] max_number_of_concurrent_tasks=10, # type: int - execution_queue='default', # type: str + execution_queue="default", # type: str optimization_time_limit=None, # type: Optional[float] compute_time_limit=None, # type: Optional[float] auto_connect_task=True, # type: Union[bool, Task] @@ -1109,10 +1291,14 @@ class HyperParameterOptimizer(object): :param str base_task_id: The Task ID to be used as template experiment to optimize. :param list hyper_parameters: The list of Parameter objects to optimize over. - :param str objective_metric_title: The Objective metric title to maximize / minimize (for example, - ``validation``). - :param str objective_metric_series: The Objective metric series to maximize / minimize (for example, ``loss``). - :param str objective_metric_sign: The objective to maximize / minimize. + :param Union[str, Sequence[str]] objective_metric_title: The Objective metric title(s) to maximize / minimize + (for example, ``validation``, ``["validation", "loss"]``). If ``objective_metric_title`` is a sequence + (used to optimize multiple objectives at the same time), then ``objective_metric_series`` and + ``objective_metric_sign`` have to be sequences of the same length. Each title will be matched + with the respective series and sign + :param Union[str, Sequence[str]] objective_metric_series: The Objective metric series to maximize / minimize + (for example, ``loss_series``, ``["validation_series", "loss_series"]``). + :param Union[str, Sequence[str]] objective_metric_sign: The objectives to maximize / minimize. The values are: - ``min`` - Minimize the last reported value for the specified title/series scalar. @@ -1190,7 +1376,24 @@ class HyperParameterOptimizer(object): # make sure we stop all jobs an_optimizer.stop() """ - + if type(objective_metric_title) is not type(objective_metric_series) or type( + objective_metric_title) is not type(objective_metric_sign): + raise TypeError( + "objective_metric_series, objective_metric_title and objective_metric_sign have to be of the same type" + " (strings if doing single objective optimization and lists of the same length" + " if doing multi-objective optimization)" + ) + if isinstance(objective_metric_title, str): + objective_metric_series = [objective_metric_series] + objective_metric_title = [objective_metric_title] + objective_metric_sign = [objective_metric_sign] + if len(objective_metric_series) != len(objective_metric_title) or len(objective_metric_series) != len( + objective_metric_sign + ): + raise ValueError( + "Can not use multiple objective optimization when objective_metric_series, objective_metric_title" + " or objective_metric_sign do not have the same length" + ) # create a new Task, if we do not have one already self._task = auto_connect_task if isinstance(auto_connect_task, Task) else Task.current_task() self._readonly_task = \ @@ -1224,17 +1427,29 @@ class HyperParameterOptimizer(object): self.hyper_parameters = hyper_parameters self.max_number_of_concurrent_tasks = opts['max_number_of_concurrent_tasks'] self.execution_queue = opts['execution_queue'] - self.objective_metric = Objective( - title=opts['objective_metric_title'], series=opts['objective_metric_series'], - order='min' if opts['objective_metric_sign'] in ('min', 'min_global') else 'max', - extremum=opts['objective_metric_sign'].endswith('_global')) + self._objective_metric = MultiObjective( + title=opts["objective_metric_title"], + series=opts["objective_metric_series"], + order=["min" if sign_ in ("min", "min_global") else "max" for sign_ in opts["objective_metric_sign"]], + extremum=[sign_.endswith("_global") for sign_ in opts["objective_metric_sign"]] + ) + optuna_error_message = "Multi parameter optimization is only supported via Optuna. Please install Optuna via" + \ + " `pip install optuna and set the `optimizer_class` to `clearml.automation.optuna.OptimizerOptuna`" + try: + if self._objective_metric.len != 1: + from .optuna import OptimizerOptuna + + if optimizer_class != OptimizerOptuna: + raise ValueError(optuna_error_message) + except Exception: + raise ValueError(optuna_error_message) # if optimizer_class is an instance, use it as is. - if type(optimizer_class) != type: + if not isinstance(optimizer_class, type): self.optimizer = optimizer_class else: self.optimizer = optimizer_class( base_task_id=opts['base_task_id'], hyper_parameters=hyper_parameters, - objective_metric=self.objective_metric, execution_queue=opts['execution_queue'], + objective_metric=self._objective_metric, execution_queue=opts['execution_queue'], num_concurrent_workers=opts['max_number_of_concurrent_tasks'], compute_time_limit=opts['compute_time_limit'], **opts.get('optimizer_kwargs', {})) self.optimizer.set_optimizer_task(self._task) @@ -1607,9 +1822,9 @@ class HyperParameterOptimizer(object): @classmethod def get_optimizer_top_experiments( cls, - objective_metric_title, # type: str - objective_metric_series, # type: str - objective_metric_sign, # type: str + objective_metric_title, # type: Union[str, List[str]] + objective_metric_series, # type: Union[str, List[str]] + objective_metric_sign, # type: Union[str, List[str]] optimizer_task_id, # type: str top_k, # type: int ): @@ -1636,6 +1851,12 @@ class HyperParameterOptimizer(object): title=objective_metric_title, series=objective_metric_series, order=objective_metric_sign) return objective.get_top_tasks(top_k=top_k, optimizer_task_id=optimizer_task_id) + @property + def objective_metric(self): + if self._objective_metric.len == 1: + return self._objective_metric.objectives[0] + return self._objective_metric + def _connect_args(self, optimizer_class=None, hyper_param_configuration=None, **kwargs): # type: (SearchStrategy, dict, Any) -> (SearchStrategy, list, dict) if not self._task or self._readonly_task: @@ -1705,8 +1926,8 @@ class HyperParameterOptimizer(object): def _report_daemon(self): # type: () -> () - title, series = self.objective_metric.get_objective_metric() - title = '{}/{}'.format(title, series) + title_series = self._objective_metric.get_objective_metric() + title = ["{}/{}".format(ts[0], ts[1]) for ts in title_series] counter = 0 completed_jobs = dict() task_logger = None @@ -1722,10 +1943,14 @@ class HyperParameterOptimizer(object): params["status"] = str(task.status) # noinspection PyProtectedMember iteration_value = task.get_last_iteration() - objective = self.objective_metric.get_objective(task) + objective = self._objective_metric.get_objective(task) completed_jobs[task.id] = ( - objective if objective is not None else -1, - iteration_value if iteration_value is not None else -1, + objective if objective is not None else ( + [-1] * self._objective_metric.len + ), + iteration_value if iteration_value is not None else ( + [-1] * self._objective_metric.len + ), params ) @@ -1754,9 +1979,9 @@ class HyperParameterOptimizer(object): self._report_remaining_budget(task_logger, counter) if ( - self.optimizer.budget.compute_time.used - and self.optimizer.budget.compute_time.limit - and self.optimizer.budget.compute_time.used >= self.optimizer.budget.compute_time.limit + self.optimizer.budget.compute_time.used + and self.optimizer.budget.compute_time.limit + and self.optimizer.budget.compute_time.used >= self.optimizer.budget.compute_time.limit ): logger.warning( "Optimizer task reached compute time limit (used {:.2f} out of {:.2f})".format( @@ -1768,7 +1993,7 @@ class HyperParameterOptimizer(object): self._report_resources(task_logger, counter) # collect a summary of all the jobs and their final objective values cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \ - {j.task_id() for j in self.optimizer.get_running_jobs()} + {j.task_id() for j in self.optimizer.get_running_jobs()} self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title) self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter) @@ -1791,9 +2016,9 @@ class HyperParameterOptimizer(object): def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False): job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs) best_experiment = \ - (self.objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]), + (self._objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]), job_ids_sorted_by_objective[0]) \ - if job_ids_sorted_by_objective else (float('-inf'), None) + if job_ids_sorted_by_objective else ([float("-inf")], None) if force or cur_completed_jobs != set(completed_jobs.keys()): pairs = [] labels = [] @@ -1801,13 +2026,14 @@ class HyperParameterOptimizer(object): created_jobs_tasks = self.optimizer.get_created_jobs_tasks() id_status = {j_id: j_run.status() for j_id, j_run in created_jobs_tasks.items()} for i, (job_id, params) in enumerate(created_jobs.items()): - value = self.objective_metric.get_objective(job_id) + value = self._objective_metric.get_objective(job_id) if job_id in completed_jobs: if value != completed_jobs[job_id][0]: - iteration_value = self.objective_metric.get_current_raw_objective(job_id) + iteration_value = self._objective_metric.get_current_raw_objective(job_id) + iteration = [it_[0] if it_ else -1 for it_ in iteration_value] completed_jobs[job_id] = ( value, - iteration_value[0] if iteration_value else -1, + iteration, copy(dict(status=id_status.get(job_id), **params))) elif completed_jobs.get(job_id): completed_jobs[job_id] = (completed_jobs[job_id][0], @@ -1815,43 +2041,98 @@ class HyperParameterOptimizer(object): copy(dict(status=id_status.get(job_id), **params))) pairs.append((i, completed_jobs[job_id][0])) labels.append(str(completed_jobs[job_id][2])[1:-1]) - elif value is not None: + elif value is not None and all(v is not None for v in value): pairs.append((i, value)) labels.append(str(params)[1:-1]) - iteration_value = self.objective_metric.get_current_raw_objective(job_id) + iteration_value = self._objective_metric.get_current_raw_objective(job_id) + iteration = [it_[0] if it_ else -1 for it_ in iteration_value] completed_jobs[job_id] = ( value, - iteration_value[0] if iteration_value else -1, + iteration, copy(dict(status=id_status.get(job_id), **params))) # callback new experiment completed if self._experiment_completed_cb: - normalized_value = self.objective_metric.get_normalized_objective(job_id) - if normalized_value is not None and normalized_value > best_experiment[0]: + normalized_value = self._objective_metric.get_normalized_objective(job_id) + if self._objective_metric.len == 1 and normalized_value is not None and \ + normalized_value[0] > best_experiment[0][0]: + best_experiment = normalized_value, job_id + elif self._objective_metric.len != 1 and normalized_value is not None and \ + all(n is not None for n in normalized_value) and (best_experiment[0] == float("-inf") or + MultiObjective._dominates( + normalized_value, + best_experiment[0])): # noqa best_experiment = normalized_value, job_id c = completed_jobs[job_id] self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1]) if pairs: - print('Updating job performance summary plot/table') - - # update scatter plot - task_logger.report_scatter2d( - title='Optimization Objective', series=title, - scatter=pairs, iteration=0, labels=labels, - mode='markers', xaxis='job #', yaxis='objective') + print("Updating job performance summary plot/table") + if isinstance(title, list): + for i, title_ in enumerate(title): + # update scatter plot + task_logger.report_scatter2d( + title="Optimization Objective", + series=title_, + scatter=[(p[0], p[1][i]) for p in pairs], + iteration=0, + labels=labels, + mode="markers", + xaxis="job #", + yaxis="objective", + ) + else: + task_logger.report_scatter2d( + title="Optimization Objective", + series=title, + scatter=pairs, + iteration=0, + labels=labels, + mode="markers", + xaxis="job #", + yaxis="objective", + ) # update summary table - job_ids = list(completed_jobs.keys()) - job_ids_sorted_by_objective = sorted( - job_ids, key=lambda x: completed_jobs[x][0], reverse=bool(self.objective_metric.sign >= 0)) + job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs) # sort the columns except for 'objective', 'iteration' columns = list(sorted(set([c for k, v in completed_jobs.items() for c in v[2].keys()]))) - # add the index column (task id) and the first two columns 'objective', 'iteration' then the rest - table_values = [['task id', 'objective', 'iteration'] + columns] - table_values += \ - [([job, completed_jobs[job][0], completed_jobs[job][1]] + - [completed_jobs[job][2].get(c, '') for c in columns]) for job in job_ids_sorted_by_objective] + concat_iterations = True + if self._objective_metric.len == 1: + # add the index column (task id) and the first two columns 'objective', 'iteration' then the rest + table_values = [['task id', 'objective', 'iteration'] + columns] + table_values += \ + [([job, completed_jobs[job][0][0], completed_jobs[job][1][0]] + + [completed_jobs[job][2].get(c, '') for c in columns]) for job in job_ids_sorted_by_objective] + else: + table_values = ['task id'] + for job in job_ids_sorted_by_objective: + if not all(iter_ == completed_jobs[job][1][0] for iter_ in completed_jobs[job][1]): + concat_iterations = False + break + if concat_iterations: + for objective in self._objective_metric.objectives: + table_values.append(objective.title + "/" + objective.series) + table_values.append("iteration") + table_values = [table_values + columns] + for job in job_ids_sorted_by_objective: + entry = [job] + for val in completed_jobs[job][0]: + entry += [val] + entry += [completed_jobs[job][1][0]] + entry += [completed_jobs[job][2].get(c, '') for c in columns] + table_values.append(entry) + else: + for objective in self._objective_metric.objectives: + table_values.append(objective.title + "/" + objective.series) + table_values.append("iteration " + objective.title + "/" + objective.series) + table_values = [table_values + columns] + for job in job_ids_sorted_by_objective: + entry = [job] + for val, iter_ in zip(completed_jobs[job][0], completed_jobs[job][1]): + entry += [val, iter_] + entry += [completed_jobs[job][2].get(c, '') for c in columns] + table_values.append(entry) # create links for task id in the table task_link_template = self._task.get_output_log_web_page() \ @@ -1867,15 +2148,42 @@ class HyperParameterOptimizer(object): task_link_template.format(project=project_id, task=task_id), task_id) task_logger.report_table( - "summary", "job", 0, table_plot=table_values_with_links, - extra_layout={"title": "objective: {}".format(title)}) + "summary", + "job", + 0, + table_plot=table_values_with_links, + extra_layout={ + "title": "objective: {}".format(title if not isinstance(title, list) else ", ".join(title)) + }, + ) # Build parallel Coordinates: convert to columns, and reorder accordingly if len(table_values) > 1: table_values_columns = [[row[i] for row in table_values] for i in range(len(table_values[0]))] - table_values_columns = \ - [[table_values_columns[0][0]] + [c[:6]+'...' for c in table_values_columns[0][1:]]] + \ - table_values_columns[2:-1] + [[title]+table_values_columns[1][1:]] + if self._objective_metric.len == 1: + table_values_columns = \ + [[table_values_columns[0][0]] + [c[:6] + '...' for c in table_values_columns[0][1:]]] + \ + table_values_columns[2:-1] + [[title] + table_values_columns[1][1:]] + else: + if not concat_iterations: + new_table_values_columns = [] + handled = [] + for i in range(1, 2 * len(self._objective_metric.objectives), 2): + handled.append(i) + new_table_values_columns.append(table_values_columns[i]) + prefix = [] + for i in range(len(table_values_columns)): + if i in handled or table_values_columns[i][0] == "status": + continue + prefix.append(table_values_columns[i]) + table_values_columns = prefix + new_table_values_columns + else: + table_values_columns = ([table_values_columns[0]] + + table_values_columns[len(self._objective_metric.objectives) + 1:-1] + + table_values_columns[1:len(self._objective_metric.objectives) + 1] + ) + for i in range(len(table_values_columns[0]) - 1): + table_values_columns[0][i + 1] = table_values_columns[0][i + 1][:6] + "..." pcc_dims = [] for col in table_values_columns: # test if all values are numbers: @@ -1896,16 +2204,21 @@ class HyperParameterOptimizer(object): pcc_dims.append(d) # report parallel coordinates plotly_pcc = dict( - data=[dict( - type='parcoords', - line=dict(colorscale='Viridis', - reversescale=bool(self.objective_metric.sign >= 0), - color=table_values_columns[-1][1:]), - dimensions=pcc_dims)], - layout={}) - task_logger.report_plotly( - title='Parallel Coordinates', series='', - iteration=0, figure=plotly_pcc) + data=[ + dict( + type="parcoords", + line=dict( + colorscale="Viridis", + reversescale=not isinstance(self._objective_metric, MultiObjective) + and self._objective_metric.sign >= 0, + color=table_values_columns[-1][1:], + ), + dimensions=pcc_dims, + ) + ], + layout={}, + ) + task_logger.report_plotly(title="Parallel Coordinates", series="", iteration=0, figure=plotly_pcc) # upload summary as artifact if force: @@ -1937,21 +2250,20 @@ class HyperParameterOptimizer(object): if not completed_jobs: return - value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \ - (min, "min") - latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name) - if latest_completed: - val = value_func(obj_values) - task_logger.report_scalar( - title=title, - series=series_name, - iteration=counter, - value=val) - task_logger.report_scalar( - title=title, - series="last reported", - iteration=counter, - value=latest_completed) + objectives = self._objective_metric.objectives + if not isinstance(title, list): + title = [title] + for objective, title_ in zip(objectives, title): + value_func, series_name = (max, "max") if objective.get_objective_sign() > 0 else (min, "min") + latest_completed, obj_values = self._get_latest_completed_task_value( + completed_jobs, series_name, objective.title, objective.series + ) + if latest_completed: + val = value_func(obj_values) + task_logger.report_scalar(title=title_, series=series_name, iteration=counter, value=val) + task_logger.report_scalar( + title=title_, series="last reported", iteration=counter, value=latest_completed + ) def _report_resources(self, task_logger, iteration): # type: (Logger, int) -> () @@ -1990,7 +2302,7 @@ class HyperParameterOptimizer(object): title="resources", series=series, iteration=iteration, value=val) - def _get_latest_completed_task_value(self, cur_completed_jobs, series_name): + def _get_latest_completed_task_value(self, cur_completed_jobs, series_name, title, series): # type: (Set[str], str) -> (float, List[float]) completed_value = None latest_completed = None @@ -2003,16 +2315,15 @@ class HyperParameterOptimizer(object): continue completed_time = datetime_from_isoformat(response.response_data["task"]["completed"].partition("+")[0]) completed_time = completed_time.timestamp() - completed_values = self._get_last_value(response) + completed_values = self._get_last_value(response, title, series) obj_values.append(completed_values['max_value'] if series_name == "max" else completed_values['min_value']) if not latest_completed or completed_time > latest_completed: latest_completed = completed_time completed_value = completed_values['value'] return completed_value, obj_values - def _get_last_value(self, response): - metrics, title, series, values = ClearmlJob.get_metric_req_params(self.objective_metric.title, - self.objective_metric.series) + def _get_last_value(self, response, title, series): + metrics, title, series, values = ClearmlJob.get_metric_req_params(title, series) last_values = response.response_data["task"]['last_metrics'][title][series] return last_values @@ -2061,6 +2372,14 @@ class HyperParameterOptimizer(object): def __sort_jobs_by_objective(self, completed_jobs): if not completed_jobs: return [] - job_ids_sorted_by_objective = list(sorted( - completed_jobs.keys(), key=lambda x: completed_jobs[x][0], reverse=bool(self.objective_metric.sign >= 0))) - return job_ids_sorted_by_objective + if self._objective_metric.len != 1: + # noinspection PyProtectedMember + return self._objective_metric._sort_jobs_by_domination(completed_jobs) + else: + return list( + sorted( + completed_jobs.keys(), + key=lambda x: completed_jobs[x][0], + reverse=bool(self._objective_metric.objectives[0].sign >= 0), + ) + ) diff --git a/clearml/automation/optuna/optuna.py b/clearml/automation/optuna/optuna.py index a69b978f..df66b0e9 100644 --- a/clearml/automation/optuna/optuna.py +++ b/clearml/automation/optuna/optuna.py @@ -1,7 +1,7 @@ from time import sleep from typing import Any, Optional, Sequence -from ..optimization import Objective, SearchStrategy +from ..optimization import Objective, SearchStrategy, MultiObjective from ..parameters import (DiscreteParameterRange, Parameter, UniformIntegerParameterRange, UniformParameterRange, LogUniformParameterRange) from ...task import Task @@ -55,21 +55,29 @@ class OptunaObjective(object): if not is_pending: # noinspection PyProtectedMember iteration_value = self.optimizer._objective_metric.get_current_raw_objective(current_job) + if not iteration_value: + if not self.optimizer.monitor_job(current_job): + break + continue # make sure we skip None objective values - if iteration_value and iteration_value[1] is not None: - # update budget - trial.report(value=iteration_value[1], step=iteration_value[0]) + if not any(val is None or val[1] is None for val in iteration_value): + iteration = max(iv[0] for iv in iteration_value) + # trial pruning based on intermediate values not supported when using multi-objective + # noinspection PyProtectedMember + if self.optimizer._objective_metric.len == 1: + # update budget + trial.report(value=iteration_value[0][1], step=iteration) - # Handle pruning based on the intermediate value. - if trial.should_prune() and ( - not self.min_iteration_per_job or - iteration_value[0] >= self.min_iteration_per_job): - current_job.abort() - raise optuna.TrialPruned() + # Handle pruning based on the intermediate value. + if trial.should_prune() and ( + not self.min_iteration_per_job or + iteration >= self.min_iteration_per_job): + current_job.abort() + raise optuna.TrialPruned() # check if we exceeded this job budget - if self.max_iteration_per_job and iteration_value[0] >= self.max_iteration_per_job: + if self.max_iteration_per_job and iteration >= self.max_iteration_per_job: current_job.abort() break @@ -79,6 +87,10 @@ class OptunaObjective(object): # noinspection PyProtectedMember objective_metric = self.optimizer._objective_metric.get_objective(current_job) + # noinspection PyProtectedMember + if self.optimizer._objective_metric.len == 1: + objective_metric = objective_metric[0] + iteration_value = iteration_value[0] print('OptunaObjective result metric={}, iteration {}'.format(objective_metric, iteration_value)) # noinspection PyProtectedMember self.optimizer._current_jobs.remove(current_job) @@ -157,13 +169,22 @@ class OptimizerOptuna(SearchStrategy): This function returns only after optimization is completed or :meth:`stop` was called. """ - self._study = optuna.create_study( - direction="minimize" if self._objective_metric.get_objective_sign() < 0 else "maximize", - load_if_exists=False, - sampler=self._optuna_sampler, - pruner=self._optuna_pruner, - study_name=self._optimizer_task.id if self._optimizer_task else None, - ) + if self._objective_metric.len != 1: + self._study = optuna.create_study( + directions=["minimize" if sign_ < 0 else "maximize" for sign_ in self._objective_metric.get_objective_sign()], + load_if_exists=False, + sampler=self._optuna_sampler, + pruner=self._optuna_pruner, + study_name=self._optimizer_task.id if self._optimizer_task else None, + ) + else: + self._study = optuna.create_study( + direction="minimize" if self._objective_metric.get_objective_sign()[0] < 0 else "maximize", + load_if_exists=False, + sampler=self._optuna_sampler, + pruner=self._optuna_pruner, + study_name=self._optimizer_task.id if self._optimizer_task else None, + ) config_space = self._convert_hyper_parameters_to_optuna() self._objective = OptunaObjective( base_task_id=self._base_task_id,