From f679c80535cd567ff5a067729de4480580c99f46 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 10 Jan 2021 13:07:57 +0200 Subject: [PATCH] Add HyperParameterOptimizer arguments: spawn_task_project, save_top_k_tasks_only --- clearml/automation/optimization.py | 311 +++++++++++++----- .../hyper_parameter_optimizer.py | 4 + 2 files changed, 233 insertions(+), 82 deletions(-) diff --git a/clearml/automation/optimization.py b/clearml/automation/optimization.py index 14c0aedf..40552a2a 100644 --- a/clearml/automation/optimization.py +++ b/clearml/automation/optimization.py @@ -11,8 +11,9 @@ from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable from .job import TrainsJob from .parameters import Parameter +from ..backend_interface.util import get_or_create_project from ..logger import Logger -from ..backend_api.services import workers as workers_service, tasks as tasks_services +from ..backend_api.services import workers as workers_service, tasks as tasks_service, events as events_service from ..task import Task logger = getLogger('clearml.automation.optimization') @@ -82,6 +83,9 @@ class Objective(object): return None metrics = task.last_metrics + if not metrics: + return None + # noinspection PyBroadException try: values = metrics[self._metric[0]][self._metric[1]] @@ -102,18 +106,34 @@ class Objective(object): :return: Tuple(iteration, value) if, and only if, the metric exists. None if the metric does not exist. """ + if isinstance(task, Task): + task_id = task.id + elif isinstance(task, TrainsJob): + task_id = task.task_id() + else: + task_id = task - if not isinstance(task, Task): - if hasattr(task, 'task'): - task = task.task - if not isinstance(task, Task): - task = Task.get_task(task_id=str(task)) - if not task: - raise ValueError("Task object could not be found") + if not task_id: + raise ValueError("Task ID not provided") - # todo: replace with more efficient code - scalars = task.get_reported_scalars() + # send request + # noinspection PyBroadException + try: + # noinspection PyProtectedMember + res = Task._get_default_session().send( + events_service.ScalarMetricsIterHistogramRequest( + task=task_id, key='iter', samples=None), + ) + except Exception: + res = None + if not res: + return None + response = res.wait() + if not response.ok() or not response.response_data: + return None + + scalars = response.response_data # noinspection PyBroadException try: return scalars[self.title][self.series]['x'][-1], scalars[self.title][self.series]['y'][-1] @@ -303,6 +323,7 @@ class SearchStrategy(object): self._pending_jobs = [] self._num_jobs = 0 self._job_parent_id = None + self._job_project_id = None self._created_jobs_ids = {} self._naming_function = None self._job_project = {} @@ -535,7 +556,9 @@ class SearchStrategy(object): new_job = self._job_class( base_task_id=base_task_id, parameter_override=parameter_override, task_overrides=task_overrides, tags=tags, parent=parent or self._job_parent_id, - name=name, comment=comment, project=self._get_task_project(parent or self._job_parent_id), **kwargs) + name=name, comment=comment, + project=self._job_project_id or self._get_task_project(parent or self._job_parent_id), + **kwargs) self._created_jobs_ids[new_job.task_id()] = (new_job, parameter_override) logger.info('Creating new Task: {}'.format(parameter_override)) return new_job @@ -549,14 +572,19 @@ class SearchStrategy(object): """ self._job_class = job_class - def set_job_default_parent(self, job_parent_task_id): - # type: (str) -> () + def set_job_default_parent(self, job_parent_task_id, project_name=None): + # type: (Optional[str], Optional[str]) -> () """ Set the default parent for all Jobs created by the :meth:`helper_create_job` method. :param str job_parent_task_id: The parent Task ID. + :param str project_name: If specified, create the jobs in the specified project """ self._job_parent_id = job_parent_task_id + # noinspection PyProtectedMember + self._job_project_id = get_or_create_project( + session=Task._get_default_session(), project_name=project_name, description='HPO process spawned Tasks') \ + if project_name else None def set_job_naming_scheme(self, naming_function): # type: (Optional[Callable[[str, dict], str]]) -> () @@ -619,11 +647,67 @@ class SearchStrategy(object): iteration_value = self._objective_metric.get_current_raw_objective(job) return iteration_value[0] if iteration_value else -1 + @classmethod + def _get_child_tasks_ids( + cls, + parent_task_id, # type: str + status=None, # type: Optional[Union[Task.TaskStatusEnum], Sequence[Task.TaskStatusEnum]] + order_by=None, # type: Optional[str] + additional_filters=None # type: Optional[dict] + ): + # type: (...) -> (Sequence[str]) + """ + Helper function. Return a list of tasks is tagged automl, with specific ``status``, ordered by ``sort_field``. + + :param str parent_task_id: The base Task ID (parent). + :param status: The current status of requested tasks (for example, ``in_progress`` and ``completed``). + :param str order_by: The field name to sort results. + + Examples: + + .. code-block:: py + + "-last_metrics.title.series.min" + "last_metrics.title.series.max" + "last_metrics.title.series.last" + "execution.parameters.name" + "updated" + + :param dict additional_filters: The additional task filters. + :return: A list of Task IDs (str) + """ + task_filter = { + 'parent': parent_task_id, + # 'tags': [cls._tag], + # since we have auto archive we do not want to filter out archived tasks + # 'system_tags': ['-archived'], + } + task_filter.update(additional_filters or {}) + + if status: + task_filter['status'] = status if isinstance(status, (tuple, list)) else [status] + + if order_by and (order_by.startswith('last_metrics') or order_by.startswith('-last_metrics')): + parts = order_by.split('.') + if parts[-1] in ('min', 'max', 'last'): + title = hashlib.md5(str(parts[1]).encode('utf-8')).hexdigest() + series = hashlib.md5(str(parts[2]).encode('utf-8')).hexdigest() + minmax = 'min_value' if 'min' in parts[3] else ('max_value' if 'max' in parts[3] else 'value') + order_by = '{}last_metrics.'.join( + ('-' if order_by and order_by[0] == '-' else '', title, series, minmax)) + + if order_by: + task_filter['order_by'] = [order_by] + + # noinspection PyProtectedMember + task_objects = Task._query_tasks(**task_filter) + return [t.id for t in task_objects] + @classmethod def _get_child_tasks( cls, parent_task_id, # type: str - status=None, # type: Optional[Task.TaskStatusEnum] + status=None, # type: Optional[Union[Task.TaskStatusEnum], Sequence[Task.TaskStatusEnum]] order_by=None, # type: Optional[str] additional_filters=None # type: Optional[dict] ): @@ -648,30 +732,13 @@ class SearchStrategy(object): :param dict additional_filters: The additional task filters. :return: A list of Task objects """ - task_filter = { - 'parent': parent_task_id, - # 'tags': [cls._tag], - # since we have auto archive we do not want to filter out archived tasks - # 'system_tags': ['-archived'], - } - task_filter.update(additional_filters or {}) - - if status: - task_filter['status'] = status - - if order_by and (order_by.startswith('last_metrics') or order_by.startswith('-last_metrics')): - parts = order_by.split('.') - if parts[-1] in ('min', 'max', 'last'): - title = hashlib.md5(str(parts[1]).encode('utf-8')).hexdigest() - series = hashlib.md5(str(parts[2]).encode('utf-8')).hexdigest() - minmax = 'min_value' if 'min' in parts[3] else ('max_value' if 'max' in parts[3] else 'value') - order_by = '{}last_metrics.'.join( - ('-' if order_by and order_by[0] == '-' else '', title, series, minmax)) - - if order_by: - task_filter['order_by'] = [order_by] - - return Task.get_tasks(task_filter=task_filter) + return [ + Task.get_task(task_id=t_id) for t_id in cls._get_child_tasks_ids( + parent_task_id=parent_task_id, + status=status, + order_by=order_by, + additional_filters=additional_filters) + ] class GridSearch(SearchStrategy): @@ -846,6 +913,8 @@ class HyperParameterOptimizer(object): compute_time_limit=None, # type: Optional[float] auto_connect_task=True, # type: Union[bool, Task] always_create_task=False, # type: bool + spawn_task_project=None, # type: Optional[str] + save_top_k_tasks_only=None, # type: Optional[int] **optimizer_kwargs # type: Any ): # type: (...) -> () @@ -893,6 +962,12 @@ class HyperParameterOptimizer(object): - ``False`` - Use the :py:meth:`task.Task.current_task` (if exists) to report statistics. + :param str spawn_task_project: If project name is specified, create all optimization Jobs (Tasks) in the + specified project, instead of the original base_task_id project. + + :param int save_top_k_tasks_only: If specified and above 0, keep only the top_k performing Tasks, + and archive the rest of the created Tasks. Default: -1 keep everything, nothing will be archived. + :param ** optimizer_kwargs: Arguments passed directly to the optimizer constructor. Example: @@ -986,8 +1061,9 @@ class HyperParameterOptimizer(object): self._report_period_min = 5. self._thread_reporter = None self._experiment_completed_cb = None - if self._task: - self.optimizer.set_job_default_parent(self._task.id) + self._save_top_k_tasks_only = max(0, save_top_k_tasks_only or 0) + self.optimizer.set_job_default_parent( + self._task.id if self._task else None, project_name=spawn_task_project or None) self.set_time_limit(in_minutes=opts['optimization_time_limit']) def get_num_active_experiments(self): @@ -1349,6 +1425,23 @@ class HyperParameterOptimizer(object): completed_jobs = dict() task_logger = None cur_completed_jobs = set() + cur_task = self._task or Task.current_task() + if cur_task and self.optimizer: + # noinspection PyProtectedMember + child_tasks = self.optimizer._get_child_tasks( + parent_task_id=cur_task.id, status=['completed', 'stopped']) + hyper_parameters = [h.name for h in self.hyper_parameters] + for task in child_tasks: + params = {k: v for k, v in task.get_parameters().items() if k in hyper_parameters} + params["status"] = str(task.status) + # noinspection PyProtectedMember + iteration_value = task.get_last_iteration() + objective = self.objective_metric.get_objective(task) + completed_jobs[task.id] = ( + objective if objective is not None else -1, + iteration_value if iteration_value is not None else -1, + params + ) while self._thread is not None: timeout = self.optimization_timeout - time() if self.optimization_timeout else 0. @@ -1366,8 +1459,9 @@ class HyperParameterOptimizer(object): counter += 1 # get task to report on. - if self._task or Task.current_task(): - task_logger = (self._task or Task.current_task()).get_logger() + cur_task = self._task or Task.current_task() + if cur_task: + task_logger = cur_task.get_logger() # do some reporting @@ -1382,8 +1476,8 @@ class HyperParameterOptimizer(object): cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \ {j.task_id() for j in self.optimizer.get_running_jobs()} self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title) - self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter) + self._auto_archive_low_performance_tasks(completed_jobs) # if we should leave, stop everything now. if timeout < 0: # we should leave @@ -1395,12 +1489,14 @@ class HyperParameterOptimizer(object): self._report_resources(task_logger, counter) self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title, force=True) self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter) + self._auto_archive_low_performance_tasks(completed_jobs) def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False): - top_experiments = self.get_top_experiments(top_k=1) + job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs) best_experiment = \ - (self.objective_metric.get_normalized_objective(top_experiments[0].id), top_experiments[0].id) \ - if top_experiments else (float('-inf'), None) + (self.objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]), + job_ids_sorted_by_objective[0]) \ + if job_ids_sorted_by_objective else (float('-inf'), None) if force or cur_completed_jobs != set(completed_jobs.keys()): pairs = [] labels = [] @@ -1421,22 +1517,21 @@ class HyperParameterOptimizer(object): copy(dict(**params, **{"status": id_status.get(job_id)}))) # noqa pairs.append((i, completed_jobs[job_id][0])) labels.append(str(completed_jobs[job_id][2])[1:-1]) - else: - if value is not None: - pairs.append((i, value)) - labels.append(str(params)[1:-1]) - iteration_value = self.objective_metric.get_current_raw_objective(job_id) - completed_jobs[job_id] = ( - value, - iteration_value[0] if iteration_value else -1, - copy(dict(**params, **{"status": id_status.get(job_id)}))) # noqa - # callback new experiment completed - if self._experiment_completed_cb: - normalized_value = self.objective_metric.get_normalized_objective(job_id) - if normalized_value is not None and normalized_value > best_experiment[0]: - best_experiment = normalized_value, job_id - c = completed_jobs[job_id] - self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1]) + elif value is not None: + pairs.append((i, value)) + labels.append(str(params)[1:-1]) + iteration_value = self.objective_metric.get_current_raw_objective(job_id) + completed_jobs[job_id] = ( + value, + iteration_value[0] if iteration_value else -1, + copy(dict(**params, **{"status": id_status.get(job_id)}))) # noqa + # callback new experiment completed + if self._experiment_completed_cb: + normalized_value = self.objective_metric.get_normalized_objective(job_id) + if normalized_value is not None and normalized_value > best_experiment[0]: + best_experiment = normalized_value, job_id + c = completed_jobs[job_id] + self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1]) if pairs: print('Updating job performance summary plot/table') @@ -1464,7 +1559,9 @@ class HyperParameterOptimizer(object): extra_layout={"title": "objective: {}".format(title)}) # upload summary as artifact if force: - self._task.upload_artifact(name='summary', artifact_object={'table': table_values}) + task = self._task or Task.current_task() + if task: + task.upload_artifact(name='summary', artifact_object={'table': table_values}) def _report_remaining_budget(self, task_logger, counter): # noinspection PyBroadException @@ -1487,22 +1584,24 @@ class HyperParameterOptimizer(object): def _report_completed_tasks_best_results(self, completed_jobs, task_logger, title, counter): # type: (Set[str], Logger, str, int) -> () - if completed_jobs: - value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \ - (min, "min") - latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name) - if latest_completed: - val = value_func(obj_values) - task_logger.report_scalar( - title=title, - series=series_name, - iteration=counter, - value=val) - task_logger.report_scalar( - title=title, - series="last reported", - iteration=counter, - value=latest_completed) + if not completed_jobs: + return + + value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \ + (min, "min") + latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name) + if latest_completed: + val = value_func(obj_values) + task_logger.report_scalar( + title=title, + series=series_name, + iteration=counter, + value=val) + task_logger.report_scalar( + title=title, + series="last reported", + iteration=counter, + value=latest_completed) def _report_resources(self, task_logger, iteration): # type: (Logger, int) -> () @@ -1511,8 +1610,7 @@ class HyperParameterOptimizer(object): def _report_active_workers(self, task_logger, iteration): # type: (Logger, int) -> () - cur_task = self._task or Task.current_task() - res = cur_task.send(workers_service.GetAllRequest()) + res = self.__get_session().send(workers_service.GetAllRequest()) response = res.wait() if response.ok(): all_workers = response @@ -1549,7 +1647,7 @@ class HyperParameterOptimizer(object): obj_values = [] cur_task = self._task or Task.current_task() for j in cur_completed_jobs: - res = cur_task.send(tasks_services.GetByIdRequest(task=j)) + res = cur_task.send(tasks_service.GetByIdRequest(task=j)) response = res.wait() if not response.ok() or response.response_data["task"].get("status") != Task.TaskStatusEnum.completed: continue @@ -1568,3 +1666,52 @@ class HyperParameterOptimizer(object): self.objective_metric.series) last_values = response.response_data["task"]['last_metrics'][title][series] return last_values + + def _auto_archive_low_performance_tasks(self, completed_jobs): + if self._save_top_k_tasks_only <= 0: + return + + # sort based on performance + job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs) + + # query system_tags only + res = self.__get_session().send(tasks_service.GetAllRequest( + id=job_ids_sorted_by_objective, status=['completed', 'stopped'], only_fields=['id', 'system_tags'])) + response = res.wait() + if not response.ok(): + return + + tasks_system_tags_lookup = { + task.get("id"): task.get("system_tags") for task in response.response_data.get("tasks")} + for i, task_id in enumerate(job_ids_sorted_by_objective): + system_tags = tasks_system_tags_lookup.get(task_id, []) + if i < self._save_top_k_tasks_only and Task.archived_tag in system_tags: + print('Restoring from archive Task id={} (#{} objective={})'.format( + task_id, i, completed_jobs[task_id][0])) + # top_k task and is archived, remove archive tag + system_tags = list(set(system_tags) - {Task.archived_tag}) + res = self.__get_session().send( + tasks_service.EditRequest(task=task_id, system_tags=system_tags, force=True)) + res.wait() + elif i >= self._save_top_k_tasks_only and Task.archived_tag not in system_tags: + print('Archiving Task id={} (#{} objective={})'.format( + task_id, i, completed_jobs[task_id][0])) + # Not in top_k task and not archived, add archive tag + system_tags = list(set(system_tags) | {Task.archived_tag}) + res = self.__get_session().send( + tasks_service.EditRequest(task=task_id, system_tags=system_tags, force=True)) + res.wait() + + def __get_session(self): + cur_task = self._task or Task.current_task() + if cur_task: + return cur_task.default_session + # noinspection PyProtectedMember + return Task._get_default_session() + + def __sort_jobs_by_objective(self, completed_jobs): + if not completed_jobs: + return [] + job_ids_sorted_by_objective = list(sorted( + completed_jobs.keys(), key=lambda x: completed_jobs[x][0], reverse=bool(self.objective_metric.sign >= 0))) + return job_ids_sorted_by_objective diff --git a/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py b/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py index 744962c2..7956a835 100644 --- a/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py +++ b/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py @@ -86,6 +86,10 @@ an_optimizer = HyperParameterOptimizer( optimizer_class=aSearchStrategy, # Select an execution queue to schedule the experiments for execution execution_queue=execution_queue, + # If specified all Tasks created by the HPO process will be under the `spawned_tasks_project` project + spawn_task_project=None, # 'HPO spawn project', + # If specified only the top K performing Tasks will be kept, the others will be automatically archived + save_top_k_tasks_only=None, # 5, # Optional: Limit the execution time of a single experiment, in minutes. # (this is optional, and if using OptimizerBOHB, it is ignored) time_limit_per_job=10.,