This commit is contained in:
revital
2024-02-25 09:56:54 +02:00
61 changed files with 696 additions and 247 deletions

View File

@@ -67,7 +67,31 @@ Instrumenting these components is the **ClearML-server**, see [Self-Hosting](htt
</div>
---
<a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml/blob/master/docs/webapp_screenshots.gif?raw=true" width="100%"></a>
<table>
<tbody>
<tr>
<td>Experiment Management</td>
<td>Datasets</td>
</tr>
<tr>
<td><a href="https://app.clear.ml"><img src="https://github.com/allegroai/clearml/blob/master/docs/experiment_manager.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/datasets"><img src="https://github.com/allegroai/clearml/blob/master/docs/datasets.gif?raw=true" width="100%"></a></td>
</tr>
<tr>
<td colspan="2" height="24px"></td>
</tr>
<tr>
<td>Orchestration</td>
<td>Pipelines</td>
</tr>
<tr>
<td><a href="https://app.clear.ml/workers-and-queues/autoscalers"><img src="https://github.com/allegroai/clearml/blob/master/docs/orchestration.gif?raw=true" width="100%"></a></td>
<td><a href="https://app.clear.ml/pipelines"><img src="https://github.com/allegroai/clearml/blob/master/docs/pipelines.gif?raw=true" width="100%"></a></td>
</tr>
</tbody>
</table>
## ClearML Experiment Manager

View File

@@ -1469,6 +1469,7 @@ class PipelineController(object):
pipeline_object._task = pipeline_task
pipeline_object._nodes = {}
pipeline_object._running_nodes = []
pipeline_object._version = pipeline_task._get_runtime_properties().get("version")
try:
pipeline_object._deserialize(pipeline_task._get_configuration_dict(cls._config_section), force=True)
except Exception:
@@ -1485,6 +1486,11 @@ class PipelineController(object):
# type: () -> List[str]
return self._task.get_tags() or []
@property
def version(self):
# type: () -> str
return self._version
def add_tags(self, tags):
# type: (Union[Sequence[str], str]) -> None
"""
@@ -4075,6 +4081,13 @@ class PipelineDecorator(PipelineController):
):
_node.name = "{}_{}".format(_node_name, counter)
counter += 1
# Copy callbacks to the replicated node
if cls._singleton._pre_step_callbacks.get(_node_name):
cls._singleton._pre_step_callbacks[_node.name] = cls._singleton._pre_step_callbacks[_node_name]
if cls._singleton._post_step_callbacks.get(_node_name):
cls._singleton._post_step_callbacks[_node.name] = cls._singleton._post_step_callbacks[_node_name]
if cls._singleton._status_change_callbacks.get(_node_name):
cls._singleton._status_change_callbacks[_node.name] = cls._singleton._status_change_callbacks[_node_name]
_node_name = _node.name
if _node.name not in cls._singleton._nodes:
cls._singleton._nodes[_node.name] = _node

View File

@@ -1947,7 +1947,7 @@ class HyperParameterOptimizer(object):
objective if objective is not None else (
[-1] * self._objective_metric.len
),
iteration_value if iteration_value is not None else (
[iteration_value] * self._objective_metric.len if iteration_value is not None else (
[-1] * self._objective_metric.len
),
params

View File

@@ -8,8 +8,8 @@ from typing import List, Union, Optional, Callable, Sequence, Dict
from attr import attrs, attrib
from dateutil.relativedelta import relativedelta
from .job import ClearmlJob
from .controller import PipelineController
from .job import ClearmlJob
from ..backend_interface.util import datetime_from_isoformat, datetime_to_isoformat, mutually_exclusive
from ..task import Task
@@ -28,12 +28,11 @@ class BaseScheduleJob(object):
_executed_instances = attrib(type=list, default=None)
def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items()
if not callable(v) and (full or not str(k).startswith('_'))}
return {k: v for k, v in self.__dict__.items() if not callable(v) and (full or not str(k).startswith("_"))}
def update(self, a_job):
# type: (Union[Dict, BaseScheduleJob]) -> BaseScheduleJob
converters = {a.name: a.converter for a in getattr(self, '__attrs_attrs__', [])}
converters = {a.name: a.converter for a in getattr(self, "__attrs_attrs__", [])}
for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items():
if v is not None and not callable(getattr(self, k, v)):
setattr(self, k, converters[k](v) if converters.get(k) else v)
@@ -80,7 +79,7 @@ class BaseScheduleJob(object):
@attrs
class ScheduleJob(BaseScheduleJob):
_weekdays_ind = ('monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday')
_weekdays_ind = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
execution_limit_hours = attrib(type=float, default=None)
recurring = attrib(type=bool, default=True)
@@ -100,8 +99,7 @@ class ScheduleJob(BaseScheduleJob):
# type: () -> None
def check_integer(value):
try:
return False if not isinstance(value, (int, float)) or \
int(value) != float(value) else True
return False if not isinstance(value, (int, float)) or int(value) != float(value) else True
except (TypeError, ValueError):
return False
@@ -155,7 +153,7 @@ class ScheduleJob(BaseScheduleJob):
month=int(self.month or 1),
day=int(self.day or 1),
hour=int(self.hour or 0),
minute=int(self.minute or 0)
minute=int(self.minute or 0),
)
if self.weekdays:
self._next_run += relativedelta(weekday=self.get_weekday_ord(self.weekdays[0]))
@@ -172,7 +170,7 @@ class ScheduleJob(BaseScheduleJob):
except ValueError:
# in case previous execution was not in the weekday (for example executed immediately at scheduling)
prev_weekday_ind = -1
weekday = _weekdays[(prev_weekday_ind+1) % len(_weekdays)]
weekday = _weekdays[(prev_weekday_ind + 1) % len(_weekdays)]
prev_timestamp = self._last_executed or self.starting_time
# fix first scheduled job should be as close as possible to starting time
@@ -184,8 +182,8 @@ class ScheduleJob(BaseScheduleJob):
# if this is a daily schedule and we can still run it today, then we should
run0 = self._calc_next_run(self.starting_time, weekday)
run1 = self._calc_next_run(run0, weekday)
delta = run1-run0
optional_first_timestamp = self._calc_next_run(prev_timestamp-delta, weekday)
delta = run1 - run0
optional_first_timestamp = self._calc_next_run(prev_timestamp - delta, weekday)
if optional_first_timestamp > prev_timestamp:
# this is us, we can still run it
self._next_run = optional_first_timestamp
@@ -236,7 +234,7 @@ class ScheduleJob(BaseScheduleJob):
months=0 if self.year else (self.month or 0),
hours=self.hour or 0,
minutes=self.minute or 0,
weekday=weekday
weekday=weekday,
)
return next_timestamp
@@ -283,7 +281,7 @@ class ScheduleJob(BaseScheduleJob):
if self.execution_limit_hours and task_id:
self._execution_timeout = self._last_executed + relativedelta(
hours=int(self.execution_limit_hours),
minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60)
minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60),
)
else:
self._execution_timeout = None
@@ -306,16 +304,16 @@ class ExecutedJob(object):
thread_id = attrib(type=str, default=None)
def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith('_')}
return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith("_")}
class BaseScheduler(object):
def __init__(
self,
sync_frequency_minutes=15,
force_create_task_name=None,
force_create_task_project=None,
pooling_frequency_minutes=None
self,
sync_frequency_minutes=15,
force_create_task_name=None,
force_create_task_project=None,
pooling_frequency_minutes=None
):
# type: (float, Optional[str], Optional[str], Optional[float]) -> None
"""
@@ -333,8 +331,8 @@ class BaseScheduler(object):
self._sync_frequency_minutes = sync_frequency_minutes
if force_create_task_name or not Task.current_task():
self._task = Task.init(
project_name=force_create_task_project or 'DevOps',
task_name=force_create_task_name or 'Scheduler',
project_name=force_create_task_project or "DevOps",
task_name=force_create_task_name or "Scheduler",
task_type=Task.TaskTypes.service,
auto_resource_monitoring=False,
)
@@ -356,12 +354,12 @@ class BaseScheduler(object):
while True:
# sync with backend
try:
if time() - self._last_sync > 60. * self._sync_frequency_minutes:
if time() - self._last_sync > 60.0 * self._sync_frequency_minutes:
self._last_sync = time()
self._deserialize()
self._update_execution_plots()
except Exception as ex:
self._log('Warning: Exception caught during deserialization: {}'.format(ex))
self._log("Warning: Exception caught during deserialization: {}".format(ex))
self._last_sync = time()
try:
@@ -369,16 +367,16 @@ class BaseScheduler(object):
self._serialize_state()
self._update_execution_plots()
except Exception as ex:
self._log('Warning: Exception caught during scheduling step: {}'.format(ex))
self._log("Warning: Exception caught during scheduling step: {}".format(ex))
# rate control
sleep(15)
# sleep until the next pool (default None)
if self._pooling_frequency_minutes:
self._log("Sleeping until the next pool in {} minutes".format(self._pooling_frequency_minutes))
sleep(self._pooling_frequency_minutes*60.)
sleep(self._pooling_frequency_minutes * 60.0)
def start_remotely(self, queue='services'):
def start_remotely(self, queue="services"):
# type: (str) -> None
"""
Start the Task TaskScheduler loop (notice this function does not return)
@@ -456,11 +454,8 @@ class BaseScheduler(object):
# check if this is a single instance, then we need to abort the Task
if job.single_instance and job.get_last_executed_task_id():
t = Task.get_task(task_id=job.get_last_executed_task_id())
if t.status in ('in_progress', 'queued'):
self._log(
'Skipping Task {} scheduling, previous Task instance {} still running'.format(
job.name, t.id
))
if t.status in ("in_progress", "queued"):
self._log("Skipping Task {} scheduling, previous Task instance {} still running".format(job.name, t.id))
job.run(None)
return None
@@ -474,8 +469,7 @@ class BaseScheduler(object):
target_project=job.get_resolved_target_project(),
tags=[add_tags] if add_tags and isinstance(add_tags, str) else add_tags,
)
self._log('Scheduling Job {}, Task {} on queue {}.'.format(
job.name, task_job.task_id(), job.queue))
self._log("Scheduling Job {}, Task {} on queue {}.".format(job.name, task_job.task_id(), job.queue))
if task_job.launch(queue_name=job.queue):
# mark as run
job.run(task_job.task_id())
@@ -501,12 +495,12 @@ class BaseScheduler(object):
self._log(
"Skipping Task '{}' scheduling, previous Thread instance '{}' still running".format(
job.name, a_thread.ident
))
)
)
job.run(None)
return None
self._log("Scheduling Job '{}', Task '{}' on background thread".format(
job.name, job.base_function))
self._log("Scheduling Job '{}', Task '{}' on background thread".format(job.name, job.base_function))
t = Thread(target=job.base_function, args=func_args or ())
t.start()
# mark as run
@@ -520,9 +514,9 @@ class BaseScheduler(object):
return
t = Task.get_task(task_id=task_id)
status = t.status
if status in ('in_progress',):
if status in ("in_progress",):
t.stopped(force=True)
elif status in ('queued',):
elif status in ("queued",):
Task.dequeue(t)
@@ -531,7 +525,8 @@ class TaskScheduler(BaseScheduler):
Task Scheduling controller.
Notice time-zone is ALWAYS UTC
"""
_configuration_section = 'schedule'
_configuration_section = "schedule"
def __init__(self, sync_frequency_minutes=15, force_create_task_name=None, force_create_task_project=None):
# type: (float, Optional[str], Optional[str]) -> None
@@ -548,32 +543,32 @@ class TaskScheduler(BaseScheduler):
super(TaskScheduler, self).__init__(
sync_frequency_minutes=sync_frequency_minutes,
force_create_task_name=force_create_task_name,
force_create_task_project=force_create_task_project
force_create_task_project=force_create_task_project,
)
self._schedule_jobs = [] # List[ScheduleJob]
self._timeout_jobs = {} # Dict[datetime, str]
self._executed_jobs = [] # List[ExecutedJob]
def add_task(
self,
schedule_task_id=None, # type: Union[str, Task]
schedule_function=None, # type: Callable
queue=None, # type: str
name=None, # type: Optional[str]
target_project=None, # type: Optional[str]
minute=None, # type: Optional[int]
hour=None, # type: Optional[int]
day=None, # type: Optional[int]
weekdays=None, # type: Optional[List[str]]
month=None, # type: Optional[int]
year=None, # type: Optional[int]
limit_execution_time=None, # type: Optional[float]
single_instance=False, # type: bool
recurring=True, # type: bool
execute_immediately=False, # type: bool
reuse_task=False, # type: bool
task_parameters=None, # type: Optional[dict]
task_overrides=None, # type: Optional[dict]
self,
schedule_task_id=None, # type: Union[str, Task]
schedule_function=None, # type: Callable
queue=None, # type: str
name=None, # type: Optional[str]
target_project=None, # type: Optional[str]
minute=None, # type: Optional[int]
hour=None, # type: Optional[int]
day=None, # type: Optional[int]
weekdays=None, # type: Optional[List[str]]
month=None, # type: Optional[int]
year=None, # type: Optional[int]
limit_execution_time=None, # type: Optional[float]
single_instance=False, # type: bool
recurring=True, # type: bool
execute_immediately=False, # type: bool
reuse_task=False, # type: bool
task_parameters=None, # type: Optional[dict]
task_overrides=None, # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -640,7 +635,7 @@ class TaskScheduler(BaseScheduler):
:return: True if job is successfully added to the scheduling list
"""
mutually_exclusive(schedule_function=schedule_function, schedule_task_id=schedule_task_id)
task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or '')
task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or "")
# noinspection PyProtectedMember
job = ScheduleJob(
@@ -715,46 +710,46 @@ class TaskScheduler(BaseScheduler):
# get idle timeout (aka sleeping)
scheduled_jobs = sorted(
[j for j in self._schedule_jobs if j.next_run() is not None],
key=lambda x: x.next_run()
[j for j in self._schedule_jobs if j.next_run() is not None], key=lambda x: x.next_run()
)
# sort by key
timeout_job_datetime = min(self._timeout_jobs, key=self._timeout_jobs.get) if self._timeout_jobs else None
if not scheduled_jobs and timeout_job_datetime is None:
# sleep and retry
seconds = 60. * self._sync_frequency_minutes
self._log('Nothing to do, sleeping for {:.2f} minutes.'.format(seconds / 60.))
seconds = 60.0 * self._sync_frequency_minutes
self._log("Nothing to do, sleeping for {:.2f} minutes.".format(seconds / 60.0))
sleep(seconds)
return False
next_time_stamp = scheduled_jobs[0].next_run() if scheduled_jobs else None
if timeout_job_datetime is not None:
next_time_stamp = (
min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime
)
next_time_stamp = min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime
sleep_time = (next_time_stamp - datetime.utcnow()).total_seconds()
if sleep_time > 0:
# sleep until we need to run a job or maximum sleep time
seconds = min(sleep_time, 60. * self._sync_frequency_minutes)
self._log('Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.'.format(
next_time_stamp, seconds / 60.))
seconds = min(sleep_time, 60.0 * self._sync_frequency_minutes)
self._log(
"Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.".format(
next_time_stamp, seconds / 60.0
)
)
sleep(seconds)
return False
# check if this is a Task timeout check
if timeout_job_datetime is not None and next_time_stamp == timeout_job_datetime:
task_id = self._timeout_jobs[timeout_job_datetime]
self._log('Aborting job due to timeout: {}'.format(task_id))
self._log("Aborting job due to timeout: {}".format(task_id))
self._cancel_task(task_id=task_id)
self._timeout_jobs.pop(timeout_job_datetime, None)
else:
self._log('Launching job: {}'.format(scheduled_jobs[0]))
self._log("Launching job: {}".format(scheduled_jobs[0]))
self._launch_job(scheduled_jobs[0])
return True
def start_remotely(self, queue='services'):
def start_remotely(self, queue="services"):
# type: (str) -> None
"""
Start the Task TaskScheduler loop (notice this function does not return)
@@ -770,8 +765,8 @@ class TaskScheduler(BaseScheduler):
"""
# noinspection PyProtectedMember
self._task._set_configuration(
config_type='json',
description='schedule tasks configuration',
config_type="json",
description="schedule tasks configuration",
config_text=self._serialize_schedule_into_string(),
name=self._configuration_section,
)
@@ -785,9 +780,9 @@ class TaskScheduler(BaseScheduler):
dict(
scheduled_jobs=[j.to_dict(full=True) for j in self._schedule_jobs],
timeout_jobs={datetime_to_isoformat(k): v for k, v in self._timeout_jobs.items()},
executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs]
executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs],
),
default=datetime_to_isoformat
default=datetime_to_isoformat,
)
self._task.upload_artifact(name="state", artifact_object=json_str, preview="scheduler internal state")
@@ -798,34 +793,32 @@ class TaskScheduler(BaseScheduler):
"""
# get artifact
self._task.reload()
artifact_object = self._task.artifacts.get('state')
artifact_object = self._task.artifacts.get("state")
if artifact_object is not None:
state_json_str = artifact_object.get(force_download=True)
if state_json_str is not None:
state_dict = json.loads(state_json_str)
self._schedule_jobs = self.__deserialize_scheduled_jobs(
serialized_jobs_dicts=state_dict.get('scheduled_jobs', []),
current_jobs=self._schedule_jobs
serialized_jobs_dicts=state_dict.get("scheduled_jobs", []), current_jobs=self._schedule_jobs
)
self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get('timeout_jobs') or {})}
self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get('executed_jobs', [])]
self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get("timeout_jobs") or {})}
self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get("executed_jobs", [])]
def _deserialize(self):
# type: () -> None
"""
Deserialize Task scheduling configuration only
"""
self._log('Syncing scheduler')
self._log("Syncing scheduler")
self._task.reload()
# noinspection PyProtectedMember
json_str = self._task._get_configuration_text(name=self._configuration_section)
try:
self._schedule_jobs = self.__deserialize_scheduled_jobs(
serialized_jobs_dicts=json.loads(json_str),
current_jobs=self._schedule_jobs
serialized_jobs_dicts=json.loads(json_str), current_jobs=self._schedule_jobs
)
except Exception as ex:
self._log('Failed deserializing configuration: {}'.format(ex), level=logging.WARN)
self._log("Failed deserializing configuration: {}".format(ex), level=logging.WARN)
return
@staticmethod
@@ -858,45 +851,61 @@ class TaskScheduler(BaseScheduler):
if not self._task:
return
task_link_template = self._task.get_output_log_web_page() \
.replace('/{}/'.format(self._task.project), '/{project}/') \
.replace('/{}/'.format(self._task.id), '/{task}/')
task_link_template = (
self._task.get_output_log_web_page()
.replace("/{}/".format(self._task.project), "/{project}/")
.replace("/{}/".format(self._task.id), "/{task}/")
)
# plot the schedule definition
columns = [
'name', 'base_task_id', 'base_function', 'next_run', 'target_project', 'queue',
'minute', 'hour', 'day', 'month', 'year',
'starting_time', 'execution_limit_hours', 'recurring',
'single_instance', 'task_parameters', 'task_overrides', 'clone_task',
"name",
"base_task_id",
"base_function",
"next_run",
"target_project",
"queue",
"minute",
"hour",
"day",
"month",
"year",
"starting_time",
"execution_limit_hours",
"recurring",
"single_instance",
"task_parameters",
"task_overrides",
"clone_task",
]
scheduler_table = [columns]
for j in self._schedule_jobs:
j_dict = j.to_dict()
j_dict['next_run'] = j.next()
j_dict['base_function'] = "{}.{}".format(
getattr(j.base_function, '__module__', ''),
getattr(j.base_function, '__name__', '')
) if j.base_function else ''
j_dict["next_run"] = j.next()
j_dict["base_function"] = (
"{}.{}".format(getattr(j.base_function, "__module__", ""), getattr(j.base_function, "__name__", ""))
if j.base_function
else ""
)
if not j_dict.get('base_task_id'):
j_dict['clone_task'] = ''
if not j_dict.get("base_task_id"):
j_dict["clone_task"] = ""
row = [
str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '')
str(j_dict.get(c)).split(".", 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or "")
for c in columns
]
if row[1]:
row[1] = '<a href="{}">{}</a>'.format(
task_link_template.format(project='*', task=row[1]), row[1])
row[1] = '<a href="{}">{}</a>'.format(task_link_template.format(project="*", task=row[1]), row[1])
scheduler_table += [row]
# plot the already executed Tasks
executed_table = [['name', 'task id', 'started', 'finished']]
executed_table = [["name", "task id", "started", "finished"]]
for executed_job in sorted(self._executed_jobs, key=lambda x: x.started, reverse=True):
if not executed_job.finished:
if executed_job.task_id:
t = Task.get_task(task_id=executed_job.task_id)
if t.status not in ('in_progress', 'queued'):
if t.status not in ("in_progress", "queued"):
executed_job.finished = t.data.completed or datetime.utcnow()
elif executed_job.thread_id:
# noinspection PyBroadException
@@ -908,30 +917,31 @@ class TaskScheduler(BaseScheduler):
pass
executed_table += [
[executed_job.name,
'<a href="{}">{}</a>'.format(task_link_template.format(
project='*', task=executed_job.task_id), executed_job.task_id)
if executed_job.task_id else 'function',
str(executed_job.started).split('.', 1)[0], str(executed_job.finished).split('.', 1)[0]
]
[
executed_job.name,
'<a href="{}">{}</a>'.format(
task_link_template.format(project="*", task=executed_job.task_id), executed_job.task_id
)
if executed_job.task_id
else "function",
str(executed_job.started).split(".", 1)[0],
str(executed_job.finished).split(".", 1)[0],
]
]
self._task.get_logger().report_table(
title='Schedule Tasks', series=' ', iteration=0,
table_plot=scheduler_table
)
self._task.get_logger().report_table(
title='Executed Tasks', series=' ', iteration=0,
table_plot=executed_table
title="Schedule Tasks", series=" ", iteration=0, table_plot=scheduler_table
)
self._task.get_logger().report_table(title="Executed Tasks", series=" ", iteration=0, table_plot=executed_table)
def _launch_job_task(self, job, task_parameters=None, add_tags=None):
# type: (ScheduleJob, Optional[dict], Optional[List[str]]) -> Optional[ClearmlJob]
task_job = super(TaskScheduler, self)._launch_job_task(job, task_parameters=task_parameters, add_tags=add_tags)
# make sure this is not a function job
if task_job:
self._executed_jobs.append(ExecutedJob(
name=job.name, task_id=task_job.task_id(), started=datetime.utcnow()))
self._executed_jobs.append(
ExecutedJob(name=job.name, task_id=task_job.task_id(), started=datetime.utcnow())
)
# add timeout check
if job.get_execution_timeout():
# we should probably make sure we are not overwriting a Task
@@ -943,8 +953,9 @@ class TaskScheduler(BaseScheduler):
thread_job = super(TaskScheduler, self)._launch_job_function(job, func_args=func_args)
# make sure this is not a function job
if thread_job:
self._executed_jobs.append(ExecutedJob(
name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow()))
self._executed_jobs.append(
ExecutedJob(name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow())
)
# execution timeout is not supported with function callbacks.
return thread_job

View File

@@ -11,8 +11,7 @@ ENV_AUTH_TOKEN = EnvEntry("CLEARML_AUTH_TOKEN")
ENV_VERBOSE = EnvEntry(
"CLEARML_API_VERBOSE", "TRAINS_API_VERBOSE", converter=safe_text_to_bool, type=bool, default=False
)
ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST_VERIFY_CERT",
type=bool, default=True)
ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST_VERIFY_CERT", default=True)
ENV_OFFLINE_MODE = EnvEntry("CLEARML_OFFLINE_MODE", "TRAINS_OFFLINE_MODE", type=bool, converter=safe_text_to_bool)
ENV_CLEARML_NO_DEFAULT_SERVER = EnvEntry(
"CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", converter=safe_text_to_bool, type=bool, default=True

View File

@@ -9,6 +9,7 @@ from urllib3 import PoolManager
import six
from .session.defs import ENV_HOST_VERIFY_CERT
from ..backend_config.converters import strtobool
if six.PY3:
from functools import lru_cache
@@ -141,8 +142,14 @@ def get_http_session_with_retry(
adapter = TLSv1HTTPAdapter(max_retries=retry, pool_connections=pool_connections, pool_maxsize=pool_maxsize)
session.mount('http://', adapter)
session.mount('https://', adapter)
# update verify host certificate
session.verify = ENV_HOST_VERIFY_CERT.get(default=config.get('api.verify_certificate', True))
verify = ENV_HOST_VERIFY_CERT.get(default=config.get('api.verify_certificate', True))
try:
session.verify = bool(strtobool(verify) if isinstance(verify, str) else verify)
except (ValueError, AttributeError):
session.verify = verify
if not session.verify and __disable_certificate_verification_warning < 2:
# show warning
__disable_certificate_verification_warning += 1

View File

@@ -34,9 +34,19 @@ class S3BucketConfig(object):
verify = attrib(type=bool, default=None)
use_credentials_chain = attrib(type=bool, default=False)
extra_args = attrib(type=dict, default=None)
profile = attrib(type=str, default="")
def update(
self, key, secret, multipart=True, region=None, use_credentials_chain=False, token="", extra_args=None
self,
key="",
secret="",
multipart=True,
region=None,
use_credentials_chain=False,
token="",
extra_args=None,
secure=True,
profile=""
):
self.key = key
self.secret = secret
@@ -45,6 +55,8 @@ class S3BucketConfig(object):
self.region = region
self.use_credentials_chain = use_credentials_chain
self.extra_args = extra_args
self.secure = secure
self.profile = profile
def is_valid(self):
return (self.key and self.secret) or self.use_credentials_chain
@@ -107,6 +119,8 @@ class S3BucketConfigurations(BaseBucketConfigurations):
default_token="",
default_extra_args=None,
default_verify=None,
default_profile="",
default_secure=True
):
super(S3BucketConfigurations, self).__init__()
self._buckets = buckets if buckets else list()
@@ -118,12 +132,12 @@ class S3BucketConfigurations(BaseBucketConfigurations):
self._default_use_credentials_chain = default_use_credentials_chain
self._default_extra_args = default_extra_args
self._default_verify = default_verify
self._default_profile = default_profile
self._default_secure = default_secure
@classmethod
def from_config(cls, s3_configuration):
config_list = S3BucketConfig.from_list(
s3_configuration.get("credentials", [])
)
config_list = S3BucketConfig.from_list(s3_configuration.get("credentials", []))
default_key = s3_configuration.get("key", "") or getenv("AWS_ACCESS_KEY_ID", "")
default_secret = s3_configuration.get("secret", "") or getenv("AWS_SECRET_ACCESS_KEY", "")
@@ -132,11 +146,14 @@ class S3BucketConfigurations(BaseBucketConfigurations):
default_use_credentials_chain = s3_configuration.get("use_credentials_chain") or False
default_extra_args = s3_configuration.get("extra_args")
default_verify = s3_configuration.get("verify", None)
default_profile = s3_configuration.get("profile", "") or getenv("AWS_PROFILE", "")
default_secure = s3_configuration.get("secure", True)
default_key = _none_to_empty_string(default_key)
default_secret = _none_to_empty_string(default_secret)
default_token = _none_to_empty_string(default_token)
default_region = _none_to_empty_string(default_region)
default_key = _none_to_empty_string(default_key).strip()
default_secret = _none_to_empty_string(default_secret).strip()
default_token = _none_to_empty_string(default_token).strip()
default_region = _none_to_empty_string(default_region).strip()
default_profile = _none_to_empty_string(default_profile).strip()
return cls(
config_list,
@@ -147,6 +164,8 @@ class S3BucketConfigurations(BaseBucketConfigurations):
default_token,
default_extra_args,
default_verify,
default_profile,
default_secure
)
def add_config(self, bucket_config):
@@ -178,6 +197,8 @@ class S3BucketConfigurations(BaseBucketConfigurations):
use_credentials_chain=self._default_use_credentials_chain,
token=self._default_token,
extra_args=self._default_extra_args,
profile=self._default_profile,
secure=self._default_secure
)
def _get_prefix_from_bucket_config(self, config):
@@ -204,7 +225,6 @@ class S3BucketConfigurations(BaseBucketConfigurations):
:param uri: URI of bucket, directory or file
:return: S3BucketConfig: bucket config
"""
def find_match(uri):
self._update_prefixes(refresh=False)
uri = uri.lower()
@@ -244,6 +264,8 @@ class S3BucketConfigurations(BaseBucketConfigurations):
host=host,
token=self._default_token,
extra_args=self._default_extra_args,
profile=self._default_profile,
secure=self._default_secure
)

View File

@@ -67,6 +67,7 @@ class SetupUploadMixin(object):
multipart=True, # bool
secure=True, # bool
verify=True, # bool
profile=None # Optional[str]
):
# type: (...) -> None
"""
@@ -85,6 +86,7 @@ class SetupUploadMixin(object):
multipart.
:param secure: Server supports HTTPS. Only required when using a Non-AWS S3 solution that only supports HTTPS.
:param verify: Whether or not to verify SSL certificates.
:param profile: The AWS profile
Only required when using a Non-AWS S3 solution that only supports HTTPS with self-signed certificate.
"""
self._bucket_config = S3BucketConfig( # noqa
@@ -98,6 +100,7 @@ class SetupUploadMixin(object):
multipart=multipart,
secure=secure,
verify=verify,
profile=profile
)
StorageHelper.add_aws_configuration(self._bucket_config, log=self.log)
self.storage_uri = StorageHelper.get_aws_storage_uri_from_config(self._bucket_config)

View File

@@ -92,7 +92,6 @@ class CreateAndPopulate(object):
repo = None
else:
folder = None
if raise_on_missing_entries and not base_task_id:
if not script:
raise ValueError("Entry point script not provided")
@@ -168,7 +167,6 @@ class CreateAndPopulate(object):
and not self.repo and (
not repo_info or not repo_info.script or not repo_info.script.get('repository')):
raise ValueError("Standalone script detected \'{}\', but no requirements provided".format(self.script))
if dry_run:
task = None
task_state = dict(
@@ -204,7 +202,6 @@ class CreateAndPopulate(object):
# clear the script section
task_state['script'] = {}
if repo_info:
task_state['script']['repository'] = repo_info.script['repository']
task_state['script']['version_num'] = repo_info.script['version_num']
@@ -216,15 +213,19 @@ class CreateAndPopulate(object):
task_state['script']['requirements'] = repo_info.script.get('requirements') or {}
if self.cwd:
self.cwd = self.cwd
cwd = self.cwd if Path(self.cwd).is_dir() else (
Path(repo_info.script['repo_root']) / self.cwd).as_posix()
# cwd should be relative to the repo_root, but we need the full path
# (repo_root + cwd) in order to resolve the entry point
cwd = (Path(repo_info.script['repo_root']) / self.cwd).as_posix()
if not Path(cwd).is_dir():
raise ValueError("Working directory \'{}\' could not be found".format(cwd))
cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix()
entry_point = \
Path(repo_info.script['repo_root']) / repo_info.script['working_dir'] / repo_info.script[
'entry_point']
# resolve entry_point relative to the current working directory
entry_point = entry_point.relative_to(cwd).as_posix()
# restore cwd - make it relative to the repo_root again
cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix()
task_state['script']['entry_point'] = entry_point or ""
task_state['script']['working_dir'] = cwd or "."
elif self.repo:
@@ -260,7 +261,6 @@ class CreateAndPopulate(object):
# standalone task
task_state['script']['entry_point'] = self.script or ""
task_state['script']['working_dir'] = '.'
# update requirements
reqs = []
if self.requirements_file:
@@ -345,7 +345,6 @@ class CreateAndPopulate(object):
if task_state['script']['diff'] and not task_state['script']['diff'].endswith('\n'):
task_state['script']['diff'] += '\n'
task_state['script']['diff'] += task_init_patch
# set base docker image if provided
if self.docker:
if dry_run:

View File

@@ -203,6 +203,10 @@ class Artifact(object):
with open(local_file, "rt") as f:
self._object = f.read()
elif self.type == "pickle":
if self.hash:
file_hash, _ = sha256sum(local_file, block_size=Artifacts._hash_block_size)
if self.hash != file_hash:
raise Exception("incorrect pickle file hash, artifact file might be corrupted")
with open(local_file, "rb") as f:
self._object = pickle.load(f)
except Exception as e:

View File

@@ -29,6 +29,7 @@ from ..storage.util import sha256sum, is_windows, md5text, format_size
from ..utilities.matching import matches_any_wildcard
from ..utilities.parallel import ParallelZipper
from ..utilities.version import Version
from ..utilities.files import is_path_traversal
try:
from pathlib import Path as _Path # noqa
@@ -1856,6 +1857,12 @@ class Dataset(object):
for ds in datasets:
base_folder = Path(ds._get_dataset_files())
files = [f.relative_path for f in ds.file_entries if f.parent_dataset_id == ds.id]
files = [
os.path.basename(file)
if is_path_traversal(base_folder, file) or is_path_traversal(temp_folder, file)
else file
for file in files
]
pool.map(
lambda x:
(temp_folder / x).parent.mkdir(parents=True, exist_ok=True) or
@@ -2326,12 +2333,24 @@ class Dataset(object):
link.size = Path(target_path).stat().st_size
if not max_workers:
for relative_path, link in links.items():
target_path = os.path.join(target_folder, relative_path)
if not is_path_traversal(target_folder, relative_path):
target_path = os.path.join(target_folder, relative_path)
else:
LoggerRoot.get_base_logger().warning(
"Ignoring relative path `{}`: it must not traverse directories".format(relative_path)
)
target_path = os.path.join(target_folder, os.path.basename(relative_path))
_download_link(link, target_path)
else:
with ThreadPoolExecutor(max_workers=max_workers) as pool:
for relative_path, link in links.items():
target_path = os.path.join(target_folder, relative_path)
if not is_path_traversal(target_folder, relative_path):
target_path = os.path.join(target_folder, relative_path)
else:
LoggerRoot.get_base_logger().warning(
"Ignoring relative path `{}`: it must not traverse directories".format(relative_path)
)
target_path = os.path.join(target_folder, os.path.basename(relative_path))
pool.submit(_download_link, link, target_path)
def _extract_dataset_archive(

View File

@@ -1,5 +1,6 @@
class UsageError(RuntimeError):
""" An exception raised for illegal usage of clearml objects"""
"""An exception raised for illegal usage of clearml objects"""
pass
@@ -12,5 +13,5 @@ class ArtifactUriDeleteError(ValueError):
@property
def remaining_uris(self):
""" Remaining URIs to delete. Deletion of these URIs was aborted due to the error. """
"""Remaining URIs to delete. Deletion of these URIs was aborted due to the error."""
return self._remaining_uris

View File

@@ -1932,8 +1932,8 @@ class InputModel(Model):
# type: () -> str
return self._base_model_id
def connect(self, task, name=None):
# type: (Task, Optional[str]) -> None
def connect(self, task, name=None, ignore_remote_overrides=False):
# type: (Task, Optional[str], bool) -> None
"""
Connect the current model to a Task object, if the model is preexisting. Preexisting models include:
@@ -1943,24 +1943,31 @@ class InputModel(Model):
- Models whose origin is not ClearML that are used to create an InputModel object. For example,
models created using TensorFlow models.
When the experiment is executed remotely in a worker, the input model already specified in the experiment is
used.
When the experiment is executed remotely in a worker, the input model specified in the experiment UI/backend
is used, unless `ignore_remote_overrides` is set to True.
.. note::
The **ClearML Web-App** allows you to switch one input model for another and then enqueue the experiment
to execute in a worker.
:param object task: A Task object.
:param ignore_remote_overrides: If True, changing the model in the UI/backend will have no
effect when running remotely.
Default is False, meaning that any changes made in the UI/backend will be applied in remote execution.
:param str name: The model name to be stored on the Task
(default to filename of the model weights, without the file extension, or to `Input Model` if that is not found)
(default to filename of the model weights, without the file extension, or to `Input Model`
if that is not found)
"""
self._set_task(task)
name = name or InputModel._get_connect_name(self)
InputModel._warn_on_same_name_connect(name)
ignore_remote_overrides = task._handle_ignore_remote_overrides(
name + "/_ignore_remote_overrides_input_model_", ignore_remote_overrides
)
model_id = None
# noinspection PyProtectedMember
if running_remotely() and (task.is_main_task() or task._is_remote_main_task()):
if running_remotely() and (task.is_main_task() or task._is_remote_main_task()) and not ignore_remote_overrides:
input_models = task.input_models_id
# noinspection PyBroadException
try:
@@ -2245,7 +2252,7 @@ class OutputModel(BaseModel):
pass
self.connect(task, name=name)
def connect(self, task, name=None):
def connect(self, task, name=None, **kwargs):
# type: (Task, Optional[str]) -> None
"""
Connect the current model to a Task object, if the model is a preexisting model. Preexisting models include:

View File

@@ -208,11 +208,21 @@ class CacheManager(object):
new_file.touch(exist_ok=True)
except Exception:
pass
# if file doesn't exist, return file size None
# noinspection PyBroadException
try:
new_file_size = new_file.stat().st_size if new_file_exists else None
except Exception:
new_file_size = None
folder_files = list(folder.iterdir())
if len(folder_files) <= self._file_limit:
return new_file.as_posix(), new_file_size
# first exclude lock files
lock_files = dict()
files = []
for f in sorted(folder.iterdir(), reverse=True, key=sort_max_access_time):
for f in sorted(folder_files, reverse=True, key=sort_max_access_time):
if f.name.startswith(CacheManager._lockfile_prefix) and f.name.endswith(
CacheManager._lockfile_suffix
):
@@ -233,10 +243,7 @@ class CacheManager(object):
# delete old files
files = files[self._file_limit:]
for i, f in enumerate(files):
if i < self._file_limit:
continue
for f in files:
# check if the file is in the lock folder list:
folder_lock = self._folder_locks.get(f.absolute().as_posix())
if folder_lock:
@@ -285,14 +292,7 @@ class CacheManager(object):
except BaseException:
pass
# if file doesn't exist, return file size None
# noinspection PyBroadException
try:
size = new_file.stat().st_size if new_file_exists else None
except Exception:
size = None
return new_file.as_posix(), size
return new_file.as_posix(), new_file_size
def lock_cache_folder(self, local_path):
# type: (Union[str, Path]) -> ()

View File

@@ -17,7 +17,7 @@ from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
from multiprocessing.pool import ThreadPool
from tempfile import mktemp
from tempfile import mkstemp
from time import time
from types import GeneratorType
@@ -461,15 +461,15 @@ class _Boto3Driver(_Driver):
)
}
if not cfg.use_credentials_chain:
boto_kwargs["aws_access_key_id"] = cfg.key
boto_kwargs["aws_secret_access_key"] = cfg.secret
boto_kwargs["aws_access_key_id"] = cfg.key or None
boto_kwargs["aws_secret_access_key"] = cfg.secret or None
if cfg.token:
boto_kwargs["aws_session_token"] = cfg.token
self.resource = boto3.resource(
"s3",
**boto_kwargs
boto_session = boto3.Session(
profile_name=cfg.profile or None,
)
self.resource = boto_session.resource("s3", **boto_kwargs)
self.config = cfg
bucket_name = self.name[len(cfg.host) + 1:] if cfg.host else self.name
@@ -683,7 +683,12 @@ class _Boto3Driver(_Driver):
'time': datetime.utcnow().isoformat()
}
boto_session = boto3.Session(conf.key, conf.secret, aws_session_token=conf.token)
boto_session = boto3.Session(
aws_access_key_id=conf.key or None,
aws_secret_access_key=conf.secret or None,
aws_session_token=conf.token or None,
profile_name=conf.profile or None
)
endpoint = (('https://' if conf.secure else 'http://') + conf.host) if conf.host else None
boto_resource = boto_session.resource('s3', region_name=conf.region or None, endpoint_url=endpoint)
bucket = boto_resource.Bucket(bucket_name)
@@ -738,7 +743,9 @@ class _Boto3Driver(_Driver):
cls._bucket_location_failure_reported.add(conf.get_bucket_host())
try:
boto_session = boto3.Session(conf.key, conf.secret, aws_session_token=conf.token)
boto_session = boto3.Session(
conf.key, conf.secret, aws_session_token=conf.token, profile_name=conf.profile_name or None
)
boto_resource = boto_session.resource('s3')
return boto_resource.meta.client.get_bucket_location(Bucket=conf.bucket)["LocationConstraint"]
@@ -2004,7 +2011,7 @@ class StorageHelper(object):
return None
# create temp file with the requested file name
file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1]
local_path = mktemp(suffix=file_name)
_, local_path = mkstemp(suffix=file_name)
return helper.download_to_file(remote_url, local_path, skip_zero_size_check=skip_zero_size_check)
def __init__(
@@ -2018,6 +2025,7 @@ class StorageHelper(object):
logger=None,
retries=5,
token=None,
profile=None,
**kwargs
):
level = config.get("storage.log.level", None)
@@ -2072,6 +2080,8 @@ class StorageHelper(object):
region=final_region,
use_credentials_chain=self._conf.use_credentials_chain,
token=token or self._conf.token,
profile=profile or self._conf.profile,
secure=self._secure,
extra_args=self._conf.extra_args,
)
@@ -2614,7 +2624,9 @@ class StorageHelper(object):
try:
if verbose:
self._log.info("Start downloading from {}".format(remote_path))
if not overwrite_existing and Path(local_path).is_file():
# check for 0 sized files as well - we want to override empty files that were created
# via mkstemp or similar functions
if not overwrite_existing and Path(local_path).is_file() and Path(local_path).stat().st_size != 0:
self._log.debug(
'File {} already exists, no need to download, thread id = {}'.format(
local_path,

View File

@@ -36,6 +36,8 @@ class StorageManager(object):
the returned link is the same, otherwise a link to a local copy of the url file is returned.
Caching is enabled by default, cache limited by number of stored files per cache context.
Oldest accessed files are deleted when cache is full.
One can also use this function to prevent the deletion of a file that has been cached,
as the respective file will have its timestamp refreshed
:param str remote_url: remote url link (string)
:param str cache_context: Optional caching context identifier (string), default context 'global'

View File

@@ -1516,11 +1516,13 @@ class Task(_Task):
self.data.tags = list(set((self.data.tags or []) + tags))
self._edit(tags=self.data.tags)
def connect(self, mutable, name=None):
# type: (Any, Optional[str]) -> Any
def connect(self, mutable, name=None, ignore_remote_overrides=False):
# type: (Any, Optional[str], bool) -> Any
"""
Connect an object to a Task object. This connects an experiment component (part of an experiment) to the
experiment. For example, an experiment component can be a valid object containing some hyperparameters, or a :class:`Model`.
When running remotely, the value of the connected object is overridden by the corresponding value found
under the experiment's UI/backend (unless `ignore_remote_overrides` is True).
:param object mutable: The experiment component to connect. The object must be one of the following types:
@@ -1533,16 +1535,23 @@ class Task(_Task):
:param str name: A section name associated with the connected object, if 'name' is None defaults to 'General'
Currently, `name` is only supported for `dict` and `TaskParameter` objects, and should be omitted for the other supported types. (Optional)
For example, by setting `name='General'` the connected dictionary will be under the General section in the hyperparameters section.
While by setting `name='Train'` the connected dictionary will be under the Train section in the hyperparameters section.
:param ignore_remote_overrides: If True, ignore UI/backend overrides when running remotely.
Default is False, meaning that any changes made in the UI/backend will be applied in remote execution.
:return: It will return the same object that was passed as the `mutable` argument to the method, except if the type of the object is dict.
For dicts the :meth:`Task.connect` will return the dict decorated as a `ProxyDictPostWrite`.
This is done to allow propagating the updates from the connected object.
:raise: Raises an exception if passed an unsupported object.
"""
# input model connect and task parameters will handle this instead
if not isinstance(mutable, (InputModel, TaskParameters)):
ignore_remote_overrides = self._handle_ignore_remote_overrides(
(name or "General") + "/_ignore_remote_overrides_", ignore_remote_overrides
)
# dispatching by match order
dispatch = (
(OutputModel, self._connect_output_model),
@@ -1564,7 +1573,7 @@ class Task(_Task):
for mutable_type, method in dispatch:
if isinstance(mutable, mutable_type):
return method(mutable, name=name)
return method(mutable, name=name, ignore_remote_overrides=ignore_remote_overrides)
raise Exception('Unsupported mutable type %s: no connect function found' % type(mutable).__name__)
@@ -1631,8 +1640,8 @@ class Task(_Task):
self.data.script.version_num = commit or ""
self._edit(script=self.data.script)
def connect_configuration(self, configuration, name=None, description=None):
# type: (Union[Mapping, list, Path, str], Optional[str], Optional[str]) -> Union[dict, Path, str]
def connect_configuration(self, configuration, name=None, description=None, ignore_remote_overrides=False):
# type: (Union[Mapping, list, Path, str], Optional[str], Optional[str], bool) -> Union[dict, Path, str]
"""
Connect a configuration dictionary or configuration file (pathlib.Path / str) to a Task object.
This method should be called before reading the configuration file.
@@ -1650,6 +1659,9 @@ class Task(_Task):
my_params = task.connect_configuration(my_params)
When running remotely, the value of the connected configuration is overridden by the corresponding value found
under the experiment's UI/backend (unless `ignore_remote_overrides` is True).
:param configuration: The configuration. This is usually the configuration used in the model training process.
Specify one of the following:
@@ -1664,9 +1676,15 @@ class Task(_Task):
:param str description: Configuration section description (text). default: None
:param bool ignore_remote_overrides: If True, ignore UI/backend overrides when running remotely.
Default is False, meaning that any changes made in the UI/backend will be applied in remote execution.
:return: If a dictionary is specified, then a dictionary is returned. If pathlib2.Path / string is
specified, then a path to a local configuration file is returned. Configuration object.
"""
ignore_remote_overrides = self._handle_ignore_remote_overrides(
(name or "General") + "/_ignore_remote_overrides_config_", ignore_remote_overrides
)
pathlib_Path = None # noqa
cast_Path = Path
if not isinstance(configuration, (dict, list, Path, six.string_types)):
@@ -1710,7 +1728,7 @@ class Task(_Task):
configuration_ = ProxyDictPostWrite(self, _update_config_dict, configuration_)
return configuration_
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()):
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()) or ignore_remote_overrides:
configuration = get_dev_config(configuration)
else:
# noinspection PyBroadException
@@ -1744,7 +1762,7 @@ class Task(_Task):
return configuration
# it is a path to a local file
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()):
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()) or ignore_remote_overrides:
# check if not absolute path
configuration_path = cast_Path(configuration)
if not configuration_path.is_file():
@@ -1793,8 +1811,8 @@ class Task(_Task):
f.write(configuration_text)
return cast_Path(local_filename) if isinstance(configuration, cast_Path) else local_filename
def connect_label_enumeration(self, enumeration):
# type: (Dict[str, int]) -> Dict[str, int]
def connect_label_enumeration(self, enumeration, ignore_remote_overrides=False):
# type: (Dict[str, int], bool) -> Dict[str, int]
"""
Connect a label enumeration dictionary to a Task (experiment) object.
@@ -1811,13 +1829,22 @@ class Task(_Task):
"person": 1
}
:param ignore_remote_overrides: If True, ignore UI/backend overrides when running remotely.
Default is False, meaning that any changes made in the UI/backend will be applied in remote execution.
:return: The label enumeration dictionary (JSON).
"""
ignore_remote_overrides = self._handle_ignore_remote_overrides(
"General/_ignore_remote_overrides_label_enumeration_", ignore_remote_overrides
)
if not isinstance(enumeration, dict):
raise ValueError("connect_label_enumeration supports only `dict` type, "
"{} is not supported".format(type(enumeration)))
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()):
if (
not running_remotely()
or not (self.is_main_task() or self._is_remote_main_task())
or ignore_remote_overrides
):
self.set_model_label_enumeration(enumeration)
else:
# pop everything
@@ -3750,9 +3777,9 @@ class Task(_Task):
return self._logger
def _connect_output_model(self, model, name=None):
def _connect_output_model(self, model, name=None, **kwargs):
assert isinstance(model, OutputModel)
model.connect(self, name=name)
model.connect(self, name=name, ignore_remote_overrides=False)
return model
def _save_output_model(self, model):
@@ -3764,6 +3791,19 @@ class Task(_Task):
# deprecated
self._connected_output_model = model
def _handle_ignore_remote_overrides(self, overrides_name, ignore_remote_overrides):
if self.running_locally() and ignore_remote_overrides:
self.set_parameter(
overrides_name,
True,
description="If True, ignore UI/backend overrides when running remotely."
" Set it to False if you would like the overrides to be applied",
value_type=bool
)
elif not self.running_locally():
ignore_remote_overrides = self.get_parameter(overrides_name, default=ignore_remote_overrides, cast=True)
return ignore_remote_overrides
def _reconnect_output_model(self):
"""
Deprecated: If there is a saved connected output model, connect it again.
@@ -3776,7 +3816,7 @@ class Task(_Task):
if self._connected_output_model:
self.connect(self._connected_output_model)
def _connect_input_model(self, model, name=None):
def _connect_input_model(self, model, name=None, ignore_remote_overrides=False):
assert isinstance(model, InputModel)
# we only allow for an input model to be connected once
# at least until we support multiple input models
@@ -3791,18 +3831,21 @@ class Task(_Task):
comment += 'Using model id: {}'.format(model.id)
self.set_comment(comment)
model.connect(self, name)
model.connect(self, name, ignore_remote_overrides=ignore_remote_overrides)
return model
def _connect_argparse(self, parser, args=None, namespace=None, parsed_args=None, name=None):
def _connect_argparse(
self, parser, args=None, namespace=None, parsed_args=None, name=None, ignore_remote_overrides=False
):
# do not allow argparser to connect to jupyter notebook
# noinspection PyBroadException
try:
if 'IPython' in sys.modules:
if "IPython" in sys.modules:
# noinspection PyPackageRequirements
from IPython import get_ipython # noqa
ip = get_ipython()
if ip is not None and 'IPKernelApp' in ip.config:
if ip is not None and "IPKernelApp" in ip.config:
return parser
except Exception:
pass
@@ -3825,14 +3868,14 @@ class Task(_Task):
if parsed_args is None and parser == _parser:
parsed_args = _parsed_args
if running_remotely() and (self.is_main_task() or self._is_remote_main_task()):
if running_remotely() and (self.is_main_task() or self._is_remote_main_task()) and not ignore_remote_overrides:
self._arguments.copy_to_parser(parser, parsed_args)
else:
self._arguments.copy_defaults_from_argparse(
parser, args=args, namespace=namespace, parsed_args=parsed_args)
return parser
def _connect_dictionary(self, dictionary, name=None):
def _connect_dictionary(self, dictionary, name=None, ignore_remote_overrides=False):
def _update_args_dict(task, config_dict):
# noinspection PyProtectedMember
task._arguments.copy_from_dict(flatten_dictionary(config_dict), prefix=name)
@@ -3862,7 +3905,7 @@ class Task(_Task):
if isinstance(v, dict):
_check_keys(v, warning_sent)
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()):
if not running_remotely() or not (self.is_main_task() or self._is_remote_main_task()) or ignore_remote_overrides:
_check_keys(dictionary)
flat_dict = {str(k): v for k, v in flatten_dictionary(dictionary).items()}
self._arguments.copy_from_dict(flat_dict, prefix=name)
@@ -3875,19 +3918,28 @@ class Task(_Task):
return dictionary
def _connect_task_parameters(self, attr_class, name=None):
if running_remotely() and (self.is_main_task() or self._is_remote_main_task()):
parameters = self.get_parameters()
if not name:
attr_class.update_from_dict(parameters)
else:
attr_class.update_from_dict(
dict((k[len(name) + 1:], v) for k, v in parameters.items() if k.startswith('{}/'.format(name))))
def _connect_task_parameters(self, attr_class, name=None, ignore_remote_overrides=False):
ignore_remote_overrides_section = "_ignore_remote_overrides_"
if running_remotely():
ignore_remote_overrides = self.get_parameter(
(name or "General") + "/" + ignore_remote_overrides_section, default=ignore_remote_overrides, cast=True
)
if running_remotely() and (self.is_main_task() or self._is_remote_main_task()) and not ignore_remote_overrides:
parameters = self.get_parameters(cast=True)
if name:
parameters = dict(
(k[len(name) + 1:], v) for k, v in parameters.items() if k.startswith("{}/".format(name))
)
parameters.pop(ignore_remote_overrides_section, None)
attr_class.update_from_dict(parameters)
else:
self.set_parameters(attr_class.to_dict(), __parameters_prefix=name)
parameters_dict = attr_class.to_dict()
if ignore_remote_overrides:
parameters_dict[ignore_remote_overrides_section] = True
self.set_parameters(parameters_dict, __parameters_prefix=name)
return attr_class
def _connect_object(self, an_object, name=None):
def _connect_object(self, an_object, name=None, ignore_remote_overrides=False):
def verify_type(key, value):
if str(key).startswith('_') or not isinstance(value, self._parameters_allowed_types):
return False
@@ -3904,15 +3956,15 @@ class Task(_Task):
for k, v in cls_.__dict__.items()
if verify_type(k, v)
}
if running_remotely() and (self.is_main_task() or self._is_remote_main_task()):
a_dict = self._connect_dictionary(a_dict, name)
if running_remotely() and (self.is_main_task() or self._is_remote_main_task()) and not ignore_remote_overrides:
a_dict = self._connect_dictionary(a_dict, name, ignore_remote_overrides=ignore_remote_overrides)
for k, v in a_dict.items():
if getattr(an_object, k, None) != a_dict[k]:
setattr(an_object, k, v)
return an_object
else:
self._connect_dictionary(a_dict, name)
self._connect_dictionary(a_dict, name, ignore_remote_overrides=ignore_remote_overrides)
return an_object
def _dev_mode_stop_task(self, stop_reason, pid=None):

View File

@@ -20,3 +20,15 @@ def get_filename_max_length(dir_path):
print(err)
return 255 # Common filesystems like NTFS, EXT4 and HFS+ limited with 255
def is_path_traversal(target_folder, relative_path):
try:
target_folder = pathlib2.Path(target_folder)
relative_path = pathlib2.Path(relative_path)
# returns the relative path starting from the target_folder,
# or raise an ValueError if a directory traversal attack is tried
target_folder.joinpath(relative_path).resolve().relative_to(target_folder.resolve())
return False
except ValueError:
return True

View File

@@ -52,7 +52,7 @@ class ResourceMonitor(BackgroundMonitor):
try:
active_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES', '') or \
os.environ.get('CUDA_VISIBLE_DEVICES', '')
if active_gpus:
if active_gpus != "all":
self._active_gpus = [g.strip() for g in active_gpus.split(',')]
except Exception:
pass

View File

@@ -1 +1 @@
__version__ = '1.14.1'
__version__ = "1.14.4"

View File

@@ -92,6 +92,8 @@ sdk {
# Specify explicit keys
key: ""
secret: ""
# Specify profile
profile: ""
# Or enable credentials chain to let Boto3 pick the right credentials.
# This includes picking credentials from environment variables,
# credential file and IAM role using metadata service.
@@ -103,6 +105,7 @@ sdk {
credentials: [
# specifies key/secret credentials to use when handling s3 urls (read or write)
# Note that all all fields in the global s3 config section are supported here
# {
# bucket: "my-bucket-name"
# key: "my-access-key"

BIN
docs/datasets.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 238 KiB

BIN
docs/experiment_manager.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 MiB

BIN
docs/orchestration.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

BIN
docs/pipelines.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 379 KiB

View File

@@ -1,3 +1,3 @@
torch>=1.1.0
torchvision>=0.3.0
clearml
clearml>=1.14.4

View File

@@ -1 +1 @@
clearml
clearml>=1.14.4

View File

@@ -1,5 +1,5 @@
tqdm==4.64.1
clearml==1.7.2
clearml>=1.14.4
github3.py==3.2.0
tabulate==0.9.0
pandas==1.5.1

View File

@@ -4,4 +4,4 @@ torch>=1.1.0
torchvision>=0.3.0
pytorch-ignite
tqdm
clearml
clearml>=1.14.4

View File

@@ -0,0 +1,163 @@
import os.path
from pathlib import Path
import matplotlib as mpl
import numpy as np
from tqdm import tqdm
import torchaudio
import torch
from clearml import Task, Dataset
task = Task.init(project_name="examples/Urbansounds", task_name="preprocessing")
# Let's preprocess the data and create a new ClearML dataset from it, so we can track it around
# The cool thing is, we can easily debug, by using, you guessed it: debug samples! We can log both
# the original sound and its processed mel spectrogram as debug samples, so we can manually check
# if everything went as planned.
class PreProcessor:
def __init__(self):
self.configuration = {"number_of_mel_filters": 64, "resample_freq": 22050}
task.connect(self.configuration)
def preprocess_sample(self, sample, original_sample_freq):
if self.configuration["resample_freq"] > 0:
resample_transform = torchaudio.transforms.Resample(
orig_freq=original_sample_freq,
new_freq=self.configuration["resample_freq"],
)
sample = resample_transform(sample)
# This will convert audio files with two channels into one
sample = torch.mean(sample, dim=0, keepdim=True)
# Convert audio to log-scale Mel spectrogram
melspectrogram_transform = torchaudio.transforms.MelSpectrogram(
sample_rate=self.configuration["resample_freq"],
n_mels=self.configuration["number_of_mel_filters"],
)
melspectrogram = melspectrogram_transform(sample)
melspectogram_db = torchaudio.transforms.AmplitudeToDB()(melspectrogram)
# Make sure all spectrograms are the same size
fixed_length = 3 * (self.configuration["resample_freq"] // 200)
if melspectogram_db.shape[2] < fixed_length:
melspectogram_db = torch.nn.functional.pad(
melspectogram_db, (0, fixed_length - melspectogram_db.shape[2])
)
else:
melspectogram_db = melspectogram_db[:, :, :fixed_length]
return melspectogram_db
class DataSetBuilder:
def __init__(self):
self.configuration = {"dataset_path": "dataset"}
task.connect(self.configuration)
self.original_dataset = Dataset.get(
dataset_project="examples/Urbansounds",
dataset_name="UrbanSounds example",
dataset_tags=["raw"],
alias="Raw Dataset",
)
# This will return the pandas dataframe we added in the previous task
self.metadata = (
Task.get_task(task_id=self.original_dataset._task.id)
.artifacts["metadata"]
.get()
)
# This will download the data and return a local path to the data
self.original_dataset_path = Path(
self.original_dataset.get_mutable_local_copy(
self.configuration["dataset_path"], overwrite=True
)
)
# Prepare a preprocessor that will handle each sample one by one
self.preprocessor = PreProcessor()
# Get ready for the new one
self.preprocessed_dataset = None
def log_dataset_statistics(self):
histogram_data = self.metadata["label"].value_counts()
self.preprocessed_dataset.get_logger().report_table(
title="Raw Dataset Metadata",
series="Raw Dataset Metadata",
table_plot=self.metadata,
)
self.preprocessed_dataset.get_logger().report_histogram(
title="Class distribution",
series="Class distribution",
values=histogram_data,
iteration=0,
xlabels=histogram_data.index.tolist(),
yaxis="Amount of samples",
)
def build_dataset(self):
# Let's create a new dataset that is a child of the original one
# We'll add the preprocessed samples to the original dataset, leading to a new version
# Providing the parent dataset allows us to keep a clear lineage of our data
self.preprocessed_dataset = Dataset.create(
dataset_name="UrbanSounds example",
dataset_project="examples/Urbansounds",
dataset_tags=["preprocessed"],
parent_datasets=[self.original_dataset.id],
)
# loop through the metadata entries and preprocess each sample, then add some of them as debug samples to
# manually double check in the UI that everything has worked (you can watch the spectrogram and listen to the
# audio side by side in the debug sample UI)
for i, (_, data) in tqdm(enumerate(self.metadata.iterrows())):
_, audio_file_path, label = data.tolist()
sample, sample_freq = torchaudio.load(
self.original_dataset_path / audio_file_path, normalize=True
)
spectrogram = self.preprocessor.preprocess_sample(sample, sample_freq)
# Get only the filename and replace the extension, we're saving an image here
new_file_name = os.path.basename(audio_file_path).replace(".wav", ".npy")
# Get the correct folder, basically the original dataset folder + the new filename
spectrogram_path = (
self.original_dataset_path
/ os.path.dirname(audio_file_path)
/ new_file_name
)
# Save the numpy array to disk
np.save(spectrogram_path, spectrogram)
# Log every 10th sample as a debug sample to the UI, so we can manually check it
if i % 10 == 0:
# Convert the numpy array to a viewable JPEG
rgb_image = mpl.colormaps["viridis"](
spectrogram[0, :, :].detach().numpy() * 255
)[:, :, :3]
title = os.path.splitext(os.path.basename(audio_file_path))[0]
# Report the image and the original sound, so they can be viewed side by side
self.preprocessed_dataset.get_logger().report_image(
title=title, series="spectrogram", image=rgb_image
)
self.preprocessed_dataset.get_logger().report_media(
title=title,
series="original_audio",
local_path=self.original_dataset_path / audio_file_path,
)
# The original data path will now also have the spectrograms in its filetree.
# So that's why we add it here to fill up the new dataset with.
self.preprocessed_dataset.add_files(self.original_dataset_path)
# Again add some visualizations to the task
self.log_dataset_statistics()
# We still want the metadata
self.preprocessed_dataset._task.upload_artifact(
name="metadata", artifact_object=self.metadata
)
self.preprocessed_dataset.finalize(auto_upload=True)
if __name__ == "__main__":
datasetbuilder = DataSetBuilder()
datasetbuilder.build_dataset()

View File

@@ -0,0 +1,96 @@
import pandas as pd
from pathlib import Path
from clearml import Task, Dataset, StorageManager
task = Task.init(project_name="examples/Urbansounds", task_name="download data")
configuration = {
"selected_classes": [
"air_conditioner",
"car_horn",
"children_playing",
"dog_bark",
"drilling",
"engine_idling",
"gun_shot",
"jackhammer",
"siren",
"street_music",
]
}
task.connect(configuration)
def get_urbansound8k():
# Download UrbanSound8K dataset (https://urbansounddataset.weebly.com/urbansound8k.html)
# For simplicity we will use here a subset of that dataset using clearml StorageManager
path_to_urbansound8k = StorageManager.get_local_copy(
"https://allegro-datasets.s3.amazonaws.com/clearml/UrbanSound8K.zip",
extract_archive=True,
)
path_to_urbansound8k_csv = (
Path(path_to_urbansound8k) / "UrbanSound8K" / "metadata" / "UrbanSound8K.csv"
)
path_to_urbansound8k_audio = Path(path_to_urbansound8k) / "UrbanSound8K" / "audio"
return path_to_urbansound8k_csv, path_to_urbansound8k_audio
def log_dataset_statistics(dataset, metadata):
histogram_data = metadata["class"].value_counts()
dataset.get_logger().report_table(
title="Raw Dataset Metadata", series="Raw Dataset Metadata", table_plot=metadata
)
dataset.get_logger().report_histogram(
title="Class distribution",
series="Class distribution",
values=histogram_data,
iteration=0,
xlabels=histogram_data.index.tolist(),
yaxis="Amount of samples",
)
def build_clearml_dataset():
# Get a local copy of both the data and the labels
path_to_urbansound8k_csv, path_to_urbansound8k_audio = get_urbansound8k()
urbansound8k_metadata = pd.read_csv(path_to_urbansound8k_csv)
# Subset the data to only include the classes we want
urbansound8k_metadata = urbansound8k_metadata[
urbansound8k_metadata["class"].isin(configuration["selected_classes"])
]
# Create a pandas dataframe containing labels and other info we need later (fold is for train test split)
metadata = pd.DataFrame(
{
"fold": urbansound8k_metadata.loc[:, "fold"],
"filepath": (
"fold"
+ urbansound8k_metadata.loc[:, "fold"].astype(str)
+ "/"
+ urbansound8k_metadata.loc[:, "slice_file_name"].astype(str)
),
"label": urbansound8k_metadata.loc[:, "classID"],
}
)
# Now create a clearml dataset to start versioning our changes and make it much easier to get the right data
# in other tasks as well as on different machines
dataset = Dataset.create(
dataset_name="UrbanSounds example",
dataset_project="examples/Urbansounds",
dataset_tags=["raw"],
)
# Add the local files we downloaded earlier
dataset.add_files(path_to_urbansound8k_audio)
# Add the metadata in pandas format, we can now see it in the webUI and have it be easily accessible
dataset._task.upload_artifact(name="metadata", artifact_object=metadata)
# Let's add some cool graphs as statistics in the plots section!
log_dataset_statistics(dataset, urbansound8k_metadata)
# Finalize and upload the data and labels of the dataset
dataset.finalize(auto_upload=True)
if __name__ == "__main__":
build_clearml_dataset()

View File

@@ -1,3 +1,3 @@
torch>=1.1.0
torchvision>=0.3.0
clearml
clearml>=1.14.4

View File

@@ -1,2 +1,2 @@
autokeras ; python_version >= '3.7'
clearml
clearml>=1.14.4

View File

@@ -1,4 +1,4 @@
catboost
numpy >= 1.19.2
scikit_learn
clearml
clearml>=1.14.4

View File

@@ -1,3 +1,3 @@
click>=8.0
clearml
clearml>=1.14.4

View File

@@ -1,4 +1,4 @@
fastai < 2.0.0
tensorboard
tensorboardX
clearml
clearml>=1.14.4

View File

@@ -1,4 +1,4 @@
fastai >= 2.6.0
tensorboard
tensorboardX
clearml
clearml>=1.14.4

View File

@@ -1,2 +1,2 @@
clearml
clearml>=1.14.4
fire

View File

@@ -1,3 +1,3 @@
clearml
clearml>=1.14.4
hydra-core>=1.2.0

View File

@@ -4,4 +4,4 @@ torch>=1.1.0
torchvision>=0.3.0
pytorch-ignite
tqdm
clearml
clearml>=1.14.4

View File

@@ -1,2 +1,2 @@
clearml
clearml>=1.14.4
Keras>=2.2.4

View File

@@ -1,2 +1,2 @@
tensorflow>=2.0
clearml
clearml>=1.14.4

View File

@@ -1,4 +1,4 @@
keras-tuner
tensorflow>=2.0
tensorflow-datasets
clearml
clearml>=1.14.4

View File

@@ -2,4 +2,4 @@ lightgbm
scikit-learn
pandas
matplotlib
clearml
clearml>=1.14.4

View File

@@ -2,4 +2,4 @@ numpy != 1.24.0 # https://github.com/numpy/numpy/issues/22826
matplotlib >= 3.1.1 ; python_version >= '3.6'
matplotlib >= 2.2.4 ; python_version < '3.6'
seaborn
clearml
clearml>=1.14.4

View File

@@ -1,3 +1,3 @@
MegEngine ; python_version < '3.9'
tensorboardX
clearml
clearml>=1.14.4

View File

@@ -1,4 +1,4 @@
clearml
clearml>=1.14.4
mmcv>=1.5.1,<2.0.0
torch
torchvision

View File

@@ -1,4 +1,4 @@
clearml
clearml>=1.14.4
pytorch-lightning >= 1.6.0
torch
torchvision

View File

@@ -1,4 +1,4 @@
clearml
clearml>=1.14.4
jsonschema==3.2.0 ; python_version <= '3.5'
matplotlib
pytorch-ignite

View File

@@ -2,4 +2,4 @@ joblib>=0.13.2
matplotlib >= 3.1.1 ; python_version >= '3.6'
matplotlib >= 2.2.4 ; python_version < '3.6'
scikit-learn
clearml
clearml>=1.14.4

View File

@@ -2,4 +2,4 @@ tensorboardX>=1.8
torch>=1.1.0
torchvision>=0.3.0
moviepy
clearml
clearml>=1.14.4

View File

@@ -2,4 +2,4 @@ numpy != 1.24.0 # https://github.com/numpy/numpy/issues/22826
absl-py>=0.7.1
tensorboard>=2.0
tensorflow>=2.0
clearml
clearml>=1.14.4

View File

@@ -2,7 +2,7 @@ numpy != 1.24.0 # https://github.com/numpy/numpy/issues/22826
matplotlib >= 3.1.1 ; python_version >= '3.6'
matplotlib >= 2.2.4 ; python_version < '3.6'
scikit-learn
clearml
clearml>=1.14.4
xgboost>=0.90 ; python_version >= '3'
xgboost>=0.82 ; python_version < '3'
# sudo apt-get install graphviz

View File

@@ -1,5 +1,5 @@
allegroai~=3.5.7
clearml
clearml>=1.14.4
numpy
Pillow
torch

View File

@@ -1,3 +1,3 @@
keras
tensorflow
clearml
clearml>=1.14.4

View File

@@ -3,4 +3,4 @@ matplotlib >= 3.1.1 ; python_version >= '3.6'
matplotlib >= 2.2.4 ; python_version < '3.6'
scikit-learn
pandas
clearml
clearml>=1.14.4

View File

@@ -1,5 +1,5 @@
bokeh>=1.4.0
clearml
clearml>=1.14.4
matplotlib >= 3.1.1 ; python_version >= '3.6'
matplotlib >= 2.2.4 ; python_version < '3.6'
numpy != 1.24.0 # https://github.com/numpy/numpy/issues/22826

View File

@@ -1 +1 @@
clearml
clearml>=1.14.4

View File

@@ -1,5 +1,5 @@
boto3
pyYaml
six
clearml
clearml>=1.14.4
pathlib2

View File

@@ -1 +1 @@
clearml
clearml>=1.14.4

View File

@@ -1,2 +1,2 @@
clearml
clearml>=1.14.4
slack_sdk > 3.0.0