Add HyperParameterOptimizer arguments: spawn_task_project, save_top_k_tasks_only

This commit is contained in:
allegroai 2021-01-10 13:07:57 +02:00
parent 49ba89587f
commit f679c80535
2 changed files with 233 additions and 82 deletions

View File

@ -11,8 +11,9 @@ from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable
from .job import TrainsJob
from .parameters import Parameter
from ..backend_interface.util import get_or_create_project
from ..logger import Logger
from ..backend_api.services import workers as workers_service, tasks as tasks_services
from ..backend_api.services import workers as workers_service, tasks as tasks_service, events as events_service
from ..task import Task
logger = getLogger('clearml.automation.optimization')
@ -82,6 +83,9 @@ class Objective(object):
return None
metrics = task.last_metrics
if not metrics:
return None
# noinspection PyBroadException
try:
values = metrics[self._metric[0]][self._metric[1]]
@ -102,18 +106,34 @@ class Objective(object):
:return: Tuple(iteration, value) if, and only if, the metric exists. None if the metric does not exist.
"""
if isinstance(task, Task):
task_id = task.id
elif isinstance(task, TrainsJob):
task_id = task.task_id()
else:
task_id = task
if not isinstance(task, Task):
if hasattr(task, 'task'):
task = task.task
if not isinstance(task, Task):
task = Task.get_task(task_id=str(task))
if not task:
raise ValueError("Task object could not be found")
if not task_id:
raise ValueError("Task ID not provided")
# todo: replace with more efficient code
scalars = task.get_reported_scalars()
# send request
# noinspection PyBroadException
try:
# noinspection PyProtectedMember
res = Task._get_default_session().send(
events_service.ScalarMetricsIterHistogramRequest(
task=task_id, key='iter', samples=None),
)
except Exception:
res = None
if not res:
return None
response = res.wait()
if not response.ok() or not response.response_data:
return None
scalars = response.response_data
# noinspection PyBroadException
try:
return scalars[self.title][self.series]['x'][-1], scalars[self.title][self.series]['y'][-1]
@ -303,6 +323,7 @@ class SearchStrategy(object):
self._pending_jobs = []
self._num_jobs = 0
self._job_parent_id = None
self._job_project_id = None
self._created_jobs_ids = {}
self._naming_function = None
self._job_project = {}
@ -535,7 +556,9 @@ class SearchStrategy(object):
new_job = self._job_class(
base_task_id=base_task_id, parameter_override=parameter_override,
task_overrides=task_overrides, tags=tags, parent=parent or self._job_parent_id,
name=name, comment=comment, project=self._get_task_project(parent or self._job_parent_id), **kwargs)
name=name, comment=comment,
project=self._job_project_id or self._get_task_project(parent or self._job_parent_id),
**kwargs)
self._created_jobs_ids[new_job.task_id()] = (new_job, parameter_override)
logger.info('Creating new Task: {}'.format(parameter_override))
return new_job
@ -549,14 +572,19 @@ class SearchStrategy(object):
"""
self._job_class = job_class
def set_job_default_parent(self, job_parent_task_id):
# type: (str) -> ()
def set_job_default_parent(self, job_parent_task_id, project_name=None):
# type: (Optional[str], Optional[str]) -> ()
"""
Set the default parent for all Jobs created by the :meth:`helper_create_job` method.
:param str job_parent_task_id: The parent Task ID.
:param str project_name: If specified, create the jobs in the specified project
"""
self._job_parent_id = job_parent_task_id
# noinspection PyProtectedMember
self._job_project_id = get_or_create_project(
session=Task._get_default_session(), project_name=project_name, description='HPO process spawned Tasks') \
if project_name else None
def set_job_naming_scheme(self, naming_function):
# type: (Optional[Callable[[str, dict], str]]) -> ()
@ -619,11 +647,67 @@ class SearchStrategy(object):
iteration_value = self._objective_metric.get_current_raw_objective(job)
return iteration_value[0] if iteration_value else -1
@classmethod
def _get_child_tasks_ids(
cls,
parent_task_id, # type: str
status=None, # type: Optional[Union[Task.TaskStatusEnum], Sequence[Task.TaskStatusEnum]]
order_by=None, # type: Optional[str]
additional_filters=None # type: Optional[dict]
):
# type: (...) -> (Sequence[str])
"""
Helper function. Return a list of tasks is tagged automl, with specific ``status``, ordered by ``sort_field``.
:param str parent_task_id: The base Task ID (parent).
:param status: The current status of requested tasks (for example, ``in_progress`` and ``completed``).
:param str order_by: The field name to sort results.
Examples:
.. code-block:: py
"-last_metrics.title.series.min"
"last_metrics.title.series.max"
"last_metrics.title.series.last"
"execution.parameters.name"
"updated"
:param dict additional_filters: The additional task filters.
:return: A list of Task IDs (str)
"""
task_filter = {
'parent': parent_task_id,
# 'tags': [cls._tag],
# since we have auto archive we do not want to filter out archived tasks
# 'system_tags': ['-archived'],
}
task_filter.update(additional_filters or {})
if status:
task_filter['status'] = status if isinstance(status, (tuple, list)) else [status]
if order_by and (order_by.startswith('last_metrics') or order_by.startswith('-last_metrics')):
parts = order_by.split('.')
if parts[-1] in ('min', 'max', 'last'):
title = hashlib.md5(str(parts[1]).encode('utf-8')).hexdigest()
series = hashlib.md5(str(parts[2]).encode('utf-8')).hexdigest()
minmax = 'min_value' if 'min' in parts[3] else ('max_value' if 'max' in parts[3] else 'value')
order_by = '{}last_metrics.'.join(
('-' if order_by and order_by[0] == '-' else '', title, series, minmax))
if order_by:
task_filter['order_by'] = [order_by]
# noinspection PyProtectedMember
task_objects = Task._query_tasks(**task_filter)
return [t.id for t in task_objects]
@classmethod
def _get_child_tasks(
cls,
parent_task_id, # type: str
status=None, # type: Optional[Task.TaskStatusEnum]
status=None, # type: Optional[Union[Task.TaskStatusEnum], Sequence[Task.TaskStatusEnum]]
order_by=None, # type: Optional[str]
additional_filters=None # type: Optional[dict]
):
@ -648,30 +732,13 @@ class SearchStrategy(object):
:param dict additional_filters: The additional task filters.
:return: A list of Task objects
"""
task_filter = {
'parent': parent_task_id,
# 'tags': [cls._tag],
# since we have auto archive we do not want to filter out archived tasks
# 'system_tags': ['-archived'],
}
task_filter.update(additional_filters or {})
if status:
task_filter['status'] = status
if order_by and (order_by.startswith('last_metrics') or order_by.startswith('-last_metrics')):
parts = order_by.split('.')
if parts[-1] in ('min', 'max', 'last'):
title = hashlib.md5(str(parts[1]).encode('utf-8')).hexdigest()
series = hashlib.md5(str(parts[2]).encode('utf-8')).hexdigest()
minmax = 'min_value' if 'min' in parts[3] else ('max_value' if 'max' in parts[3] else 'value')
order_by = '{}last_metrics.'.join(
('-' if order_by and order_by[0] == '-' else '', title, series, minmax))
if order_by:
task_filter['order_by'] = [order_by]
return Task.get_tasks(task_filter=task_filter)
return [
Task.get_task(task_id=t_id) for t_id in cls._get_child_tasks_ids(
parent_task_id=parent_task_id,
status=status,
order_by=order_by,
additional_filters=additional_filters)
]
class GridSearch(SearchStrategy):
@ -846,6 +913,8 @@ class HyperParameterOptimizer(object):
compute_time_limit=None, # type: Optional[float]
auto_connect_task=True, # type: Union[bool, Task]
always_create_task=False, # type: bool
spawn_task_project=None, # type: Optional[str]
save_top_k_tasks_only=None, # type: Optional[int]
**optimizer_kwargs # type: Any
):
# type: (...) -> ()
@ -893,6 +962,12 @@ class HyperParameterOptimizer(object):
- ``False`` - Use the :py:meth:`task.Task.current_task` (if exists) to report statistics.
:param str spawn_task_project: If project name is specified, create all optimization Jobs (Tasks) in the
specified project, instead of the original base_task_id project.
:param int save_top_k_tasks_only: If specified and above 0, keep only the top_k performing Tasks,
and archive the rest of the created Tasks. Default: -1 keep everything, nothing will be archived.
:param ** optimizer_kwargs: Arguments passed directly to the optimizer constructor.
Example:
@ -986,8 +1061,9 @@ class HyperParameterOptimizer(object):
self._report_period_min = 5.
self._thread_reporter = None
self._experiment_completed_cb = None
if self._task:
self.optimizer.set_job_default_parent(self._task.id)
self._save_top_k_tasks_only = max(0, save_top_k_tasks_only or 0)
self.optimizer.set_job_default_parent(
self._task.id if self._task else None, project_name=spawn_task_project or None)
self.set_time_limit(in_minutes=opts['optimization_time_limit'])
def get_num_active_experiments(self):
@ -1349,6 +1425,23 @@ class HyperParameterOptimizer(object):
completed_jobs = dict()
task_logger = None
cur_completed_jobs = set()
cur_task = self._task or Task.current_task()
if cur_task and self.optimizer:
# noinspection PyProtectedMember
child_tasks = self.optimizer._get_child_tasks(
parent_task_id=cur_task.id, status=['completed', 'stopped'])
hyper_parameters = [h.name for h in self.hyper_parameters]
for task in child_tasks:
params = {k: v for k, v in task.get_parameters().items() if k in hyper_parameters}
params["status"] = str(task.status)
# noinspection PyProtectedMember
iteration_value = task.get_last_iteration()
objective = self.objective_metric.get_objective(task)
completed_jobs[task.id] = (
objective if objective is not None else -1,
iteration_value if iteration_value is not None else -1,
params
)
while self._thread is not None:
timeout = self.optimization_timeout - time() if self.optimization_timeout else 0.
@ -1366,8 +1459,9 @@ class HyperParameterOptimizer(object):
counter += 1
# get task to report on.
if self._task or Task.current_task():
task_logger = (self._task or Task.current_task()).get_logger()
cur_task = self._task or Task.current_task()
if cur_task:
task_logger = cur_task.get_logger()
# do some reporting
@ -1382,8 +1476,8 @@ class HyperParameterOptimizer(object):
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \
{j.task_id() for j in self.optimizer.get_running_jobs()}
self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title)
self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter)
self._auto_archive_low_performance_tasks(completed_jobs)
# if we should leave, stop everything now.
if timeout < 0:
# we should leave
@ -1395,12 +1489,14 @@ class HyperParameterOptimizer(object):
self._report_resources(task_logger, counter)
self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title, force=True)
self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter)
self._auto_archive_low_performance_tasks(completed_jobs)
def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False):
top_experiments = self.get_top_experiments(top_k=1)
job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs)
best_experiment = \
(self.objective_metric.get_normalized_objective(top_experiments[0].id), top_experiments[0].id) \
if top_experiments else (float('-inf'), None)
(self.objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]),
job_ids_sorted_by_objective[0]) \
if job_ids_sorted_by_objective else (float('-inf'), None)
if force or cur_completed_jobs != set(completed_jobs.keys()):
pairs = []
labels = []
@ -1421,22 +1517,21 @@ class HyperParameterOptimizer(object):
copy(dict(**params, **{"status": id_status.get(job_id)}))) # noqa
pairs.append((i, completed_jobs[job_id][0]))
labels.append(str(completed_jobs[job_id][2])[1:-1])
else:
if value is not None:
pairs.append((i, value))
labels.append(str(params)[1:-1])
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
completed_jobs[job_id] = (
value,
iteration_value[0] if iteration_value else -1,
copy(dict(**params, **{"status": id_status.get(job_id)}))) # noqa
# callback new experiment completed
if self._experiment_completed_cb:
normalized_value = self.objective_metric.get_normalized_objective(job_id)
if normalized_value is not None and normalized_value > best_experiment[0]:
best_experiment = normalized_value, job_id
c = completed_jobs[job_id]
self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1])
elif value is not None:
pairs.append((i, value))
labels.append(str(params)[1:-1])
iteration_value = self.objective_metric.get_current_raw_objective(job_id)
completed_jobs[job_id] = (
value,
iteration_value[0] if iteration_value else -1,
copy(dict(**params, **{"status": id_status.get(job_id)}))) # noqa
# callback new experiment completed
if self._experiment_completed_cb:
normalized_value = self.objective_metric.get_normalized_objective(job_id)
if normalized_value is not None and normalized_value > best_experiment[0]:
best_experiment = normalized_value, job_id
c = completed_jobs[job_id]
self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1])
if pairs:
print('Updating job performance summary plot/table')
@ -1464,7 +1559,9 @@ class HyperParameterOptimizer(object):
extra_layout={"title": "objective: {}".format(title)})
# upload summary as artifact
if force:
self._task.upload_artifact(name='summary', artifact_object={'table': table_values})
task = self._task or Task.current_task()
if task:
task.upload_artifact(name='summary', artifact_object={'table': table_values})
def _report_remaining_budget(self, task_logger, counter):
# noinspection PyBroadException
@ -1487,22 +1584,24 @@ class HyperParameterOptimizer(object):
def _report_completed_tasks_best_results(self, completed_jobs, task_logger, title, counter):
# type: (Set[str], Logger, str, int) -> ()
if completed_jobs:
value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \
(min, "min")
latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name)
if latest_completed:
val = value_func(obj_values)
task_logger.report_scalar(
title=title,
series=series_name,
iteration=counter,
value=val)
task_logger.report_scalar(
title=title,
series="last reported",
iteration=counter,
value=latest_completed)
if not completed_jobs:
return
value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \
(min, "min")
latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name)
if latest_completed:
val = value_func(obj_values)
task_logger.report_scalar(
title=title,
series=series_name,
iteration=counter,
value=val)
task_logger.report_scalar(
title=title,
series="last reported",
iteration=counter,
value=latest_completed)
def _report_resources(self, task_logger, iteration):
# type: (Logger, int) -> ()
@ -1511,8 +1610,7 @@ class HyperParameterOptimizer(object):
def _report_active_workers(self, task_logger, iteration):
# type: (Logger, int) -> ()
cur_task = self._task or Task.current_task()
res = cur_task.send(workers_service.GetAllRequest())
res = self.__get_session().send(workers_service.GetAllRequest())
response = res.wait()
if response.ok():
all_workers = response
@ -1549,7 +1647,7 @@ class HyperParameterOptimizer(object):
obj_values = []
cur_task = self._task or Task.current_task()
for j in cur_completed_jobs:
res = cur_task.send(tasks_services.GetByIdRequest(task=j))
res = cur_task.send(tasks_service.GetByIdRequest(task=j))
response = res.wait()
if not response.ok() or response.response_data["task"].get("status") != Task.TaskStatusEnum.completed:
continue
@ -1568,3 +1666,52 @@ class HyperParameterOptimizer(object):
self.objective_metric.series)
last_values = response.response_data["task"]['last_metrics'][title][series]
return last_values
def _auto_archive_low_performance_tasks(self, completed_jobs):
if self._save_top_k_tasks_only <= 0:
return
# sort based on performance
job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs)
# query system_tags only
res = self.__get_session().send(tasks_service.GetAllRequest(
id=job_ids_sorted_by_objective, status=['completed', 'stopped'], only_fields=['id', 'system_tags']))
response = res.wait()
if not response.ok():
return
tasks_system_tags_lookup = {
task.get("id"): task.get("system_tags") for task in response.response_data.get("tasks")}
for i, task_id in enumerate(job_ids_sorted_by_objective):
system_tags = tasks_system_tags_lookup.get(task_id, [])
if i < self._save_top_k_tasks_only and Task.archived_tag in system_tags:
print('Restoring from archive Task id={} (#{} objective={})'.format(
task_id, i, completed_jobs[task_id][0]))
# top_k task and is archived, remove archive tag
system_tags = list(set(system_tags) - {Task.archived_tag})
res = self.__get_session().send(
tasks_service.EditRequest(task=task_id, system_tags=system_tags, force=True))
res.wait()
elif i >= self._save_top_k_tasks_only and Task.archived_tag not in system_tags:
print('Archiving Task id={} (#{} objective={})'.format(
task_id, i, completed_jobs[task_id][0]))
# Not in top_k task and not archived, add archive tag
system_tags = list(set(system_tags) | {Task.archived_tag})
res = self.__get_session().send(
tasks_service.EditRequest(task=task_id, system_tags=system_tags, force=True))
res.wait()
def __get_session(self):
cur_task = self._task or Task.current_task()
if cur_task:
return cur_task.default_session
# noinspection PyProtectedMember
return Task._get_default_session()
def __sort_jobs_by_objective(self, completed_jobs):
if not completed_jobs:
return []
job_ids_sorted_by_objective = list(sorted(
completed_jobs.keys(), key=lambda x: completed_jobs[x][0], reverse=bool(self.objective_metric.sign >= 0)))
return job_ids_sorted_by_objective

View File

@ -86,6 +86,10 @@ an_optimizer = HyperParameterOptimizer(
optimizer_class=aSearchStrategy,
# Select an execution queue to schedule the experiments for execution
execution_queue=execution_queue,
# If specified all Tasks created by the HPO process will be under the `spawned_tasks_project` project
spawn_task_project=None, # 'HPO spawn project',
# If specified only the top K performing Tasks will be kept, the others will be automatically archived
save_top_k_tasks_only=None, # 5,
# Optional: Limit the execution time of a single experiment, in minutes.
# (this is optional, and if using OptimizerBOHB, it is ignored)
time_limit_per_job=10.,