This commit is contained in:
revital 2023-07-25 10:22:00 +03:00
commit 0cb99b93ee
17 changed files with 405 additions and 110 deletions

View File

@ -43,7 +43,7 @@ jobs:
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL - name: Initialize CodeQL
uses: github/codeql-action/init@v1 uses: github/codeql-action/init@v2
with: with:
languages: ${{ matrix.language }} languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file. # If you wish to specify custom queries, you can do so here or in a config file.
@ -54,7 +54,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below) # If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild - name: Autobuild
uses: github/codeql-action/autobuild@v1 uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell. # Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl # 📚 https://git.io/JvXDl
@ -68,4 +68,4 @@ jobs:
# make release # make release
- name: Perform CodeQL Analysis - name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1 uses: github/codeql-action/analyze@v2

View File

@ -26,6 +26,7 @@ class AWSDriver(CloudDriver):
"""AWS Driver""" """AWS Driver"""
aws_access_key_id = attr.ib(validator=instance_of(str), default='') aws_access_key_id = attr.ib(validator=instance_of(str), default='')
aws_secret_access_key = attr.ib(validator=instance_of(str), default='') aws_secret_access_key = attr.ib(validator=instance_of(str), default='')
aws_session_token = attr.ib(validator=instance_of(str), default='')
aws_region = attr.ib(validator=instance_of(str), default='') aws_region = attr.ib(validator=instance_of(str), default='')
use_credentials_chain = attr.ib(validator=instance_of(bool), default=False) use_credentials_chain = attr.ib(validator=instance_of(bool), default=False)
use_iam_instance_profile = attr.ib(validator=instance_of(bool), default=False) use_iam_instance_profile = attr.ib(validator=instance_of(bool), default=False)
@ -37,6 +38,7 @@ class AWSDriver(CloudDriver):
obj = super().from_config(config) obj = super().from_config(config)
obj.aws_access_key_id = config['hyper_params'].get('cloud_credentials_key') obj.aws_access_key_id = config['hyper_params'].get('cloud_credentials_key')
obj.aws_secret_access_key = config['hyper_params'].get('cloud_credentials_secret') obj.aws_secret_access_key = config['hyper_params'].get('cloud_credentials_secret')
obj.aws_session_token = config['hyper_params'].get('cloud_credentials_token')
obj.aws_region = config['hyper_params'].get('cloud_credentials_region') obj.aws_region = config['hyper_params'].get('cloud_credentials_region')
obj.use_credentials_chain = config['hyper_params'].get('use_credentials_chain', False) obj.use_credentials_chain = config['hyper_params'].get('use_credentials_chain', False)
obj.use_iam_instance_profile = config['hyper_params'].get('use_iam_instance_profile', False) obj.use_iam_instance_profile = config['hyper_params'].get('use_iam_instance_profile', False)
@ -50,26 +52,46 @@ class AWSDriver(CloudDriver):
def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id): def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id):
# user_data script will automatically run when the instance is started. it will install the required packages # user_data script will automatically run when the instance is started. it will install the required packages
# for clearml-agent configure it using environment variables and run clearml-agent on the required queue # for clearml-agent, configure it using environment variables and run clearml-agent on the required queue
# Config reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/run_instances.html
user_data = self.gen_user_data(worker_prefix, queue_name, task_id, resource_conf.get("cpu_only", False)) user_data = self.gen_user_data(worker_prefix, queue_name, task_id, resource_conf.get("cpu_only", False))
ec2 = boto3.client("ec2", **self.creds()) ec2 = boto3.client("ec2", **self.creds())
launch_specification = ConfigFactory.from_dict( launch_specification = ConfigFactory.from_dict(
{ {
"ImageId": resource_conf["ami_id"], "ImageId": resource_conf["ami_id"],
"Monitoring": {'Enabled': bool(resource_conf.get('enable_monitoring', False))},
"InstanceType": resource_conf["instance_type"], "InstanceType": resource_conf["instance_type"],
"BlockDeviceMappings": [
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"VolumeSize": resource_conf["ebs_volume_size"],
"VolumeType": resource_conf["ebs_volume_type"],
},
}
],
"Placement": {"AvailabilityZone": resource_conf["availability_zone"]},
} }
) )
# handle EBS volumes (existing or new)
# Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/block-device-mapping-concepts.html
if resource_conf.get("ebs_snapshot_id") and resource_conf.get("ebs_device_name"):
launch_specification["BlockDeviceMappings"] = [
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"SnapshotId": resource_conf["ebs_snapshot_id"]
}
}
]
elif resource_conf.get("ebs_device_name"):
launch_specification["BlockDeviceMappings"] = [
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"VolumeSize": resource_conf.get("ebs_volume_size", 80),
"VolumeType": resource_conf.get("ebs_volume_type", "gp3")
}
}
]
if resource_conf.get("subnet_id", None):
launch_specification["SubnetId"] = resource_conf["subnet_id"]
elif resource_conf.get("availability_zone", None):
launch_specification["Placement"] = {"AvailabilityZone": resource_conf["availability_zone"]}
else:
raise Exception('subnet_id or availability_zone must to be specified in the config')
if resource_conf.get("key_name", None): if resource_conf.get("key_name", None):
launch_specification["KeyName"] = resource_conf["key_name"] launch_specification["KeyName"] = resource_conf["key_name"]
if resource_conf.get("security_group_ids", None): if resource_conf.get("security_group_ids", None):
@ -150,6 +172,7 @@ class AWSDriver(CloudDriver):
creds.update({ creds.update({
'aws_secret_access_key': self.aws_secret_access_key or None, 'aws_secret_access_key': self.aws_secret_access_key or None,
'aws_access_key_id': self.aws_access_key_id or None, 'aws_access_key_id': self.aws_access_key_id or None,
'aws_session_token': self.aws_session_token or None,
}) })
return creds return creds

View File

@ -23,7 +23,7 @@ from .. import Logger
from ..automation import ClearmlJob from ..automation import ClearmlJob
from ..backend_api import Session from ..backend_api import Session
from ..backend_interface.task.populate import CreateFromFunction from ..backend_interface.task.populate import CreateFromFunction
from ..backend_interface.util import get_or_create_project from ..backend_interface.util import get_or_create_project, mutually_exclusive
from ..config import get_remote_task_id from ..config import get_remote_task_id
from ..debugging.log import LoggerRoot from ..debugging.log import LoggerRoot
from ..errors import UsageError from ..errors import UsageError
@ -657,7 +657,7 @@ class PipelineController(object):
:param function: A global function to convert into a standalone Task :param function: A global function to convert into a standalone Task
:param function_kwargs: Optional, provide subset of function arguments and default values to expose. :param function_kwargs: Optional, provide subset of function arguments and default values to expose.
If not provided automatically take all function arguments & defaults If not provided automatically take all function arguments & defaults
Optional, pass input arguments to the function from other Tasks's output artifact. Optional, pass input arguments to the function from other Tasks' output artifact.
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'} {'numpy_matrix': 'aabbcc.answer'}
:param function_return: Provide a list of names for all the results. :param function_return: Provide a list of names for all the results.
@ -1292,6 +1292,136 @@ class PipelineController(object):
""" """
return self._pipeline_args return self._pipeline_args
@classmethod
def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False):
# type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any
"""
Enqueue a PipelineController for execution, by adding it to an execution queue.
.. note::
A worker daemon must be listening at the queue for the worker to fetch the Task and execute it,
see `ClearML Agent <../clearml_agent>`_ in the ClearML Documentation.
:param pipeline_controller: The PipelineController to enqueue. Specify a PipelineController object or PipelineController ID
:param queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified.
:param queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified.
:param bool force: If True, reset the PipelineController if necessary before enqueuing it
:return: An enqueue JSON response.
.. code-block:: javascript
{
"queued": 1,
"updated": 1,
"fields": {
"status": "queued",
"status_reason": "",
"status_message": "",
"status_changed": "2020-02-24T15:05:35.426770+00:00",
"last_update": "2020-02-24T15:05:35.426770+00:00",
"execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf"
}
}
- ``queued`` - The number of Tasks enqueued (an integer or ``null``).
- ``updated`` - The number of Tasks updated (an integer or ``null``).
- ``fields``
- ``status`` - The status of the experiment.
- ``status_reason`` - The reason for the last status change.
- ``status_message`` - Information about the status.
- ``status_changed`` - The last status change date and time (ISO 8601 format).
- ``last_update`` - The last Task update time, including Task creation, update, change, or events for this task (ISO 8601 format).
- ``execution.queue`` - The ID of the queue where the Task is enqueued. ``null`` indicates not enqueued.
"""
pipeline_controller = (
pipeline_controller
if isinstance(pipeline_controller, PipelineController)
else cls.get(pipeline_id=pipeline_controller)
)
return Task.enqueue(pipeline_controller._task, queue_name=queue_name, queue_id=queue_id, force=force)
@classmethod
def get(
cls,
pipeline_id=None, # type: Optional[str]
pipeline_project=None, # type: Optional[str]
pipeline_name=None, # type: Optional[str]
pipeline_version=None, # type: Optional[str]
pipeline_tags=None, # type: Optional[Sequence[str]]
shallow_search=False # type: bool
):
# type: (...) -> "PipelineController"
"""
Get a specific PipelineController. If multiple pipeline controllers are found, the pipeline controller
with the highest semantic version is returned. If no semantic version is found, the most recently
updated pipeline controller is returned. This function raises aan Exception if no pipeline controller
was found
Note: In order to run the pipeline controller returned by this function, use PipelineController.enqueue
:param pipeline_id: Requested PipelineController ID
:param pipeline_project: Requested PipelineController project
:param pipeline_name: Requested PipelineController name
:param pipeline_tags: Requested PipelineController tags (list of tag strings)
:param shallow_search: If True, search only the first 500 results (first page)
"""
mutually_exclusive(pipeline_id=pipeline_id, pipeline_project=pipeline_project, _require_at_least_one=False)
mutually_exclusive(pipeline_id=pipeline_id, pipeline_name=pipeline_name, _require_at_least_one=False)
if not pipeline_id:
pipeline_project_hidden = "{}/.pipelines/{}".format(pipeline_project, pipeline_name)
name_with_runtime_number_regex = r"^{}( #[0-9]+)*$".format(re.escape(pipeline_name))
pipelines = Task._query_tasks(
pipeline_project=[pipeline_project_hidden],
task_name=name_with_runtime_number_regex,
fetch_only_first_page=False if not pipeline_version else shallow_search,
only_fields=["id"] if not pipeline_version else ["id", "runtime.version"],
system_tags=[cls._tag],
order_by=["-last_update"],
tags=pipeline_tags,
search_hidden=True,
_allow_extra_fields_=True,
)
if pipelines:
if not pipeline_version:
pipeline_id = pipelines[0].id
current_version = None
for pipeline in pipelines:
if not pipeline.runtime:
continue
candidate_version = pipeline.runtime.get("version")
if not candidate_version or not Version.is_valid_version_string(candidate_version):
continue
if not current_version or Version(candidate_version) > current_version:
current_version = Version(candidate_version)
pipeline_id = pipeline.id
else:
for pipeline in pipelines:
if pipeline.runtime.get("version") == pipeline_version:
pipeline_id = pipeline.id
break
if not pipeline_id:
error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format(pipeline_project, pipeline_name)
if pipeline_version:
error_msg += ", pipeline_version={}".format(pipeline_version)
raise ValueError(error_msg)
pipeline_task = Task.get_task(task_id=pipeline_id)
pipeline_object = cls.__new__(cls)
pipeline_object._task = pipeline_task
pipeline_object._nodes = {}
pipeline_object._running_nodes = []
try:
pipeline_object._deserialize(pipeline_task._get_configuration_dict(cls._config_section))
except Exception:
pass
return pipeline_object
@property
def id(self):
# type: () -> str
return self._task.id
@property @property
def tags(self): def tags(self):
# type: () -> List[str] # type: () -> List[str]

View File

@ -31,7 +31,7 @@ class S3BucketConfig(object):
acl = attrib(type=str, converter=_none_to_empty_string, default="") acl = attrib(type=str, converter=_none_to_empty_string, default="")
secure = attrib(type=bool, default=True) secure = attrib(type=bool, default=True)
region = attrib(type=str, converter=_none_to_empty_string, default="") region = attrib(type=str, converter=_none_to_empty_string, default="")
verify = attrib(type=bool, default=True) verify = attrib(type=bool, default=None)
use_credentials_chain = attrib(type=bool, default=False) use_credentials_chain = attrib(type=bool, default=False)
extra_args = attrib(type=dict, default=None) extra_args = attrib(type=dict, default=None)
@ -106,6 +106,7 @@ class S3BucketConfigurations(BaseBucketConfigurations):
default_use_credentials_chain=False, default_use_credentials_chain=False,
default_token="", default_token="",
default_extra_args=None, default_extra_args=None,
default_verify=None,
): ):
super(S3BucketConfigurations, self).__init__() super(S3BucketConfigurations, self).__init__()
self._buckets = buckets if buckets else list() self._buckets = buckets if buckets else list()
@ -116,6 +117,7 @@ class S3BucketConfigurations(BaseBucketConfigurations):
self._default_multipart = True self._default_multipart = True
self._default_use_credentials_chain = default_use_credentials_chain self._default_use_credentials_chain = default_use_credentials_chain
self._default_extra_args = default_extra_args self._default_extra_args = default_extra_args
self._default_verify = default_verify
@classmethod @classmethod
def from_config(cls, s3_configuration): def from_config(cls, s3_configuration):
@ -129,6 +131,7 @@ class S3BucketConfigurations(BaseBucketConfigurations):
default_region = s3_configuration.get("region", "") or getenv("AWS_DEFAULT_REGION", "") default_region = s3_configuration.get("region", "") or getenv("AWS_DEFAULT_REGION", "")
default_use_credentials_chain = s3_configuration.get("use_credentials_chain") or False default_use_credentials_chain = s3_configuration.get("use_credentials_chain") or False
default_extra_args = s3_configuration.get("extra_args") default_extra_args = s3_configuration.get("extra_args")
default_verify = s3_configuration.get("verify", None)
default_key = _none_to_empty_string(default_key) default_key = _none_to_empty_string(default_key)
default_secret = _none_to_empty_string(default_secret) default_secret = _none_to_empty_string(default_secret)
@ -142,7 +145,8 @@ class S3BucketConfigurations(BaseBucketConfigurations):
default_region, default_region,
default_use_credentials_chain, default_use_credentials_chain,
default_token, default_token,
default_extra_args default_extra_args,
default_verify,
) )
def add_config(self, bucket_config): def add_config(self, bucket_config):

View File

@ -3,7 +3,6 @@ import itertools
import json import json
import logging import logging
import os import os
import re
import sys import sys
import warnings import warnings
from copy import copy from copy import copy
@ -34,12 +33,12 @@ from ...backend_interface.task.development.worker import DevWorker
from ...backend_interface.session import SendError from ...backend_interface.session import SendError
from ...backend_api import Session from ...backend_api import Session
from ...backend_api.services import tasks, models, events, projects from ...backend_api.services import tasks, models, events, projects
from ...backend_api.session.defs import ENV_OFFLINE_MODE # from ...backend_api.session.defs import ENV_OFFLINE_MODE
from ...utilities.pyhocon import ConfigTree, ConfigFactory from ...utilities.pyhocon import ConfigTree, ConfigFactory
from ...utilities.config import config_dict_to_text, text_to_config_dict from ...utilities.config import config_dict_to_text, text_to_config_dict
from ...errors import ArtifactUriDeleteError from ...errors import ArtifactUriDeleteError
from ..base import IdObjectBase, InterfaceBase from ..base import IdObjectBase # , InterfaceBase
from ..metrics import Metrics, Reporter from ..metrics import Metrics, Reporter
from ..model import Model from ..model import Model
from ..setupuploadmixin import SetupUploadMixin from ..setupuploadmixin import SetupUploadMixin
@ -376,7 +375,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._edit(type=tasks.TaskTypeEnum(task_type)) self._edit(type=tasks.TaskTypeEnum(task_type))
return id return id
def _set_storage_uri(self, value): def _set_storage_uri(self, value):
value = value.rstrip('/') if value else None value = value.rstrip('/') if value else None
self._storage_uri = StorageHelper.conform_url(value) self._storage_uri = StorageHelper.conform_url(value)
@ -1406,7 +1404,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
Remove input models from the current task. Note that the models themselves are not deleted, Remove input models from the current task. Note that the models themselves are not deleted,
but the tasks' reference to the models is removed. but the tasks' reference to the models is removed.
To delete the models themselves, see `Models.remove` To delete the models themselves, see `Models.remove`
:param models_to_remove: The models to remove from the task. Can be a list of ids, :param models_to_remove: The models to remove from the task. Can be a list of ids,
or of `BaseModel` (including its subclasses: `Model` and `InputModel`) or of `BaseModel` (including its subclasses: `Model` and `InputModel`)
""" """
@ -2543,7 +2541,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
# type: (**Any) -> () # type: (**Any) -> ()
for k, v in kwargs.items(): for k, v in kwargs.items():
setattr(self.data, k, v) setattr(self.data, k, v)
offline_mode_folder = self.get_offline_mode_folder() offline_mode_folder = self.get_offline_mode_folder()
if not offline_mode_folder: if not offline_mode_folder:
return return
Path(offline_mode_folder).mkdir(parents=True, exist_ok=True) Path(offline_mode_folder).mkdir(parents=True, exist_ok=True)
@ -2807,21 +2805,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
res = cls._send(session=session, req=req, log=log) res = cls._send(session=session, req=req, log=log)
return res return res
@classmethod
def get_by_name(cls, task_name):
# type: (str) -> Task
"""
Returns the most recent task with the given name from anywhere in the system as a Task object.
:param str task_name: The name of the task to search for.
:return: Task object of the most recent task with that name.
"""
res = cls._send(cls._get_default_session(), tasks.GetAllRequest(name=exact_match_regex(task_name)))
task = get_single_result(entity='task', query=task_name, results=res.response.tasks)
return cls(task_id=task.id)
@classmethod @classmethod
def get_task_output_log_web_page(cls, task_id, project_id=None, app_server_host=None): def get_task_output_log_web_page(cls, task_id, project_id=None, app_server_host=None):
# type: (str, Optional[str], Optional[str]) -> str # type: (str, Optional[str], Optional[str]) -> str

View File

@ -724,17 +724,8 @@ class EventTrainsWriter(object):
'Received event without step, assuming step = {}'.format(step)) 'Received event without step, assuming step = {}'.format(step))
else: else:
step = int(step) step = int(step)
# unlike other frameworks, tensorflow already accounts for the iteration number step = tweak_step(step)
# when continuing the training. we substract the smallest iteration such that we
# don't increment the step twice number
original_step = step
if EventTrainsWriter._current_task:
step -= EventTrainsWriter._current_task.get_initial_iteration()
# there can be a few metrics getting reported again, so the step can be negative
# for the first few reports
if step < 0 and original_step > 0:
step = 0
self._max_step = max(self._max_step, step) self._max_step = max(self._max_step, step)
if value_dicts is None: if value_dicts is None:
LoggerRoot.get_base_logger(TensorflowBinding).debug("Summary arrived without 'value'") LoggerRoot.get_base_logger(TensorflowBinding).debug("Summary arrived without 'value'")
@ -1378,7 +1369,7 @@ class PatchTensorFlowEager(object):
plugin_type = plugin_type[next(i for i, c in enumerate(plugin_type) if c >= 'A'):] plugin_type = plugin_type[next(i for i, c in enumerate(plugin_type) if c >= 'A'):]
if plugin_type.startswith('scalars'): if plugin_type.startswith('scalars'):
event_writer._add_scalar(tag=str(tag), event_writer._add_scalar(tag=str(tag),
step=int(step.numpy()) if not isinstance(step, int) else step, step=tweak_step(step),
scalar_data=tensor.numpy()) scalar_data=tensor.numpy())
elif plugin_type.startswith('images'): elif plugin_type.startswith('images'):
img_data_np = tensor.numpy() img_data_np = tensor.numpy()
@ -1386,19 +1377,19 @@ class PatchTensorFlowEager(object):
tag=tag, step=step, **kwargs) tag=tag, step=step, **kwargs)
elif plugin_type.startswith('histograms'): elif plugin_type.startswith('histograms'):
event_writer._add_histogram( event_writer._add_histogram(
tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, tag=str(tag), step=tweak_step(step),
hist_data=tensor.numpy() hist_data=tensor.numpy()
) )
elif plugin_type.startswith('text'): elif plugin_type.startswith('text'):
event_writer._add_text( event_writer._add_text(
tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, tag=str(tag), step=tweak_step(step),
tensor_bytes=tensor.numpy() tensor_bytes=tensor.numpy()
) )
elif 'audio' in plugin_type: elif 'audio' in plugin_type:
audio_bytes_list = [a for a in tensor.numpy().flatten() if a] audio_bytes_list = [a for a in tensor.numpy().flatten() if a]
for i, audio_bytes in enumerate(audio_bytes_list): for i, audio_bytes in enumerate(audio_bytes_list):
event_writer._add_audio(tag=str(tag) + ('/{}'.format(i) if len(audio_bytes_list) > 1 else ''), event_writer._add_audio(tag=str(tag) + ('/{}'.format(i) if len(audio_bytes_list) > 1 else ''),
step=int(step.numpy()) if not isinstance(step, int) else step, step=tweak_step(step),
values=None, audio_data=audio_bytes) values=None, audio_data=audio_bytes)
else: else:
pass pass
@ -1416,7 +1407,7 @@ class PatchTensorFlowEager(object):
if event_writer and isinstance(step, int) or hasattr(step, 'numpy'): if event_writer and isinstance(step, int) or hasattr(step, 'numpy'):
try: try:
event_writer._add_scalar(tag=str(tag), event_writer._add_scalar(tag=str(tag),
step=int(step.numpy()) if not isinstance(step, int) else step, step=tweak_step(step),
scalar_data=value.numpy()) scalar_data=value.numpy())
except Exception as ex: except Exception as ex:
LoggerRoot.get_base_logger(TensorflowBinding).warning(str(ex)) LoggerRoot.get_base_logger(TensorflowBinding).warning(str(ex))
@ -1428,7 +1419,7 @@ class PatchTensorFlowEager(object):
str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag) str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag)
event_writer._add_scalar( event_writer._add_scalar(
tag=str_tag, tag=str_tag,
step=int(a_step.numpy()) if not isinstance(a_step, int) else a_step, step=tweak_step(step),
scalar_data=a_value.numpy()) scalar_data=a_value.numpy())
except Exception as a_ex: except Exception as a_ex:
LoggerRoot.get_base_logger(TensorflowBinding).warning( LoggerRoot.get_base_logger(TensorflowBinding).warning(
@ -1458,7 +1449,7 @@ class PatchTensorFlowEager(object):
if event_writer and isinstance(step, int) or hasattr(step, 'numpy'): if event_writer and isinstance(step, int) or hasattr(step, 'numpy'):
try: try:
event_writer._add_histogram( event_writer._add_histogram(
tag=str(tag), step=int(step.numpy()) if not isinstance(step, int) else step, tag=str(tag), step=tweak_step(step),
hist_data=values.numpy() hist_data=values.numpy()
) )
except Exception as ex: except Exception as ex:
@ -1471,7 +1462,7 @@ class PatchTensorFlowEager(object):
str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag) str_tag = str_tag.decode() if isinstance(str_tag, bytes) else str(str_tag)
event_writer._add_histogram( event_writer._add_histogram(
tag=str_tag, tag=str_tag,
step=int(a_step.numpy()) if not isinstance(a_step, int) else a_step, step=tweak_step(a_step),
hist_data=a_value.numpy() hist_data=a_value.numpy()
) )
except Exception as a_ex: except Exception as a_ex:
@ -1549,11 +1540,11 @@ class PatchTensorFlowEager(object):
'colorspace': 'RGB', 'encodedImageString': img_data_np[i]} 'colorspace': 'RGB', 'encodedImageString': img_data_np[i]}
image_tag = str(tag) + '/sample_{}'.format(i - 2) if img_data_np.size > 3 else str(tag) image_tag = str(tag) + '/sample_{}'.format(i - 2) if img_data_np.size > 3 else str(tag)
event_writer._add_image(tag=image_tag, event_writer._add_image(tag=image_tag,
step=int(step.numpy()) if not isinstance(step, int) else step, step=tweak_step(step),
img_data=img_data) img_data=img_data)
else: else:
event_writer._add_image_numpy(tag=str(tag), event_writer._add_image_numpy(tag=str(tag),
step=int(step.numpy()) if not isinstance(step, int) else step, step=tweak_step(step),
img_data_np=img_data_np, img_data_np=img_data_np,
max_keep_images=kwargs.get('max_images')) max_keep_images=kwargs.get('max_images'))
@ -2299,3 +2290,15 @@ class PatchTensorflow2ModelIO(object):
except Exception: except Exception:
pass pass
return model return model
def tweak_step(step):
# noinspection PyBroadException
try:
step = int(step.numpy()) if not isinstance(step, int) else step
# unlike other frameworks, tensorflow already accounts for the iteration number
# when continuing the training. we substract the smallest iteration such that we
# don't increment the step twice number
return step - EventTrainsWriter._current_task.get_initial_iteration()
except Exception:
return step

View File

@ -70,7 +70,7 @@ class PatchGradio:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
return original_fn(*args, **kwargs) return original_fn(*args, **kwargs)
except Exception as e: except Exception:
del kwargs["root_path"] del kwargs["root_path"]
return original_fn(*args, **kwargs) return original_fn(*args, **kwargs)

View File

@ -242,7 +242,7 @@ def parse_known_host(parsed_host):
print('Assuming files and api ports are unchanged and use the same (' + parsed_host.scheme + ') protocol') print('Assuming files and api ports are unchanged and use the same (' + parsed_host.scheme + ') protocol')
api_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8008' + parsed_host.path api_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8008' + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc+ ':8081' + parsed_host.path files_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8081' + parsed_host.path
else: else:
print("Warning! Could not parse host name") print("Warning! Could not parse host name")
api_host = '' api_host = ''

View File

@ -3,7 +3,7 @@
storage { storage {
cache { cache {
# Defaults to system temp folder / cache # Defaults to <system_temp_folder>/clearml_cache
default_base_dir: "~/.clearml/cache" default_base_dir: "~/.clearml/cache"
# default_cache_manager_size: 100 # default_cache_manager_size: 100
} }
@ -103,6 +103,8 @@
boto3 { boto3 {
pool_connections: 512 pool_connections: 512
max_multipart_concurrency: 16 max_multipart_concurrency: 16
multipart_threshold: 8388608 # 8MB
multipart_chunksize: 8388608 # 8MB
} }
} }
google.storage { google.storage {

View File

@ -2981,7 +2981,7 @@ class Dataset(object):
task = Task.get_task(task_id=id_) task = Task.get_task(task_id=id_)
dataset_struct_entry = { dataset_struct_entry = {
"job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9 "job_id": id_[len("offline-"):] if id_.startswith("offline-") else id_, # .removeprefix not supported < Python 3.9
"status": task.status "status": task.status
} }
# noinspection PyProtectedMember # noinspection PyProtectedMember

View File

@ -116,6 +116,7 @@ class _Driver(object):
cls._file_server_hosts = hosts cls._file_server_hosts = hosts
return cls._file_server_hosts return cls._file_server_hosts
class _HttpDriver(_Driver): class _HttpDriver(_Driver):
""" LibCloud http/https adapter (simple, enough for now) """ """ LibCloud http/https adapter (simple, enough for now) """
@ -401,6 +402,8 @@ class _Boto3Driver(_Driver):
_min_pool_connections = 512 _min_pool_connections = 512
_max_multipart_concurrency = deferred_config('aws.boto3.max_multipart_concurrency', 16) _max_multipart_concurrency = deferred_config('aws.boto3.max_multipart_concurrency', 16)
_multipart_threshold = deferred_config('aws.boto3.multipart_threshold', (1024 ** 2) * 8) # 8 MB
_multipart_chunksize = deferred_config('aws.boto3.multipart_chunksize', (1024 ** 2) * 8)
_pool_connections = deferred_config('aws.boto3.pool_connections', 512) _pool_connections = deferred_config('aws.boto3.pool_connections', 512)
_connect_timeout = deferred_config('aws.boto3.connect_timeout', 60) _connect_timeout = deferred_config('aws.boto3.connect_timeout', 60)
_read_timeout = deferred_config('aws.boto3.read_timeout', 60) _read_timeout = deferred_config('aws.boto3.read_timeout', 60)
@ -435,12 +438,18 @@ class _Boto3Driver(_Driver):
self.name = name[5:] self.name = name[5:]
endpoint = (('https://' if cfg.secure else 'http://') + cfg.host) if cfg.host else None endpoint = (('https://' if cfg.secure else 'http://') + cfg.host) if cfg.host else None
verify = cfg.verify
if verify is True:
# True is a non-documented value for boto3, use None instead (which means verify)
print("Using boto3 verify=None instead of true")
verify = None
# boto3 client creation isn't thread-safe (client itself is) # boto3 client creation isn't thread-safe (client itself is)
with self._creation_lock: with self._creation_lock:
boto_kwargs = { boto_kwargs = {
"endpoint_url": endpoint, "endpoint_url": endpoint,
"use_ssl": cfg.secure, "use_ssl": cfg.secure,
"verify": cfg.verify, "verify": verify,
"region_name": cfg.region or None, # None in case cfg.region is an empty string "region_name": cfg.region or None, # None in case cfg.region is an empty string
"config": botocore.client.Config( "config": botocore.client.Config(
max_pool_connections=max( max_pool_connections=max(
@ -498,7 +507,9 @@ class _Boto3Driver(_Driver):
container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig( container.bucket.upload_fileobj(stream, object_name, Config=boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart, use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries), num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold,
multipart_chunksize=self._multipart_chunksize),
Callback=callback, Callback=callback,
ExtraArgs=extra_args, ExtraArgs=extra_args,
) )
@ -512,6 +523,8 @@ class _Boto3Driver(_Driver):
Config=boto3.s3.transfer.TransferConfig( Config=boto3.s3.transfer.TransferConfig(
use_threads=False, use_threads=False,
num_download_attempts=container.config.retries, num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold,
multipart_chunksize=self._multipart_chunksize,
), ),
Callback=callback, Callback=callback,
ExtraArgs=extra_args ExtraArgs=extra_args
@ -535,7 +548,9 @@ class _Boto3Driver(_Driver):
container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig( container.bucket.upload_file(file_path, object_name, Config=boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart, use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries), num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold,
multipart_chunksize=self._multipart_chunksize),
Callback=callback, Callback=callback,
ExtraArgs=extra_args, ExtraArgs=extra_args,
) )
@ -547,7 +562,10 @@ class _Boto3Driver(_Driver):
file_path, file_path,
object_name, object_name,
Config=boto3.s3.transfer.TransferConfig( Config=boto3.s3.transfer.TransferConfig(
use_threads=False, num_download_attempts=container.config.retries use_threads=False,
num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold,
multipart_chunksize=self._multipart_chunksize
), ),
Callback=callback, Callback=callback,
ExtraArgs=extra_args ExtraArgs=extra_args
@ -600,7 +618,9 @@ class _Boto3Driver(_Driver):
config = boto3.s3.transfer.TransferConfig( config = boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart, use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries) num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold,
multipart_chunksize=self._multipart_chunksize)
total_size_mb = obj.content_length / (1024. * 1024.) total_size_mb = obj.content_length / (1024. * 1024.)
remote_path = os.path.join(obj.container_name, obj.key) remote_path = os.path.join(obj.container_name, obj.key)
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log) cb = DownloadProgressReport(total_size_mb, verbose, remote_path, log)
@ -618,7 +638,9 @@ class _Boto3Driver(_Driver):
Config = boto3.s3.transfer.TransferConfig( Config = boto3.s3.transfer.TransferConfig(
use_threads=container.config.multipart, use_threads=container.config.multipart,
max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1, max_concurrency=self._max_multipart_concurrency if container.config.multipart else 1,
num_download_attempts=container.config.retries num_download_attempts=container.config.retries,
multipart_threshold=self._multipart_threshold,
multipart_chunksize=self._multipart_chunksize
) )
obj.download_file(str(p), Callback=callback, Config=Config) obj.download_file(str(p), Callback=callback, Config=Config)
@ -1776,7 +1798,6 @@ class _FileStorageDriver(_Driver):
return os.path.isfile(object_name) return os.path.isfile(object_name)
class StorageHelper(object): class StorageHelper(object):
""" Storage helper. """ Storage helper.
Used by the entire system to download/upload files. Used by the entire system to download/upload files.
@ -2412,7 +2433,7 @@ class StorageHelper(object):
result_dest_path = canonized_dest_path if return_canonized else dest_path result_dest_path = canonized_dest_path if return_canonized else dest_path
if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema if self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema
# quote link # quote link
result_dest_path = quote_url(result_dest_path, StorageHelper._quotable_uri_schemes) result_dest_path = quote_url(result_dest_path, StorageHelper._quotable_uri_schemes)
@ -2430,7 +2451,7 @@ class StorageHelper(object):
result_path = canonized_dest_path if return_canonized else dest_path result_path = canonized_dest_path if return_canonized else dest_path
if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema if cb and self.scheme in StorageHelper._quotable_uri_schemes: # TODO: fix-driver-schema
# store original callback # store original callback
a_cb = cb a_cb = cb
@ -2955,7 +2976,6 @@ class StorageHelper(object):
) )
def normalize_local_path(local_path): def normalize_local_path(local_path):
""" """
Get a normalized local path Get a normalized local path

View File

@ -7,6 +7,7 @@ import signal
import sys import sys
import threading import threading
import time import time
import warnings
from argparse import ArgumentParser from argparse import ArgumentParser
from logging import getLogger from logging import getLogger
from tempfile import mkstemp, mkdtemp from tempfile import mkstemp, mkdtemp
@ -860,6 +861,23 @@ class Task(_Task):
return task return task
@classmethod
def get_by_name(cls, task_name):
# type: (str) -> TaskInstance
"""
.. note::
This method is deprecated, use :meth:`Task.get_task` instead.
Returns the most recent task with the given name from anywhere in the system as a Task object.
:param str task_name: The name of the task to search for.
:return: Task object of the most recent task with that name.
"""
warnings.warn("Warning: 'Task.get_by_name' is deprecated. Use 'Task.get_task' instead", DeprecationWarning)
return cls.get_task(task_name=task_name)
@classmethod @classmethod
def get_task( def get_task(
cls, cls,
@ -1213,8 +1231,8 @@ class Task(_Task):
return cloned_task return cloned_task
@classmethod @classmethod
def enqueue(cls, task, queue_name=None, queue_id=None): def enqueue(cls, task, queue_name=None, queue_id=None, force=False):
# type: (Union[Task, str], Optional[str], Optional[str]) -> Any # type: (Union[Task, str], Optional[str], Optional[str], bool) -> Any
""" """
Enqueue a Task for execution, by adding it to an execution queue. Enqueue a Task for execution, by adding it to an execution queue.
@ -1225,6 +1243,7 @@ class Task(_Task):
:param Task/str task: The Task to enqueue. Specify a Task object or Task ID. :param Task/str task: The Task to enqueue. Specify a Task object or Task ID.
:param str queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified. :param str queue_name: The name of the queue. If not specified, then ``queue_id`` must be specified.
:param str queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified. :param str queue_id: The ID of the queue. If not specified, then ``queue_name`` must be specified.
:param bool force: If True, reset the Task if necessary before enqueuing it
:return: An enqueue JSON response. :return: An enqueue JSON response.
@ -1271,9 +1290,25 @@ class Task(_Task):
raise ValueError('Could not find queue named "{}"'.format(queue_name)) raise ValueError('Could not find queue named "{}"'.format(queue_name))
req = tasks.EnqueueRequest(task=task_id, queue=queue_id) req = tasks.EnqueueRequest(task=task_id, queue=queue_id)
res = cls._send(session=session, req=req) exception = None
if not res.ok(): res = None
raise ValueError(res.response) try:
res = cls._send(session=session, req=req)
ok = res.ok()
except Exception as e:
exception = e
ok = False
if not ok:
if not force:
if res:
raise ValueError(res.response)
raise exception
task = cls.get_task(task_id=task) if isinstance(task, str) else task
task.reset(set_started_on_success=False, force=True)
req = tasks.EnqueueRequest(task=task_id, queue=queue_id)
res = cls._send(session=session, req=req)
if not res.ok():
raise ValueError(res.response)
resp = res.response resp = res.response
return resp return resp

View File

@ -1 +1 @@
__version__ = '1.11.2rc0' __version__ = '1.12.0'

View File

@ -33,7 +33,7 @@ sdk {
# } # }
] ]
cache { cache {
# Defaults to system temp folder / cache # Defaults to <system_temp_folder>/clearml_cache
default_base_dir: "~/.clearml/cache" default_base_dir: "~/.clearml/cache"
default_cache_manager_size: 100 default_cache_manager_size: 100
} }
@ -121,6 +121,8 @@ sdk {
boto3 { boto3 {
pool_connections: 512 pool_connections: 512
max_multipart_concurrency: 16 max_multipart_concurrency: 16
multipart_threshold: 8388608 # 8MB
multipart_chunksize: 8388608 # 8MB
} }
} }
google.storage { google.storage {

View File

@ -0,0 +1,87 @@
# Handling the Google Cloud Storage breaking change
## Rationale
Due to an issue with ClearML SDK versions 1.11.x, URLs of objects uploaded to the Google Cloud Storage were stored in the ClearML backend as a quoted string. This behavior causes issues accessing registered objects using the ClearML SDK. The issue affects the URLs of models, datasets, artifacts, and media files/debug samples. In case you have such objects uploaded with the affected ClearML SDK versions and wish to be able to access them programmatically using the ClearML SDK using version 1.12 and above (note that access from the ClearML UI is still possible), you should perform the actions listed in the section below.
## Recommended Steps
The code snippets below should serve as an example rather than an actual conversion script.
The general flow is that you will first need to download these files by a custom access method, then upload them with the fixed SDK version. Depending on what object you're trying to fix, you should pick the respective lines of code from steps 1 and 2.
1. You need to be able to download objects (models, datasets, media, artifacts) registered by affected versions. See the code snippet below and adjust it according to your use case to be able to get a local copy of the object
```python
from clearml import Task, ImportModel
from urllib.parse import unquote # <- you will need this
ds_task = Task.get_task(dataset_id) # For Datasets
# OR
task = Task.get_task(task_id) # For Artifacts, Media, and Models
url = unquote(ds_task.artifacts['data'].url) # For Datasets
# OR
url = unquote(task.artifacts[artifact_name].url) # For Artifacts
# OR
model = InputModel(task.output_models_id['test_file']) # For Models associated to tasks
url = unquote(model.url)
# OR
model = InputModel(model_id) # For any Models
url = unquote(model.url)
# OR
samples = task.get_debug_samples(title, series) # For Media/Debug samples
sample_urls = [unquote(sample['url']) for sample in samples]
local_path = StorageManager.get_local_copy(url)
# NOTE: For Datasets you will need to unzip the `local_path`
```
2. Once the object is downloaded locally, you can re-register it with the new version. See the snipped below and adjust according to your use case
```python
from clearml import Task, Dataset, OutputModel
import os
ds = Dataset.create(dataset_name=task.name, dataset_projecte=task.get_project_name(), parents=[Dataset.get(dataset_id)]) # For Datasets
# OR
task = Task.get_task(task_name=task.name, project_name=task.get_project_name()) # For Artifacts, Media, and Models
ds.add_files(unzipped_local_path) # For Datasets
ds.finalize(auto_upload=True)
# OR
task.upload_artifact(name=artifact_name, artifact_object=local_path) # For Artifacts
# OR
model = OutputModel(task=task) # For any Models
model.update_weights(local_path) # note: if the original model was created with update_weights_package,
# preserve this behavior by saving the new one with update_weights_package too
# OR
for sample in samples:
task.get_logger().report_media(sample['metric'], sample['variant'], local_path=unquote(sample['url'])) # For Media/Debug samples
```
## Alternative methods
The methods described next are more advanced (read "more likely to mess up"). If you're unsure whether to use them or not, better don't. Both methods described below will alter (i.e., modify **in-place**) the existing objects. Note that you still need to run the code from step 1 to have access to all required metadata.
**Method 1**: You can try to alter the existing unpublished experiments/models using the lower-level `APIClient`
```python
from clearml.backend_api.session.client import APIClient
client = APIClient()
client.tasks.add_or_update_artifacts(task=ds_task.id, force=True, artifacts=[{"uri": unquote(ds_task.artifacts['state'].url), "key": "state", "type": "dict"}])
client.tasks.add_or_update_artifacts(task=ds_task.id, force=True, artifacts=[{"uri": unquote(ds_task.artifacts['data'].url), "key": "data", "type": "custom"}]) # For datasets on completed dataset uploads
# OR
client.tasks.add_or_update_artifacts(task=task.id, force=True, artifacts=[{"uri": unquote(url), "key": artifact_name, "type": "custom"}]) # For artifacts on completed tasks
# OR
client.models.edit(model=model.id, force=True, uri=url) # For any unpublished Model
```
**Method 2**: There's an option available only to those who self-host their ClearML server. It is possible to manually update the values registered in MongoDB, but beware - this advanced procedure should be performed with extreme care, as it can lead to an inconsistent state if mishandled.

View File

@ -20,7 +20,7 @@ sdk {
storage { storage {
cache { cache {
# Defaults to system temp folder / cache # Defaults to <system_temp_folder>/clearml_cache
default_base_dir: "~/.clearml/cache" default_base_dir: "~/.clearml/cache"
} }
} }
@ -105,6 +105,8 @@ sdk {
boto3 { boto3 {
pool_connections: 512 pool_connections: 512
max_multipart_concurrency: 16 max_multipart_concurrency: 16
multipart_threshold: 8388608 # 8MB
multipart_chunksize: 8388608 # 8MB
} }
} }
google.storage { google.storage {

View File

@ -5,20 +5,21 @@ from clearml import TaskTypes
# Make the following function an independent pipeline component step # Make the following function an independent pipeline component step
# notice all package imports inside the function will be automatically logged as # notice all package imports inside the function will be automatically logged as
# required packages for the pipeline execution step # required packages for the pipeline execution step
@PipelineDecorator.component(return_values=['data_frame'], cache=True, task_type=TaskTypes.data_processing) @PipelineDecorator.component(return_values=["data_frame"], cache=True, task_type=TaskTypes.data_processing)
def step_one(pickle_data_url: str, extra: int = 43): def step_one(pickle_data_url: str, extra: int = 43):
print('step_one') print("step_one")
# make sure we have scikit-learn for this step, we need it to use to unpickle the object # make sure we have scikit-learn for this step, we need it to use to unpickle the object
import sklearn # noqa import sklearn # noqa
import pickle import pickle
import pandas as pd import pandas as pd
from clearml import StorageManager from clearml import StorageManager
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f: with open(local_iris_pkl, "rb") as f:
iris = pickle.load(f) iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) data_frame = pd.DataFrame(iris["data"], columns=iris["feature_names"])
data_frame.columns += ['target'] data_frame.columns += ["target"]
data_frame['target'] = iris['target'] data_frame["target"] = iris["target"]
return data_frame return data_frame
@ -28,18 +29,17 @@ def step_one(pickle_data_url: str, extra: int = 43):
# Specifying `return_values` makes sure the function step can return an object to the pipeline logic # Specifying `return_values` makes sure the function step can return an object to the pipeline logic
# In this case, the returned tuple will be stored as an artifact named "X_train, X_test, y_train, y_test" # In this case, the returned tuple will be stored as an artifact named "X_train, X_test, y_train, y_test"
@PipelineDecorator.component( @PipelineDecorator.component(
return_values=['X_train, X_test, y_train, y_test'], cache=True, task_type=TaskTypes.data_processing return_values=["X_train", "X_test", "y_train", "y_test"], cache=True, task_type=TaskTypes.data_processing
) )
def step_two(data_frame, test_size=0.2, random_state=42): def step_two(data_frame, test_size=0.2, random_state=42):
print('step_two') print("step_two")
# make sure we have pandas for this step, we need it to use the data_frame # make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa import pandas as pd # noqa
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')] y = data_frame["target"]
X_train, X_test, y_train, y_test = train_test_split( X = data_frame[(c for c in data_frame.columns if c != "target")]
X, y, test_size=test_size, random_state=random_state X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
)
return X_train, X_test, y_train, y_test return X_train, X_test, y_train, y_test
@ -49,37 +49,41 @@ def step_two(data_frame, test_size=0.2, random_state=42):
# required packages for the pipeline execution step # required packages for the pipeline execution step
# Specifying `return_values` makes sure the function step can return an object to the pipeline logic # Specifying `return_values` makes sure the function step can return an object to the pipeline logic
# In this case, the returned object will be stored as an artifact named "model" # In this case, the returned object will be stored as an artifact named "model"
@PipelineDecorator.component(return_values=['model'], cache=True, task_type=TaskTypes.training) @PipelineDecorator.component(return_values=["model"], cache=True, task_type=TaskTypes.training)
def step_three(X_train, y_train): def step_three(X_train, y_train):
print('step_three') print("step_three")
# make sure we have pandas for this step, we need it to use the data_frame # make sure we have pandas for this step, we need it to use the data_frame
import pandas as pd # noqa import pandas as pd # noqa
from sklearn.linear_model import LogisticRegression from sklearn.linear_model import LogisticRegression
model = LogisticRegression(solver='liblinear', multi_class='auto')
model = LogisticRegression(solver="liblinear", multi_class="auto")
model.fit(X_train, y_train) model.fit(X_train, y_train)
return model return model
# Make the following function an independent pipeline component step # Make the following function an independent pipeline component step
# notice all package imports inside the function will be automatically logged as # notice all package imports inside the function will be automatically logged as
# required packages for the pipeline execution step # required packages for the pipeline execution step
# Specifying `return_values` makes sure the function step can return an object to the pipeline logic # Specifying `return_values` makes sure the function step can return an object to the pipeline logic
# In this case, the returned object will be stored as an artifact named "accuracy" # In this case, the returned object will be stored as an artifact named "accuracy"
@PipelineDecorator.component(return_values=['accuracy'], cache=True, task_type=TaskTypes.qc) @PipelineDecorator.component(return_values=["accuracy"], cache=True, task_type=TaskTypes.qc)
def step_four(model, X_data, Y_data): def step_four(model, X_data, Y_data):
from sklearn.linear_model import LogisticRegression # noqa from sklearn.linear_model import LogisticRegression # noqa
from sklearn.metrics import accuracy_score from sklearn.metrics import accuracy_score
Y_pred = model.predict(X_data) Y_pred = model.predict(X_data)
return accuracy_score(Y_data, Y_pred, normalize=True) return accuracy_score(Y_data, Y_pred, normalize=True)
# The actual pipeline execution context # The actual pipeline execution context
# notice that all pipeline component function calls are actually executed remotely # notice that all pipeline component function calls are actually executed remotely
# Only when a return value is used, the pipeline logic will wait for the component execution to complete # Only when a return value is used, the pipeline logic will wait for the component execution to complete
@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.5') @PipelineDecorator.pipeline(name="custom pipeline logic", project="examples", version="0.0.5")
def executing_pipeline(pickle_url, mock_parameter='mock'): def executing_pipeline(pickle_url, mock_parameter="mock"):
print('pipeline args:', pickle_url, mock_parameter) print("pipeline args:", pickle_url, mock_parameter)
# Use the pipeline argument to start the pipeline and pass it ot the first step # Use the pipeline argument to start the pipeline and pass it ot the first step
print('launch step one') print("launch step one")
data_frame = step_one(pickle_url) data_frame = step_one(pickle_url)
# Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`) # Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`)
@ -87,17 +91,17 @@ def executing_pipeline(pickle_url, mock_parameter='mock'):
# the pipeline logic does not actually load the artifact itself. # the pipeline logic does not actually load the artifact itself.
# When actually passing the `data_frame` object into a new step, # When actually passing the `data_frame` object into a new step,
# It waits for the creating step/function (`step_one`) to complete the execution # It waits for the creating step/function (`step_one`) to complete the execution
print('launch step two') print("launch step two")
X_train, X_test, y_train, y_test = step_two(data_frame) X_train, X_test, y_train, y_test = step_two(data_frame)
print('launch step three') print("launch step three")
model = step_three(X_train, y_train) model = step_three(X_train, y_train)
# Notice since we are "printing" the `model` object, # Notice since we are "printing" the `model` object,
# we actually deserialize the object from the third step, and thus wait for the third step to complete. # we actually deserialize the object from the third step, and thus wait for the third step to complete.
print('returned model: {}'.format(model)) print("returned model: {}".format(model))
print('launch step four') print("launch step four")
accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test) accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test)
# Notice since we are "printing" the `accuracy` object, # Notice since we are "printing" the `accuracy` object,
@ -105,7 +109,7 @@ def executing_pipeline(pickle_url, mock_parameter='mock'):
print(f"Accuracy={accuracy}%") print(f"Accuracy={accuracy}%")
if __name__ == '__main__': if __name__ == "__main__":
# set the pipeline steps default execution queue (per specific step we can override it with the decorator) # set the pipeline steps default execution queue (per specific step we can override it with the decorator)
# PipelineDecorator.set_default_execution_queue('default') # PipelineDecorator.set_default_execution_queue('default')
# Run the pipeline steps as subprocesses on the current machine, great for local executions # Run the pipeline steps as subprocesses on the current machine, great for local executions
@ -114,7 +118,7 @@ if __name__ == '__main__':
# Start the pipeline execution logic. # Start the pipeline execution logic.
executing_pipeline( executing_pipeline(
pickle_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl', pickle_url="https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl",
) )
print('process completed') print("process completed")