Fix Optimizer limits and values, update trains version in examples

This commit is contained in:
allegroai 2020-09-15 19:14:45 +03:00
parent ab5059e8e1
commit 8ec6bba4d9
5 changed files with 175 additions and 108 deletions

View File

@ -12,7 +12,7 @@
"! pip install -U torch==1.5.1\n",
"! pip install -U torchaudio==0.5.1\n",
"! pip install -U matplotlib==3.2.1\n",
"! pip install -U trains>=0.16.0\n",
"! pip install -U trains>=0.16.1\n",
"! pip install -U tensorboard==2.2.1"
]
},

View File

@ -12,7 +12,7 @@
"\n",
"# pip install with locked versions\n",
"! pip install -U pandas==1.0.3\n",
"! pip install -U trains>=0.16.1\n",
"! pip install -U trains>=0.16.2\n",
"! pip install -U optuna==2.0.0"
]
},
@ -35,7 +35,9 @@
"metadata": {},
"outputs": [],
"source": [
"task = Task.init(project_name='Hyperparameter Optimization with Optuna', task_name='Hyperparameter Search')\n"
"task = Task.init(project_name='Hyperparameter Optimization with Optuna',\n",
" task_name='Hyperparameter Search',\n",
" task_type=Task.TaskTypes.optimizer)\n"
]
},
{
@ -134,4 +136,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}

View File

@ -57,11 +57,7 @@ class TrainsJob(object):
:param str series: Series on the specific graph (variant)
:return: A tuple of min value, max value, last value
"""
title = hashlib.md5(str(title).encode('utf-8')).hexdigest()
series = hashlib.md5(str(series).encode('utf-8')).hexdigest()
metric = 'last_metrics.{}.{}.'.format(title, series)
values = ['min_value', 'max_value', 'value']
metrics = [metric + v for v in values]
metrics, title, series, values = self.get_metric_req_params(title, series)
res = self.task.send(
tasks_service.GetAllRequest(
@ -75,6 +71,15 @@ class TrainsJob(object):
return tuple(response.response_data['tasks'][0]['last_metrics'][title][series][v] for v in values)
@staticmethod
def get_metric_req_params(title, series):
title = hashlib.md5(str(title).encode('utf-8')).hexdigest()
series = hashlib.md5(str(series).encode('utf-8')).hexdigest()
metric = 'last_metrics.{}.{}.'.format(title, series)
values = ['min_value', 'max_value', 'value']
metrics = [metric + v for v in values]
return metrics, title, series, values
def launch(self, queue_name=None):
# type: (str) -> ()
"""

View File

@ -6,7 +6,7 @@ from itertools import product
from logging import getLogger
from threading import Thread, Event
from time import time
from typing import Dict, Set, Tuple, Union, Any, Sequence, Optional, Mapping, Callable
from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable
from .job import TrainsJob
from .parameters import Parameter
@ -395,8 +395,17 @@ class SearchStrategy(object):
:return: False, if the job is no longer relevant.
"""
abort_job = False
abort_job = self.update_budget_per_job(job)
if abort_job:
job.abort()
return False
return not job.is_stopped()
def update_budget_per_job(self, job):
abort_job = False
if self.time_limit_per_job:
elapsed = job.elapsed() / 60.
if elapsed > 0:
@ -409,9 +418,6 @@ class SearchStrategy(object):
elapsed = job.elapsed() / 60.
if elapsed > 0:
self.budget.compute_time.update(job.task_id(), elapsed)
self.budget.compute_time.update(job.task_id(), job.elapsed() / 60.)
if self.budget.compute_time.used and self.compute_time_limit < self.budget.compute_time.used:
abort_job = True
if self.max_iteration_per_job:
iterations = self._get_job_iterations(job)
@ -420,11 +426,7 @@ class SearchStrategy(object):
if iterations > self.max_iteration_per_job:
abort_job = True
if abort_job:
job.abort()
return False
return not job.is_stopped()
return abort_job
def get_running_jobs(self):
# type: () -> Sequence[TrainsJob]
@ -443,7 +445,17 @@ class SearchStrategy(object):
:return: dict of task IDs (str) as keys, and their parameters dict as values.
"""
return self._created_jobs_ids
return {job_id: job_val[1] for job_id, job_val in self._created_jobs_ids.items()}
def get_created_jobs_tasks(self):
# type: () -> Mapping[str, dict]
"""
Return a Task IDs dict created by this optimizer until now.
The values of the returned dict are the TrainsJob.
:return: dict of task IDs (str) as keys, and their TrainsJob as values.
"""
return {job_id: job_val[0] for job_id, job_val in self._created_jobs_ids.items()}
def get_top_experiments(self, top_k):
# type: (int) -> Sequence[Task]
@ -502,7 +514,7 @@ class SearchStrategy(object):
base_task_id=base_task_id, parameter_override=parameter_override,
task_overrides=task_overrides, tags=tags, parent=parent or self._job_parent_id,
name=name, comment=comment, project=self._get_task_project(parent or self._job_parent_id), **kwargs)
self._created_jobs_ids[new_job.task_id()] = parameter_override
self._created_jobs_ids[new_job.task_id()] = (new_job, parameter_override)
logger.info('Creating new Task: {}'.format(parameter_override))
return new_job
@ -900,7 +912,7 @@ class HyperParameterOptimizer(object):
# create a new Task, if we do not have one already
self._task = Task.current_task()
if not self._task and always_create_task:
base_task = Task.get_task(task_id=self.base_task_id)
base_task = Task.get_task(task_id=base_task_id)
self._task = Task.init(
project_name=base_task.get_project_name(),
task_name='Optimizing: {}'.format(base_task.name),
@ -1014,15 +1026,18 @@ class HyperParameterOptimizer(object):
self._thread_reporter.start()
return True
def stop(self, timeout=None):
# type: (Optional[float]) -> ()
def stop(self, timeout=None, flush_reporter=True):
# type: (Optional[float], Optional[bool]) -> ()
"""
Stop the HyperParameterOptimizer controller and the optimization thread.
:param float timeout: Wait timeout for the optimization thread to exit (minutes).
The default is ``None``, indicating do not wait terminate immediately.
:param flush_reporter: Wait for reporter to flush data.
"""
if not self._thread or not self._stop_event or not self.optimizer:
if self._thread_reporter and flush_reporter:
self._thread_reporter.join()
return
_thread = self._thread
@ -1039,8 +1054,9 @@ class HyperParameterOptimizer(object):
# clear thread
self._thread = None
# wait for reporter to flush
self._thread_reporter.join()
if flush_reporter:
# wait for reporter to flush
self._thread_reporter.join()
def is_active(self):
# type: () -> bool
@ -1255,7 +1271,8 @@ class HyperParameterOptimizer(object):
title = '{}/{}'.format(title, series)
counter = 0
completed_jobs = dict()
best_experiment = float('-inf'), None
task_logger = None
cur_completed_jobs = set()
while self._thread is not None:
timeout = self.optimization_timeout - time() if self.optimization_timeout else 0.
@ -1278,98 +1295,127 @@ class HyperParameterOptimizer(object):
# do some reporting
# noinspection PyBroadException
try:
budget = self.optimizer.budget.to_dict()
except Exception:
budget = {}
self._report_remaining_budget(task_logger, counter)
# report remaining budget
for budget_part, value in budget.items():
task_logger.report_scalar(
title='remaining budget', series='{} %'.format(budget_part),
iteration=counter, value=round(100 - value['used'] * 100., ndigits=1))
if self.optimization_timeout and self.optimization_start_time:
task_logger.report_scalar(
title='remaining budget', series='time %',
iteration=counter,
value=round(100 - (100. * (time() - self.optimization_start_time) /
(self.optimization_timeout - self.optimization_start_time)), ndigits=1)
)
if self.optimizer.budget.compute_time.used and self.optimizer.budget.compute_time.used >= 1.0:
# Reached compute time limit
timeout = -1
self._report_resources(task_logger, counter)
# collect a summary of all the jobs and their final objective values
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \
{j.task_id() for j in self.optimizer.get_running_jobs()}
if cur_completed_jobs != set(completed_jobs.keys()):
pairs = []
labels = []
created_jobs = copy(self.optimizer.get_created_jobs_ids())
for i, (job_id, params) in enumerate(created_jobs.items()):
if job_id in completed_jobs:
pairs.append((i, completed_jobs[job_id][0]))
labels.append(str(completed_jobs[job_id][2])[1:-1])
else:
value = self.objective_metric.get_objective(job_id)
if value is not None:
pairs.append((i, value))
labels.append(str(params)[1:-1])
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
completed_jobs[job_id] = (
value, iteration_value[0] if iteration_value else -1, copy(params))
# callback new experiment completed
if self._experiment_completed_cb:
normalized_value = self.objective_metric.get_normalized_objective(job_id)
if normalized_value is not None and normalized_value > best_experiment[0]:
best_experiment = normalized_value, job_id
c = completed_jobs[job_id]
self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1])
self._report_completed_tasks_best_results(completed_jobs, task_logger, title, counter)
if pairs:
print('Updating job performance summary plot/table')
# update scatter plot
task_logger.report_scatter2d(
title='optimization', series=title,
scatter=pairs, iteration=0, labels=labels,
mode='markers', xaxis='job #', yaxis='objective')
# update summary table
if pd:
index = list(completed_jobs.keys())
table = {'objective': [completed_jobs[i][0] for i in index],
'iteration': [completed_jobs[i][1] for i in index]}
columns = set([c for k, v in completed_jobs.items() for c in v[2].keys()])
for c in sorted(columns):
table.update({c: [completed_jobs[i][2].get(c, '') for i in index]})
df = pd.DataFrame(table, index=index)
df.sort_values(by='objective', ascending=bool(self.objective_metric.sign < 0), inplace=True)
df.index.name = 'task id'
task_logger.report_table(
"summary", "job", 0, table_plot=df,
extra_layout={"title": "objective: {}".format(title)})
self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title)
self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter)
# if we should leave, stop everything now.
if timeout < 0:
# we should leave
self.stop()
self.stop(flush_reporter=False)
return
if task_logger and counter:
counter += 1
self._report_remaining_budget(task_logger, counter)
self._report_resources(task_logger, counter)
self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title, force=True)
self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter)
def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False):
best_experiment = float('-inf'), None
if force or cur_completed_jobs != set(completed_jobs.keys()):
pairs = []
labels = []
created_jobs = copy(self.optimizer.get_created_jobs_ids())
id_status = {j_id: j_run.status() for j_id, j_run in self.optimizer.get_created_jobs_tasks().items()}
for i, (job_id, params) in enumerate(created_jobs.items()):
value = self.objective_metric.get_objective(job_id)
if job_id in completed_jobs:
if value != completed_jobs[job_id][0]:
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
completed_jobs[job_id] = (
value,
iteration_value[0] if iteration_value else -1,
copy(dict(**params, **{"status": id_status.get(job_id)})))
elif completed_jobs.get(job_id):
completed_jobs[job_id] = (completed_jobs[job_id][0],
completed_jobs[job_id][1],
copy(dict(**params, **{"status": id_status.get(job_id)})))
pairs.append((i, completed_jobs[job_id][0]))
labels.append(str(completed_jobs[job_id][2])[1:-1])
else:
if value is not None:
pairs.append((i, value))
labels.append(str(params)[1:-1])
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
completed_jobs[job_id] = (
value,
iteration_value[0] if iteration_value else -1,
copy(dict(**params, **{"status": id_status.get(job_id)})))
# callback new experiment completed
if self._experiment_completed_cb:
normalized_value = self.objective_metric.get_normalized_objective(job_id)
if normalized_value is not None and normalized_value > best_experiment[0]:
best_experiment = normalized_value, job_id
c = completed_jobs[job_id]
self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1])
if pairs:
print('Updating job performance summary plot/table')
# update scatter plot
task_logger.report_scatter2d(
title='optimization', series=title,
scatter=pairs, iteration=0, labels=labels,
mode='markers', xaxis='job #', yaxis='objective')
# update summary table
if pd:
index = list(completed_jobs.keys())
table = {'objective': [completed_jobs[i][0] for i in index],
'iteration': [completed_jobs[i][1] for i in index]}
columns = set([c for k, v in completed_jobs.items() for c in v[2].keys()])
for c in sorted(columns):
table.update({c: [completed_jobs[i][2].get(c, '') for i in index]})
df = pd.DataFrame(table, index=index)
df.sort_values(by='objective', ascending=bool(self.objective_metric.sign < 0), inplace=True)
df.index.name = 'task id'
task_logger.report_table(
"summary", "job", 0, table_plot=df,
extra_layout={"title": "objective: {}".format(title)})
def _report_remaining_budget(self, task_logger, counter):
# noinspection PyBroadException
try:
budget = self.optimizer.budget.to_dict()
except Exception:
budget = {}
# report remaining budget
for budget_part, value in budget.items():
task_logger.report_scalar(
title='remaining budget', series='{} %'.format(budget_part),
iteration=counter, value=round(100 - value['used'] * 100., ndigits=1))
if self.optimization_timeout and self.optimization_start_time:
task_logger.report_scalar(
title='remaining budget', series='time %',
iteration=counter,
value=round(100 - (100. * (time() - self.optimization_start_time) /
(self.optimization_timeout - self.optimization_start_time)), ndigits=1)
)
def _report_completed_tasks_best_results(self, completed_jobs, task_logger, title, counter):
# type: (Dict[str, Tuple[float, int, Dict[str, int]]], Logger, str, int) -> ()
# type: (Set[str], Logger, str, int) -> ()
if completed_jobs:
value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \
(min, "min")
task_logger.report_scalar(
title=title,
series=series_name,
iteration=counter,
value=value_func([val[0] for val in completed_jobs.values()]))
latest_completed = self._get_latest_completed_task_value(set(completed_jobs.keys()))
latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name)
val = value_func(obj_values)
if latest_completed:
task_logger.report_scalar(
title=title,
series=series_name,
iteration=counter,
value=val)
task_logger.report_scalar(
title=title,
series="last reported",
@ -1396,7 +1442,10 @@ class HyperParameterOptimizer(object):
if q.get("name") == self.execution_queue
]
)
task_logger.report_scalar(title="resources", series="queue workers", iteration=iteration, value=queue_workers)
task_logger.report_scalar(title="resources",
series="queue workers",
iteration=iteration,
value=queue_workers)
def _report_tasks_status(self, task_logger, iteration):
# type: (Logger, int) -> ()
@ -1411,10 +1460,11 @@ class HyperParameterOptimizer(object):
title="resources", series=series,
iteration=iteration, value=val)
def _get_latest_completed_task_value(self, cur_completed_jobs):
# type: (Set[str]) -> float
def _get_latest_completed_task_value(self, cur_completed_jobs, series_name):
# type: (Set[str], str) -> (float, List[float])
completed_value = None
latest_completed = None
obj_values = []
cur_task = self._task or Task.current_task()
for j in cur_completed_jobs:
res = cur_task.send(tasks_services.GetByIdRequest(task=j))
@ -1424,7 +1474,15 @@ class HyperParameterOptimizer(object):
completed_time = datetime.strptime(response.response_data["task"]["completed"].partition("+")[0],
"%Y-%m-%dT%H:%M:%S.%f")
completed_time = completed_time.timestamp()
completed_values = self._get_last_value(response)
obj_values.append(completed_values['max_value'] if series_name == "max" else completed_values['min_value'])
if not latest_completed or completed_time > latest_completed:
latest_completed = completed_time
completed_value = self.objective_metric.get_objective(j)
return completed_value
completed_value = completed_values['value']
return completed_value, obj_values
def _get_last_value(self, response):
metrics, title, series, values = TrainsJob.get_metric_req_params(self.objective_metric.title,
self.objective_metric.series)
last_values = response.response_data["task"]['last_metrics'][title][series]
return last_values

View File

@ -45,11 +45,12 @@ class OptunaObjective(object):
current_job.launch(self.queue_name)
iteration_value = None
is_pending = True
while self.optimizer.monitor_job(current_job):
while not current_job.is_stopped():
if is_pending and not current_job.is_pending():
is_pending = False
self.optimizer.budget.jobs.update(current_job.task_id(), 1.)
if not is_pending:
self.optimizer.update_budget_per_job(current_job)
# noinspection PyProtectedMember
iteration_value = self.optimizer._objective_metric.get_current_raw_objective(current_job)
@ -182,6 +183,7 @@ class OptimizerOptuna(SearchStrategy):
self._study.stop()
except Exception as ex:
print(ex)
self._stop_event.set()
def _convert_hyper_parameters_to_optuna(self):
# type: () -> dict