Add multi objective optimization to HPO

This commit is contained in:
allegroai 2024-01-06 12:35:49 +02:00
parent d993fa75e4
commit f7b441ab49
3 changed files with 473 additions and 133 deletions

View File

@ -85,7 +85,7 @@ class _TrainsBandsterWorker(Worker):
self.optimizer.budget.iterations.update(self._current_job.task_id(), iteration_value[0])
# check if we exceeded this job budget
if iteration_value[0] >= self.budget_iteration_scale * budget:
if iteration_value[0][0] >= self.budget_iteration_scale * budget:
self._current_job.abort()
break
@ -95,7 +95,7 @@ class _TrainsBandsterWorker(Worker):
# noinspection PyProtectedMember
self.optimizer.budget.jobs.update(
self._current_job.task_id(),
float(iteration_value[0]) / self.optimizer._max_iteration_per_job)
float(iteration_value[0][0]) / self.optimizer._max_iteration_per_job)
result = {
# this is the a mandatory field to run hyperband
@ -104,7 +104,7 @@ class _TrainsBandsterWorker(Worker):
# can be used for any user-defined information - also mandatory
'info': self._current_job.task_id()
}
print('TrainsBandsterWorker result {}, iteration {}'.format(result, iteration_value))
print('TrainsBandsterWorker result {}, iteration {}'.format(result, iteration_value[0]))
# noinspection PyProtectedMember
self.optimizer._current_jobs.remove(self._current_job)
return result
@ -299,7 +299,7 @@ class OptimizerBOHB(SearchStrategy, RandomSeed):
sleep_interval=int(self.pool_period_minutes * 60),
budget_iteration_scale=budget_iteration_scale,
base_task_id=self._base_task_id,
objective=self._objective_metric,
objective=self._objective_metric.objectives[0],
queue_name=self._execution_queue,
nameserver='127.0.0.1', nameserver_port=self._nameserver_port, run_id=fake_run_id, id=i)
w.run(background=True)

View File

@ -1,13 +1,16 @@
import hashlib
import json
import six
import numpy as np
import functools
from copy import copy, deepcopy
from datetime import datetime
from itertools import product
from logging import getLogger
from threading import Thread, Event
from time import time
from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable
from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable, Tuple
from abc import ABC, abstractmethod
from .job import ClearmlJob, LocalClearmlJob
from .parameters import Parameter
@ -18,8 +21,33 @@ from ..task import Task
logger = getLogger('clearml.automation.optimization')
class _ObjectiveInterface(ABC):
@abstractmethod
def get_objective(self, task_id):
pass
class Objective(object):
@abstractmethod
def get_current_raw_objective(self, task):
pass
@abstractmethod
def get_objective_sign(self):
pass
@abstractmethod
def get_objective_metric(self):
pass
@abstractmethod
def get_normalized_objective(self, task_id):
pass
@abstractmethod
def get_top_tasks(self, top_k, optimizer_task_id=None, task_filter=None):
pass
class Objective(_ObjectiveInterface):
"""
Optimization ``Objective`` class to maximize / minimize over all experiments. This class will sample a specific
scalar from all experiments, and maximize / minimize over single scalar (i.e., title and series combination).
@ -53,7 +81,7 @@ class Objective(object):
self.series = series
assert order in ('min', 'max',)
# normalize value so we always look for the highest objective value
self.sign = -1 if (isinstance(order, str) and order.lower().strip() == 'min') else +1
self.sign = -1 if (isinstance(order, str) and order.lower().strip() == 'min') else 1
self._metric = None
self.extremum = extremum
@ -243,7 +271,7 @@ class Budget(object):
# type: () -> (Optional[float])
if self.limit is None or not self.current:
return None
return sum(self.current.values())/float(self.limit)
return sum(self.current.values()) / float(self.limit)
def __init__(self, jobs_limit, iterations_limit, compute_time_limit):
# type: (Optional[int], Optional[int], Optional[float]) -> ()
@ -467,7 +495,7 @@ class SearchStrategy(object):
if self.max_iteration_per_job:
iterations = self._get_job_iterations(job)
if iterations > 0:
if iterations and iterations > 0:
self.budget.iterations.update(job.task_id(), iterations)
if iterations > self.max_iteration_per_job:
abort_job = True
@ -512,12 +540,8 @@ class SearchStrategy(object):
:return: A list of Task objects, ordered by performance, where index 0 is the best performing Task.
"""
# noinspection PyProtectedMember
top_tasks = self._get_child_tasks(
parent_task_id=self._job_parent_id or self._base_task_id,
order_by=self._objective_metric._get_last_metrics_encode_field(),
additional_filters={'page_size': int(top_k), 'page': 0})
return top_tasks
return self._objective_metric.get_top_tasks(top_k=top_k,
optimizer_task_id=self._job_parent_id or self._base_task_id)
def get_top_experiments_id_metrics_pair(self, top_k, all_metrics=False, only_completed=False):
# type: (int, bool, bool) -> Sequence[(str, dict)]
@ -582,15 +606,17 @@ class SearchStrategy(object):
# noinspection PyProtectedMember
top_tasks_ids_metric = self._get_child_tasks_ids(
parent_task_id=self._job_parent_id or self._base_task_id,
order_by=self._objective_metric._get_last_metrics_encode_field(),
order_by=self._objective_metric._get_last_metrics_encode_field()[0],
additional_filters=additional_filters,
additional_fields=['last_metrics']
)
title, series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None)
title_series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None)
titles = [ts[0] for ts in title_series]
series = [ts[1] for ts in title_series]
return [(i, {'{}/{}'.format(v['metric'], v['variant']): v
for variant in metric.values() for v in variant.values()
if all_metrics or v['metric'] == title and v['variant'] == series}
if all_metrics or (v['metric'] in titles and v['variant'] in series)}
) for i, metric in top_tasks_ids_metric]
def get_top_experiments_details(
@ -669,15 +695,28 @@ class SearchStrategy(object):
# noinspection PyProtectedMember
top_tasks_ids_metric_params = self._get_child_tasks_ids(
parent_task_id=self._job_parent_id or self._base_task_id,
order_by=self._objective_metric._get_last_metrics_encode_field(),
order_by=self._objective_metric._get_last_metrics_encode_field()[
0] if self._objective_metric.len == 1 else None,
additional_filters=additional_filters,
additional_fields=['last_metrics', 'hyperparams']
)
if self._objective_metric.len != 1:
top_tasks_ids_metric_params_dict = {}
for task in top_tasks_ids_metric_params:
objective = self._objective_metric.get_objective(task[0])
if objective is None or any(o is None for o in objective):
continue
top_tasks_ids_metric_params_dict[task[0]] = (objective, task)
# noinspection PyProtectedMember
sorted_ids = self._objective_metric._sort_jobs_by_domination(top_tasks_ids_metric_params_dict)
top_tasks_ids_metric_params = [top_tasks_ids_metric_params_dict[s][1] for s in sorted_ids]
# get hp_parameters:
hp_params = set(p.name for p in self._hyper_parameters)
title, series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None)
title_series = self._objective_metric.get_objective_metric() if not all_metrics else (None, None)
titles = [ts[0] for ts in title_series]
series = [ts[1] for ts in title_series]
return [
{
'task_id': tid,
@ -688,19 +727,20 @@ class SearchStrategy(object):
},
'metrics': {
'{}/{}'.format(v['metric'], v['variant']): v for variant in metric.values()
for v in variant.values() if all_metrics or v['metric'] == title and v['variant'] == series
for v in variant.values() if all_metrics or v['metric'] in titles and v['variant'] in series
}
} for tid, metric, param_sections in top_tasks_ids_metric_params
]
def get_objective_metric(self):
# type: () -> (str, str)
# type: () -> Union[Tuple[str, str], List[Tuple[str, str]]]
"""
Return the metric title, series pair of the objective.
:return: (title, series)
"""
return self._objective_metric.get_objective_metric()
objective = self._objective_metric.get_objective_metric()
return objective[0] if self._objective_metric.len == 1 else objective
def helper_create_job(
self,
@ -823,7 +863,9 @@ class SearchStrategy(object):
def _get_job_iterations(self, job):
# type: (Union[ClearmlJob, Task]) -> int
iteration_value = self._objective_metric.get_current_raw_objective(job)
return iteration_value[0] if iteration_value else -1
if iteration_value is not None and any(iv is not None and iv[0] is not None for iv in iteration_value):
return max(iv[0] for iv in iteration_value if iv is not None)
return -1
@classmethod
def _get_child_tasks_ids(
@ -887,7 +929,7 @@ class SearchStrategy(object):
task_objects = Task._query_tasks(**task_filter)
if not additional_fields:
return [t.id for t in task_objects]
return [[t.id]+[getattr(t, f, None) for f in additional_fields] for t in task_objects]
return [[t.id] + [getattr(t, f, None) for f in additional_fields] for t in task_objects]
@classmethod
def _get_child_tasks(
@ -927,6 +969,146 @@ class SearchStrategy(object):
]
class MultiObjective(_ObjectiveInterface):
def __init__(self, title, series, order, extremum):
self.title = title
self.series = series
self.order = order
self.extremum = extremum
self.objectives = []
for title_, series_, order_, extremum_ in zip(title, series, order, extremum):
self.objectives.append(Objective(title=title_, series=series_, order=order_, extremum=extremum_))
self.len = len(self.objectives)
def get_objective(self, task_id):
# type: (Union[str, Task, ClearmlJob]) -> Optional[List[float]]
"""
Return a specific task scalar values based on the objective settings (title/series).
:param str task_id: The Task ID to retrieve scalar from (or ``ClearMLJob`` object).
:return: The scalar values.
"""
objective = [o.get_objective(task_id) for o in self.objectives]
if any(o is None for o in objective):
return None
return objective
def get_current_raw_objective(self, task):
# type: (Union[ClearmlJob, Task]) -> Optional[List[Tuple[int, float]]]
"""
Return the current raw value (without sign normalization) of each objective.
:param str task: The Task or Job to retrieve scalar from (or ``ClearmlJob`` object).
:return: List[Optional[Tuple(iteration, value)]]. None if the metric does not exist.
"""
objective = [o.get_current_raw_objective(task) for o in self.objectives]
if any(o is None for o in objective):
return None
return objective
def get_objective_sign(self):
# type: () -> List[float]
"""
Return the sign of the objectives.
- ``+1`` - If maximizing
- ``-1`` - If minimizing
:return: Objective function signs.
"""
return [o.get_objective_sign() for o in self.objectives]
def get_normalized_objective(self, task_id):
# type: (Union[str, Task, ClearmlJob]) -> Optional[List[float]]
"""
Return a normalized task scalar values based on the objective settings (title/series).
I.e. objective is always to maximize the returned value
:param str task_id: The Task ID to retrieve scalars from.
:return: Normalized scalar values.
"""
objective = [o.get_normalized_objective(task_id) for o in self.objectives]
if any(o is None for o in objective):
return None
return objective
def get_objective_metric(self):
# type: () -> List[(str, str)]
"""
Return the metric title, series pairs of the objectives.
:return: List[(title, series)]
"""
return [o.get_objective_metric() for o in self.objectives]
def get_top_tasks(self, top_k, optimizer_task_id=None, task_filter=None):
# type: (int, Optional[str], Optional[dict]) -> Sequence[Task]
"""
Return a list of Tasks of the top performing experiments.
If there is only one objective, the tasks are sorted based on that objective.
If there are multiple objectives, the tasks are sorted based on successive Pareto fronts.
A trial is located at the Pareto front if there are no trials that dominate the trial.
A trial dominates another trial if all its objective metrics are greater or equal than the other
trial's and there is at least one objective metric that is strictly greater than the other.
:param int top_k: The number of Tasks (experiments) to return.
:param str optimizer_task_id: Parent optimizer Task ID
:param dict task_filter: Optional task_filtering for the query
:return: A list of Task objects, ordered by performance, where index 0 is the best performing Task.
"""
if self.len == 1:
return self.objectives[0].get_top_tasks(
top_k=top_k,
optimizer_task_id=optimizer_task_id,
task_filter=task_filter
)
task_filter = deepcopy(task_filter) if task_filter else {}
if optimizer_task_id:
task_filter["parent"] = optimizer_task_id
# noinspection PyProtectedMember
tasks = Task._query_tasks(**task_filter)
candidates = {}
for task in tasks:
values = self.get_objective(task.id)
if values is None or any(v is None for v in values):
continue
candidates[task.id] = (values, task.id)
sorted_ids = self._sort_jobs_by_domination(candidates)
if not sorted_ids:
return []
return Task.get_tasks(task_ids=sorted_ids[:top_k])
def _get_last_metrics_encode_field(self):
# noinspection PyProtectedMember
return [o._get_last_metrics_encode_field() for o in self.objectives]
def _weakly_dominates_normalized(self, lhs, rhs):
return all(lhs_elem * o.sign >= rhs_elem * o.sign for lhs_elem, rhs_elem, o in zip(lhs, rhs, self.objectives))
@staticmethod
def _dominates(lhs, rhs):
return all(lhs_elem >= rhs_elem for lhs_elem, rhs_elem in zip(lhs, rhs)) and \
any(lhs_elem > rhs_elem for lhs_elem, rhs_elem in zip(lhs, rhs))
def _sort_jobs_by_domination(self, jobs):
job_ids = list(jobs.keys())
job_ids_sorted = []
while len(job_ids_sorted) < len(jobs.keys()):
have_result = False
for job_id in job_ids:
if all(self._weakly_dominates_normalized(jobs[job_id][0], jobs[other_job_id][0])
for other_job_id in job_ids):
have_result = True
job_ids_sorted.append(job_id)
if not have_result:
job_ids_sorted.extend(job_ids)
job_ids = [job_id for job_id in job_ids if job_id not in job_ids_sorted]
return job_ids_sorted
class GridSearch(SearchStrategy):
"""
Grid search strategy controller. Full grid sampling of every hyperparameter combination.
@ -1089,12 +1271,12 @@ class HyperParameterOptimizer(object):
self,
base_task_id, # type: str
hyper_parameters, # type: Sequence[Parameter]
objective_metric_title, # type: str
objective_metric_series, # type: str
objective_metric_sign='min', # type: str
objective_metric_title, # type: Union[str, Sequence[str]]
objective_metric_series, # type: Union[str, Sequence[str]]
objective_metric_sign="min", # type: Union[str, Sequence[str]]
optimizer_class=RandomSearch, # type: Union[SearchStrategy, type(SearchStrategy)]
max_number_of_concurrent_tasks=10, # type: int
execution_queue='default', # type: str
execution_queue="default", # type: str
optimization_time_limit=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
auto_connect_task=True, # type: Union[bool, Task]
@ -1109,10 +1291,14 @@ class HyperParameterOptimizer(object):
:param str base_task_id: The Task ID to be used as template experiment to optimize.
:param list hyper_parameters: The list of Parameter objects to optimize over.
:param str objective_metric_title: The Objective metric title to maximize / minimize (for example,
``validation``).
:param str objective_metric_series: The Objective metric series to maximize / minimize (for example, ``loss``).
:param str objective_metric_sign: The objective to maximize / minimize.
:param Union[str, Sequence[str]] objective_metric_title: The Objective metric title(s) to maximize / minimize
(for example, ``validation``, ``["validation", "loss"]``). If ``objective_metric_title`` is a sequence
(used to optimize multiple objectives at the same time), then ``objective_metric_series`` and
``objective_metric_sign`` have to be sequences of the same length. Each title will be matched
with the respective series and sign
:param Union[str, Sequence[str]] objective_metric_series: The Objective metric series to maximize / minimize
(for example, ``loss_series``, ``["validation_series", "loss_series"]``).
:param Union[str, Sequence[str]] objective_metric_sign: The objectives to maximize / minimize.
The values are:
- ``min`` - Minimize the last reported value for the specified title/series scalar.
@ -1190,7 +1376,24 @@ class HyperParameterOptimizer(object):
# make sure we stop all jobs
an_optimizer.stop()
"""
if type(objective_metric_title) is not type(objective_metric_series) or type(
objective_metric_title) is not type(objective_metric_sign):
raise TypeError(
"objective_metric_series, objective_metric_title and objective_metric_sign have to be of the same type"
" (strings if doing single objective optimization and lists of the same length"
" if doing multi-objective optimization)"
)
if isinstance(objective_metric_title, str):
objective_metric_series = [objective_metric_series]
objective_metric_title = [objective_metric_title]
objective_metric_sign = [objective_metric_sign]
if len(objective_metric_series) != len(objective_metric_title) or len(objective_metric_series) != len(
objective_metric_sign
):
raise ValueError(
"Can not use multiple objective optimization when objective_metric_series, objective_metric_title"
" or objective_metric_sign do not have the same length"
)
# create a new Task, if we do not have one already
self._task = auto_connect_task if isinstance(auto_connect_task, Task) else Task.current_task()
self._readonly_task = \
@ -1224,17 +1427,29 @@ class HyperParameterOptimizer(object):
self.hyper_parameters = hyper_parameters
self.max_number_of_concurrent_tasks = opts['max_number_of_concurrent_tasks']
self.execution_queue = opts['execution_queue']
self.objective_metric = Objective(
title=opts['objective_metric_title'], series=opts['objective_metric_series'],
order='min' if opts['objective_metric_sign'] in ('min', 'min_global') else 'max',
extremum=opts['objective_metric_sign'].endswith('_global'))
self._objective_metric = MultiObjective(
title=opts["objective_metric_title"],
series=opts["objective_metric_series"],
order=["min" if sign_ in ("min", "min_global") else "max" for sign_ in opts["objective_metric_sign"]],
extremum=[sign_.endswith("_global") for sign_ in opts["objective_metric_sign"]]
)
optuna_error_message = "Multi parameter optimization is only supported via Optuna. Please install Optuna via" + \
" `pip install optuna and set the `optimizer_class` to `clearml.automation.optuna.OptimizerOptuna`"
try:
if self._objective_metric.len != 1:
from .optuna import OptimizerOptuna
if optimizer_class != OptimizerOptuna:
raise ValueError(optuna_error_message)
except Exception:
raise ValueError(optuna_error_message)
# if optimizer_class is an instance, use it as is.
if type(optimizer_class) != type:
if not isinstance(optimizer_class, type):
self.optimizer = optimizer_class
else:
self.optimizer = optimizer_class(
base_task_id=opts['base_task_id'], hyper_parameters=hyper_parameters,
objective_metric=self.objective_metric, execution_queue=opts['execution_queue'],
objective_metric=self._objective_metric, execution_queue=opts['execution_queue'],
num_concurrent_workers=opts['max_number_of_concurrent_tasks'],
compute_time_limit=opts['compute_time_limit'], **opts.get('optimizer_kwargs', {}))
self.optimizer.set_optimizer_task(self._task)
@ -1607,9 +1822,9 @@ class HyperParameterOptimizer(object):
@classmethod
def get_optimizer_top_experiments(
cls,
objective_metric_title, # type: str
objective_metric_series, # type: str
objective_metric_sign, # type: str
objective_metric_title, # type: Union[str, List[str]]
objective_metric_series, # type: Union[str, List[str]]
objective_metric_sign, # type: Union[str, List[str]]
optimizer_task_id, # type: str
top_k, # type: int
):
@ -1636,6 +1851,12 @@ class HyperParameterOptimizer(object):
title=objective_metric_title, series=objective_metric_series, order=objective_metric_sign)
return objective.get_top_tasks(top_k=top_k, optimizer_task_id=optimizer_task_id)
@property
def objective_metric(self):
if self._objective_metric.len == 1:
return self._objective_metric.objectives[0]
return self._objective_metric
def _connect_args(self, optimizer_class=None, hyper_param_configuration=None, **kwargs):
# type: (SearchStrategy, dict, Any) -> (SearchStrategy, list, dict)
if not self._task or self._readonly_task:
@ -1705,8 +1926,8 @@ class HyperParameterOptimizer(object):
def _report_daemon(self):
# type: () -> ()
title, series = self.objective_metric.get_objective_metric()
title = '{}/{}'.format(title, series)
title_series = self._objective_metric.get_objective_metric()
title = ["{}/{}".format(ts[0], ts[1]) for ts in title_series]
counter = 0
completed_jobs = dict()
task_logger = None
@ -1722,10 +1943,14 @@ class HyperParameterOptimizer(object):
params["status"] = str(task.status)
# noinspection PyProtectedMember
iteration_value = task.get_last_iteration()
objective = self.objective_metric.get_objective(task)
objective = self._objective_metric.get_objective(task)
completed_jobs[task.id] = (
objective if objective is not None else -1,
iteration_value if iteration_value is not None else -1,
objective if objective is not None else (
[-1] * self._objective_metric.len
),
iteration_value if iteration_value is not None else (
[-1] * self._objective_metric.len
),
params
)
@ -1754,9 +1979,9 @@ class HyperParameterOptimizer(object):
self._report_remaining_budget(task_logger, counter)
if (
self.optimizer.budget.compute_time.used
and self.optimizer.budget.compute_time.limit
and self.optimizer.budget.compute_time.used >= self.optimizer.budget.compute_time.limit
self.optimizer.budget.compute_time.used
and self.optimizer.budget.compute_time.limit
and self.optimizer.budget.compute_time.used >= self.optimizer.budget.compute_time.limit
):
logger.warning(
"Optimizer task reached compute time limit (used {:.2f} out of {:.2f})".format(
@ -1768,7 +1993,7 @@ class HyperParameterOptimizer(object):
self._report_resources(task_logger, counter)
# collect a summary of all the jobs and their final objective values
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \
{j.task_id() for j in self.optimizer.get_running_jobs()}
{j.task_id() for j in self.optimizer.get_running_jobs()}
self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title)
self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter)
@ -1791,9 +2016,9 @@ class HyperParameterOptimizer(object):
def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False):
job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs)
best_experiment = \
(self.objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]),
(self._objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]),
job_ids_sorted_by_objective[0]) \
if job_ids_sorted_by_objective else (float('-inf'), None)
if job_ids_sorted_by_objective else ([float("-inf")], None)
if force or cur_completed_jobs != set(completed_jobs.keys()):
pairs = []
labels = []
@ -1801,13 +2026,14 @@ class HyperParameterOptimizer(object):
created_jobs_tasks = self.optimizer.get_created_jobs_tasks()
id_status = {j_id: j_run.status() for j_id, j_run in created_jobs_tasks.items()}
for i, (job_id, params) in enumerate(created_jobs.items()):
value = self.objective_metric.get_objective(job_id)
value = self._objective_metric.get_objective(job_id)
if job_id in completed_jobs:
if value != completed_jobs[job_id][0]:
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
iteration_value = self._objective_metric.get_current_raw_objective(job_id)
iteration = [it_[0] if it_ else -1 for it_ in iteration_value]
completed_jobs[job_id] = (
value,
iteration_value[0] if iteration_value else -1,
iteration,
copy(dict(status=id_status.get(job_id), **params)))
elif completed_jobs.get(job_id):
completed_jobs[job_id] = (completed_jobs[job_id][0],
@ -1815,43 +2041,98 @@ class HyperParameterOptimizer(object):
copy(dict(status=id_status.get(job_id), **params)))
pairs.append((i, completed_jobs[job_id][0]))
labels.append(str(completed_jobs[job_id][2])[1:-1])
elif value is not None:
elif value is not None and all(v is not None for v in value):
pairs.append((i, value))
labels.append(str(params)[1:-1])
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
iteration_value = self._objective_metric.get_current_raw_objective(job_id)
iteration = [it_[0] if it_ else -1 for it_ in iteration_value]
completed_jobs[job_id] = (
value,
iteration_value[0] if iteration_value else -1,
iteration,
copy(dict(status=id_status.get(job_id), **params)))
# callback new experiment completed
if self._experiment_completed_cb:
normalized_value = self.objective_metric.get_normalized_objective(job_id)
if normalized_value is not None and normalized_value > best_experiment[0]:
normalized_value = self._objective_metric.get_normalized_objective(job_id)
if self._objective_metric.len == 1 and normalized_value is not None and \
normalized_value[0] > best_experiment[0][0]:
best_experiment = normalized_value, job_id
elif self._objective_metric.len != 1 and normalized_value is not None and \
all(n is not None for n in normalized_value) and (best_experiment[0] == float("-inf") or
MultiObjective._dominates(
normalized_value,
best_experiment[0])): # noqa
best_experiment = normalized_value, job_id
c = completed_jobs[job_id]
self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1])
if pairs:
print('Updating job performance summary plot/table')
# update scatter plot
task_logger.report_scatter2d(
title='Optimization Objective', series=title,
scatter=pairs, iteration=0, labels=labels,
mode='markers', xaxis='job #', yaxis='objective')
print("Updating job performance summary plot/table")
if isinstance(title, list):
for i, title_ in enumerate(title):
# update scatter plot
task_logger.report_scatter2d(
title="Optimization Objective",
series=title_,
scatter=[(p[0], p[1][i]) for p in pairs],
iteration=0,
labels=labels,
mode="markers",
xaxis="job #",
yaxis="objective",
)
else:
task_logger.report_scatter2d(
title="Optimization Objective",
series=title,
scatter=pairs,
iteration=0,
labels=labels,
mode="markers",
xaxis="job #",
yaxis="objective",
)
# update summary table
job_ids = list(completed_jobs.keys())
job_ids_sorted_by_objective = sorted(
job_ids, key=lambda x: completed_jobs[x][0], reverse=bool(self.objective_metric.sign >= 0))
job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs)
# sort the columns except for 'objective', 'iteration'
columns = list(sorted(set([c for k, v in completed_jobs.items() for c in v[2].keys()])))
# add the index column (task id) and the first two columns 'objective', 'iteration' then the rest
table_values = [['task id', 'objective', 'iteration'] + columns]
table_values += \
[([job, completed_jobs[job][0], completed_jobs[job][1]] +
[completed_jobs[job][2].get(c, '') for c in columns]) for job in job_ids_sorted_by_objective]
concat_iterations = True
if self._objective_metric.len == 1:
# add the index column (task id) and the first two columns 'objective', 'iteration' then the rest
table_values = [['task id', 'objective', 'iteration'] + columns]
table_values += \
[([job, completed_jobs[job][0][0], completed_jobs[job][1][0]] +
[completed_jobs[job][2].get(c, '') for c in columns]) for job in job_ids_sorted_by_objective]
else:
table_values = ['task id']
for job in job_ids_sorted_by_objective:
if not all(iter_ == completed_jobs[job][1][0] for iter_ in completed_jobs[job][1]):
concat_iterations = False
break
if concat_iterations:
for objective in self._objective_metric.objectives:
table_values.append(objective.title + "/" + objective.series)
table_values.append("iteration")
table_values = [table_values + columns]
for job in job_ids_sorted_by_objective:
entry = [job]
for val in completed_jobs[job][0]:
entry += [val]
entry += [completed_jobs[job][1][0]]
entry += [completed_jobs[job][2].get(c, '') for c in columns]
table_values.append(entry)
else:
for objective in self._objective_metric.objectives:
table_values.append(objective.title + "/" + objective.series)
table_values.append("iteration " + objective.title + "/" + objective.series)
table_values = [table_values + columns]
for job in job_ids_sorted_by_objective:
entry = [job]
for val, iter_ in zip(completed_jobs[job][0], completed_jobs[job][1]):
entry += [val, iter_]
entry += [completed_jobs[job][2].get(c, '') for c in columns]
table_values.append(entry)
# create links for task id in the table
task_link_template = self._task.get_output_log_web_page() \
@ -1867,15 +2148,42 @@ class HyperParameterOptimizer(object):
task_link_template.format(project=project_id, task=task_id), task_id)
task_logger.report_table(
"summary", "job", 0, table_plot=table_values_with_links,
extra_layout={"title": "objective: {}".format(title)})
"summary",
"job",
0,
table_plot=table_values_with_links,
extra_layout={
"title": "objective: {}".format(title if not isinstance(title, list) else ", ".join(title))
},
)
# Build parallel Coordinates: convert to columns, and reorder accordingly
if len(table_values) > 1:
table_values_columns = [[row[i] for row in table_values] for i in range(len(table_values[0]))]
table_values_columns = \
[[table_values_columns[0][0]] + [c[:6]+'...' for c in table_values_columns[0][1:]]] + \
table_values_columns[2:-1] + [[title]+table_values_columns[1][1:]]
if self._objective_metric.len == 1:
table_values_columns = \
[[table_values_columns[0][0]] + [c[:6] + '...' for c in table_values_columns[0][1:]]] + \
table_values_columns[2:-1] + [[title] + table_values_columns[1][1:]]
else:
if not concat_iterations:
new_table_values_columns = []
handled = []
for i in range(1, 2 * len(self._objective_metric.objectives), 2):
handled.append(i)
new_table_values_columns.append(table_values_columns[i])
prefix = []
for i in range(len(table_values_columns)):
if i in handled or table_values_columns[i][0] == "status":
continue
prefix.append(table_values_columns[i])
table_values_columns = prefix + new_table_values_columns
else:
table_values_columns = ([table_values_columns[0]] +
table_values_columns[len(self._objective_metric.objectives) + 1:-1] +
table_values_columns[1:len(self._objective_metric.objectives) + 1]
)
for i in range(len(table_values_columns[0]) - 1):
table_values_columns[0][i + 1] = table_values_columns[0][i + 1][:6] + "..."
pcc_dims = []
for col in table_values_columns:
# test if all values are numbers:
@ -1896,16 +2204,21 @@ class HyperParameterOptimizer(object):
pcc_dims.append(d)
# report parallel coordinates
plotly_pcc = dict(
data=[dict(
type='parcoords',
line=dict(colorscale='Viridis',
reversescale=bool(self.objective_metric.sign >= 0),
color=table_values_columns[-1][1:]),
dimensions=pcc_dims)],
layout={})
task_logger.report_plotly(
title='Parallel Coordinates', series='',
iteration=0, figure=plotly_pcc)
data=[
dict(
type="parcoords",
line=dict(
colorscale="Viridis",
reversescale=not isinstance(self._objective_metric, MultiObjective)
and self._objective_metric.sign >= 0,
color=table_values_columns[-1][1:],
),
dimensions=pcc_dims,
)
],
layout={},
)
task_logger.report_plotly(title="Parallel Coordinates", series="", iteration=0, figure=plotly_pcc)
# upload summary as artifact
if force:
@ -1937,21 +2250,20 @@ class HyperParameterOptimizer(object):
if not completed_jobs:
return
value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \
(min, "min")
latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name)
if latest_completed:
val = value_func(obj_values)
task_logger.report_scalar(
title=title,
series=series_name,
iteration=counter,
value=val)
task_logger.report_scalar(
title=title,
series="last reported",
iteration=counter,
value=latest_completed)
objectives = self._objective_metric.objectives
if not isinstance(title, list):
title = [title]
for objective, title_ in zip(objectives, title):
value_func, series_name = (max, "max") if objective.get_objective_sign() > 0 else (min, "min")
latest_completed, obj_values = self._get_latest_completed_task_value(
completed_jobs, series_name, objective.title, objective.series
)
if latest_completed:
val = value_func(obj_values)
task_logger.report_scalar(title=title_, series=series_name, iteration=counter, value=val)
task_logger.report_scalar(
title=title_, series="last reported", iteration=counter, value=latest_completed
)
def _report_resources(self, task_logger, iteration):
# type: (Logger, int) -> ()
@ -1990,7 +2302,7 @@ class HyperParameterOptimizer(object):
title="resources", series=series,
iteration=iteration, value=val)
def _get_latest_completed_task_value(self, cur_completed_jobs, series_name):
def _get_latest_completed_task_value(self, cur_completed_jobs, series_name, title, series):
# type: (Set[str], str) -> (float, List[float])
completed_value = None
latest_completed = None
@ -2003,16 +2315,15 @@ class HyperParameterOptimizer(object):
continue
completed_time = datetime_from_isoformat(response.response_data["task"]["completed"].partition("+")[0])
completed_time = completed_time.timestamp()
completed_values = self._get_last_value(response)
completed_values = self._get_last_value(response, title, series)
obj_values.append(completed_values['max_value'] if series_name == "max" else completed_values['min_value'])
if not latest_completed or completed_time > latest_completed:
latest_completed = completed_time
completed_value = completed_values['value']
return completed_value, obj_values
def _get_last_value(self, response):
metrics, title, series, values = ClearmlJob.get_metric_req_params(self.objective_metric.title,
self.objective_metric.series)
def _get_last_value(self, response, title, series):
metrics, title, series, values = ClearmlJob.get_metric_req_params(title, series)
last_values = response.response_data["task"]['last_metrics'][title][series]
return last_values
@ -2061,6 +2372,14 @@ class HyperParameterOptimizer(object):
def __sort_jobs_by_objective(self, completed_jobs):
if not completed_jobs:
return []
job_ids_sorted_by_objective = list(sorted(
completed_jobs.keys(), key=lambda x: completed_jobs[x][0], reverse=bool(self.objective_metric.sign >= 0)))
return job_ids_sorted_by_objective
if self._objective_metric.len != 1:
# noinspection PyProtectedMember
return self._objective_metric._sort_jobs_by_domination(completed_jobs)
else:
return list(
sorted(
completed_jobs.keys(),
key=lambda x: completed_jobs[x][0],
reverse=bool(self._objective_metric.objectives[0].sign >= 0),
)
)

View File

@ -1,7 +1,7 @@
from time import sleep
from typing import Any, Optional, Sequence
from ..optimization import Objective, SearchStrategy
from ..optimization import Objective, SearchStrategy, MultiObjective
from ..parameters import (DiscreteParameterRange, Parameter, UniformIntegerParameterRange, UniformParameterRange,
LogUniformParameterRange)
from ...task import Task
@ -55,21 +55,29 @@ class OptunaObjective(object):
if not is_pending:
# noinspection PyProtectedMember
iteration_value = self.optimizer._objective_metric.get_current_raw_objective(current_job)
if not iteration_value:
if not self.optimizer.monitor_job(current_job):
break
continue
# make sure we skip None objective values
if iteration_value and iteration_value[1] is not None:
# update budget
trial.report(value=iteration_value[1], step=iteration_value[0])
if not any(val is None or val[1] is None for val in iteration_value):
iteration = max(iv[0] for iv in iteration_value)
# trial pruning based on intermediate values not supported when using multi-objective
# noinspection PyProtectedMember
if self.optimizer._objective_metric.len == 1:
# update budget
trial.report(value=iteration_value[0][1], step=iteration)
# Handle pruning based on the intermediate value.
if trial.should_prune() and (
not self.min_iteration_per_job or
iteration_value[0] >= self.min_iteration_per_job):
current_job.abort()
raise optuna.TrialPruned()
# Handle pruning based on the intermediate value.
if trial.should_prune() and (
not self.min_iteration_per_job or
iteration >= self.min_iteration_per_job):
current_job.abort()
raise optuna.TrialPruned()
# check if we exceeded this job budget
if self.max_iteration_per_job and iteration_value[0] >= self.max_iteration_per_job:
if self.max_iteration_per_job and iteration >= self.max_iteration_per_job:
current_job.abort()
break
@ -79,6 +87,10 @@ class OptunaObjective(object):
# noinspection PyProtectedMember
objective_metric = self.optimizer._objective_metric.get_objective(current_job)
# noinspection PyProtectedMember
if self.optimizer._objective_metric.len == 1:
objective_metric = objective_metric[0]
iteration_value = iteration_value[0]
print('OptunaObjective result metric={}, iteration {}'.format(objective_metric, iteration_value))
# noinspection PyProtectedMember
self.optimizer._current_jobs.remove(current_job)
@ -157,13 +169,22 @@ class OptimizerOptuna(SearchStrategy):
This function returns only after optimization is completed or :meth:`stop` was called.
"""
self._study = optuna.create_study(
direction="minimize" if self._objective_metric.get_objective_sign() < 0 else "maximize",
load_if_exists=False,
sampler=self._optuna_sampler,
pruner=self._optuna_pruner,
study_name=self._optimizer_task.id if self._optimizer_task else None,
)
if self._objective_metric.len != 1:
self._study = optuna.create_study(
directions=["minimize" if sign_ < 0 else "maximize" for sign_ in self._objective_metric.get_objective_sign()],
load_if_exists=False,
sampler=self._optuna_sampler,
pruner=self._optuna_pruner,
study_name=self._optimizer_task.id if self._optimizer_task else None,
)
else:
self._study = optuna.create_study(
direction="minimize" if self._objective_metric.get_objective_sign()[0] < 0 else "maximize",
load_if_exists=False,
sampler=self._optuna_sampler,
pruner=self._optuna_pruner,
study_name=self._optimizer_task.id if self._optimizer_task else None,
)
config_space = self._convert_hyper_parameters_to_optuna()
self._objective = OptunaObjective(
base_task_id=self._base_task_id,