From 9e8ed16b3eca9dba0bde8c8506cbef2ec46920d6 Mon Sep 17 00:00:00 2001 From: AbdulHamid Merii <43741215+AH-Merii@users.noreply.github.com> Date: Sun, 25 Jun 2023 06:05:53 +0100 Subject: [PATCH 01/16] Added boto3 parameters for multipart configurations - multipart threshold and chunksize --- clearml/storage/helper.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index 3c48275e..32efea16 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -401,6 +401,8 @@ class _Boto3Driver(_Driver): _min_pool_connections = 512 _max_multipart_concurrency = deferred_config('aws.boto3.max_multipart_concurrency', 16) + _multipart_threshold = deferred_config('aws.boto3.multipart_threshold', (1024 ** 2) * 8) # 8 MB + _multipart_chunksize = deferred_config('aws.boto3.multipart_chunksize', (1024 ** 2) * 8) _pool_connections = deferred_config('aws.boto3.pool_connections', 512) _connect_timeout = deferred_config('aws.boto3.connect_timeout', 60) _read_timeout = deferred_config('aws.boto3.read_timeout', 60) @@ -498,7 +500,9 @@ class _Boto3Driver(_Driver): container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries), + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize), Callback=callback, ExtraArgs=extra_args, ) @@ -512,6 +516,8 @@ class _Boto3Driver(_Driver): Config=boto3.s3.transfer.TransferConfig( use_threads=False, num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize, ), Callback=callback, ExtraArgs=extra_args @@ -535,7 +541,9 @@ class _Boto3Driver(_Driver): container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries), + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize), Callback=callback, ExtraArgs=extra_args, ) @@ -547,7 +555,10 @@ class _Boto3Driver(_Driver): file_path, object_name, Config=boto3.s3.transfer.TransferConfig( - use_threads=False, num_download_attempts=container.config.retries + use_threads=False, + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize ), Callback=callback, ExtraArgs=extra_args @@ -600,7 +611,9 @@ class _Boto3Driver(_Driver): config = boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries) + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize) total_size_mb = obj.content_length / (1024. * 1024.) remote_path = os.path.join(obj.container_name, obj.key) cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log) @@ -618,7 +631,9 @@ class _Boto3Driver(_Driver): Config = boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize ) obj.download_file(str(p), Callback=callback, Config=Config) From 31739dbd84718429145fdc663ff5614956899cb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Kotiuk?= Date: Fri, 30 Jun 2023 13:29:42 +0200 Subject: [PATCH 02/16] Bump version of codeql task --- .github/workflows/codeql-analysis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index eb2ecbcb..da578099 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -43,7 +43,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v1 + uses: github/codeql-action/init@v2 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -54,7 +54,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v1 + uses: github/codeql-action/autobuild@v2 # â„šī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -68,4 +68,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + uses: github/codeql-action/analyze@v2 From f2fbbbd988d47f8df88ddb7f7ee2e64652cfc659 Mon Sep 17 00:00:00 2001 From: pollfly <75068813+pollfly@users.noreply.github.com> Date: Tue, 11 Jul 2023 10:22:36 +0300 Subject: [PATCH 03/16] Improve clearml.conf (#1070) --- docs/clearml.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/clearml.conf b/docs/clearml.conf index 0bf896b1..d29e5051 100644 --- a/docs/clearml.conf +++ b/docs/clearml.conf @@ -33,7 +33,7 @@ sdk { # } ] cache { - # Defaults to system temp folder / cache + # Defaults to /clearml_cache default_base_dir: "~/.clearml/cache" default_cache_manager_size: 100 } From 44faf6ef7b374ad3987d66f4711a92d24a4c973e Mon Sep 17 00:00:00 2001 From: pollfly <75068813+pollfly@users.noreply.github.com> Date: Tue, 11 Jul 2023 10:53:19 +0300 Subject: [PATCH 04/16] Improve clearml.conf (#1071) --- clearml/config/default/sdk.conf | 2 +- docs/trains.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clearml/config/default/sdk.conf b/clearml/config/default/sdk.conf index bb92674c..14ab36f6 100644 --- a/clearml/config/default/sdk.conf +++ b/clearml/config/default/sdk.conf @@ -3,7 +3,7 @@ storage { cache { - # Defaults to system temp folder / cache + # Defaults to /clearml_cache default_base_dir: "~/.clearml/cache" # default_cache_manager_size: 100 } diff --git a/docs/trains.conf b/docs/trains.conf index a56a509f..c84a10b8 100644 --- a/docs/trains.conf +++ b/docs/trains.conf @@ -20,7 +20,7 @@ sdk { storage { cache { - # Defaults to system temp folder / cache + # Defaults to /clearml_cache default_base_dir: "~/.clearml/cache" } } From e7edcbb813cd49e0d5ffb9c686a2afccd5ae9503 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Fri, 14 Jul 2023 14:04:38 +0300 Subject: [PATCH 05/16] Fix #1054, now retring a pipeline step will continue from the correct tf epoch --- clearml/binding/frameworks/tensorflow_bind.py | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/clearml/binding/frameworks/tensorflow_bind.py b/clearml/binding/frameworks/tensorflow_bind.py index fa0ac995..65b9e06d 100644 --- a/clearml/binding/frameworks/tensorflow_bind.py +++ b/clearml/binding/frameworks/tensorflow_bind.py @@ -724,16 +724,7 @@ class EventTrainsWriter(object): 'Received event without step, assuming step = {}'.format(step)) else: step = int(step) - # unlike other frameworks, tensorflow already accounts for the iteration number - # when continuing the training. we substract the smallest iteration such that we - # don't increment the step twice number - original_step = step - if EventTrainsWriter._current_task: - step -= EventTrainsWriter._current_task.get_initial_iteration() - # there can be a few metrics getting reported again, so the step can be negative - # for the first few reports - if step < 0 and original_step > 0: - step = 0 + step = tweak_step(step) self._max_step = max(self._max_step, step) if value_dicts is None: @@ -1378,7 +1369,7 @@ class PatchTensorFlowEager(object): plugin_type = plugin_type[next(i for i, c in enumerate(plugin_type) if c >= 'A'):] if plugin_type.startswith('scalars'): event_writer._add_scalar(tag=str(tag), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), scalar_data=tensor.numpy()) elif plugin_type.startswith('images'): img_data_np = tensor.numpy() @@ -1386,19 +1377,19 @@ class PatchTensorFlowEager(object): tag=tag, step=step, **kwargs) elif plugin_type.startswith('histograms'): event_writer._add_histogram( - tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, + tag=str(tag), step=tweak_step(step), hist_data=tensor.numpy() ) elif plugin_type.startswith('text'): event_writer._add_text( - tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, + tag=str(tag), step=tweak_step(step), tensor_bytes=tensor.numpy() ) elif 'audio' in plugin_type: audio_bytes_list = [a for a in tensor.numpy().flatten() if a] for i, audio_bytes in enumerate(audio_bytes_list): event_writer._add_audio(tag=str(tag) + ('/{}'.format(i) if len(audio_bytes_list) > 1 else ''), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), values=None, audio_data=audio_bytes) else: pass @@ -1416,7 +1407,7 @@ class PatchTensorFlowEager(object): if event_writer and isinstance(step, int) or hasattr(step, 'numpy'): try: event_writer._add_scalar(tag=str(tag), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), scalar_data=value.numpy()) except Exception as ex: LoggerRoot.get_base_logger(TensorflowBinding).warning(str(ex)) @@ -1428,7 +1419,7 @@ class PatchTensorFlowEager(object): str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag) event_writer._add_scalar( tag=str_tag, - step=int(a_step.numpy()) if not isinstance(a_step, int) else a_step, + step=tweak_step(step), scalar_data=a_value.numpy()) except Exception as a_ex: LoggerRoot.get_base_logger(TensorflowBinding).warning( @@ -1458,7 +1449,7 @@ class PatchTensorFlowEager(object): if event_writer and isinstance(step, int) or hasattr(step, 'numpy'): try: event_writer._add_histogram( - tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, + tag=str(tag), step=tweak_step(step), hist_data=values.numpy() ) except Exception as ex: @@ -1471,7 +1462,7 @@ class PatchTensorFlowEager(object): str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag) event_writer._add_histogram( tag=str_tag, - step=int(a_step.numpy()) if not isinstance(a_step, int) else a_step, + step=tweak_step(a_step), hist_data=a_value.numpy() ) except Exception as a_ex: @@ -1549,11 +1540,11 @@ class PatchTensorFlowEager(object): 'colorspace': 'RGB', 'encodedImageString': img_data_np[i]} image_tag = str(tag) + '/sample_{}'.format(i - 2) if img_data_np.size > 3 else str(tag) event_writer._add_image(tag=image_tag, - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), img_data=img_data) else: event_writer._add_image_numpy(tag=str(tag), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), img_data_np=img_data_np, max_keep_images=kwargs.get('max_images')) @@ -2299,3 +2290,15 @@ class PatchTensorflow2ModelIO(object): except Exception: pass return model + + +def tweak_step(step): + # noinspection PyBroadException + try: + step = int(step.numpy()) if not isinstance(step, int) else step + # unlike other frameworks, tensorflow already accounts for the iteration number + # when continuing the training. we substract the smallest iteration such that we + # don't increment the step twice number + return step - EventTrainsWriter._current_task.get_initial_iteration() + except Exception: + return step From f6ae8031109ac14179bcd55147f3575336d9ac3f Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Fri, 14 Jul 2023 14:24:10 +0300 Subject: [PATCH 06/16] Add default values in .conf files for s3 multipart --- clearml/config/default/sdk.conf | 2 ++ docs/clearml.conf | 2 ++ docs/trains.conf | 2 ++ 3 files changed, 6 insertions(+) diff --git a/clearml/config/default/sdk.conf b/clearml/config/default/sdk.conf index 14ab36f6..63b15b8a 100644 --- a/clearml/config/default/sdk.conf +++ b/clearml/config/default/sdk.conf @@ -103,6 +103,8 @@ boto3 { pool_connections: 512 max_multipart_concurrency: 16 + multipart_threshold: 8388608 # 8MB + multipart_chunksize: 8388608 # 8MB } } google.storage { diff --git a/docs/clearml.conf b/docs/clearml.conf index d29e5051..e7b045c5 100644 --- a/docs/clearml.conf +++ b/docs/clearml.conf @@ -121,6 +121,8 @@ sdk { boto3 { pool_connections: 512 max_multipart_concurrency: 16 + multipart_threshold: 8388608 # 8MB + multipart_chunksize: 8388608 # 8MB } } google.storage { diff --git a/docs/trains.conf b/docs/trains.conf index c84a10b8..f2d1ee84 100644 --- a/docs/trains.conf +++ b/docs/trains.conf @@ -105,6 +105,8 @@ sdk { boto3 { pool_connections: 512 max_multipart_concurrency: 16 + multipart_threshold: 8388608 # 8MB + multipart_chunksize: 8388608 # 8MB } } google.storage { From 9d57dad65239f5630b7ac931c9326359c722f3b3 Mon Sep 17 00:00:00 2001 From: achaiah <3967183+achaiah@users.noreply.github.com> Date: Sun, 16 Jul 2023 07:22:10 -0500 Subject: [PATCH 07/16] AWS credential access fix (#1000) Fixing AWS credential access that uses a token, instantiation within VPC without AvailabilityZones and ebs volume creation. Note: EBS is attached but never mounted. --- clearml/automation/aws_driver.py | 48 +++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/clearml/automation/aws_driver.py b/clearml/automation/aws_driver.py index 603c0e03..8c4f0768 100644 --- a/clearml/automation/aws_driver.py +++ b/clearml/automation/aws_driver.py @@ -20,12 +20,12 @@ except ImportError as err: "install with: pip install boto3" ) from err - @attr.s class AWSDriver(CloudDriver): """AWS Driver""" aws_access_key_id = attr.ib(validator=instance_of(str), default='') aws_secret_access_key = attr.ib(validator=instance_of(str), default='') + aws_session_token = attr.ib(validator=instance_of(str), default='') aws_region = attr.ib(validator=instance_of(str), default='') use_credentials_chain = attr.ib(validator=instance_of(bool), default=False) use_iam_instance_profile = attr.ib(validator=instance_of(bool), default=False) @@ -37,6 +37,7 @@ class AWSDriver(CloudDriver): obj = super().from_config(config) obj.aws_access_key_id = config['hyper_params'].get('cloud_credentials_key') obj.aws_secret_access_key = config['hyper_params'].get('cloud_credentials_secret') + obj.aws_session_token = config['hyper_params'].get('cloud_credentials_token') obj.aws_region = config['hyper_params'].get('cloud_credentials_region') obj.use_credentials_chain = config['hyper_params'].get('use_credentials_chain', False) obj.use_iam_instance_profile = config['hyper_params'].get('use_iam_instance_profile', False) @@ -47,29 +48,49 @@ class AWSDriver(CloudDriver): def __attrs_post_init__(self): super().__attrs_post_init__() self.tags = parse_tags(self.tags) - + def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id): # user_data script will automatically run when the instance is started. it will install the required packages - # for clearml-agent configure it using environment variables and run clearml-agent on the required queue + # for clearml-agent, configure it using environment variables and run clearml-agent on the required queue + # Config reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/run_instances.html user_data = self.gen_user_data(worker_prefix, queue_name, task_id, resource_conf.get("cpu_only", False)) ec2 = boto3.client("ec2", **self.creds()) launch_specification = ConfigFactory.from_dict( { "ImageId": resource_conf["ami_id"], + "Monitoring": {'Enabled':bool(resource_conf.get('enable_monitoring', False))}, "InstanceType": resource_conf["instance_type"], - "BlockDeviceMappings": [ - { - "DeviceName": resource_conf["ebs_device_name"], - "Ebs": { - "VolumeSize": resource_conf["ebs_volume_size"], - "VolumeType": resource_conf["ebs_volume_type"], - }, - } - ], - "Placement": {"AvailabilityZone": resource_conf["availability_zone"]}, } ) + # handle EBS volumes (existing or new) + # Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html + if resource_conf.get("ebs_snapshot_id") and resource_conf.get("ebs_device_name"): + launch_specification["BlockDeviceMappings"] = [ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "SnapshotId": resource_conf["ebs_snapshot_id"] + } + } + ] + elif resource_conf.get("ebs_device_name"): + launch_specification["BlockDeviceMappings"] = [ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "VolumeSize": resource_conf.get("ebs_volume_size", 80), + "VolumeType": resource_conf.get("ebs_volume_type", "gp3") + } + } + ] + + if resource_conf.get("subnet_id", None): + launch_specification["SubnetId"] = resource_conf["subnet_id"] + elif resource_conf.get("availability_zone", None): + launch_specification["Placement"] = {"AvailabilityZone": resource_conf["availability_zone"]} + else: + raise Exception('subnet_id or availability_zone must to be specified in the config') if resource_conf.get("key_name", None): launch_specification["KeyName"] = resource_conf["key_name"] if resource_conf.get("security_group_ids", None): @@ -150,6 +171,7 @@ class AWSDriver(CloudDriver): creds.update({ 'aws_secret_access_key': self.aws_secret_access_key or None, 'aws_access_key_id': self.aws_access_key_id or None, + 'aws_session_token': self.aws_session_token or None, }) return creds From c8c8a1224e99bb04637b9b0d6ca92e60d1010fdf Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Tue, 18 Jul 2023 15:03:31 +0300 Subject: [PATCH 08/16] Add support for .get ing pipelines and enqueue-ing them --- clearml/automation/controller.py | 132 ++++++++++++++++++- clearml/task.py | 27 +++- examples/pipeline/pipeline_from_decorator.py | 60 +++++---- 3 files changed, 185 insertions(+), 34 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 0b1c465e..49b646b2 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -23,7 +23,7 @@ from .. import Logger from ..automation import ClearmlJob from ..backend_api import Session from ..backend_interface.task.populate import CreateFromFunction -from ..backend_interface.util import get_or_create_project +from ..backend_interface.util import get_or_create_project, mutually_exclusive from ..config import get_remote_task_id from ..debugging.log import LoggerRoot from ..errors import UsageError @@ -1292,6 +1292,136 @@ class PipelineController(object): """ return self._pipeline_args + @classmethod + def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False): + # type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any + """ + Enqueue a PipelineController for execution, by adding it to an execution queue. + + .. note:: + A worker daemon must be listening at the queue for the worker to fetch the Task and execute it, + see `ClearML Agent <../clearml_agent>`_ in the ClearML Documentation. + + :param pipeline_controller: The PipelineController to enqueue. Specify a PipelineController object or PipelineController ID + :param queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified. + :param queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified. + :param bool force: If True, reset the PipelineController if necessary before enqueuing it + + :return: An enqueue JSON response. + + .. code-block:: javascript + + { + "queued": 1, + "updated": 1, + "fields": { + "status": "queued", + "status_reason": "", + "status_message": "", + "status_changed": "2020-02-24T15:05:35.426770+00:00", + "last_update": "2020-02-24T15:05:35.426770+00:00", + "execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf" + } + } + + - ``queued`` - The number of Tasks enqueued (an integer or ``null``). + - ``updated`` - The number of Tasks updated (an integer or ``null``). + - ``fields`` + + - ``status`` - The status of the experiment. + - ``status_reason`` - The reason for the last status change. + - ``status_message`` - Information about the status. + - ``status_changed`` - The last status change date and time (ISO 8601 format). + - ``last_update`` - The last Task update time, including Task creation, update, change, or events for this task (ISO 8601 format). + - ``execution.queue`` - The ID of the queue where the Task is enqueued. ``null`` indicates not enqueued. + """ + pipeline_controller = ( + pipeline_controller + if isinstance(pipeline_controller, PipelineController) + else cls.get(pipeline_id=pipeline_controller) + ) + return Task.enqueue(pipeline_controller._task, queue_name=queue_name, queue_id=queue_id, force=force) + + @classmethod + def get( + cls, + pipeline_id=None, # type: Optional[str] + pipeline_project=None, # type: Optional[str] + pipeline_name=None, # type: Optional[str] + pipeline_version=None, # type: Optional[str] + pipeline_tags=None, # type: Optional[Sequence[str]] + shallow_search=False # type: bool + ): + # type: (...) -> "PipelineController" + """ + Get a specific PipelineController. If multiple pipeline controllers are found, the pipeline controller + with the highest semantic version is returned. If no semantic version is found, the most recently + updated pipeline controller is returned. This function raises aan Exception if no pipeline controller + was found + + Note: In order to run the pipeline controller returned by this function, use PipelineController.enqueue + + :param pipeline_id: Requested PipelineController ID + :param pipeline_project: Requested PipelineController project + :param pipeline_name: Requested PipelineController name + :param pipeline_tags: Requested PipelineController tags (list of tag strings) + :param shallow_search: If True, search only the first 500 results (first page) + """ + mutually_exclusive(pipeline_id=pipeline_id, pipeline_project=pipeline_project, _require_at_least_one=False) + mutually_exclusive(pipeline_id=pipeline_id, pipeline_name=pipeline_name, _require_at_least_one=False) + if not pipeline_id: + pipeline_project_hidden = "{}/.pipelines/{}".format(pipeline_project, pipeline_name) + name_with_runtime_number_regex = r"^{}( #[0-9]+)*$".format(re.escape(pipeline_name)) + pipelines = Task._query_tasks( + pipeline_project=[pipeline_project_hidden], + task_name=name_with_runtime_number_regex, + fetch_only_first_page=False if not pipeline_version else shallow_search, + only_fields=["id"] if not pipeline_version else ["id", "runtime.version"], + system_tags=[cls._tag], + order_by=["-last_update"], + tags=pipeline_tags, + search_hidden=True, + _allow_extra_fields_=True, + ) + if pipelines: + if not pipeline_version: + pipeline_id = pipelines[0].id + current_version = None + for pipeline in pipelines: + if not pipeline.runtime: + continue + candidate_version = pipeline.runtime.get("version") + if not candidate_version or not Version.is_valid_version_string(candidate_version): + continue + if not current_version or Version(candidate_version) > current_version: + current_version = Version(candidate_version) + pipeline_id = pipeline.id + else: + for pipeline in pipelines: + if pipeline.runtime.get("version") == pipeline_version: + pipeline_id = pipeline.id + break + if not pipeline_id: + error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format(pipeline_project, pipeline_name) + if pipeline_version: + error_msg += ", pipeline_version={}".format(pipeline_version) + raise ValueError(error_msg) + pipeline_task = Task.get_task(task_id=pipeline_id) + pipeline_object = cls.__new__(cls) + pipeline_object._task = pipeline_task + pipeline_object._nodes = {} + pipeline_object._running_nodes = [] + try: + pipeline_object._deserialize(pipeline_task._get_configuration_dict(cls._config_section)) + except Exception: + pass + return pipeline_object + + @property + def id(self): + # type: () -> str + return self._task.id + @property def tags(self): # type: () -> List[str] diff --git a/clearml/task.py b/clearml/task.py index 1e28999f..1becc33e 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -1213,8 +1213,8 @@ class Task(_Task): return cloned_task @classmethod - def enqueue(cls, task, queue_name=None, queue_id=None): - # type: (Union[Task, str], Optional[str], Optional[str]) -> Any + def enqueue(cls, task, queue_name=None, queue_id=None, force=False): + # type: (Union[Task, str], Optional[str], Optional[str], bool) -> Any """ Enqueue a Task for execution, by adding it to an execution queue. @@ -1225,6 +1225,7 @@ class Task(_Task): :param Task/str task: The Task to enqueue. Specify a Task object or Task ID. :param str queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified. :param str queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified. + :param bool force: If True, reset the Task if necessary before enqueuing it :return: An enqueue JSON response. @@ -1271,9 +1272,25 @@ class Task(_Task): raise ValueError('Could not find queue named "{}"'.format(queue_name)) req = tasks.EnqueueRequest(task=task_id, queue=queue_id) - res = cls._send(session=session, req=req) - if not res.ok(): - raise ValueError(res.response) + exception = None + res = None + try: + res = cls._send(session=session, req=req) + ok = res.ok() + except Exception as e: + exception = e + ok = False + if not ok: + if not force: + if res: + raise ValueError(res.response) + raise exception + task = cls.get_task(task_id=task) if isinstance(task, str) else task + task.reset(set_started_on_success=False, force=True) + req = tasks.EnqueueRequest(task=task_id, queue=queue_id) + res = cls._send(session=session, req=req) + if not res.ok(): + raise ValueError(res.response) resp = res.response return resp diff --git a/examples/pipeline/pipeline_from_decorator.py b/examples/pipeline/pipeline_from_decorator.py index a37baeb4..0a847ab0 100644 --- a/examples/pipeline/pipeline_from_decorator.py +++ b/examples/pipeline/pipeline_from_decorator.py @@ -5,20 +5,21 @@ from clearml import TaskTypes # Make the following function an independent pipeline component step # notice all package imports inside the function will be automatically logged as # required packages for the pipeline execution step -@PipelineDecorator.component(return_values=['data_frame'], cache=True, task_type=TaskTypes.data_processing) +@PipelineDecorator.component(return_values=["data_frame"], cache=True, task_type=TaskTypes.data_processing) def step_one(pickle_data_url: str, extra: int = 43): - print('step_one') + print("step_one") # make sure we have scikit-learn for this step, we need it to use to unpickle the object import sklearn # noqa import pickle import pandas as pd from clearml import StorageManager + local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) - with open(local_iris_pkl, 'rb') as f: + with open(local_iris_pkl, "rb") as f: iris = pickle.load(f) - data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) - data_frame.columns += ['target'] - data_frame['target'] = iris['target'] + data_frame = pd.DataFrame(iris["data"], columns=iris["feature_names"]) + data_frame.columns += ["target"] + data_frame["target"] = iris["target"] return data_frame @@ -28,18 +29,17 @@ def step_one(pickle_data_url: str, extra: int = 43): # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned tuple will be stored as an artifact named "X_train, X_test, y_train, y_test" @PipelineDecorator.component( - return_values=['X_train, X_test, y_train, y_test'], cache=True, task_type=TaskTypes.data_processing + return_values=["X_train", "X_test", "y_train", "y_test"], cache=True, task_type=TaskTypes.data_processing ) def step_two(data_frame, test_size=0.2, random_state=42): - print('step_two') + print("step_two") # make sure we have pandas for this step, we need it to use the data_frame import pandas as pd # noqa from sklearn.model_selection import train_test_split - y = data_frame['target'] - X = data_frame[(c for c in data_frame.columns if c != 'target')] - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=test_size, random_state=random_state - ) + + y = data_frame["target"] + X = data_frame[(c for c in data_frame.columns if c != "target")] + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state) return X_train, X_test, y_train, y_test @@ -49,37 +49,41 @@ def step_two(data_frame, test_size=0.2, random_state=42): # required packages for the pipeline execution step # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned object will be stored as an artifact named "model" -@PipelineDecorator.component(return_values=['model'], cache=True, task_type=TaskTypes.training) +@PipelineDecorator.component(return_values=["model"], cache=True, task_type=TaskTypes.training) def step_three(X_train, y_train): - print('step_three') + print("step_three") # make sure we have pandas for this step, we need it to use the data_frame import pandas as pd # noqa from sklearn.linear_model import LogisticRegression - model = LogisticRegression(solver='liblinear', multi_class='auto') + + model = LogisticRegression(solver="liblinear", multi_class="auto") model.fit(X_train, y_train) return model + # Make the following function an independent pipeline component step # notice all package imports inside the function will be automatically logged as # required packages for the pipeline execution step # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned object will be stored as an artifact named "accuracy" -@PipelineDecorator.component(return_values=['accuracy'], cache=True, task_type=TaskTypes.qc) +@PipelineDecorator.component(return_values=["accuracy"], cache=True, task_type=TaskTypes.qc) def step_four(model, X_data, Y_data): from sklearn.linear_model import LogisticRegression # noqa from sklearn.metrics import accuracy_score + Y_pred = model.predict(X_data) return accuracy_score(Y_data, Y_pred, normalize=True) + # The actual pipeline execution context # notice that all pipeline component function calls are actually executed remotely # Only when a return value is used, the pipeline logic will wait for the component execution to complete -@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.5') -def executing_pipeline(pickle_url, mock_parameter='mock'): - print('pipeline args:', pickle_url, mock_parameter) +@PipelineDecorator.pipeline(name="custom pipeline logic", project="examples", version="0.0.5") +def executing_pipeline(pickle_url, mock_parameter="mock"): + print("pipeline args:", pickle_url, mock_parameter) # Use the pipeline argument to start the pipeline and pass it ot the first step - print('launch step one') + print("launch step one") data_frame = step_one(pickle_url) # Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`) @@ -87,17 +91,17 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): # the pipeline logic does not actually load the artifact itself. # When actually passing the `data_frame` object into a new step, # It waits for the creating step/function (`step_one`) to complete the execution - print('launch step two') + print("launch step two") X_train, X_test, y_train, y_test = step_two(data_frame) - print('launch step three') + print("launch step three") model = step_three(X_train, y_train) # Notice since we are "printing" the `model` object, # we actually deserialize the object from the third step, and thus wait for the third step to complete. - print('returned model: {}'.format(model)) + print("returned model: {}".format(model)) - print('launch step four') + print("launch step four") accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test) # Notice since we are "printing" the `accuracy` object, @@ -105,7 +109,7 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): print(f"Accuracy={accuracy}%") -if __name__ == '__main__': +if __name__ == "__main__": # set the pipeline steps default execution queue (per specific step we can override it with the decorator) # PipelineDecorator.set_default_execution_queue('default') # Run the pipeline steps as subprocesses on the current machine, great for local executions @@ -114,7 +118,7 @@ if __name__ == '__main__': # Start the pipeline execution logic. executing_pipeline( - pickle_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl', + pickle_url="https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl", ) - print('process completed') + print("process completed") From 75b4015fdb102039f1630595fb755fe8b594493a Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Wed, 19 Jul 2023 02:09:49 +0300 Subject: [PATCH 09/16] Fix misconfigured boto3 bucket credentials verify --- clearml/backend_config/bucket_config.py | 8 ++++++-- clearml/storage/helper.py | 8 +++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/clearml/backend_config/bucket_config.py b/clearml/backend_config/bucket_config.py index 49086cec..0bc5ac8e 100644 --- a/clearml/backend_config/bucket_config.py +++ b/clearml/backend_config/bucket_config.py @@ -31,7 +31,7 @@ class S3BucketConfig(object): acl = attrib(type=str, converter=_none_to_empty_string, default="") secure = attrib(type=bool, default=True) region = attrib(type=str, converter=_none_to_empty_string, default="") - verify = attrib(type=bool, default=True) + verify = attrib(type=bool, default=None) use_credentials_chain = attrib(type=bool, default=False) extra_args = attrib(type=dict, default=None) @@ -106,6 +106,7 @@ class S3BucketConfigurations(BaseBucketConfigurations): default_use_credentials_chain=False, default_token="", default_extra_args=None, + default_verify=None, ): super(S3BucketConfigurations, self).__init__() self._buckets = buckets if buckets else list() @@ -116,6 +117,7 @@ class S3BucketConfigurations(BaseBucketConfigurations): self._default_multipart = True self._default_use_credentials_chain = default_use_credentials_chain self._default_extra_args = default_extra_args + self._default_verify = default_verify @classmethod def from_config(cls, s3_configuration): @@ -129,6 +131,7 @@ class S3BucketConfigurations(BaseBucketConfigurations): default_region = s3_configuration.get("region", "") or getenv("AWS_DEFAULT_REGION", "") default_use_credentials_chain = s3_configuration.get("use_credentials_chain") or False default_extra_args = s3_configuration.get("extra_args") + default_verify = s3_configuration.get("verify", None) default_key = _none_to_empty_string(default_key) default_secret = _none_to_empty_string(default_secret) @@ -142,7 +145,8 @@ class S3BucketConfigurations(BaseBucketConfigurations): default_region, default_use_credentials_chain, default_token, - default_extra_args + default_extra_args, + default_verify, ) def add_config(self, bucket_config): diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index f5aea115..f7ef6008 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -437,12 +437,18 @@ class _Boto3Driver(_Driver): self.name = name[5:] endpoint = (('https://' if cfg.secure else 'http://') + cfg.host) if cfg.host else None + verify = cfg.verify + if verify is True: + # True is a non-documented value for boto3, use None instead (which means verify) + print("Using boto3 verify=None instead of true") + verify = None + # boto3 client creation isn't thread-safe (client itself is) with self._creation_lock: boto_kwargs = { "endpoint_url": endpoint, "use_ssl": cfg.secure, - "verify": cfg.verify, + "verify": verify, "region_name": cfg.region or None, # None in case cfg.region is an empty string "config": botocore.client.Config( max_pool_connections=max( From 028a8356767fa81669074e5e0106774b49345123 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Wed, 19 Jul 2023 02:11:40 +0300 Subject: [PATCH 10/16] Deprecate Task.get_by_name --- clearml/backend_interface/task/task.py | 15 --------------- clearml/task.py | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 53465a37..531c8a4c 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -2807,21 +2807,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): res = cls._send(session=session, req=req, log=log) return res - @classmethod - def get_by_name(cls, task_name): - # type: (str) -> Task - """ - Returns the most recent task with the given name from anywhere in the system as a Task object. - - :param str task_name: The name of the task to search for. - - :return: Task object of the most recent task with that name. - """ - res = cls._send(cls._get_default_session(), tasks.GetAllRequest(name=exact_match_regex(task_name))) - - task = get_single_result(entity='task', query=task_name, results=res.response.tasks) - return cls(task_id=task.id) - @classmethod def get_task_output_log_web_page(cls, task_id, project_id=None, app_server_host=None): # type: (str, Optional[str], Optional[str]) -> str diff --git a/clearml/task.py b/clearml/task.py index 1becc33e..e0110af7 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -7,6 +7,7 @@ import signal import sys import threading import time +import warnings from argparse import ArgumentParser from logging import getLogger from tempfile import mkstemp, mkdtemp @@ -860,6 +861,23 @@ class Task(_Task): return task + @classmethod + def get_by_name(cls, task_name): + # type: (str) -> TaskInstance + """ + + .. note:: + This method is deprecated, use :meth:`Task.get_task` instead. + + Returns the most recent task with the given name from anywhere in the system as a Task object. + + :param str task_name: The name of the task to search for. + + :return: Task object of the most recent task with that name. + """ + warnings.warn("Warning: 'Task.get_by_name' is deprecated. Use 'Task.get_task' instead", DeprecationWarning) + return cls.get_task(task_name=task_name) + @classmethod def get_task( cls, From cc875f0fbe75ee9763f3aebbb1a3356ffaf6fc43 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Wed, 19 Jul 2023 17:28:58 +0300 Subject: [PATCH 11/16] Fix linter issues --- clearml/automation/aws_driver.py | 7 ++++--- clearml/backend_interface/task/task.py | 10 ++++------ clearml/binding/frameworks/tensorflow_bind.py | 4 ++-- clearml/binding/gradio_bind.py | 2 +- clearml/cli/config/__main__.py | 2 +- clearml/datasets/dataset.py | 2 +- clearml/storage/helper.py | 7 +++---- 7 files changed, 16 insertions(+), 18 deletions(-) diff --git a/clearml/automation/aws_driver.py b/clearml/automation/aws_driver.py index 8c4f0768..1f0b75de 100644 --- a/clearml/automation/aws_driver.py +++ b/clearml/automation/aws_driver.py @@ -20,6 +20,7 @@ except ImportError as err: "install with: pip install boto3" ) from err + @attr.s class AWSDriver(CloudDriver): """AWS Driver""" @@ -48,7 +49,7 @@ class AWSDriver(CloudDriver): def __attrs_post_init__(self): super().__attrs_post_init__() self.tags = parse_tags(self.tags) - + def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id): # user_data script will automatically run when the instance is started. it will install the required packages # for clearml-agent, configure it using environment variables and run clearml-agent on the required queue @@ -59,7 +60,7 @@ class AWSDriver(CloudDriver): launch_specification = ConfigFactory.from_dict( { "ImageId": resource_conf["ami_id"], - "Monitoring": {'Enabled':bool(resource_conf.get('enable_monitoring', False))}, + "Monitoring": {'Enabled': bool(resource_conf.get('enable_monitoring', False))}, "InstanceType": resource_conf["instance_type"], } ) @@ -84,7 +85,7 @@ class AWSDriver(CloudDriver): } } ] - + if resource_conf.get("subnet_id", None): launch_specification["SubnetId"] = resource_conf["subnet_id"] elif resource_conf.get("availability_zone", None): diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 531c8a4c..5f4323f3 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -3,7 +3,6 @@ import itertools import json import logging import os -import re import sys import warnings from copy import copy @@ -34,12 +33,12 @@ from ...backend_interface.task.development.worker import DevWorker from ...backend_interface.session import SendError from ...backend_api import Session from ...backend_api.services import tasks, models, events, projects -from ...backend_api.session.defs import ENV_OFFLINE_MODE +# from ...backend_api.session.defs import ENV_OFFLINE_MODE from ...utilities.pyhocon import ConfigTree, ConfigFactory from ...utilities.config import config_dict_to_text, text_to_config_dict from ...errors import ArtifactUriDeleteError -from ..base import IdObjectBase, InterfaceBase +from ..base import IdObjectBase # , InterfaceBase from ..metrics import Metrics, Reporter from ..model import Model from ..setupuploadmixin import SetupUploadMixin @@ -376,7 +375,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): self._edit(type=tasks.TaskTypeEnum(task_type)) return id - def _set_storage_uri(self, value): value = value.rstrip('/') if value else None self._storage_uri = StorageHelper.conform_url(value) @@ -1406,7 +1404,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): Remove input models from the current task. Note that the models themselves are not deleted, but the tasks' reference to the models is removed. To delete the models themselves, see `Models.remove` - + :param models_to_remove: The models to remove from the task. Can be a list of ids, or of `BaseModel` (including its subclasses: `Model` and `InputModel`) """ @@ -2543,7 +2541,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): # type: (**Any) -> () for k, v in kwargs.items(): setattr(self.data, k, v) - offline_mode_folder = self.get_offline_mode_folder() + offline_mode_folder = self.get_offline_mode_folder() if not offline_mode_folder: return Path(offline_mode_folder).mkdir(parents=True, exist_ok=True) diff --git a/clearml/binding/frameworks/tensorflow_bind.py b/clearml/binding/frameworks/tensorflow_bind.py index 65b9e06d..eb0366dc 100644 --- a/clearml/binding/frameworks/tensorflow_bind.py +++ b/clearml/binding/frameworks/tensorflow_bind.py @@ -725,7 +725,7 @@ class EventTrainsWriter(object): else: step = int(step) step = tweak_step(step) - + self._max_step = max(self._max_step, step) if value_dicts is None: LoggerRoot.get_base_logger(TensorflowBinding).debug("Summary arrived without 'value'") @@ -2299,6 +2299,6 @@ def tweak_step(step): # unlike other frameworks, tensorflow already accounts for the iteration number # when continuing the training. we substract the smallest iteration such that we # don't increment the step twice number - return step - EventTrainsWriter._current_task.get_initial_iteration() + return step - EventTrainsWriter._current_task.get_initial_iteration() except Exception: return step diff --git a/clearml/binding/gradio_bind.py b/clearml/binding/gradio_bind.py index d074302e..26d1d99c 100644 --- a/clearml/binding/gradio_bind.py +++ b/clearml/binding/gradio_bind.py @@ -70,7 +70,7 @@ class PatchGradio: # noinspection PyBroadException try: return original_fn(*args, **kwargs) - except Exception as e: + except Exception: del kwargs["root_path"] return original_fn(*args, **kwargs) diff --git a/clearml/cli/config/__main__.py b/clearml/cli/config/__main__.py index de006d3d..46a4ccb9 100644 --- a/clearml/cli/config/__main__.py +++ b/clearml/cli/config/__main__.py @@ -242,7 +242,7 @@ def parse_known_host(parsed_host): print('Assuming files and api ports are unchanged and use the same (' + parsed_host.scheme + ') protocol') api_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8008' + parsed_host.path web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path - files_host = parsed_host.scheme + "://" + parsed_host.netloc+ ':8081' + parsed_host.path + files_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8081' + parsed_host.path else: print("Warning! Could not parse host name") api_host = '' diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 05ae2d62..569fb743 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -2981,7 +2981,7 @@ class Dataset(object): task = Task.get_task(task_id=id_) dataset_struct_entry = { - "job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9 + "job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9 "status": task.status } # noinspection PyProtectedMember diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index f7ef6008..b3a407a7 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -116,6 +116,7 @@ class _Driver(object): cls._file_server_hosts = hosts return cls._file_server_hosts + class _HttpDriver(_Driver): """ LibCloud http/https adapter (simple, enough for now) """ @@ -1797,7 +1798,6 @@ class _FileStorageDriver(_Driver): return os.path.isfile(object_name) - class StorageHelper(object): """ Storage helper. Used by the entire system to download/upload files. @@ -2433,7 +2433,7 @@ class StorageHelper(object): result_dest_path = canonized_dest_path if return_canonized else dest_path - if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema + if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema # quote link result_dest_path = quote_url(result_dest_path, StorageHelper._quotable_uri_schemes) @@ -2451,7 +2451,7 @@ class StorageHelper(object): result_path = canonized_dest_path if return_canonized else dest_path - if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema + if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema # store original callback a_cb = cb @@ -2976,7 +2976,6 @@ class StorageHelper(object): ) - def normalize_local_path(local_path): """ Get a normalized local path From 9b680d174335c3d7e1027594a26ec05355fbaa7f Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Wed, 19 Jul 2023 17:29:41 +0300 Subject: [PATCH 12/16] Add clarification about breaking changes in sdk 1.11.0 and 1.11.1 --- docs/errata_breaking_change_gcs_sdk_1_11_x.md | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 docs/errata_breaking_change_gcs_sdk_1_11_x.md diff --git a/docs/errata_breaking_change_gcs_sdk_1_11_x.md b/docs/errata_breaking_change_gcs_sdk_1_11_x.md new file mode 100644 index 00000000..bbbdd722 --- /dev/null +++ b/docs/errata_breaking_change_gcs_sdk_1_11_x.md @@ -0,0 +1,83 @@ +# Handling the Google Cloud Storage breaking change + +## Rationale + +Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This causes issues accessing these objects directly from the ClearML SDK. This is relevant for URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK (note that access from the ClearML UI is still possible), you should perform one of the actions in the section below. + +## Recommended Steps + +The code snippets below should serve as an example rather than an actual conversion script. Depending on what object you're trying to fix, you should pick the respective lines of code from step 1 and 2. + +1. You need to be able to download objects (models, datasets, media, artifacts) registered by affected versions. See the code snippet below and adjust it according to your use case to be able to get a local copy of the object +``` +from clearml import Task, ImportModel +from urllib.parse import unquote # <- you will need this + + +ds_task = Task.get_task(dataset_id) # For Datasets +# OR +task = Task.get_task(task_id) # For Artifacts, Media, and Models + + +url = unquote(ds_task.artifacts['data'].url) # For Datasets +# OR +url = unquote(task.artifacts[artifact_name].url) # For Artifacts +# OR +model = InputModel(task.output_models_id['test_file']) # For Models associated to tasks +url = unquote(model.url) +# OR +model = InputModel(model_id) # For any Models +url = unquote(model.url) +# OR +samples = task.get_debug_samples(title, series) # For Media/Debug samples +sample_urls = [unquote(sample['url']) for sample in samples] + +local_path = StorageManager.get_local_copy(url) + +# NOTE: For Datasets you will need to unzip the `local_path` +``` + +2. Once the object is downloaded locally, you can re-register it with the new version. See the snipped below and adjust according to your use case +``` +from clearml import Task, Dataset, OutputModel +import os + + +ds = Dataset.create(dataset_name=task.name, dataset_projecte=task.get_project_name(), parents=[Dataset.get(dataset_id)]) # For Datasets +# OR +task = Task.get_task(task_name=task.name, project_name=task.get_project_name()) # For Artifacts, Media, and Models + + +ds.add_files(unzipped_local_path) # For Datasets +ds.finalize(auto_upload=True) +# OR +task.upload_artifact(name=artifact_name, artifact_object=local_path) # For Artifacts +# OR +model = OutputModel(task=task) # For any Models +model.update_weights(local_path) # note: if the original model was created with update_weights_package, + # preserve this behavior by saving the new one with update_weights_package too +# OR +for sample in samples: + task.get_logger().report_media(sample['metric'], sample['variant'], local_path=unquote(sample['url'])) # For Media/Debug samples +``` + +## Alternative methods + +These methods are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata. + +**Method 1**: You can try to alter the existing unpublished experiments/models using the lower-level `APIClient` +``` +from clearml.backend_api.session.client import APIClient + + +client = APIClient() + +client.tasks.add_or_update_artifacts(task=ds_task.id, force=True, artifacts=[{"uri": unquote(ds_task.artifacts['state'].url), "key": "state", "type": "dict"}]) +client.tasks.add_or_update_artifacts(task=ds_task.id, force=True, artifacts=[{"uri": unquote(ds_task.artifacts['data'].url), "key": "data", "type": "custom"}]) # For datasets on completed dataset uploads +# OR +client.tasks.add_or_update_artifacts(task=task.id, force=True, artifacts=[{"uri": unquote(url), "key": artifact_name, "type": "custom"}]) # For artifacts on completed tasks +# OR +client.models.edit(model=model.id, force=True, uri=url) # For any unpublished Model +``` + +**Method 2**: There's an option available only to those who self-host their ClearML server. It is possible to manually update the values registered in MongoDB, but beware - this is an advanced procedure that should be handled with extreme care, as it can lead to an inconsistent state if mishandled. From 618a625b74cbb164668f93daf0fdcaf31101d431 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Wed, 19 Jul 2023 17:33:28 +0300 Subject: [PATCH 13/16] Bump version to 1.12.0 --- clearml/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clearml/version.py b/clearml/version.py index aab78e83..666b2f71 100644 --- a/clearml/version.py +++ b/clearml/version.py @@ -1 +1 @@ -__version__ = '1.11.2rc0' +__version__ = '1.12.0' From ac1ae7f4473ba7f82edbd1edd0cb3d69652c833e Mon Sep 17 00:00:00 2001 From: alex-burlacu-clear-ml <123369351+alex-burlacu-clear-ml@users.noreply.github.com> Date: Wed, 19 Jul 2023 20:23:03 +0300 Subject: [PATCH 14/16] Update errata, clarify a few points --- docs/errata_breaking_change_gcs_sdk_1_11_x.md | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/errata_breaking_change_gcs_sdk_1_11_x.md b/docs/errata_breaking_change_gcs_sdk_1_11_x.md index bbbdd722..70813e65 100644 --- a/docs/errata_breaking_change_gcs_sdk_1_11_x.md +++ b/docs/errata_breaking_change_gcs_sdk_1_11_x.md @@ -2,14 +2,18 @@ ## Rationale -Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This causes issues accessing these objects directly from the ClearML SDK. This is relevant for URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK (note that access from the ClearML UI is still possible), you should perform one of the actions in the section below. +Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This causes issues accessing these objects directly from the ClearML SDK. This is relevant for URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK using the version 1.12 and above (note that access from the ClearML UI is still possible), you should perform one of the actions in the section below. ## Recommended Steps -The code snippets below should serve as an example rather than an actual conversion script. Depending on what object you're trying to fix, you should pick the respective lines of code from step 1 and 2. +The code snippets below should serve as an example rather than an actual conversion script. + +The general flow is that you will first need to download these files by a custom access method, then upload them with the fixed SDK version. Depending on what object you're trying to fix, you should pick the respective lines of code from step 1 and 2. + + 1. You need to be able to download objects (models, datasets, media, artifacts) registered by affected versions. See the code snippet below and adjust it according to your use case to be able to get a local copy of the object -``` +```python from clearml import Task, ImportModel from urllib.parse import unquote # <- you will need this @@ -38,7 +42,7 @@ local_path = StorageManager.get_local_copy(url) ``` 2. Once the object is downloaded locally, you can re-register it with the new version. See the snipped below and adjust according to your use case -``` +```python from clearml import Task, Dataset, OutputModel import os @@ -63,10 +67,10 @@ for sample in samples: ## Alternative methods -These methods are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata. +These methods are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter (i.e. modify **in-place**) the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata. **Method 1**: You can try to alter the existing unpublished experiments/models using the lower-level `APIClient` -``` +```python from clearml.backend_api.session.client import APIClient From 05c048d0bde236af6a1f8b3ae705e9a54d53ad1a Mon Sep 17 00:00:00 2001 From: alex-burlacu-clear-ml <123369351+alex-burlacu-clear-ml@users.noreply.github.com> Date: Wed, 19 Jul 2023 22:55:08 +0300 Subject: [PATCH 15/16] Fix style and grammar mistakes --- docs/errata_breaking_change_gcs_sdk_1_11_x.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/errata_breaking_change_gcs_sdk_1_11_x.md b/docs/errata_breaking_change_gcs_sdk_1_11_x.md index 70813e65..a640fc2b 100644 --- a/docs/errata_breaking_change_gcs_sdk_1_11_x.md +++ b/docs/errata_breaking_change_gcs_sdk_1_11_x.md @@ -2,13 +2,13 @@ ## Rationale -Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This causes issues accessing these objects directly from the ClearML SDK. This is relevant for URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK using the version 1.12 and above (note that access from the ClearML UI is still possible), you should perform one of the actions in the section below. +Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This behavior causes issues accessing registered objects using the ClearML SDK. The issue affects the URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK using version 1.12 and above (note that access from the ClearML UI is still possible), you should perform the actions listed in the section below. ## Recommended Steps The code snippets below should serve as an example rather than an actual conversion script. -The general flow is that you will first need to download these files by a custom access method, then upload them with the fixed SDK version. Depending on what object you're trying to fix, you should pick the respective lines of code from step 1 and 2. +The general flow is that you will first need to download these files by a custom access method, then upload them with the fixed SDK version. Depending on what object you're trying to fix, you should pick the respective lines of code from steps 1 and 2. @@ -67,7 +67,7 @@ for sample in samples: ## Alternative methods -These methods are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter (i.e. modify **in-place**) the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata. +The methods described next are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter (i.e., modify **in-place**) the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata. **Method 1**: You can try to alter the existing unpublished experiments/models using the lower-level `APIClient` ```python @@ -84,4 +84,4 @@ client.tasks.add_or_update_artifacts(task=task.id, force=True, artifacts=[{"uri" client.models.edit(model=model.id, force=True, uri=url) # For any unpublished Model ``` -**Method 2**: There's an option available only to those who self-host their ClearML server. It is possible to manually update the values registered in MongoDB, but beware - this is an advanced procedure that should be handled with extreme care, as it can lead to an inconsistent state if mishandled. +**Method 2**: There's an option available only to those who self-host their ClearML server. It is possible to manually update the values registered in MongoDB, but beware - this advanced procedure should be performed with extreme care, as it can lead to an inconsistent state if mishandled. From 09363b0d309148cb826ce58b046e39f630ff5c33 Mon Sep 17 00:00:00 2001 From: Alex Burlacu Date: Fri, 21 Jul 2023 14:23:32 +0300 Subject: [PATCH 16/16] Fix typo --- clearml/automation/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 49b646b2..076c8c7d 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -657,7 +657,7 @@ class PipelineController(object): :param function: A global function to convert into a standalone Task :param function_kwargs: Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults - Optional, pass input arguments to the function from other Tasks's output artifact. + Optional, pass input arguments to the function from other Tasks' output artifact. Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: {'numpy_matrix': 'aabbcc.answer'} :param function_return: Provide a list of names for all the results.