Refactor new code

This commit is contained in:
allegroai 2024-01-10 14:01:52 +02:00
parent 1f72b0ca66
commit c5510bb06e
5 changed files with 152 additions and 28 deletions

View File

@ -1,8 +1,6 @@
import hashlib import hashlib
import json import json
import six import six
import numpy as np
import functools
from copy import copy, deepcopy from copy import copy, deepcopy
from datetime import datetime from datetime import datetime
from itertools import product from itertools import product
@ -21,6 +19,7 @@ from ..task import Task
logger = getLogger('clearml.automation.optimization') logger = getLogger('clearml.automation.optimization')
class _ObjectiveInterface(ABC): class _ObjectiveInterface(ABC):
@abstractmethod @abstractmethod
def get_objective(self, task_id): def get_objective(self, task_id):
@ -1992,8 +1991,9 @@ class HyperParameterOptimizer(object):
self._report_resources(task_logger, counter) self._report_resources(task_logger, counter)
# collect a summary of all the jobs and their final objective values # collect a summary of all the jobs and their final objective values
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \ cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - {
{j.task_id() for j in self.optimizer.get_running_jobs()} j.task_id() for j in self.optimizer.get_running_jobs()
}
self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title) self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title)
self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter) self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter)
@ -2015,10 +2015,14 @@ class HyperParameterOptimizer(object):
def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False): def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False):
job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs) job_ids_sorted_by_objective = self.__sort_jobs_by_objective(completed_jobs)
best_experiment = \ best_experiment = (
(self._objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]), (
job_ids_sorted_by_objective[0]) \ self._objective_metric.get_normalized_objective(job_ids_sorted_by_objective[0]),
if job_ids_sorted_by_objective else ([float("-inf")], None) job_ids_sorted_by_objective[0],
)
if job_ids_sorted_by_objective
else ([float("-inf")], None)
)
if force or cur_completed_jobs != set(completed_jobs.keys()): if force or cur_completed_jobs != set(completed_jobs.keys()):
pairs = [] pairs = []
labels = [] labels = []
@ -2209,8 +2213,9 @@ class HyperParameterOptimizer(object):
type="parcoords", type="parcoords",
line=dict( line=dict(
colorscale="Viridis", colorscale="Viridis",
reversescale=not isinstance(self._objective_metric, MultiObjective) reversescale = (
and self._objective_metric.sign >= 0, self._objective_metric.len == 1 and self._objective_metric.objectives[0].sign >= 0,
),
color=table_values_columns[-1][1:], color=table_values_columns[-1][1:],
), ),
dimensions=pcc_dims, dimensions=pcc_dims,

View File

@ -1,7 +1,7 @@
from time import sleep from time import sleep
from typing import Any, Optional, Sequence from typing import Any, Optional, Sequence
from ..optimization import Objective, SearchStrategy, MultiObjective from ..optimization import Objective, SearchStrategy
from ..parameters import (DiscreteParameterRange, Parameter, UniformIntegerParameterRange, UniformParameterRange, from ..parameters import (DiscreteParameterRange, Parameter, UniformIntegerParameterRange, UniformParameterRange,
LogUniformParameterRange) LogUniformParameterRange)
from ...task import Task from ...task import Task

View File

@ -17,15 +17,13 @@ from ...task import Task
class CreateAndPopulate(object): class CreateAndPopulate(object):
_VCS_SSH_REGEX = \ _VCS_SSH_REGEX = (
"^" \ "^"
"(?:(?P<user>{regular}*?)@)?" \ "(?:(?P<user>{regular}*?)@)?"
"(?P<host>{regular}*?)" \ "(?P<host>{regular}*?)"
":" \ ":"
"(?P<path>{regular}.*)?" \ "(?P<path>{regular}.*)?"
"$" \ "$".format(regular=r"[^/@:#]")
.format(
regular=r"[^/@:#]"
) )
def __init__( def __init__(
@ -854,7 +852,7 @@ if __name__ == '__main__':
def add_import_guard(import_): def add_import_guard(import_):
return ("try:\n " return ("try:\n "
+ import_.replace("\n", "\n ", import_.count("\n") - 1) + import_.replace("\n", "\n ", import_.count("\n") - 1)
+ "except Exception as e:\n print('Import error: ' + str(e))\n" + "\nexcept Exception as e:\n print('Import error: ' + str(e))\n"
) )
# noinspection PyBroadException # noinspection PyBroadException
@ -862,14 +860,24 @@ if __name__ == '__main__':
import ast import ast
func_module = inspect.getmodule(func) func_module = inspect.getmodule(func)
source = inspect.getsource(func_module) source = inspect.getsource(func_module)
source_lines = inspect.getsourcelines(func_module)[0]
parsed_source = ast.parse(source) parsed_source = ast.parse(source)
imports = [] imports = []
for parsed_source_entry in parsed_source.body: for parsed_source_entry in parsed_source.body:
if isinstance(parsed_source_entry, # we only include global imports (i.e. at col_offset 0)
(ast.Import, ast.ImportFrom)) and parsed_source_entry.col_offset == 0: if parsed_source_entry.col_offset != 0:
imports.append( continue
"\n".join(source_lines[parsed_source_entry.lineno - 1: parsed_source_entry.end_lineno])) if isinstance(parsed_source_entry, ast.ImportFrom):
for sub_entry in parsed_source_entry.names:
import_str = "from {} import {}".format(parsed_source_entry.module, sub_entry.name)
if sub_entry.asname:
import_str += " as {}".format(sub_entry.asname)
imports.append(import_str)
elif isinstance(parsed_source_entry, ast.Import):
for sub_entry in parsed_source_entry.names:
import_str = "import {}".format(sub_entry.name)
if sub_entry.asname:
import_str += " as {}".format(sub_entry.asname)
imports.append(import_str)
imports = [add_import_guard(import_) for import_ in imports] imports = [add_import_guard(import_) for import_ in imports]
return "\n".join(imports) return "\n".join(imports)
except Exception as e: except Exception as e:

View File

@ -0,0 +1,111 @@
import sys
from clearml import Task
from clearml.automation import DiscreteParameterRange, HyperParameterOptimizer, UniformIntegerParameterRange
try:
from clearml.automation.optuna import OptimizerOptuna # noqa
except ImportError:
print("Multi-objective HPO is currently only supported via Optuna")
sys.exit(0)
if __name__ == "__main__":
task = Task.init(
project_name="Hyper-Parameter Optimization",
task_name="Multi-objective HPO",
task_type=Task.TaskTypes.optimizer,
reuse_last_task_id=False,
)
# experiment template to optimize in the hyper-parameter optimization
args = {
"template_task_id": None,
"run_as_service": False,
}
args = task.connect(args)
# Get the template task experiment that we want to optimize
if not args["template_task_id"]:
args["template_task_id"] = Task.get_task(project_name="examples", task_name="Keras HP optimization base").id
# Set default queue name for the Training tasks themselves.
# later can be overridden in the UI
execution_queue = "1xGPU"
an_optimizer = HyperParameterOptimizer(
# This is the experiment we want to optimize
base_task_id=args["template_task_id"],
# here we define the hyper-parameters to optimize
# Notice: The parameter name should exactly match what you see in the UI: <section_name>/<parameter>
# For Example, here we see in the base experiment a section Named: "General"
# under it a parameter named "batch_size", this becomes "General/batch_size"
# If you have `argparse` for example, then arguments will appear under the "Args" section,
# and you should instead pass "Args/batch_size"
hyper_parameters=[
UniformIntegerParameterRange("General/layer_1", min_value=128, max_value=512, step_size=128),
UniformIntegerParameterRange("General/layer_2", min_value=128, max_value=512, step_size=128),
DiscreteParameterRange("General/batch_size", values=[96, 128, 160]),
DiscreteParameterRange("General/epochs", values=[30]),
],
# this is the objectives' metric/series we want to maximize/minimize
objective_metric_title=["evaluate", "evaluate"],
objective_metric_series=["score", "accuracy"],
# now we decide if we want to maximize it or minimize them
# in this case, we want to minimize evaluate/score and maximize evaluate/accuracy
objective_metric_sign=["min", "max"],
# let us limit the number of concurrent experiments,
# this in turn will make sure we do dont bombard the scheduler with experiments.
# if we have an auto-scaler connected, this, by proxy, will limit the number of machine
max_number_of_concurrent_tasks=1,
# optimizer_class has to be OptimizerOptuna
optimizer_class=OptimizerOptuna,
# Select an execution queue to schedule the experiments for execution
execution_queue=execution_queue,
# If specified all Tasks created by the HPO process will be created under the `spawned_project` project
spawn_project=None, # 'HPO spawn project',
# If specified only the top K performing Tasks will be kept, the others will be automatically archived
save_top_k_tasks_only=None, # 5,
# Optional: Limit the execution time of a single experiment, in minutes.
# (this is optional, and if using OptimizerBOHB, it is ignored)
time_limit_per_job=10.0,
# Check the experiments every 12 seconds is way too often, we should probably set it to 5 min,
# assuming a single experiment is usually hours...
pool_period_min=0.2,
# set the maximum number of jobs to launch for the optimization, default (None) unlimited
# If OptimizerBOHB is used, it defined the maximum budget in terms of full jobs
# basically the cumulative number of iterations will not exceed total_max_jobs * max_iteration_per_job
total_max_jobs=10,
# set the minimum number of iterations for an experiment, before early stopping.
# Does not apply for simple strategies such as RandomSearch or GridSearch
min_iteration_per_job=10,
# Set the maximum number of iterations for an experiment to execute
# (This is optional, unless using OptimizerBOHB where this is a must)
max_iteration_per_job=30,
)
# if we are running as a service, just enqueue ourselves into the services queue and let it run the optimization
if args["run_as_service"]:
# if this code is executed by `clearml-agent` the function call does nothing.
# if executed locally, the local process will be terminated, and a remote copy will be executed instead
task.execute_remotely(queue_name="services", exit_process=True)
# report every 12 seconds, this is way too often, but we are testing here
an_optimizer.set_report_period(0.2)
# start the optimization process, callback function to be called every time an experiment is completed
# this function returns immediately
an_optimizer.start()
# You can also use the line below instead to run all the optimizer tasks locally, without using queues or agent
# an_optimizer.start_locally(job_complete_callback=job_complete_callback)
# set the time limit for the optimization process (2 hours)
an_optimizer.set_time_limit(in_minutes=120.0)
# wait until process is done (notice we are controlling the optimization process in the background)
an_optimizer.wait()
# optimization is completed, print the top performing experiments id
top_exp = an_optimizer.get_top_experiments(top_k=3)
print([t.id for t in top_exp])
# make sure background optimization stopped
an_optimizer.stop()
print("We are done, good bye")