clearml/trains/automation/hpbandster/bandster.py

241 lines
12 KiB
Python

from time import sleep, time
from ..parameters import DiscreteParameterRange, UniformParameterRange, RandomSeed, UniformIntegerParameterRange
from ..optimization import Objective, SearchStrategy
from ...task import Task
try:
from hpbandster.core.worker import Worker
from hpbandster.optimizers import BOHB
import hpbandster.core.nameserver as hpns
import ConfigSpace as CS
import ConfigSpace.hyperparameters as CSH
Task.add_requirements('hpbandster')
except ImportError:
raise ValueError("OptimizerBOHB requires 'hpbandster' package, it was not found\n"
"install with: pip install hpbandster")
class TrainsBandsterWorker(Worker):
def __init__(self, *args, optimizer, base_task_id, queue_name, objective,
sleep_interval=0, budget_iteration_scale=1., **kwargs):
super(TrainsBandsterWorker, self).__init__(*args, **kwargs)
self.optimizer = optimizer
self.base_task_id = base_task_id
self.queue_name = queue_name
self.objective = objective
self.sleep_interval = sleep_interval
self.budget_iteration_scale = budget_iteration_scale
self._current_job = None
def compute(self, config, budget, **kwargs):
"""
Simple example for a compute function
The loss is just a the config + some noise (that decreases with the budget)
For dramatization, the function can sleep for a given interval to emphasizes
the speed ups achievable with parallel workers.
Args:
config: dictionary containing the sampled configurations by the optimizer
budget: (float) amount of time/epochs/etc. the model can use to train.
We assume budget is iteration, as time might not be stable from machine to machine.
Returns:
dictionary with mandatory fields:
'loss' (scalar)
'info' (dict)
"""
self._current_job = self.optimizer.helper_create_job(self.base_task_id, parameter_override=config)
self.optimizer._current_jobs.append(self._current_job)
self._current_job.launch(self.queue_name)
iteration_value = None
while not self._current_job.is_stopped():
iteration_value = self.optimizer._objective_metric.get_current_raw_objective(self._current_job)
if iteration_value and iteration_value[0] >= self.budget_iteration_scale * budget:
self._current_job.abort()
break
sleep(self.sleep_interval)
result = {
# this is the a mandatory field to run hyperband
# remember: HpBandSter always minimizes!
'loss': float(self.objective.get_normalized_objective(self._current_job)*-1.),
# can be used for any user-defined information - also mandatory
'info': self._current_job.task_id()
}
print('TrainsBandsterWorker result {}, iteration {}'.format(result, iteration_value))
self.optimizer._current_jobs.remove(self._current_job)
return result
class OptimizerBOHB(SearchStrategy, RandomSeed):
def __init__(self, base_task_id, hyper_parameters, objective_metric,
execution_queue, num_concurrent_workers, min_iteration_per_job, max_iteration_per_job, total_max_jobs,
pool_period_min=2.0, max_job_execution_minutes=None, **bohb_kargs):
"""
Initialize a search strategy optimizer
:param str base_task_id: Task ID (str)
:param list hyper_parameters: list of Parameter objects to optimize over
:param Objective objective_metric: Objective metric to maximize / minimize
:param str execution_queue: execution queue to use for launching Tasks (experiments).
:param int num_concurrent_workers: Limit number of concurrent running machines
:param float min_iteration_per_job: minimum number of iterations for a job to run.
:param int max_iteration_per_job: number of iteration per job
:param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited
:param float pool_period_min: time in minutes between two consecutive pools
:param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted
:param ** bohb_kargs: arguments passed directly yo the BOHB object
"""
super(OptimizerBOHB, self).__init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, max_job_execution_minutes=max_job_execution_minutes,
total_max_jobs=total_max_jobs)
self._max_iteration_per_job = max_iteration_per_job
self._min_iteration_per_job = min_iteration_per_job
self._bohb_kwargs = bohb_kargs or {}
self._param_iterator = None
self._namespace = None
self._bohb = None
self._res = None
def set_optimization_args(self, eta=3, min_budget=None, max_budget=None,
min_points_in_model=None, top_n_percent=15,
num_samples=None, random_fraction=1/3., bandwidth_factor=3,
min_bandwidth=1e-3):
"""
Defaults copied from BOHB constructor, see details in BOHB.__init__
BOHB performs robust and efficient hyperparameter optimization
at scale by combining the speed of Hyperband searches with the
guidance and guarantees of convergence of Bayesian
Optimization. Instead of sampling new configurations at random,
BOHB uses kernel density estimators to select promising candidates.
.. highlight:: none
For reference: ::
@InProceedings{falkner-icml-18,
title = {{BOHB}: Robust and Efficient Hyperparameter Optimization at Scale},
author = {Falkner, Stefan and Klein, Aaron and Hutter, Frank},
booktitle = {Proceedings of the 35th International Conference on Machine Learning},
pages = {1436--1445},
year = {2018},
}
Parameters
----------
eta : float (3)
In each iteration, a complete run of sequential halving is executed. In it,
after evaluating each configuration on the same subset size, only a fraction of
1/eta of them 'advances' to the next round.
Must be greater or equal to 2.
min_budget : float (0.01)
The smallest budget to consider. Needs to be positive!
max_budget : float (1)
The largest budget to consider. Needs to be larger than min_budget!
The budgets will be geometrically distributed
:math:`a^2 + b^2 = c^2 \sim \eta^k` for :math:`k\in [0, 1, ... , num\_subsets - 1]`.
min_points_in_model: int (None)
number of observations to start building a KDE. Default 'None' means
dim+1, the bare minimum.
top_n_percent: int (15)
percentage ( between 1 and 99, default 15) of the observations that are considered good.
num_samples: int (64)
number of samples to optimize EI (default 64)
random_fraction: float (1/3.)
fraction of purely random configurations that are sampled from the
prior without the model.
bandwidth_factor: float (3.)
to encourage diversity, the points proposed to optimize EI, are sampled
from a 'widened' KDE where the bandwidth is multiplied by this factor (default: 3)
min_bandwidth: float (1e-3)
to keep diversity, even when all (good) samples have the same value for one of the parameters,
a minimum bandwidth (Default: 1e-3) is used instead of zero.
"""
if min_budget:
self._bohb_kwargs['min_budget'] = min_budget
if max_budget:
self._bohb_kwargs['max_budget'] = max_budget
if num_samples:
self._bohb_kwargs['num_samples'] = num_samples
self._bohb_kwargs['eta'] = eta
self._bohb_kwargs['min_points_in_model'] = min_points_in_model
self._bohb_kwargs['top_n_percent'] = top_n_percent
self._bohb_kwargs['random_fraction'] = random_fraction
self._bohb_kwargs['bandwidth_factor'] = bandwidth_factor
self._bohb_kwargs['min_bandwidth'] = min_bandwidth
def start(self):
# Step 1: Start a nameserver
fake_run_id = 'OptimizerBOHB_{}'.format(time())
self._namespace = hpns.NameServer(run_id=fake_run_id, host='127.0.0.1', port=None)
self._namespace.start()
# we have to scale the budget to the iterations per job, otherwise numbers might be too high
budget_iteration_scale = self._max_iteration_per_job
# Step 2: Start the workers
workers = []
for i in range(self._num_concurrent_workers):
w = TrainsBandsterWorker(optimizer=self,
sleep_interval=int(self.pool_period_minutes*60),
budget_iteration_scale=budget_iteration_scale,
base_task_id=self._base_task_id,
objective=self._objective_metric,
queue_name=self._execution_queue,
nameserver='127.0.0.1', run_id=fake_run_id, id=i)
w.run(background=True)
workers.append(w)
# Step 3: Run an optimizer
self._bohb = BOHB(configspace=self.convert_hyper_parameters_to_cs(),
run_id=fake_run_id,
num_samples=self.total_max_jobs,
min_budget=float(self._min_iteration_per_job)/float(self._max_iteration_per_job),
**self._bohb_kwargs)
self._res = self._bohb.run(n_iterations=self.total_max_jobs, min_n_workers=self._num_concurrent_workers)
# Step 4: if we get here, Shutdown
self.stop()
def stop(self):
# After the optimizer run, we must shutdown the master and the nameserver.
self._bohb.shutdown(shutdown_workers=True)
self._namespace.shutdown()
if not self._res:
return
# Step 5: Analysis
id2config = self._res.get_id2config_mapping()
incumbent = self._res.get_incumbent_id()
all_runs = self._res.get_all_runs()
# Step 6: Print Analysis
print('Best found configuration:', id2config[incumbent]['config'])
print('A total of {} unique configurations where sampled.'.format(len(id2config.keys())))
print('A total of {} runs where executed.'.format(len(self._res.get_all_runs())))
print('Total budget corresponds to {:.1f} full function evaluations.'.format(
sum([r.budget for r in all_runs]) / self._bohb_kwargs.get('max_budget', 1.0)))
print('Total budget corresponds to {:.1f} full function evaluations.'.format(
sum([r.budget for r in all_runs]) / self._bohb_kwargs.get('max_budget', 1.0)))
print('The run took {:.1f} seconds to complete.'.format(
all_runs[-1].time_stamps['finished'] - all_runs[0].time_stamps['started']))
def convert_hyper_parameters_to_cs(self):
cs = CS.ConfigurationSpace(seed=self._seed)
for p in self._hyper_parameters:
if isinstance(p, UniformParameterRange):
hp = CSH.UniformFloatHyperparameter(
p.name, lower=p.min_value, upper=p.max_value, log=False, q=p.step_size)
elif isinstance(p, UniformIntegerParameterRange):
hp = CSH.UniformIntegerHyperparameter(
p.name, lower=p.min_value, upper=p.max_value, log=False, q=p.step_size)
elif isinstance(p, DiscreteParameterRange):
hp = CSH.CategoricalHyperparameter(p.name, choices=p.values)
else:
raise ValueError("HyperParameter type {} not supported yet with OptimizerBOHB".format(type(p)))
cs.add_hyperparameter(hp)
return cs