From 43dac458df20212067d01fe018d2f854a3881700 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 31 May 2020 13:28:03 +0300 Subject: [PATCH] Change OptimizerBOHB argument max_job_execution_minutes to time_limit_per_job. Add budget remaining reporting --- .../automation/hyper_parameter_optimizer.py | 16 +- trains/automation/hpbandster/bandster.py | 46 ++++- trains/automation/job.py | 6 +- trains/automation/optimization.py | 191 +++++++++++++----- trains/automation/parameters.py | 9 +- 5 files changed, 199 insertions(+), 69 deletions(-) diff --git a/examples/automation/hyper_parameter_optimizer.py b/examples/automation/hyper_parameter_optimizer.py index 1b8fb61b..c750e4cf 100644 --- a/examples/automation/hyper_parameter_optimizer.py +++ b/examples/automation/hyper_parameter_optimizer.py @@ -31,11 +31,13 @@ def job_complete_callback( # Connecting TRAINS task = Task.init(project_name='Hyper-Parameter Optimization', task_name='Automatic Hyper-Parameter Optimization', + task_type=Task.TaskTypes.optimizer, reuse_last_task_id=False) # experiment template to optimize in the hyper-parameter optimization args = { 'template_task_id': None, + 'run_as_service': False, } args = task.connect(args) @@ -70,9 +72,9 @@ an_optimizer = HyperParameterOptimizer( optimizer_class=Our_SearchStrategy, # Select an execution queue to schedule the experiments for execution execution_queue='default', - # Limit the execution time of a single experiment + # Optional: Limit the execution time of a single experiment, in minutes. # (this is optional, and if using OptimizerBOHB, it is ignored) - max_job_execution_minutes=10., + time_limit_per_job=10., # Check the experiments every 6 seconds is way too often, we should probably set it to 5 min, # assuming a single experiment is usually hours... pool_period_min=0.1, @@ -83,11 +85,17 @@ an_optimizer = HyperParameterOptimizer( # This is only applicable for OptimizerBOHB and ignore by the rest # set the minimum number of iterations for an experiment, before early stopping min_iteration_per_job=10, - # This is only applicable for OptimizerBOHB and ignore by the rest - # set the maximum number of iterations for an experiment to execute + # Set the maximum number of iterations for an experiment to execute + # (This is optional, unless using OptimizerBOHB where this is a must) max_iteration_per_job=30, ) +# if we are running as a service, just enqueue ourselves into the services queue and let it run the optimization +if args['run_as_service']: + # if this code is executed by `trains-agent` the function call does nothing. + # if executed locally, the local process will be terminated, and a remote copy will be executed instead + task.execute_remotely(queue_name='services', exit_process=True) + # report every 12 seconds, this is way too often, but we are testing here J an_optimizer.set_report_period(0.2) # start the optimization process, callback function to be called every time an experiment is completed diff --git a/trains/automation/hpbandster/bandster.py b/trains/automation/hpbandster/bandster.py index 4599c3e0..8224eb00 100644 --- a/trains/automation/hpbandster/bandster.py +++ b/trains/automation/hpbandster/bandster.py @@ -67,14 +67,35 @@ class _TrainsBandsterWorker(Worker): self.optimizer._current_jobs.append(self._current_job) self._current_job.launch(self.queue_name) iteration_value = None + is_pending = True + while not self._current_job.is_stopped(): + if is_pending and not self._current_job.is_pending(): + is_pending = False + # noinspection PyProtectedMember + self.optimizer.budget.jobs.update( + self._current_job.task_id(), + float(self.optimizer._min_iteration_per_job)/self.optimizer._max_iteration_per_job) + # noinspection PyProtectedMember iteration_value = self.optimizer._objective_metric.get_current_raw_objective(self._current_job) - if iteration_value and iteration_value[0] >= self.budget_iteration_scale * budget: - self._current_job.abort() - break + if iteration_value: + # update budget + self.optimizer.budget.iterations.update(self._current_job.task_id(), iteration_value[0]) + + # check if we exceeded this job budget + if iteration_value[0] >= self.budget_iteration_scale * budget: + self._current_job.abort() + break + sleep(self.sleep_interval) + if iteration_value: + # noinspection PyProtectedMember + self.optimizer.budget.jobs.update( + self._current_job.task_id(), + float(iteration_value[0]) / self.optimizer._max_iteration_per_job) + result = { # this is the a mandatory field to run hyperband # remember: HpBandSter always minimizes! @@ -100,11 +121,11 @@ class OptimizerBOHB(SearchStrategy, RandomSeed): max_iteration_per_job, # type: Optional[int] total_max_jobs, # type: Optional[int] pool_period_min=2., # type: float - max_job_execution_minutes=None, # type: Optional[float] + time_limit_per_job=None, # type: Optional[float] local_port=9090, # type: int **bohb_kwargs, # type: Any ): - # type: (...) -> OptimizerBOHB + # type: (...) -> None """ Initialize a BOHB search strategy optimizer BOHB performs robust and efficient hyperparameter optimization at scale by combining @@ -140,15 +161,24 @@ class OptimizerBOHB(SearchStrategy, RandomSeed): This means more than total_max_jobs could be created, as long as the cumulative iterations (summed over all created jobs) will not exceed `max_iteration_per_job * total_max_jobs` :param float pool_period_min: time in minutes between two consecutive pools - :param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted + :param float time_limit_per_job: Optional, maximum execution time per single job in minutes, + when time limit is exceeded job is aborted :param int local_port: default port 9090 tcp, this is a must for the BOHB workers to communicate, even locally. :param bohb_kwargs: arguments passed directly yo the BOHB object """ + if not max_iteration_per_job or not min_iteration_per_job or not total_max_jobs: + raise ValueError( + "OptimizerBOHB is missing a defined budget.\n" + "The following arguments must be defined: " + "max_iteration_per_job, min_iteration_per_job, total_max_jobs.\n" + "Maximum optimization budget is: max_iteration_per_job * total_max_jobs\n" + ) + super(OptimizerBOHB, self).__init__( base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric, execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers, - pool_period_min=pool_period_min, max_job_execution_minutes=max_job_execution_minutes, - total_max_jobs=total_max_jobs) + pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job, + max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs) self._max_iteration_per_job = max_iteration_per_job self._min_iteration_per_job = min_iteration_per_job self._bohb_kwargs = bohb_kwargs or {} diff --git a/trains/automation/job.py b/trains/automation/job.py index 77c86172..9a358375 100644 --- a/trains/automation/job.py +++ b/trains/automation/job.py @@ -21,7 +21,7 @@ class TrainsJob(object): parent=None, # type: Optional[str] **kwargs # type: Any ): - # type: (...) -> TrainsJob + # type: (...) -> () """ Create a new Task based in a base_task_id with a different set of parameters @@ -131,7 +131,7 @@ class TrainsJob(object): return self.task.id def status(self): - # type: () -> Task.TaskStatusEnum + # type: () -> str """ Return the Job Task current status, see Task.TaskStatusEnum @@ -229,7 +229,7 @@ class _JobStub(object): tags=None, # type: Optional[Sequence[str]] **kwargs # type: Any ): - # type: (...) -> _JobStub + # type: (...) -> () self.task = None self.base_task_id = base_task_id self.parameter_override = parameter_override diff --git a/trains/automation/optimization.py b/trains/automation/optimization.py index a79a6941..27e40ced 100644 --- a/trains/automation/optimization.py +++ b/trains/automation/optimization.py @@ -33,7 +33,7 @@ class Objective(object): """ def __init__(self, title, series, order='max', extremum=False): - # type: (str, str, Union['max', 'min'], bool) -> Objective + # type: (str, str, Union['max', 'min'], bool) -> () """ Construct objective object that will return the scalar value for a specific task ID @@ -161,6 +161,49 @@ class Objective(object): ('min_value' if self.sign < 0 else 'max_value') if self.extremum else 'value') +class Budget(object): + class Field(object): + def __init__(self, limit=None): + # type: (Optional[float]) -> () + self.limit = limit + self.current = {} + + def update(self, uid, value): + # type: (Union[str, int], float) -> () + if value is not None: + try: + self.current[uid] = float(value) + except (TypeError, ValueError): + pass + + @property + def used(self): + # type: () -> (Optional[float]) + if self.limit is None or not self.current: + return None + return sum(self.current.values())/float(self.limit) + + def __init__(self, jobs_limit, iterations_limit, compute_time_limit): + # type: (Optional[int], Optional[int], Optional[float]) -> () + self.jobs = self.Field(jobs_limit) + self.iterations = self.Field(iterations_limit) + self.compute_time = self.Field(compute_time_limit) + + def to_dict(self): + # type: () -> (Mapping[Union['jobs', 'iterations', 'compute_time'], Mapping[Union['limit', 'used'], float]]) + current_budget = {} + jobs = self.jobs.used + if jobs: + current_budget['jobs'] = {'limit': self.jobs.limit, 'used': jobs} + iterations = self.iterations.used + if iterations: + current_budget['iterations'] = {'limit': self.iterations.limit, 'used': iterations} + compute_time = self.compute_time.used + if compute_time: + current_budget['compute_time'] = {'limit': self.compute_time.limit, 'used': compute_time} + return current_budget + + class SearchStrategy(object): """ Base Search strategy class, inherit to implement your custom strategy @@ -176,11 +219,12 @@ class SearchStrategy(object): execution_queue, # type: str num_concurrent_workers, # type: int pool_period_min=2., # type: float - max_job_execution_minutes=None, # type: Optional[float] + time_limit_per_job=None, # type: Optional[float] + max_iteration_per_job=None, # type: Optional[int] total_max_jobs=None, # type: Optional[int] **_ # type: Any ): - # type: (...) -> SearchStrategy + # type: (...) -> () """ Initialize a search strategy optimizer @@ -190,8 +234,11 @@ class SearchStrategy(object): :param str execution_queue: execution queue to use for launching Tasks (experiments). :param int num_concurrent_workers: Limit number of concurrent running machines :param float pool_period_min: time in minutes between two consecutive pools - :param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted - :param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited + :param float time_limit_per_job: Optional, maximum execution time per single job in minutes, + when time limit is exceeded job is aborted + :param int max_iteration_per_job: Optional, maximum iterations (of the objective metric) + per single job, when exceeded job is aborted. + :param int total_max_jobs: total maximum jobs for the optimization process. Default None, unlimited """ super(SearchStrategy, self).__init__() self._base_task_id = base_task_id @@ -200,15 +247,24 @@ class SearchStrategy(object): self._execution_queue = execution_queue self._num_concurrent_workers = num_concurrent_workers self.pool_period_minutes = pool_period_min - self.max_job_execution_minutes = max_job_execution_minutes + self.time_limit_per_job = time_limit_per_job + self.max_iteration_per_job = max_iteration_per_job self.total_max_jobs = total_max_jobs self._stop_event = Event() self._current_jobs = [] + self._pending_jobs = [] self._num_jobs = 0 self._job_parent_id = None self._created_jobs_ids = {} self._naming_function = None self._job_project = {} + self.budget = Budget( + jobs_limit=self.total_max_jobs, + compute_time_limit=self.total_max_jobs * self.time_limit_per_job if + self.time_limit_per_job and self.total_max_jobs else None, + iterations_limit=self.total_max_jobs * self.max_iteration_per_job if + self.max_iteration_per_job and self.total_max_jobs else None + ) self._validate_base_task() def start(self): @@ -256,6 +312,15 @@ class SearchStrategy(object): self._current_jobs = updated_jobs + pending_jobs = [] + for job in self._pending_jobs: + if job.is_pending(): + pending_jobs.append(job) + else: + self.budget.jobs.update(job.task_id(), 1) + + self._pending_jobs = pending_jobs + free_workers = self._num_concurrent_workers - len(self._current_jobs) # do not create more jobs if we hit the limit @@ -270,6 +335,7 @@ class SearchStrategy(object): self._num_jobs += 1 new_job.launch(self._execution_queue) self._current_jobs.append(new_job) + self._pending_jobs.append(new_job) return bool(self._current_jobs) @@ -287,13 +353,34 @@ class SearchStrategy(object): def monitor_job(self, job): # type: (TrainsJob) -> bool """ - Abstract helper function, not a must to implement, default use in process_step default implementation + Helper function, not a must to implement, default use in process_step default implementation Check if the job needs to be aborted or already completed if return False, the job was aborted / completed, and should be taken off the current job list + If there is a budget limitation, + this call should update self.budget.time.update() / self.budget.iterations.update() + :param TrainsJob job: a TrainsJob object to monitor :return bool: If False, job is no longer relevant """ + abort_job = False + + if self.time_limit_per_job: + elapsed = job.elapsed() / 60. + self.budget.compute_time.update(job.task_id(), elapsed) + if elapsed > self.time_limit_per_job: + abort_job = True + + if self.max_iteration_per_job: + iterations = self._get_job_iterations(job) + self.budget.iterations.update(job.task_id(), iterations) + if iterations > self.max_iteration_per_job: + abort_job = True + + if abort_job: + job.abort() + return False + return not job.is_stopped() def get_running_jobs(self): @@ -433,6 +520,11 @@ class SearchStrategy(object): return self._job_project.get(parent_task_id) + def _get_job_iterations(self, job): + # type: (Union[TrainsJob, Task]) -> int + iteration_value = self._objective_metric.get_current_raw_objective(job) + return iteration_value[0] if iteration_value else -1 + @classmethod def _get_child_tasks( cls, @@ -494,11 +586,12 @@ class GridSearch(SearchStrategy): execution_queue, # type: str num_concurrent_workers, # type: int pool_period_min=2., # type: float - max_job_execution_minutes=None, # type: Optional[float] + time_limit_per_job=None, # type: Optional[float] + max_iteration_per_job=None, # type: Optional[int] total_max_jobs=None, # type: Optional[int] **_ # type: Any ): - # type: (...) -> GridSearch + # type: (...) -> () """ Initialize a grid search optimizer @@ -508,14 +601,17 @@ class GridSearch(SearchStrategy): :param str execution_queue: execution queue to use for launching Tasks (experiments). :param int num_concurrent_workers: Limit number of concurrent running machines :param float pool_period_min: time in minutes between two consecutive pools - :param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted - :param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited + :param float time_limit_per_job: Optional, maximum execution time per single job in minutes, + when time limit is exceeded job is aborted + :param int max_iteration_per_job: maximum iterations (of the objective metric) + per single job, when exceeded job is aborted. + :param int total_max_jobs: total maximum jobs for the optimization process. Default None, unlimited """ super(GridSearch, self).__init__( base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric, execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers, - pool_period_min=pool_period_min, max_job_execution_minutes=max_job_execution_minutes, - total_max_jobs=total_max_jobs, **_) + pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job, + max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs, **_) self._param_iterator = None def create_job(self): @@ -533,21 +629,6 @@ class GridSearch(SearchStrategy): return self.helper_create_job(base_task_id=self._base_task_id, parameter_override=parameters) - def monitor_job(self, job): - # type: (TrainsJob) -> bool - """ - Check if the job needs to be aborted or already completed - if return False, the job was aborted / completed, and should be taken off the current job list - - :param TrainsJob job: a TrainsJob object to monitor - :return: boolean, If False, job is no longer relevant - """ - if self.max_job_execution_minutes and job.elapsed() / 60. > self.max_job_execution_minutes: - job.abort() - return False - - return not job.is_stopped() - def _next_configuration(self): # type: () -> Mapping[str, str] def param_iterator_fn(): @@ -577,11 +658,12 @@ class RandomSearch(SearchStrategy): execution_queue, # type: str num_concurrent_workers, # type: int pool_period_min=2., # type: float - max_job_execution_minutes=None, # type: Optional[float] + time_limit_per_job=None, # type: Optional[float] + max_iteration_per_job=None, # type: Optional[int] total_max_jobs=None, # type: Optional[int] **_ # type: Any ): - # type: (...) -> RandomSearch + # type: (...) -> () """ Initialize a random search optimizer @@ -591,14 +673,17 @@ class RandomSearch(SearchStrategy): :param str execution_queue: execution queue to use for launching Tasks (experiments). :param int num_concurrent_workers: Limit number of concurrent running machines :param float pool_period_min: time in minutes between two consecutive pools - :param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted - :param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited + :param float time_limit_per_job: Optional, maximum execution time per single job in minutes, + when time limit is exceeded job is aborted + :param int max_iteration_per_job: maximum iterations (of the objective metric) + per single job, when exceeded job is aborted. + :param int total_max_jobs: total maximum jobs for the optimization process. Default None, unlimited """ super(RandomSearch, self).__init__( base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric, execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers, - pool_period_min=pool_period_min, - max_job_execution_minutes=max_job_execution_minutes, total_max_jobs=total_max_jobs, **_) + pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job, + max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs, **_) self._hyper_parameters_collection = set() def create_job(self): @@ -631,21 +716,6 @@ class RandomSearch(SearchStrategy): return self.helper_create_job(base_task_id=self._base_task_id, parameter_override=parameters) - def monitor_job(self, job): - # type: (TrainsJob) -> bool - """ - Check if the job needs to be aborted or already completed - if return False, the job was aborted / completed, and should be taken off the current job list - - :param TrainsJob job: a TrainsJob object to monitor - :return: boolean, If False, job is no longer relevant - """ - if self.max_job_execution_minutes and job.elapsed() / 60. > self.max_job_execution_minutes: - job.abort() - return False - - return not job.is_stopped() - class HyperParameterOptimizer(object): """ @@ -669,7 +739,7 @@ class HyperParameterOptimizer(object): always_create_task=False, # type: bool **optimizer_kwargs # type: Any ): - # type: (...) -> HyperParameterOptimizer + # type: (...) -> () """ Create a new hyper-parameter controller. The newly created object will launch and monitor the new experiments. @@ -717,7 +787,7 @@ class HyperParameterOptimizer(object): objective_metric_sign='min', max_number_of_concurrent_tasks=5, optimizer_class=RandomSearch, - execution_queue='workers', max_job_execution_minutes=0.1, pool_period_min=0.1) + execution_queue='workers', time_limit_per_job=120, pool_period_min=0.2) # This will automatically create and print the optimizer new task id # for later use. if a Task was already created, it will use it. @@ -1093,6 +1163,25 @@ class HyperParameterOptimizer(object): title=title, series='{}{}'.format(series, machine_id), iteration=counter, value=value) + # noinspection PyBroadException + try: + budget = self.optimizer.budget.to_dict() + except Exception: + budget = {} + + # report remaining budget + for budget_part, value in budget.items(): + task_logger.report_scalar( + title='remaining budget', series='{} %'.format(budget_part), + iteration=counter, value=round(100 - value['used'] * 100., ndigits=1)) + if self.optimization_timeout and self.optimization_start_time: + task_logger.report_scalar( + title='remaining budget', series='time %', + iteration=counter, + value=round(100 - (100. * (time() - self.optimization_start_time) / + (self.optimization_timeout - self.optimization_start_time)), ndigits=1) + ) + # collect a summary of all the jobs and their final objective values cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - running_job_ids if cur_completed_jobs != set(completed_jobs.keys()): diff --git a/trains/automation/parameters.py b/trains/automation/parameters.py index c1777027..d18caaaf 100644 --- a/trains/automation/parameters.py +++ b/trains/automation/parameters.py @@ -39,7 +39,7 @@ class Parameter(RandomSeed): """ def __init__(self, name): - # type: (Optional[str]) -> Parameter + # type: (Optional[str]) -> () """ Create a new Parameter for hyper-parameter optimization @@ -73,6 +73,7 @@ class Parameter(RandomSeed): :return dict: dict representation of the object (serialization) """ serialize = {'__class__': str(self.__class__).split('.')[-1][:-2]} + # noinspection PyCallingNonCallable serialize.update(dict(((k, v.to_dict() if hasattr(v, 'to_dict') else v) for k, v in self.__dict__.items()))) return serialize @@ -111,7 +112,7 @@ class UniformParameterRange(Parameter): step_size=None, # type: Optional[float] include_max_value=True # type: bool ): - # type: (...) -> UniformParameterRange + # type: (...) -> () """ Create a parameter to be sampled by the SearchStrategy @@ -160,6 +161,7 @@ class UniformIntegerParameterRange(Parameter): """ def __init__(self, name, min_value, max_value, step_size=1, include_max_value=True): + # type: (str, int, int, int, bool) -> () """ Create a parameter to be sampled by the SearchStrategy @@ -206,6 +208,7 @@ class DiscreteParameterRange(Parameter): """ def __init__(self, name, values=()): + # type: (str, Sequence[Any]) -> () """ Uniformly sample values form a list of discrete options @@ -240,7 +243,7 @@ class ParameterSet(Parameter): """ def __init__(self, parameter_combinations=()): - # type: (Sequence[Mapping[str, Union[float, int, str, Parameter]]]) -> ParameterSet + # type: (Sequence[Mapping[str, Union[float, int, str, Parameter]]]) -> () """ Uniformly sample values form a list of discrete options (combinations) of parameters