diff --git a/examples/frameworks/pytorch/notebooks/audio/audio_preprocessing_example.ipynb b/examples/frameworks/pytorch/notebooks/audio/audio_preprocessing_example.ipynb index ee86142b..59e75f55 100644 --- a/examples/frameworks/pytorch/notebooks/audio/audio_preprocessing_example.ipynb +++ b/examples/frameworks/pytorch/notebooks/audio/audio_preprocessing_example.ipynb @@ -12,7 +12,7 @@ "! pip install -U torch==1.5.1\n", "! pip install -U torchaudio==0.5.1\n", "! pip install -U matplotlib==3.2.1\n", - "! pip install -U trains>=0.16.0\n", + "! pip install -U trains>=0.16.1\n", "! pip install -U tensorboard==2.2.1" ] }, diff --git a/examples/frameworks/pytorch/notebooks/image/hyperparameter_search.ipynb b/examples/frameworks/pytorch/notebooks/image/hyperparameter_search.ipynb index 82ebc0b4..aef32da0 100644 --- a/examples/frameworks/pytorch/notebooks/image/hyperparameter_search.ipynb +++ b/examples/frameworks/pytorch/notebooks/image/hyperparameter_search.ipynb @@ -12,7 +12,7 @@ "\n", "# pip install with locked versions\n", "! pip install -U pandas==1.0.3\n", - "! pip install -U trains>=0.16.1\n", + "! pip install -U trains>=0.16.2\n", "! pip install -U optuna==2.0.0" ] }, @@ -35,7 +35,9 @@ "metadata": {}, "outputs": [], "source": [ - "task = Task.init(project_name='Hyperparameter Optimization with Optuna', task_name='Hyperparameter Search')\n" + "task = Task.init(project_name='Hyperparameter Optimization with Optuna',\n", + " task_name='Hyperparameter Search',\n", + " task_type=Task.TaskTypes.optimizer)\n" ] }, { @@ -134,4 +136,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} +} \ No newline at end of file diff --git a/trains/automation/job.py b/trains/automation/job.py index a5c65e2f..657a9f60 100644 --- a/trains/automation/job.py +++ b/trains/automation/job.py @@ -57,11 +57,7 @@ class TrainsJob(object): :param str series: Series on the specific graph (variant) :return: A tuple of min value, max value, last value """ - title = hashlib.md5(str(title).encode('utf-8')).hexdigest() - series = hashlib.md5(str(series).encode('utf-8')).hexdigest() - metric = 'last_metrics.{}.{}.'.format(title, series) - values = ['min_value', 'max_value', 'value'] - metrics = [metric + v for v in values] + metrics, title, series, values = self.get_metric_req_params(title, series) res = self.task.send( tasks_service.GetAllRequest( @@ -75,6 +71,15 @@ class TrainsJob(object): return tuple(response.response_data['tasks'][0]['last_metrics'][title][series][v] for v in values) + @staticmethod + def get_metric_req_params(title, series): + title = hashlib.md5(str(title).encode('utf-8')).hexdigest() + series = hashlib.md5(str(series).encode('utf-8')).hexdigest() + metric = 'last_metrics.{}.{}.'.format(title, series) + values = ['min_value', 'max_value', 'value'] + metrics = [metric + v for v in values] + return metrics, title, series, values + def launch(self, queue_name=None): # type: (str) -> () """ diff --git a/trains/automation/optimization.py b/trains/automation/optimization.py index 634c0373..2f931dbe 100644 --- a/trains/automation/optimization.py +++ b/trains/automation/optimization.py @@ -6,7 +6,7 @@ from itertools import product from logging import getLogger from threading import Thread, Event from time import time -from typing import Dict, Set, Tuple, Union, Any, Sequence, Optional, Mapping, Callable +from typing import List, Set, Union, Any, Sequence, Optional, Mapping, Callable from .job import TrainsJob from .parameters import Parameter @@ -395,8 +395,17 @@ class SearchStrategy(object): :return: False, if the job is no longer relevant. """ - abort_job = False + abort_job = self.update_budget_per_job(job) + + if abort_job: + job.abort() + return False + + return not job.is_stopped() + + def update_budget_per_job(self, job): + abort_job = False if self.time_limit_per_job: elapsed = job.elapsed() / 60. if elapsed > 0: @@ -409,9 +418,6 @@ class SearchStrategy(object): elapsed = job.elapsed() / 60. if elapsed > 0: self.budget.compute_time.update(job.task_id(), elapsed) - self.budget.compute_time.update(job.task_id(), job.elapsed() / 60.) - if self.budget.compute_time.used and self.compute_time_limit < self.budget.compute_time.used: - abort_job = True if self.max_iteration_per_job: iterations = self._get_job_iterations(job) @@ -420,11 +426,7 @@ class SearchStrategy(object): if iterations > self.max_iteration_per_job: abort_job = True - if abort_job: - job.abort() - return False - - return not job.is_stopped() + return abort_job def get_running_jobs(self): # type: () -> Sequence[TrainsJob] @@ -443,7 +445,17 @@ class SearchStrategy(object): :return: dict of task IDs (str) as keys, and their parameters dict as values. """ - return self._created_jobs_ids + return {job_id: job_val[1] for job_id, job_val in self._created_jobs_ids.items()} + + def get_created_jobs_tasks(self): + # type: () -> Mapping[str, dict] + """ + Return a Task IDs dict created by this optimizer until now. + The values of the returned dict are the TrainsJob. + + :return: dict of task IDs (str) as keys, and their TrainsJob as values. + """ + return {job_id: job_val[0] for job_id, job_val in self._created_jobs_ids.items()} def get_top_experiments(self, top_k): # type: (int) -> Sequence[Task] @@ -502,7 +514,7 @@ class SearchStrategy(object): base_task_id=base_task_id, parameter_override=parameter_override, task_overrides=task_overrides, tags=tags, parent=parent or self._job_parent_id, name=name, comment=comment, project=self._get_task_project(parent or self._job_parent_id), **kwargs) - self._created_jobs_ids[new_job.task_id()] = parameter_override + self._created_jobs_ids[new_job.task_id()] = (new_job, parameter_override) logger.info('Creating new Task: {}'.format(parameter_override)) return new_job @@ -900,7 +912,7 @@ class HyperParameterOptimizer(object): # create a new Task, if we do not have one already self._task = Task.current_task() if not self._task and always_create_task: - base_task = Task.get_task(task_id=self.base_task_id) + base_task = Task.get_task(task_id=base_task_id) self._task = Task.init( project_name=base_task.get_project_name(), task_name='Optimizing: {}'.format(base_task.name), @@ -1014,15 +1026,18 @@ class HyperParameterOptimizer(object): self._thread_reporter.start() return True - def stop(self, timeout=None): - # type: (Optional[float]) -> () + def stop(self, timeout=None, flush_reporter=True): + # type: (Optional[float], Optional[bool]) -> () """ Stop the HyperParameterOptimizer controller and the optimization thread. :param float timeout: Wait timeout for the optimization thread to exit (minutes). The default is ``None``, indicating do not wait terminate immediately. + :param flush_reporter: Wait for reporter to flush data. """ if not self._thread or not self._stop_event or not self.optimizer: + if self._thread_reporter and flush_reporter: + self._thread_reporter.join() return _thread = self._thread @@ -1039,8 +1054,9 @@ class HyperParameterOptimizer(object): # clear thread self._thread = None - # wait for reporter to flush - self._thread_reporter.join() + if flush_reporter: + # wait for reporter to flush + self._thread_reporter.join() def is_active(self): # type: () -> bool @@ -1255,7 +1271,8 @@ class HyperParameterOptimizer(object): title = '{}/{}'.format(title, series) counter = 0 completed_jobs = dict() - best_experiment = float('-inf'), None + task_logger = None + cur_completed_jobs = set() while self._thread is not None: timeout = self.optimization_timeout - time() if self.optimization_timeout else 0. @@ -1278,98 +1295,127 @@ class HyperParameterOptimizer(object): # do some reporting - # noinspection PyBroadException - try: - budget = self.optimizer.budget.to_dict() - except Exception: - budget = {} + self._report_remaining_budget(task_logger, counter) - # report remaining budget - for budget_part, value in budget.items(): - task_logger.report_scalar( - title='remaining budget', series='{} %'.format(budget_part), - iteration=counter, value=round(100 - value['used'] * 100., ndigits=1)) - if self.optimization_timeout and self.optimization_start_time: - task_logger.report_scalar( - title='remaining budget', series='time %', - iteration=counter, - value=round(100 - (100. * (time() - self.optimization_start_time) / - (self.optimization_timeout - self.optimization_start_time)), ndigits=1) - ) + if self.optimizer.budget.compute_time.used and self.optimizer.budget.compute_time.used >= 1.0: + # Reached compute time limit + timeout = -1 self._report_resources(task_logger, counter) # collect a summary of all the jobs and their final objective values cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \ {j.task_id() for j in self.optimizer.get_running_jobs()} - if cur_completed_jobs != set(completed_jobs.keys()): - pairs = [] - labels = [] - created_jobs = copy(self.optimizer.get_created_jobs_ids()) - for i, (job_id, params) in enumerate(created_jobs.items()): - if job_id in completed_jobs: - pairs.append((i, completed_jobs[job_id][0])) - labels.append(str(completed_jobs[job_id][2])[1:-1]) - else: - value = self.objective_metric.get_objective(job_id) - if value is not None: - pairs.append((i, value)) - labels.append(str(params)[1:-1]) - iteration_value = self.objective_metric.get_current_raw_objective(job_id) - completed_jobs[job_id] = ( - value, iteration_value[0] if iteration_value else -1, copy(params)) - # callback new experiment completed - if self._experiment_completed_cb: - normalized_value = self.objective_metric.get_normalized_objective(job_id) - if normalized_value is not None and normalized_value > best_experiment[0]: - best_experiment = normalized_value, job_id - c = completed_jobs[job_id] - self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1]) - - self._report_completed_tasks_best_results(completed_jobs, task_logger, title, counter) - - if pairs: - print('Updating job performance summary plot/table') - - # update scatter plot - task_logger.report_scatter2d( - title='optimization', series=title, - scatter=pairs, iteration=0, labels=labels, - mode='markers', xaxis='job #', yaxis='objective') - - # update summary table - if pd: - index = list(completed_jobs.keys()) - table = {'objective': [completed_jobs[i][0] for i in index], - 'iteration': [completed_jobs[i][1] for i in index]} - columns = set([c for k, v in completed_jobs.items() for c in v[2].keys()]) - for c in sorted(columns): - table.update({c: [completed_jobs[i][2].get(c, '') for i in index]}) - - df = pd.DataFrame(table, index=index) - df.sort_values(by='objective', ascending=bool(self.objective_metric.sign < 0), inplace=True) - df.index.name = 'task id' - task_logger.report_table( - "summary", "job", 0, table_plot=df, - extra_layout={"title": "objective: {}".format(title)}) + self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title) + self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter) # if we should leave, stop everything now. if timeout < 0: # we should leave - self.stop() + self.stop(flush_reporter=False) return + if task_logger and counter: + counter += 1 + self._report_remaining_budget(task_logger, counter) + self._report_resources(task_logger, counter) + self._report_completed_status(completed_jobs, cur_completed_jobs, task_logger, title, force=True) + self._report_completed_tasks_best_results(set(completed_jobs.keys()), task_logger, title, counter) + + def _report_completed_status(self, completed_jobs, cur_completed_jobs, task_logger, title, force=False): + best_experiment = float('-inf'), None + if force or cur_completed_jobs != set(completed_jobs.keys()): + pairs = [] + labels = [] + created_jobs = copy(self.optimizer.get_created_jobs_ids()) + id_status = {j_id: j_run.status() for j_id, j_run in self.optimizer.get_created_jobs_tasks().items()} + for i, (job_id, params) in enumerate(created_jobs.items()): + value = self.objective_metric.get_objective(job_id) + if job_id in completed_jobs: + if value != completed_jobs[job_id][0]: + iteration_value = self.objective_metric.get_current_raw_objective(job_id) + completed_jobs[job_id] = ( + value, + iteration_value[0] if iteration_value else -1, + copy(dict(**params, **{"status": id_status.get(job_id)}))) + elif completed_jobs.get(job_id): + completed_jobs[job_id] = (completed_jobs[job_id][0], + completed_jobs[job_id][1], + copy(dict(**params, **{"status": id_status.get(job_id)}))) + pairs.append((i, completed_jobs[job_id][0])) + labels.append(str(completed_jobs[job_id][2])[1:-1]) + else: + if value is not None: + pairs.append((i, value)) + labels.append(str(params)[1:-1]) + iteration_value = self.objective_metric.get_current_raw_objective(job_id) + completed_jobs[job_id] = ( + value, + iteration_value[0] if iteration_value else -1, + copy(dict(**params, **{"status": id_status.get(job_id)}))) + # callback new experiment completed + if self._experiment_completed_cb: + normalized_value = self.objective_metric.get_normalized_objective(job_id) + if normalized_value is not None and normalized_value > best_experiment[0]: + best_experiment = normalized_value, job_id + c = completed_jobs[job_id] + self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1]) + + if pairs: + print('Updating job performance summary plot/table') + + # update scatter plot + task_logger.report_scatter2d( + title='optimization', series=title, + scatter=pairs, iteration=0, labels=labels, + mode='markers', xaxis='job #', yaxis='objective') + + # update summary table + if pd: + index = list(completed_jobs.keys()) + table = {'objective': [completed_jobs[i][0] for i in index], + 'iteration': [completed_jobs[i][1] for i in index]} + columns = set([c for k, v in completed_jobs.items() for c in v[2].keys()]) + for c in sorted(columns): + table.update({c: [completed_jobs[i][2].get(c, '') for i in index]}) + + df = pd.DataFrame(table, index=index) + df.sort_values(by='objective', ascending=bool(self.objective_metric.sign < 0), inplace=True) + df.index.name = 'task id' + task_logger.report_table( + "summary", "job", 0, table_plot=df, + extra_layout={"title": "objective: {}".format(title)}) + + def _report_remaining_budget(self, task_logger, counter): + # noinspection PyBroadException + try: + budget = self.optimizer.budget.to_dict() + except Exception: + budget = {} + # report remaining budget + for budget_part, value in budget.items(): + task_logger.report_scalar( + title='remaining budget', series='{} %'.format(budget_part), + iteration=counter, value=round(100 - value['used'] * 100., ndigits=1)) + if self.optimization_timeout and self.optimization_start_time: + task_logger.report_scalar( + title='remaining budget', series='time %', + iteration=counter, + value=round(100 - (100. * (time() - self.optimization_start_time) / + (self.optimization_timeout - self.optimization_start_time)), ndigits=1) + ) def _report_completed_tasks_best_results(self, completed_jobs, task_logger, title, counter): - # type: (Dict[str, Tuple[float, int, Dict[str, int]]], Logger, str, int) -> () + # type: (Set[str], Logger, str, int) -> () if completed_jobs: value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \ (min, "min") - task_logger.report_scalar( - title=title, - series=series_name, - iteration=counter, - value=value_func([val[0] for val in completed_jobs.values()])) - latest_completed = self._get_latest_completed_task_value(set(completed_jobs.keys())) + latest_completed, obj_values = self._get_latest_completed_task_value(completed_jobs, series_name) + val = value_func(obj_values) if latest_completed: + task_logger.report_scalar( + title=title, + series=series_name, + iteration=counter, + value=val) task_logger.report_scalar( title=title, series="last reported", @@ -1396,7 +1442,10 @@ class HyperParameterOptimizer(object): if q.get("name") == self.execution_queue ] ) - task_logger.report_scalar(title="resources", series="queue workers", iteration=iteration, value=queue_workers) + task_logger.report_scalar(title="resources", + series="queue workers", + iteration=iteration, + value=queue_workers) def _report_tasks_status(self, task_logger, iteration): # type: (Logger, int) -> () @@ -1411,10 +1460,11 @@ class HyperParameterOptimizer(object): title="resources", series=series, iteration=iteration, value=val) - def _get_latest_completed_task_value(self, cur_completed_jobs): - # type: (Set[str]) -> float + def _get_latest_completed_task_value(self, cur_completed_jobs, series_name): + # type: (Set[str], str) -> (float, List[float]) completed_value = None latest_completed = None + obj_values = [] cur_task = self._task or Task.current_task() for j in cur_completed_jobs: res = cur_task.send(tasks_services.GetByIdRequest(task=j)) @@ -1424,7 +1474,15 @@ class HyperParameterOptimizer(object): completed_time = datetime.strptime(response.response_data["task"]["completed"].partition("+")[0], "%Y-%m-%dT%H:%M:%S.%f") completed_time = completed_time.timestamp() + completed_values = self._get_last_value(response) + obj_values.append(completed_values['max_value'] if series_name == "max" else completed_values['min_value']) if not latest_completed or completed_time > latest_completed: latest_completed = completed_time - completed_value = self.objective_metric.get_objective(j) - return completed_value + completed_value = completed_values['value'] + return completed_value, obj_values + + def _get_last_value(self, response): + metrics, title, series, values = TrainsJob.get_metric_req_params(self.objective_metric.title, + self.objective_metric.series) + last_values = response.response_data["task"]['last_metrics'][title][series] + return last_values diff --git a/trains/automation/optuna/optuna.py b/trains/automation/optuna/optuna.py index 61fb47a4..ab6565c9 100644 --- a/trains/automation/optuna/optuna.py +++ b/trains/automation/optuna/optuna.py @@ -45,11 +45,12 @@ class OptunaObjective(object): current_job.launch(self.queue_name) iteration_value = None is_pending = True - while self.optimizer.monitor_job(current_job): + while not current_job.is_stopped(): if is_pending and not current_job.is_pending(): is_pending = False self.optimizer.budget.jobs.update(current_job.task_id(), 1.) if not is_pending: + self.optimizer.update_budget_per_job(current_job) # noinspection PyProtectedMember iteration_value = self.optimizer._objective_metric.get_current_raw_objective(current_job) @@ -182,6 +183,7 @@ class OptimizerOptuna(SearchStrategy): self._study.stop() except Exception as ex: print(ex) + self._stop_event.set() def _convert_hyper_parameters_to_optuna(self): # type: () -> dict