Change OptimizerBOHB argument max_job_execution_minutes to time_limit_per_job. Add budget remaining reporting

This commit is contained in:
allegroai 2020-05-31 13:28:03 +03:00
parent 2089dde55a
commit 43dac458df
5 changed files with 199 additions and 69 deletions

View File

@ -31,11 +31,13 @@ def job_complete_callback(
# Connecting TRAINS
task = Task.init(project_name='Hyper-Parameter Optimization',
task_name='Automatic Hyper-Parameter Optimization',
task_type=Task.TaskTypes.optimizer,
reuse_last_task_id=False)
# experiment template to optimize in the hyper-parameter optimization
args = {
'template_task_id': None,
'run_as_service': False,
}
args = task.connect(args)
@ -70,9 +72,9 @@ an_optimizer = HyperParameterOptimizer(
optimizer_class=Our_SearchStrategy,
# Select an execution queue to schedule the experiments for execution
execution_queue='default',
# Limit the execution time of a single experiment
# Optional: Limit the execution time of a single experiment, in minutes.
# (this is optional, and if using OptimizerBOHB, it is ignored)
max_job_execution_minutes=10.,
time_limit_per_job=10.,
# Check the experiments every 6 seconds is way too often, we should probably set it to 5 min,
# assuming a single experiment is usually hours...
pool_period_min=0.1,
@ -83,11 +85,17 @@ an_optimizer = HyperParameterOptimizer(
# This is only applicable for OptimizerBOHB and ignore by the rest
# set the minimum number of iterations for an experiment, before early stopping
min_iteration_per_job=10,
# This is only applicable for OptimizerBOHB and ignore by the rest
# set the maximum number of iterations for an experiment to execute
# Set the maximum number of iterations for an experiment to execute
# (This is optional, unless using OptimizerBOHB where this is a must)
max_iteration_per_job=30,
)
# if we are running as a service, just enqueue ourselves into the services queue and let it run the optimization
if args['run_as_service']:
# if this code is executed by `trains-agent` the function call does nothing.
# if executed locally, the local process will be terminated, and a remote copy will be executed instead
task.execute_remotely(queue_name='services', exit_process=True)
# report every 12 seconds, this is way too often, but we are testing here J
an_optimizer.set_report_period(0.2)
# start the optimization process, callback function to be called every time an experiment is completed

View File

@ -67,14 +67,35 @@ class _TrainsBandsterWorker(Worker):
self.optimizer._current_jobs.append(self._current_job)
self._current_job.launch(self.queue_name)
iteration_value = None
is_pending = True
while not self._current_job.is_stopped():
if is_pending and not self._current_job.is_pending():
is_pending = False
# noinspection PyProtectedMember
self.optimizer.budget.jobs.update(
self._current_job.task_id(),
float(self.optimizer._min_iteration_per_job)/self.optimizer._max_iteration_per_job)
# noinspection PyProtectedMember
iteration_value = self.optimizer._objective_metric.get_current_raw_objective(self._current_job)
if iteration_value and iteration_value[0] >= self.budget_iteration_scale * budget:
self._current_job.abort()
break
if iteration_value:
# update budget
self.optimizer.budget.iterations.update(self._current_job.task_id(), iteration_value[0])
# check if we exceeded this job budget
if iteration_value[0] >= self.budget_iteration_scale * budget:
self._current_job.abort()
break
sleep(self.sleep_interval)
if iteration_value:
# noinspection PyProtectedMember
self.optimizer.budget.jobs.update(
self._current_job.task_id(),
float(iteration_value[0]) / self.optimizer._max_iteration_per_job)
result = {
# this is the a mandatory field to run hyperband
# remember: HpBandSter always minimizes!
@ -100,11 +121,11 @@ class OptimizerBOHB(SearchStrategy, RandomSeed):
max_iteration_per_job, # type: Optional[int]
total_max_jobs, # type: Optional[int]
pool_period_min=2., # type: float
max_job_execution_minutes=None, # type: Optional[float]
time_limit_per_job=None, # type: Optional[float]
local_port=9090, # type: int
**bohb_kwargs, # type: Any
):
# type: (...) -> OptimizerBOHB
# type: (...) -> None
"""
Initialize a BOHB search strategy optimizer
BOHB performs robust and efficient hyperparameter optimization at scale by combining
@ -140,15 +161,24 @@ class OptimizerBOHB(SearchStrategy, RandomSeed):
This means more than total_max_jobs could be created, as long as the cumulative iterations
(summed over all created jobs) will not exceed `max_iteration_per_job * total_max_jobs`
:param float pool_period_min: time in minutes between two consecutive pools
:param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted
:param float time_limit_per_job: Optional, maximum execution time per single job in minutes,
when time limit is exceeded job is aborted
:param int local_port: default port 9090 tcp, this is a must for the BOHB workers to communicate, even locally.
:param bohb_kwargs: arguments passed directly yo the BOHB object
"""
if not max_iteration_per_job or not min_iteration_per_job or not total_max_jobs:
raise ValueError(
"OptimizerBOHB is missing a defined budget.\n"
"The following arguments must be defined: "
"max_iteration_per_job, min_iteration_per_job, total_max_jobs.\n"
"Maximum optimization budget is: max_iteration_per_job * total_max_jobs\n"
)
super(OptimizerBOHB, self).__init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, max_job_execution_minutes=max_job_execution_minutes,
total_max_jobs=total_max_jobs)
pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job,
max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs)
self._max_iteration_per_job = max_iteration_per_job
self._min_iteration_per_job = min_iteration_per_job
self._bohb_kwargs = bohb_kwargs or {}

View File

@ -21,7 +21,7 @@ class TrainsJob(object):
parent=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> TrainsJob
# type: (...) -> ()
"""
Create a new Task based in a base_task_id with a different set of parameters
@ -131,7 +131,7 @@ class TrainsJob(object):
return self.task.id
def status(self):
# type: () -> Task.TaskStatusEnum
# type: () -> str
"""
Return the Job Task current status, see Task.TaskStatusEnum
@ -229,7 +229,7 @@ class _JobStub(object):
tags=None, # type: Optional[Sequence[str]]
**kwargs # type: Any
):
# type: (...) -> _JobStub
# type: (...) -> ()
self.task = None
self.base_task_id = base_task_id
self.parameter_override = parameter_override

View File

@ -33,7 +33,7 @@ class Objective(object):
"""
def __init__(self, title, series, order='max', extremum=False):
# type: (str, str, Union['max', 'min'], bool) -> Objective
# type: (str, str, Union['max', 'min'], bool) -> ()
"""
Construct objective object that will return the scalar value for a specific task ID
@ -161,6 +161,49 @@ class Objective(object):
('min_value' if self.sign < 0 else 'max_value') if self.extremum else 'value')
class Budget(object):
class Field(object):
def __init__(self, limit=None):
# type: (Optional[float]) -> ()
self.limit = limit
self.current = {}
def update(self, uid, value):
# type: (Union[str, int], float) -> ()
if value is not None:
try:
self.current[uid] = float(value)
except (TypeError, ValueError):
pass
@property
def used(self):
# type: () -> (Optional[float])
if self.limit is None or not self.current:
return None
return sum(self.current.values())/float(self.limit)
def __init__(self, jobs_limit, iterations_limit, compute_time_limit):
# type: (Optional[int], Optional[int], Optional[float]) -> ()
self.jobs = self.Field(jobs_limit)
self.iterations = self.Field(iterations_limit)
self.compute_time = self.Field(compute_time_limit)
def to_dict(self):
# type: () -> (Mapping[Union['jobs', 'iterations', 'compute_time'], Mapping[Union['limit', 'used'], float]])
current_budget = {}
jobs = self.jobs.used
if jobs:
current_budget['jobs'] = {'limit': self.jobs.limit, 'used': jobs}
iterations = self.iterations.used
if iterations:
current_budget['iterations'] = {'limit': self.iterations.limit, 'used': iterations}
compute_time = self.compute_time.used
if compute_time:
current_budget['compute_time'] = {'limit': self.compute_time.limit, 'used': compute_time}
return current_budget
class SearchStrategy(object):
"""
Base Search strategy class, inherit to implement your custom strategy
@ -176,11 +219,12 @@ class SearchStrategy(object):
execution_queue, # type: str
num_concurrent_workers, # type: int
pool_period_min=2., # type: float
max_job_execution_minutes=None, # type: Optional[float]
time_limit_per_job=None, # type: Optional[float]
max_iteration_per_job=None, # type: Optional[int]
total_max_jobs=None, # type: Optional[int]
**_ # type: Any
):
# type: (...) -> SearchStrategy
# type: (...) -> ()
"""
Initialize a search strategy optimizer
@ -190,8 +234,11 @@ class SearchStrategy(object):
:param str execution_queue: execution queue to use for launching Tasks (experiments).
:param int num_concurrent_workers: Limit number of concurrent running machines
:param float pool_period_min: time in minutes between two consecutive pools
:param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted
:param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited
:param float time_limit_per_job: Optional, maximum execution time per single job in minutes,
when time limit is exceeded job is aborted
:param int max_iteration_per_job: Optional, maximum iterations (of the objective metric)
per single job, when exceeded job is aborted.
:param int total_max_jobs: total maximum jobs for the optimization process. Default None, unlimited
"""
super(SearchStrategy, self).__init__()
self._base_task_id = base_task_id
@ -200,15 +247,24 @@ class SearchStrategy(object):
self._execution_queue = execution_queue
self._num_concurrent_workers = num_concurrent_workers
self.pool_period_minutes = pool_period_min
self.max_job_execution_minutes = max_job_execution_minutes
self.time_limit_per_job = time_limit_per_job
self.max_iteration_per_job = max_iteration_per_job
self.total_max_jobs = total_max_jobs
self._stop_event = Event()
self._current_jobs = []
self._pending_jobs = []
self._num_jobs = 0
self._job_parent_id = None
self._created_jobs_ids = {}
self._naming_function = None
self._job_project = {}
self.budget = Budget(
jobs_limit=self.total_max_jobs,
compute_time_limit=self.total_max_jobs * self.time_limit_per_job if
self.time_limit_per_job and self.total_max_jobs else None,
iterations_limit=self.total_max_jobs * self.max_iteration_per_job if
self.max_iteration_per_job and self.total_max_jobs else None
)
self._validate_base_task()
def start(self):
@ -256,6 +312,15 @@ class SearchStrategy(object):
self._current_jobs = updated_jobs
pending_jobs = []
for job in self._pending_jobs:
if job.is_pending():
pending_jobs.append(job)
else:
self.budget.jobs.update(job.task_id(), 1)
self._pending_jobs = pending_jobs
free_workers = self._num_concurrent_workers - len(self._current_jobs)
# do not create more jobs if we hit the limit
@ -270,6 +335,7 @@ class SearchStrategy(object):
self._num_jobs += 1
new_job.launch(self._execution_queue)
self._current_jobs.append(new_job)
self._pending_jobs.append(new_job)
return bool(self._current_jobs)
@ -287,13 +353,34 @@ class SearchStrategy(object):
def monitor_job(self, job):
# type: (TrainsJob) -> bool
"""
Abstract helper function, not a must to implement, default use in process_step default implementation
Helper function, not a must to implement, default use in process_step default implementation
Check if the job needs to be aborted or already completed
if return False, the job was aborted / completed, and should be taken off the current job list
If there is a budget limitation,
this call should update self.budget.time.update() / self.budget.iterations.update()
:param TrainsJob job: a TrainsJob object to monitor
:return bool: If False, job is no longer relevant
"""
abort_job = False
if self.time_limit_per_job:
elapsed = job.elapsed() / 60.
self.budget.compute_time.update(job.task_id(), elapsed)
if elapsed > self.time_limit_per_job:
abort_job = True
if self.max_iteration_per_job:
iterations = self._get_job_iterations(job)
self.budget.iterations.update(job.task_id(), iterations)
if iterations > self.max_iteration_per_job:
abort_job = True
if abort_job:
job.abort()
return False
return not job.is_stopped()
def get_running_jobs(self):
@ -433,6 +520,11 @@ class SearchStrategy(object):
return self._job_project.get(parent_task_id)
def _get_job_iterations(self, job):
# type: (Union[TrainsJob, Task]) -> int
iteration_value = self._objective_metric.get_current_raw_objective(job)
return iteration_value[0] if iteration_value else -1
@classmethod
def _get_child_tasks(
cls,
@ -494,11 +586,12 @@ class GridSearch(SearchStrategy):
execution_queue, # type: str
num_concurrent_workers, # type: int
pool_period_min=2., # type: float
max_job_execution_minutes=None, # type: Optional[float]
time_limit_per_job=None, # type: Optional[float]
max_iteration_per_job=None, # type: Optional[int]
total_max_jobs=None, # type: Optional[int]
**_ # type: Any
):
# type: (...) -> GridSearch
# type: (...) -> ()
"""
Initialize a grid search optimizer
@ -508,14 +601,17 @@ class GridSearch(SearchStrategy):
:param str execution_queue: execution queue to use for launching Tasks (experiments).
:param int num_concurrent_workers: Limit number of concurrent running machines
:param float pool_period_min: time in minutes between two consecutive pools
:param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted
:param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited
:param float time_limit_per_job: Optional, maximum execution time per single job in minutes,
when time limit is exceeded job is aborted
:param int max_iteration_per_job: maximum iterations (of the objective metric)
per single job, when exceeded job is aborted.
:param int total_max_jobs: total maximum jobs for the optimization process. Default None, unlimited
"""
super(GridSearch, self).__init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, max_job_execution_minutes=max_job_execution_minutes,
total_max_jobs=total_max_jobs, **_)
pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job,
max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs, **_)
self._param_iterator = None
def create_job(self):
@ -533,21 +629,6 @@ class GridSearch(SearchStrategy):
return self.helper_create_job(base_task_id=self._base_task_id, parameter_override=parameters)
def monitor_job(self, job):
# type: (TrainsJob) -> bool
"""
Check if the job needs to be aborted or already completed
if return False, the job was aborted / completed, and should be taken off the current job list
:param TrainsJob job: a TrainsJob object to monitor
:return: boolean, If False, job is no longer relevant
"""
if self.max_job_execution_minutes and job.elapsed() / 60. > self.max_job_execution_minutes:
job.abort()
return False
return not job.is_stopped()
def _next_configuration(self):
# type: () -> Mapping[str, str]
def param_iterator_fn():
@ -577,11 +658,12 @@ class RandomSearch(SearchStrategy):
execution_queue, # type: str
num_concurrent_workers, # type: int
pool_period_min=2., # type: float
max_job_execution_minutes=None, # type: Optional[float]
time_limit_per_job=None, # type: Optional[float]
max_iteration_per_job=None, # type: Optional[int]
total_max_jobs=None, # type: Optional[int]
**_ # type: Any
):
# type: (...) -> RandomSearch
# type: (...) -> ()
"""
Initialize a random search optimizer
@ -591,14 +673,17 @@ class RandomSearch(SearchStrategy):
:param str execution_queue: execution queue to use for launching Tasks (experiments).
:param int num_concurrent_workers: Limit number of concurrent running machines
:param float pool_period_min: time in minutes between two consecutive pools
:param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted
:param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited
:param float time_limit_per_job: Optional, maximum execution time per single job in minutes,
when time limit is exceeded job is aborted
:param int max_iteration_per_job: maximum iterations (of the objective metric)
per single job, when exceeded job is aborted.
:param int total_max_jobs: total maximum jobs for the optimization process. Default None, unlimited
"""
super(RandomSearch, self).__init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min,
max_job_execution_minutes=max_job_execution_minutes, total_max_jobs=total_max_jobs, **_)
pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job,
max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs, **_)
self._hyper_parameters_collection = set()
def create_job(self):
@ -631,21 +716,6 @@ class RandomSearch(SearchStrategy):
return self.helper_create_job(base_task_id=self._base_task_id, parameter_override=parameters)
def monitor_job(self, job):
# type: (TrainsJob) -> bool
"""
Check if the job needs to be aborted or already completed
if return False, the job was aborted / completed, and should be taken off the current job list
:param TrainsJob job: a TrainsJob object to monitor
:return: boolean, If False, job is no longer relevant
"""
if self.max_job_execution_minutes and job.elapsed() / 60. > self.max_job_execution_minutes:
job.abort()
return False
return not job.is_stopped()
class HyperParameterOptimizer(object):
"""
@ -669,7 +739,7 @@ class HyperParameterOptimizer(object):
always_create_task=False, # type: bool
**optimizer_kwargs # type: Any
):
# type: (...) -> HyperParameterOptimizer
# type: (...) -> ()
"""
Create a new hyper-parameter controller. The newly created object will launch and monitor the new experiments.
@ -717,7 +787,7 @@ class HyperParameterOptimizer(object):
objective_metric_sign='min',
max_number_of_concurrent_tasks=5,
optimizer_class=RandomSearch,
execution_queue='workers', max_job_execution_minutes=0.1, pool_period_min=0.1)
execution_queue='workers', time_limit_per_job=120, pool_period_min=0.2)
# This will automatically create and print the optimizer new task id
# for later use. if a Task was already created, it will use it.
@ -1093,6 +1163,25 @@ class HyperParameterOptimizer(object):
title=title, series='{}{}'.format(series, machine_id),
iteration=counter, value=value)
# noinspection PyBroadException
try:
budget = self.optimizer.budget.to_dict()
except Exception:
budget = {}
# report remaining budget
for budget_part, value in budget.items():
task_logger.report_scalar(
title='remaining budget', series='{} %'.format(budget_part),
iteration=counter, value=round(100 - value['used'] * 100., ndigits=1))
if self.optimization_timeout and self.optimization_start_time:
task_logger.report_scalar(
title='remaining budget', series='time %',
iteration=counter,
value=round(100 - (100. * (time() - self.optimization_start_time) /
(self.optimization_timeout - self.optimization_start_time)), ndigits=1)
)
# collect a summary of all the jobs and their final objective values
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - running_job_ids
if cur_completed_jobs != set(completed_jobs.keys()):

View File

@ -39,7 +39,7 @@ class Parameter(RandomSeed):
"""
def __init__(self, name):
# type: (Optional[str]) -> Parameter
# type: (Optional[str]) -> ()
"""
Create a new Parameter for hyper-parameter optimization
@ -73,6 +73,7 @@ class Parameter(RandomSeed):
:return dict: dict representation of the object (serialization)
"""
serialize = {'__class__': str(self.__class__).split('.')[-1][:-2]}
# noinspection PyCallingNonCallable
serialize.update(dict(((k, v.to_dict() if hasattr(v, 'to_dict') else v) for k, v in self.__dict__.items())))
return serialize
@ -111,7 +112,7 @@ class UniformParameterRange(Parameter):
step_size=None, # type: Optional[float]
include_max_value=True # type: bool
):
# type: (...) -> UniformParameterRange
# type: (...) -> ()
"""
Create a parameter to be sampled by the SearchStrategy
@ -160,6 +161,7 @@ class UniformIntegerParameterRange(Parameter):
"""
def __init__(self, name, min_value, max_value, step_size=1, include_max_value=True):
# type: (str, int, int, int, bool) -> ()
"""
Create a parameter to be sampled by the SearchStrategy
@ -206,6 +208,7 @@ class DiscreteParameterRange(Parameter):
"""
def __init__(self, name, values=()):
# type: (str, Sequence[Any]) -> ()
"""
Uniformly sample values form a list of discrete options
@ -240,7 +243,7 @@ class ParameterSet(Parameter):
"""
def __init__(self, parameter_combinations=()):
# type: (Sequence[Mapping[str, Union[float, int, str, Parameter]]]) -> ParameterSet
# type: (Sequence[Mapping[str, Union[float, int, str, Parameter]]]) -> ()
"""
Uniformly sample values form a list of discrete options (combinations) of parameters