diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index eb2ecbcb..da578099 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -43,7 +43,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v1 + uses: github/codeql-action/init@v2 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -54,7 +54,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v1 + uses: github/codeql-action/autobuild@v2 # â„šī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -68,4 +68,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + uses: github/codeql-action/analyze@v2 diff --git a/clearml/automation/aws_driver.py b/clearml/automation/aws_driver.py index 603c0e03..1f0b75de 100644 --- a/clearml/automation/aws_driver.py +++ b/clearml/automation/aws_driver.py @@ -26,6 +26,7 @@ class AWSDriver(CloudDriver): """AWS Driver""" aws_access_key_id = attr.ib(validator=instance_of(str), default='') aws_secret_access_key = attr.ib(validator=instance_of(str), default='') + aws_session_token = attr.ib(validator=instance_of(str), default='') aws_region = attr.ib(validator=instance_of(str), default='') use_credentials_chain = attr.ib(validator=instance_of(bool), default=False) use_iam_instance_profile = attr.ib(validator=instance_of(bool), default=False) @@ -37,6 +38,7 @@ class AWSDriver(CloudDriver): obj = super().from_config(config) obj.aws_access_key_id = config['hyper_params'].get('cloud_credentials_key') obj.aws_secret_access_key = config['hyper_params'].get('cloud_credentials_secret') + obj.aws_session_token = config['hyper_params'].get('cloud_credentials_token') obj.aws_region = config['hyper_params'].get('cloud_credentials_region') obj.use_credentials_chain = config['hyper_params'].get('use_credentials_chain', False) obj.use_iam_instance_profile = config['hyper_params'].get('use_iam_instance_profile', False) @@ -50,26 +52,46 @@ class AWSDriver(CloudDriver): def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id): # user_data script will automatically run when the instance is started. it will install the required packages - # for clearml-agent configure it using environment variables and run clearml-agent on the required queue + # for clearml-agent, configure it using environment variables and run clearml-agent on the required queue + # Config reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/run_instances.html user_data = self.gen_user_data(worker_prefix, queue_name, task_id, resource_conf.get("cpu_only", False)) ec2 = boto3.client("ec2", **self.creds()) launch_specification = ConfigFactory.from_dict( { "ImageId": resource_conf["ami_id"], + "Monitoring": {'Enabled': bool(resource_conf.get('enable_monitoring', False))}, "InstanceType": resource_conf["instance_type"], - "BlockDeviceMappings": [ - { - "DeviceName": resource_conf["ebs_device_name"], - "Ebs": { - "VolumeSize": resource_conf["ebs_volume_size"], - "VolumeType": resource_conf["ebs_volume_type"], - }, - } - ], - "Placement": {"AvailabilityZone": resource_conf["availability_zone"]}, } ) + # handle EBS volumes (existing or new) + # Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html + if resource_conf.get("ebs_snapshot_id") and resource_conf.get("ebs_device_name"): + launch_specification["BlockDeviceMappings"] = [ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "SnapshotId": resource_conf["ebs_snapshot_id"] + } + } + ] + elif resource_conf.get("ebs_device_name"): + launch_specification["BlockDeviceMappings"] = [ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "VolumeSize": resource_conf.get("ebs_volume_size", 80), + "VolumeType": resource_conf.get("ebs_volume_type", "gp3") + } + } + ] + + if resource_conf.get("subnet_id", None): + launch_specification["SubnetId"] = resource_conf["subnet_id"] + elif resource_conf.get("availability_zone", None): + launch_specification["Placement"] = {"AvailabilityZone": resource_conf["availability_zone"]} + else: + raise Exception('subnet_id or availability_zone must to be specified in the config') if resource_conf.get("key_name", None): launch_specification["KeyName"] = resource_conf["key_name"] if resource_conf.get("security_group_ids", None): @@ -150,6 +172,7 @@ class AWSDriver(CloudDriver): creds.update({ 'aws_secret_access_key': self.aws_secret_access_key or None, 'aws_access_key_id': self.aws_access_key_id or None, + 'aws_session_token': self.aws_session_token or None, }) return creds diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 0b1c465e..076c8c7d 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -23,7 +23,7 @@ from .. import Logger from ..automation import ClearmlJob from ..backend_api import Session from ..backend_interface.task.populate import CreateFromFunction -from ..backend_interface.util import get_or_create_project +from ..backend_interface.util import get_or_create_project, mutually_exclusive from ..config import get_remote_task_id from ..debugging.log import LoggerRoot from ..errors import UsageError @@ -657,7 +657,7 @@ class PipelineController(object): :param function: A global function to convert into a standalone Task :param function_kwargs: Optional, provide subset of function arguments and default values to expose. If not provided automatically take all function arguments & defaults - Optional, pass input arguments to the function from other Tasks's output artifact. + Optional, pass input arguments to the function from other Tasks' output artifact. Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: {'numpy_matrix': 'aabbcc.answer'} :param function_return: Provide a list of names for all the results. @@ -1292,6 +1292,136 @@ class PipelineController(object): """ return self._pipeline_args + @classmethod + def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False): + # type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any + """ + Enqueue a PipelineController for execution, by adding it to an execution queue. + + .. note:: + A worker daemon must be listening at the queue for the worker to fetch the Task and execute it, + see `ClearML Agent <../clearml_agent>`_ in the ClearML Documentation. + + :param pipeline_controller: The PipelineController to enqueue. Specify a PipelineController object or PipelineController ID + :param queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified. + :param queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified. + :param bool force: If True, reset the PipelineController if necessary before enqueuing it + + :return: An enqueue JSON response. + + .. code-block:: javascript + + { + "queued": 1, + "updated": 1, + "fields": { + "status": "queued", + "status_reason": "", + "status_message": "", + "status_changed": "2020-02-24T15:05:35.426770+00:00", + "last_update": "2020-02-24T15:05:35.426770+00:00", + "execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf" + } + } + + - ``queued`` - The number of Tasks enqueued (an integer or ``null``). + - ``updated`` - The number of Tasks updated (an integer or ``null``). + - ``fields`` + + - ``status`` - The status of the experiment. + - ``status_reason`` - The reason for the last status change. + - ``status_message`` - Information about the status. + - ``status_changed`` - The last status change date and time (ISO 8601 format). + - ``last_update`` - The last Task update time, including Task creation, update, change, or events for this task (ISO 8601 format). + - ``execution.queue`` - The ID of the queue where the Task is enqueued. ``null`` indicates not enqueued. + """ + pipeline_controller = ( + pipeline_controller + if isinstance(pipeline_controller, PipelineController) + else cls.get(pipeline_id=pipeline_controller) + ) + return Task.enqueue(pipeline_controller._task, queue_name=queue_name, queue_id=queue_id, force=force) + + @classmethod + def get( + cls, + pipeline_id=None, # type: Optional[str] + pipeline_project=None, # type: Optional[str] + pipeline_name=None, # type: Optional[str] + pipeline_version=None, # type: Optional[str] + pipeline_tags=None, # type: Optional[Sequence[str]] + shallow_search=False # type: bool + ): + # type: (...) -> "PipelineController" + """ + Get a specific PipelineController. If multiple pipeline controllers are found, the pipeline controller + with the highest semantic version is returned. If no semantic version is found, the most recently + updated pipeline controller is returned. This function raises aan Exception if no pipeline controller + was found + + Note: In order to run the pipeline controller returned by this function, use PipelineController.enqueue + + :param pipeline_id: Requested PipelineController ID + :param pipeline_project: Requested PipelineController project + :param pipeline_name: Requested PipelineController name + :param pipeline_tags: Requested PipelineController tags (list of tag strings) + :param shallow_search: If True, search only the first 500 results (first page) + """ + mutually_exclusive(pipeline_id=pipeline_id, pipeline_project=pipeline_project, _require_at_least_one=False) + mutually_exclusive(pipeline_id=pipeline_id, pipeline_name=pipeline_name, _require_at_least_one=False) + if not pipeline_id: + pipeline_project_hidden = "{}/.pipelines/{}".format(pipeline_project, pipeline_name) + name_with_runtime_number_regex = r"^{}( #[0-9]+)*$".format(re.escape(pipeline_name)) + pipelines = Task._query_tasks( + pipeline_project=[pipeline_project_hidden], + task_name=name_with_runtime_number_regex, + fetch_only_first_page=False if not pipeline_version else shallow_search, + only_fields=["id"] if not pipeline_version else ["id", "runtime.version"], + system_tags=[cls._tag], + order_by=["-last_update"], + tags=pipeline_tags, + search_hidden=True, + _allow_extra_fields_=True, + ) + if pipelines: + if not pipeline_version: + pipeline_id = pipelines[0].id + current_version = None + for pipeline in pipelines: + if not pipeline.runtime: + continue + candidate_version = pipeline.runtime.get("version") + if not candidate_version or not Version.is_valid_version_string(candidate_version): + continue + if not current_version or Version(candidate_version) > current_version: + current_version = Version(candidate_version) + pipeline_id = pipeline.id + else: + for pipeline in pipelines: + if pipeline.runtime.get("version") == pipeline_version: + pipeline_id = pipeline.id + break + if not pipeline_id: + error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format(pipeline_project, pipeline_name) + if pipeline_version: + error_msg += ", pipeline_version={}".format(pipeline_version) + raise ValueError(error_msg) + pipeline_task = Task.get_task(task_id=pipeline_id) + pipeline_object = cls.__new__(cls) + pipeline_object._task = pipeline_task + pipeline_object._nodes = {} + pipeline_object._running_nodes = [] + try: + pipeline_object._deserialize(pipeline_task._get_configuration_dict(cls._config_section)) + except Exception: + pass + return pipeline_object + + @property + def id(self): + # type: () -> str + return self._task.id + @property def tags(self): # type: () -> List[str] diff --git a/clearml/backend_config/bucket_config.py b/clearml/backend_config/bucket_config.py index 49086cec..0bc5ac8e 100644 --- a/clearml/backend_config/bucket_config.py +++ b/clearml/backend_config/bucket_config.py @@ -31,7 +31,7 @@ class S3BucketConfig(object): acl = attrib(type=str, converter=_none_to_empty_string, default="") secure = attrib(type=bool, default=True) region = attrib(type=str, converter=_none_to_empty_string, default="") - verify = attrib(type=bool, default=True) + verify = attrib(type=bool, default=None) use_credentials_chain = attrib(type=bool, default=False) extra_args = attrib(type=dict, default=None) @@ -106,6 +106,7 @@ class S3BucketConfigurations(BaseBucketConfigurations): default_use_credentials_chain=False, default_token="", default_extra_args=None, + default_verify=None, ): super(S3BucketConfigurations, self).__init__() self._buckets = buckets if buckets else list() @@ -116,6 +117,7 @@ class S3BucketConfigurations(BaseBucketConfigurations): self._default_multipart = True self._default_use_credentials_chain = default_use_credentials_chain self._default_extra_args = default_extra_args + self._default_verify = default_verify @classmethod def from_config(cls, s3_configuration): @@ -129,6 +131,7 @@ class S3BucketConfigurations(BaseBucketConfigurations): default_region = s3_configuration.get("region", "") or getenv("AWS_DEFAULT_REGION", "") default_use_credentials_chain = s3_configuration.get("use_credentials_chain") or False default_extra_args = s3_configuration.get("extra_args") + default_verify = s3_configuration.get("verify", None) default_key = _none_to_empty_string(default_key) default_secret = _none_to_empty_string(default_secret) @@ -142,7 +145,8 @@ class S3BucketConfigurations(BaseBucketConfigurations): default_region, default_use_credentials_chain, default_token, - default_extra_args + default_extra_args, + default_verify, ) def add_config(self, bucket_config): diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 53465a37..5f4323f3 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -3,7 +3,6 @@ import itertools import json import logging import os -import re import sys import warnings from copy import copy @@ -34,12 +33,12 @@ from ...backend_interface.task.development.worker import DevWorker from ...backend_interface.session import SendError from ...backend_api import Session from ...backend_api.services import tasks, models, events, projects -from ...backend_api.session.defs import ENV_OFFLINE_MODE +# from ...backend_api.session.defs import ENV_OFFLINE_MODE from ...utilities.pyhocon import ConfigTree, ConfigFactory from ...utilities.config import config_dict_to_text, text_to_config_dict from ...errors import ArtifactUriDeleteError -from ..base import IdObjectBase, InterfaceBase +from ..base import IdObjectBase # , InterfaceBase from ..metrics import Metrics, Reporter from ..model import Model from ..setupuploadmixin import SetupUploadMixin @@ -376,7 +375,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): self._edit(type=tasks.TaskTypeEnum(task_type)) return id - def _set_storage_uri(self, value): value = value.rstrip('/') if value else None self._storage_uri = StorageHelper.conform_url(value) @@ -1406,7 +1404,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): Remove input models from the current task. Note that the models themselves are not deleted, but the tasks' reference to the models is removed. To delete the models themselves, see `Models.remove` - + :param models_to_remove: The models to remove from the task. Can be a list of ids, or of `BaseModel` (including its subclasses: `Model` and `InputModel`) """ @@ -2543,7 +2541,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): # type: (**Any) -> () for k, v in kwargs.items(): setattr(self.data, k, v) - offline_mode_folder = self.get_offline_mode_folder() + offline_mode_folder = self.get_offline_mode_folder() if not offline_mode_folder: return Path(offline_mode_folder).mkdir(parents=True, exist_ok=True) @@ -2807,21 +2805,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): res = cls._send(session=session, req=req, log=log) return res - @classmethod - def get_by_name(cls, task_name): - # type: (str) -> Task - """ - Returns the most recent task with the given name from anywhere in the system as a Task object. - - :param str task_name: The name of the task to search for. - - :return: Task object of the most recent task with that name. - """ - res = cls._send(cls._get_default_session(), tasks.GetAllRequest(name=exact_match_regex(task_name))) - - task = get_single_result(entity='task', query=task_name, results=res.response.tasks) - return cls(task_id=task.id) - @classmethod def get_task_output_log_web_page(cls, task_id, project_id=None, app_server_host=None): # type: (str, Optional[str], Optional[str]) -> str diff --git a/clearml/binding/frameworks/tensorflow_bind.py b/clearml/binding/frameworks/tensorflow_bind.py index fa0ac995..eb0366dc 100644 --- a/clearml/binding/frameworks/tensorflow_bind.py +++ b/clearml/binding/frameworks/tensorflow_bind.py @@ -724,17 +724,8 @@ class EventTrainsWriter(object): 'Received event without step, assuming step = {}'.format(step)) else: step = int(step) - # unlike other frameworks, tensorflow already accounts for the iteration number - # when continuing the training. we substract the smallest iteration such that we - # don't increment the step twice number - original_step = step - if EventTrainsWriter._current_task: - step -= EventTrainsWriter._current_task.get_initial_iteration() - # there can be a few metrics getting reported again, so the step can be negative - # for the first few reports - if step < 0 and original_step > 0: - step = 0 - + step = tweak_step(step) + self._max_step = max(self._max_step, step) if value_dicts is None: LoggerRoot.get_base_logger(TensorflowBinding).debug("Summary arrived without 'value'") @@ -1378,7 +1369,7 @@ class PatchTensorFlowEager(object): plugin_type = plugin_type[next(i for i, c in enumerate(plugin_type) if c >= 'A'):] if plugin_type.startswith('scalars'): event_writer._add_scalar(tag=str(tag), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), scalar_data=tensor.numpy()) elif plugin_type.startswith('images'): img_data_np = tensor.numpy() @@ -1386,19 +1377,19 @@ class PatchTensorFlowEager(object): tag=tag, step=step, **kwargs) elif plugin_type.startswith('histograms'): event_writer._add_histogram( - tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, + tag=str(tag), step=tweak_step(step), hist_data=tensor.numpy() ) elif plugin_type.startswith('text'): event_writer._add_text( - tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, + tag=str(tag), step=tweak_step(step), tensor_bytes=tensor.numpy() ) elif 'audio' in plugin_type: audio_bytes_list = [a for a in tensor.numpy().flatten() if a] for i, audio_bytes in enumerate(audio_bytes_list): event_writer._add_audio(tag=str(tag) + ('/{}'.format(i) if len(audio_bytes_list) > 1 else ''), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), values=None, audio_data=audio_bytes) else: pass @@ -1416,7 +1407,7 @@ class PatchTensorFlowEager(object): if event_writer and isinstance(step, int) or hasattr(step, 'numpy'): try: event_writer._add_scalar(tag=str(tag), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), scalar_data=value.numpy()) except Exception as ex: LoggerRoot.get_base_logger(TensorflowBinding).warning(str(ex)) @@ -1428,7 +1419,7 @@ class PatchTensorFlowEager(object): str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag) event_writer._add_scalar( tag=str_tag, - step=int(a_step.numpy()) if not isinstance(a_step, int) else a_step, + step=tweak_step(step), scalar_data=a_value.numpy()) except Exception as a_ex: LoggerRoot.get_base_logger(TensorflowBinding).warning( @@ -1458,7 +1449,7 @@ class PatchTensorFlowEager(object): if event_writer and isinstance(step, int) or hasattr(step, 'numpy'): try: event_writer._add_histogram( - tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, + tag=str(tag), step=tweak_step(step), hist_data=values.numpy() ) except Exception as ex: @@ -1471,7 +1462,7 @@ class PatchTensorFlowEager(object): str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag) event_writer._add_histogram( tag=str_tag, - step=int(a_step.numpy()) if not isinstance(a_step, int) else a_step, + step=tweak_step(a_step), hist_data=a_value.numpy() ) except Exception as a_ex: @@ -1549,11 +1540,11 @@ class PatchTensorFlowEager(object): 'colorspace': 'RGB', 'encodedImageString': img_data_np[i]} image_tag = str(tag) + '/sample_{}'.format(i - 2) if img_data_np.size > 3 else str(tag) event_writer._add_image(tag=image_tag, - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), img_data=img_data) else: event_writer._add_image_numpy(tag=str(tag), - step=int(step.numpy()) if not isinstance(step, int) else step, + step=tweak_step(step), img_data_np=img_data_np, max_keep_images=kwargs.get('max_images')) @@ -2299,3 +2290,15 @@ class PatchTensorflow2ModelIO(object): except Exception: pass return model + + +def tweak_step(step): + # noinspection PyBroadException + try: + step = int(step.numpy()) if not isinstance(step, int) else step + # unlike other frameworks, tensorflow already accounts for the iteration number + # when continuing the training. we substract the smallest iteration such that we + # don't increment the step twice number + return step - EventTrainsWriter._current_task.get_initial_iteration() + except Exception: + return step diff --git a/clearml/binding/gradio_bind.py b/clearml/binding/gradio_bind.py index d074302e..26d1d99c 100644 --- a/clearml/binding/gradio_bind.py +++ b/clearml/binding/gradio_bind.py @@ -70,7 +70,7 @@ class PatchGradio: # noinspection PyBroadException try: return original_fn(*args, **kwargs) - except Exception as e: + except Exception: del kwargs["root_path"] return original_fn(*args, **kwargs) diff --git a/clearml/cli/config/__main__.py b/clearml/cli/config/__main__.py index de006d3d..46a4ccb9 100644 --- a/clearml/cli/config/__main__.py +++ b/clearml/cli/config/__main__.py @@ -242,7 +242,7 @@ def parse_known_host(parsed_host): print('Assuming files and api ports are unchanged and use the same (' + parsed_host.scheme + ') protocol') api_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8008' + parsed_host.path web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path - files_host = parsed_host.scheme + "://" + parsed_host.netloc+ ':8081' + parsed_host.path + files_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8081' + parsed_host.path else: print("Warning! Could not parse host name") api_host = '' diff --git a/clearml/config/default/sdk.conf b/clearml/config/default/sdk.conf index bb92674c..63b15b8a 100644 --- a/clearml/config/default/sdk.conf +++ b/clearml/config/default/sdk.conf @@ -3,7 +3,7 @@ storage { cache { - # Defaults to system temp folder / cache + # Defaults to /clearml_cache default_base_dir: "~/.clearml/cache" # default_cache_manager_size: 100 } @@ -103,6 +103,8 @@ boto3 { pool_connections: 512 max_multipart_concurrency: 16 + multipart_threshold: 8388608 # 8MB + multipart_chunksize: 8388608 # 8MB } } google.storage { diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 05ae2d62..569fb743 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -2981,7 +2981,7 @@ class Dataset(object): task = Task.get_task(task_id=id_) dataset_struct_entry = { - "job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9 + "job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9 "status": task.status } # noinspection PyProtectedMember diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index 2d3f3fde..b3a407a7 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -116,6 +116,7 @@ class _Driver(object): cls._file_server_hosts = hosts return cls._file_server_hosts + class _HttpDriver(_Driver): """ LibCloud http/https adapter (simple, enough for now) """ @@ -401,6 +402,8 @@ class _Boto3Driver(_Driver): _min_pool_connections = 512 _max_multipart_concurrency = deferred_config('aws.boto3.max_multipart_concurrency', 16) + _multipart_threshold = deferred_config('aws.boto3.multipart_threshold', (1024 ** 2) * 8) # 8 MB + _multipart_chunksize = deferred_config('aws.boto3.multipart_chunksize', (1024 ** 2) * 8) _pool_connections = deferred_config('aws.boto3.pool_connections', 512) _connect_timeout = deferred_config('aws.boto3.connect_timeout', 60) _read_timeout = deferred_config('aws.boto3.read_timeout', 60) @@ -435,12 +438,18 @@ class _Boto3Driver(_Driver): self.name = name[5:] endpoint = (('https://' if cfg.secure else 'http://') + cfg.host) if cfg.host else None + verify = cfg.verify + if verify is True: + # True is a non-documented value for boto3, use None instead (which means verify) + print("Using boto3 verify=None instead of true") + verify = None + # boto3 client creation isn't thread-safe (client itself is) with self._creation_lock: boto_kwargs = { "endpoint_url": endpoint, "use_ssl": cfg.secure, - "verify": cfg.verify, + "verify": verify, "region_name": cfg.region or None, # None in case cfg.region is an empty string "config": botocore.client.Config( max_pool_connections=max( @@ -498,7 +507,9 @@ class _Boto3Driver(_Driver): container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries), + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize), Callback=callback, ExtraArgs=extra_args, ) @@ -512,6 +523,8 @@ class _Boto3Driver(_Driver): Config=boto3.s3.transfer.TransferConfig( use_threads=False, num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize, ), Callback=callback, ExtraArgs=extra_args @@ -535,7 +548,9 @@ class _Boto3Driver(_Driver): container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries), + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize), Callback=callback, ExtraArgs=extra_args, ) @@ -547,7 +562,10 @@ class _Boto3Driver(_Driver): file_path, object_name, Config=boto3.s3.transfer.TransferConfig( - use_threads=False, num_download_attempts=container.config.retries + use_threads=False, + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize ), Callback=callback, ExtraArgs=extra_args @@ -600,7 +618,9 @@ class _Boto3Driver(_Driver): config = boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries) + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize) total_size_mb = obj.content_length / (1024. * 1024.) remote_path = os.path.join(obj.container_name, obj.key) cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log) @@ -618,7 +638,9 @@ class _Boto3Driver(_Driver): Config = boto3.s3.transfer.TransferConfig( use_threads=container.config.multipart, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, - num_download_attempts=container.config.retries + num_download_attempts=container.config.retries, + multipart_threshold=self._multipart_threshold, + multipart_chunksize=self._multipart_chunksize ) obj.download_file(str(p), Callback=callback, Config=Config) @@ -1776,7 +1798,6 @@ class _FileStorageDriver(_Driver): return os.path.isfile(object_name) - class StorageHelper(object): """ Storage helper. Used by the entire system to download/upload files. @@ -2412,7 +2433,7 @@ class StorageHelper(object): result_dest_path = canonized_dest_path if return_canonized else dest_path - if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema + if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema # quote link result_dest_path = quote_url(result_dest_path, StorageHelper._quotable_uri_schemes) @@ -2430,7 +2451,7 @@ class StorageHelper(object): result_path = canonized_dest_path if return_canonized else dest_path - if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema + if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema # store original callback a_cb = cb @@ -2955,7 +2976,6 @@ class StorageHelper(object): ) - def normalize_local_path(local_path): """ Get a normalized local path diff --git a/clearml/task.py b/clearml/task.py index 1e28999f..e0110af7 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -7,6 +7,7 @@ import signal import sys import threading import time +import warnings from argparse import ArgumentParser from logging import getLogger from tempfile import mkstemp, mkdtemp @@ -860,6 +861,23 @@ class Task(_Task): return task + @classmethod + def get_by_name(cls, task_name): + # type: (str) -> TaskInstance + """ + + .. note:: + This method is deprecated, use :meth:`Task.get_task` instead. + + Returns the most recent task with the given name from anywhere in the system as a Task object. + + :param str task_name: The name of the task to search for. + + :return: Task object of the most recent task with that name. + """ + warnings.warn("Warning: 'Task.get_by_name' is deprecated. Use 'Task.get_task' instead", DeprecationWarning) + return cls.get_task(task_name=task_name) + @classmethod def get_task( cls, @@ -1213,8 +1231,8 @@ class Task(_Task): return cloned_task @classmethod - def enqueue(cls, task, queue_name=None, queue_id=None): - # type: (Union[Task, str], Optional[str], Optional[str]) -> Any + def enqueue(cls, task, queue_name=None, queue_id=None, force=False): + # type: (Union[Task, str], Optional[str], Optional[str], bool) -> Any """ Enqueue a Task for execution, by adding it to an execution queue. @@ -1225,6 +1243,7 @@ class Task(_Task): :param Task/str task: The Task to enqueue. Specify a Task object or Task ID. :param str queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified. :param str queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified. + :param bool force: If True, reset the Task if necessary before enqueuing it :return: An enqueue JSON response. @@ -1271,9 +1290,25 @@ class Task(_Task): raise ValueError('Could not find queue named "{}"'.format(queue_name)) req = tasks.EnqueueRequest(task=task_id, queue=queue_id) - res = cls._send(session=session, req=req) - if not res.ok(): - raise ValueError(res.response) + exception = None + res = None + try: + res = cls._send(session=session, req=req) + ok = res.ok() + except Exception as e: + exception = e + ok = False + if not ok: + if not force: + if res: + raise ValueError(res.response) + raise exception + task = cls.get_task(task_id=task) if isinstance(task, str) else task + task.reset(set_started_on_success=False, force=True) + req = tasks.EnqueueRequest(task=task_id, queue=queue_id) + res = cls._send(session=session, req=req) + if not res.ok(): + raise ValueError(res.response) resp = res.response return resp diff --git a/clearml/version.py b/clearml/version.py index aab78e83..666b2f71 100644 --- a/clearml/version.py +++ b/clearml/version.py @@ -1 +1 @@ -__version__ = '1.11.2rc0' +__version__ = '1.12.0' diff --git a/docs/clearml.conf b/docs/clearml.conf index 0bf896b1..e7b045c5 100644 --- a/docs/clearml.conf +++ b/docs/clearml.conf @@ -33,7 +33,7 @@ sdk { # } ] cache { - # Defaults to system temp folder / cache + # Defaults to /clearml_cache default_base_dir: "~/.clearml/cache" default_cache_manager_size: 100 } @@ -121,6 +121,8 @@ sdk { boto3 { pool_connections: 512 max_multipart_concurrency: 16 + multipart_threshold: 8388608 # 8MB + multipart_chunksize: 8388608 # 8MB } } google.storage { diff --git a/docs/errata_breaking_change_gcs_sdk_1_11_x.md b/docs/errata_breaking_change_gcs_sdk_1_11_x.md new file mode 100644 index 00000000..a640fc2b --- /dev/null +++ b/docs/errata_breaking_change_gcs_sdk_1_11_x.md @@ -0,0 +1,87 @@ +# Handling the Google Cloud Storage breaking change + +## Rationale + +Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This behavior causes issues accessing registered objects using the ClearML SDK. The issue affects the URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK using version 1.12 and above (note that access from the ClearML UI is still possible), you should perform the actions listed in the section below. + +## Recommended Steps + +The code snippets below should serve as an example rather than an actual conversion script. + +The general flow is that you will first need to download these files by a custom access method, then upload them with the fixed SDK version. Depending on what object you're trying to fix, you should pick the respective lines of code from steps 1 and 2. + + + +1. You need to be able to download objects (models, datasets, media, artifacts) registered by affected versions. See the code snippet below and adjust it according to your use case to be able to get a local copy of the object +```python +from clearml import Task, ImportModel +from urllib.parse import unquote # <- you will need this + + +ds_task = Task.get_task(dataset_id) # For Datasets +# OR +task = Task.get_task(task_id) # For Artifacts, Media, and Models + + +url = unquote(ds_task.artifacts['data'].url) # For Datasets +# OR +url = unquote(task.artifacts[artifact_name].url) # For Artifacts +# OR +model = InputModel(task.output_models_id['test_file']) # For Models associated to tasks +url = unquote(model.url) +# OR +model = InputModel(model_id) # For any Models +url = unquote(model.url) +# OR +samples = task.get_debug_samples(title, series) # For Media/Debug samples +sample_urls = [unquote(sample['url']) for sample in samples] + +local_path = StorageManager.get_local_copy(url) + +# NOTE: For Datasets you will need to unzip the `local_path` +``` + +2. Once the object is downloaded locally, you can re-register it with the new version. See the snipped below and adjust according to your use case +```python +from clearml import Task, Dataset, OutputModel +import os + + +ds = Dataset.create(dataset_name=task.name, dataset_projecte=task.get_project_name(), parents=[Dataset.get(dataset_id)]) # For Datasets +# OR +task = Task.get_task(task_name=task.name, project_name=task.get_project_name()) # For Artifacts, Media, and Models + + +ds.add_files(unzipped_local_path) # For Datasets +ds.finalize(auto_upload=True) +# OR +task.upload_artifact(name=artifact_name, artifact_object=local_path) # For Artifacts +# OR +model = OutputModel(task=task) # For any Models +model.update_weights(local_path) # note: if the original model was created with update_weights_package, + # preserve this behavior by saving the new one with update_weights_package too +# OR +for sample in samples: + task.get_logger().report_media(sample['metric'], sample['variant'], local_path=unquote(sample['url'])) # For Media/Debug samples +``` + +## Alternative methods + +The methods described next are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter (i.e., modify **in-place**) the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata. + +**Method 1**: You can try to alter the existing unpublished experiments/models using the lower-level `APIClient` +```python +from clearml.backend_api.session.client import APIClient + + +client = APIClient() + +client.tasks.add_or_update_artifacts(task=ds_task.id, force=True, artifacts=[{"uri": unquote(ds_task.artifacts['state'].url), "key": "state", "type": "dict"}]) +client.tasks.add_or_update_artifacts(task=ds_task.id, force=True, artifacts=[{"uri": unquote(ds_task.artifacts['data'].url), "key": "data", "type": "custom"}]) # For datasets on completed dataset uploads +# OR +client.tasks.add_or_update_artifacts(task=task.id, force=True, artifacts=[{"uri": unquote(url), "key": artifact_name, "type": "custom"}]) # For artifacts on completed tasks +# OR +client.models.edit(model=model.id, force=True, uri=url) # For any unpublished Model +``` + +**Method 2**: There's an option available only to those who self-host their ClearML server. It is possible to manually update the values registered in MongoDB, but beware - this advanced procedure should be performed with extreme care, as it can lead to an inconsistent state if mishandled. diff --git a/docs/trains.conf b/docs/trains.conf index a56a509f..f2d1ee84 100644 --- a/docs/trains.conf +++ b/docs/trains.conf @@ -20,7 +20,7 @@ sdk { storage { cache { - # Defaults to system temp folder / cache + # Defaults to /clearml_cache default_base_dir: "~/.clearml/cache" } } @@ -105,6 +105,8 @@ sdk { boto3 { pool_connections: 512 max_multipart_concurrency: 16 + multipart_threshold: 8388608 # 8MB + multipart_chunksize: 8388608 # 8MB } } google.storage { diff --git a/examples/pipeline/pipeline_from_decorator.py b/examples/pipeline/pipeline_from_decorator.py index a37baeb4..0a847ab0 100644 --- a/examples/pipeline/pipeline_from_decorator.py +++ b/examples/pipeline/pipeline_from_decorator.py @@ -5,20 +5,21 @@ from clearml import TaskTypes # Make the following function an independent pipeline component step # notice all package imports inside the function will be automatically logged as # required packages for the pipeline execution step -@PipelineDecorator.component(return_values=['data_frame'], cache=True, task_type=TaskTypes.data_processing) +@PipelineDecorator.component(return_values=["data_frame"], cache=True, task_type=TaskTypes.data_processing) def step_one(pickle_data_url: str, extra: int = 43): - print('step_one') + print("step_one") # make sure we have scikit-learn for this step, we need it to use to unpickle the object import sklearn # noqa import pickle import pandas as pd from clearml import StorageManager + local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) - with open(local_iris_pkl, 'rb') as f: + with open(local_iris_pkl, "rb") as f: iris = pickle.load(f) - data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) - data_frame.columns += ['target'] - data_frame['target'] = iris['target'] + data_frame = pd.DataFrame(iris["data"], columns=iris["feature_names"]) + data_frame.columns += ["target"] + data_frame["target"] = iris["target"] return data_frame @@ -28,18 +29,17 @@ def step_one(pickle_data_url: str, extra: int = 43): # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned tuple will be stored as an artifact named "X_train, X_test, y_train, y_test" @PipelineDecorator.component( - return_values=['X_train, X_test, y_train, y_test'], cache=True, task_type=TaskTypes.data_processing + return_values=["X_train", "X_test", "y_train", "y_test"], cache=True, task_type=TaskTypes.data_processing ) def step_two(data_frame, test_size=0.2, random_state=42): - print('step_two') + print("step_two") # make sure we have pandas for this step, we need it to use the data_frame import pandas as pd # noqa from sklearn.model_selection import train_test_split - y = data_frame['target'] - X = data_frame[(c for c in data_frame.columns if c != 'target')] - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=test_size, random_state=random_state - ) + + y = data_frame["target"] + X = data_frame[(c for c in data_frame.columns if c != "target")] + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state) return X_train, X_test, y_train, y_test @@ -49,37 +49,41 @@ def step_two(data_frame, test_size=0.2, random_state=42): # required packages for the pipeline execution step # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned object will be stored as an artifact named "model" -@PipelineDecorator.component(return_values=['model'], cache=True, task_type=TaskTypes.training) +@PipelineDecorator.component(return_values=["model"], cache=True, task_type=TaskTypes.training) def step_three(X_train, y_train): - print('step_three') + print("step_three") # make sure we have pandas for this step, we need it to use the data_frame import pandas as pd # noqa from sklearn.linear_model import LogisticRegression - model = LogisticRegression(solver='liblinear', multi_class='auto') + + model = LogisticRegression(solver="liblinear", multi_class="auto") model.fit(X_train, y_train) return model + # Make the following function an independent pipeline component step # notice all package imports inside the function will be automatically logged as # required packages for the pipeline execution step # Specifying `return_values` makes sure the function step can return an object to the pipeline logic # In this case, the returned object will be stored as an artifact named "accuracy" -@PipelineDecorator.component(return_values=['accuracy'], cache=True, task_type=TaskTypes.qc) +@PipelineDecorator.component(return_values=["accuracy"], cache=True, task_type=TaskTypes.qc) def step_four(model, X_data, Y_data): from sklearn.linear_model import LogisticRegression # noqa from sklearn.metrics import accuracy_score + Y_pred = model.predict(X_data) return accuracy_score(Y_data, Y_pred, normalize=True) + # The actual pipeline execution context # notice that all pipeline component function calls are actually executed remotely # Only when a return value is used, the pipeline logic will wait for the component execution to complete -@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.5') -def executing_pipeline(pickle_url, mock_parameter='mock'): - print('pipeline args:', pickle_url, mock_parameter) +@PipelineDecorator.pipeline(name="custom pipeline logic", project="examples", version="0.0.5") +def executing_pipeline(pickle_url, mock_parameter="mock"): + print("pipeline args:", pickle_url, mock_parameter) # Use the pipeline argument to start the pipeline and pass it ot the first step - print('launch step one') + print("launch step one") data_frame = step_one(pickle_url) # Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`) @@ -87,17 +91,17 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): # the pipeline logic does not actually load the artifact itself. # When actually passing the `data_frame` object into a new step, # It waits for the creating step/function (`step_one`) to complete the execution - print('launch step two') + print("launch step two") X_train, X_test, y_train, y_test = step_two(data_frame) - print('launch step three') + print("launch step three") model = step_three(X_train, y_train) # Notice since we are "printing" the `model` object, # we actually deserialize the object from the third step, and thus wait for the third step to complete. - print('returned model: {}'.format(model)) + print("returned model: {}".format(model)) - print('launch step four') + print("launch step four") accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test) # Notice since we are "printing" the `accuracy` object, @@ -105,7 +109,7 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): print(f"Accuracy={accuracy}%") -if __name__ == '__main__': +if __name__ == "__main__": # set the pipeline steps default execution queue (per specific step we can override it with the decorator) # PipelineDecorator.set_default_execution_queue('default') # Run the pipeline steps as subprocesses on the current machine, great for local executions @@ -114,7 +118,7 @@ if __name__ == '__main__': # Start the pipeline execution logic. executing_pipeline( - pickle_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl', + pickle_url="https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl", ) - print('process completed') + print("process completed")