This commit is contained in:
revital 2023-06-04 11:07:50 +03:00
commit 2f6a7ffcfe
4 changed files with 117 additions and 65 deletions

View File

@ -4,7 +4,7 @@ import logging
try: try:
from jsonargparse import ArgumentParser from jsonargparse import ArgumentParser
from jsonargparse.namespace import Namespace from jsonargparse.namespace import Namespace
from jsonargparse.util import Path from jsonargparse.util import Path, change_to_path_dir
except ImportError: except ImportError:
ArgumentParser = None ArgumentParser = None
@ -30,6 +30,7 @@ class PatchJsonArgParse(object):
_command_name = "subcommand" _command_name = "subcommand"
_section_name = "Args" _section_name = "Args"
__remote_task_params = {} __remote_task_params = {}
__remote_task_params_dict = {}
__patched = False __patched = False
@classmethod @classmethod
@ -54,7 +55,7 @@ class PatchJsonArgParse(object):
) )
@classmethod @classmethod
def _update_task_args(cls): def _update_task_args(cls, parser=None, subcommand=None):
if running_remotely() or not cls._current_task or not cls._args: if running_remotely() or not cls._current_task or not cls._args:
return return
args = {} args = {}
@ -65,7 +66,7 @@ class PatchJsonArgParse(object):
if k in cls._args_type: if k in cls._args_type:
args_type[key_with_section] = cls._args_type[k] args_type[key_with_section] = cls._args_type[k]
continue continue
if not verify_basic_type(v) and v: if not verify_basic_type(v, basic_types=(float, int, bool, str, type(None))) and v:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
if isinstance(v, Namespace) or (isinstance(v, list) and all(isinstance(sub_v, Namespace) for sub_v in v)): if isinstance(v, Namespace) or (isinstance(v, list) and all(isinstance(sub_v, Namespace) for sub_v in v)):
@ -78,8 +79,24 @@ class PatchJsonArgParse(object):
args[key_with_section] = str(v) args[key_with_section] = str(v)
except Exception: except Exception:
pass pass
args, args_type = cls.__delete_config_args(parser, args, args_type, subcommand=subcommand)
cls._current_task._set_parameters(args, __update=True, __parameters_types=args_type) cls._current_task._set_parameters(args, __update=True, __parameters_types=args_type)
@classmethod
def __delete_config_args(cls, parser, args, args_type, subcommand=None):
if not parser:
return args, args_type
paths = PatchJsonArgParse.__get_paths_from_dict(cls._args)
for path in paths:
args_to_delete = PatchJsonArgParse.__get_args_from_path(parser, path, subcommand=subcommand)
for arg_to_delete_key, arg_to_delete_value in args_to_delete.items():
key_with_section = cls._section_name + cls._args_sep + arg_to_delete_key
if key_with_section in args and args[key_with_section] == arg_to_delete_value:
del args[key_with_section]
if key_with_section in args_type:
del args_type[key_with_section]
return args, args_type
@staticmethod @staticmethod
def _adapt_typehints(original_fn, val, *args, **kwargs): def _adapt_typehints(original_fn, val, *args, **kwargs):
if not PatchJsonArgParse._current_task or not running_remotely(): if not PatchJsonArgParse._current_task or not running_remotely():
@ -97,7 +114,7 @@ class PatchJsonArgParse(object):
return original_fn(obj, *args, **kwargs) return original_fn(obj, *args, **kwargs)
if running_remotely(): if running_remotely():
try: try:
PatchJsonArgParse._load_task_params() PatchJsonArgParse._load_task_params(parser=obj)
params = PatchJsonArgParse.__remote_task_params_dict params = PatchJsonArgParse.__remote_task_params_dict
params_namespace = Namespace() params_namespace = Namespace()
for k, v in params.items(): for k, v in params.items():
@ -132,37 +149,68 @@ class PatchJsonArgParse(object):
del PatchJsonArgParse._args[subcommand] del PatchJsonArgParse._args[subcommand]
PatchJsonArgParse._args.update(subcommand_args) PatchJsonArgParse._args.update(subcommand_args)
PatchJsonArgParse._args = {k: v for k, v in PatchJsonArgParse._args.items()} PatchJsonArgParse._args = {k: v for k, v in PatchJsonArgParse._args.items()}
PatchJsonArgParse._update_task_args() PatchJsonArgParse._update_task_args(parser=obj, subcommand=subcommand)
except Exception as e: except Exception as e:
logging.getLogger(__file__).warning("Failed parsing jsonargparse arguments: {}".format(e)) logging.getLogger(__file__).warning("Failed parsing jsonargparse arguments: {}".format(e))
return parsed_args return parsed_args
@staticmethod @classmethod
def _load_task_params(): def _load_task_params(cls, parser=None):
if not PatchJsonArgParse.__remote_task_params: if cls.__remote_task_params:
from clearml import Task return
from clearml import Task
t = Task.get_task(task_id=get_remote_task_id()) t = Task.get_task(task_id=get_remote_task_id())
# noinspection PyProtectedMember # noinspection PyProtectedMember
PatchJsonArgParse.__remote_task_params = t._get_task_property("hyperparams") or {} cls.__remote_task_params = t._get_task_property("hyperparams") or {}
params_dict = t.get_parameters(backwards_compatibility=False, cast=True) params_dict = t.get_parameters(backwards_compatibility=False, cast=True)
for key, section_param in PatchJsonArgParse.__remote_task_params[PatchJsonArgParse._section_name].items(): for key, section_param in cls.__remote_task_params[cls._section_name].items():
if section_param.type == PatchJsonArgParse.namespace_type: if section_param.type == cls.namespace_type:
params_dict[ params_dict[
"{}/{}".format(PatchJsonArgParse._section_name, key) "{}/{}".format(cls._section_name, key)
] = PatchJsonArgParse._get_namespace_from_json(section_param.value) ] = cls._get_namespace_from_json(section_param.value)
elif section_param.type == PatchJsonArgParse.path_type: elif section_param.type == cls.path_type:
params_dict[ params_dict[
"{}/{}".format(PatchJsonArgParse._section_name, key) "{}/{}".format(cls._section_name, key)
] = PatchJsonArgParse._get_path_from_json(section_param.value) ] = cls._get_path_from_json(section_param.value)
elif (not section_param.type or section_param.type == "NoneType") and not section_param.value: elif (not section_param.type or section_param.type == "NoneType") and not section_param.value:
params_dict["{}/{}".format(PatchJsonArgParse._section_name, key)] = None params_dict["{}/{}".format(cls._section_name, key)] = None
skip = len(PatchJsonArgParse._section_name) + 1 skip = len(cls._section_name) + 1
PatchJsonArgParse.__remote_task_params_dict = { cls.__remote_task_params_dict = {
k[skip:]: v k[skip:]: v
for k, v in params_dict.items() for k, v in params_dict.items()
if k.startswith(PatchJsonArgParse._section_name + PatchJsonArgParse._args_sep) if k.startswith(cls._section_name + cls._args_sep)
} }
cls.__update_remote_task_params_dict_based_on_paths(parser)
@classmethod
def __update_remote_task_params_dict_based_on_paths(cls, parser):
paths = PatchJsonArgParse.__get_paths_from_dict(cls.__remote_task_params_dict)
for path in paths:
args = PatchJsonArgParse.__get_args_from_path(
parser,
path,
subcommand=cls.__remote_task_params_dict.get("subcommand")
)
for subarg_key, subarg_value in args.items():
if subarg_key not in cls.__remote_task_params_dict:
cls.__remote_task_params_dict[subarg_key] = subarg_value
@staticmethod
def __get_paths_from_dict(dict_):
paths = [path for path in dict_.values() if isinstance(path, Path)]
for subargs in dict_.values():
if isinstance(subargs, list) and all(isinstance(path, Path) for path in subargs):
paths.extend(subargs)
return paths
@staticmethod
def __get_args_from_path(parser, path, subcommand=None):
with change_to_path_dir(path):
parsed_cfg = parser.parse_string(path.get_content(), _skip_check=True, _fail_no_subcommand=False)
if subcommand:
parsed_cfg = {subcommand + PatchJsonArgParse._commands_sep + k: v for k, v in parsed_cfg.items()}
return parsed_cfg
@staticmethod @staticmethod
def _handle_namespace(value): def _handle_namespace(value):

View File

@ -309,7 +309,7 @@ class Logger(object):
extra_data=None, # type: Optional[dict] extra_data=None, # type: Optional[dict]
): ):
""" """
For explicit report, report a table plot. For explicit reporting, report a table plot.
One and only one of the following parameters must be provided. One and only one of the following parameters must be provided.
@ -340,25 +340,29 @@ class Logger(object):
See full details on the supported configuration: https://plotly.com/javascript/reference/layout/ See full details on the supported configuration: https://plotly.com/javascript/reference/layout/
For example: For example:
.. code block:: py .. code-block:: py
logger.report_table(
title='table example', logger.report_table(
series='pandas DataFrame', title='table example',
iteration=0, series='pandas DataFrame',
table_plot=df, iteration=0,
extra_layout={'height': 600}) table_plot=df,
extra_layout={'height': 600}
)
:param extra_data: optional dictionary for data configuration, like column width, passed directly to plotly :param extra_data: optional dictionary for data configuration, like column width, passed directly to plotly
See full details on the supported configuration: https://plotly.com/javascript/reference/table/ See full details on the supported configuration: https://plotly.com/javascript/reference/table/
For example: For example:
.. code block:: py .. code-block:: py
logger.report_table(
title='table example', logger.report_table(
series='pandas DataFrame', title='table example',
iteration=0, series='pandas DataFrame',
table_plot=df, iteration=0,
extra_data={'columnwidth': [2., 1., 1., 1.]}) table_plot=df,
extra_data={'columnwidth': [2., 1., 1., 1.]}
)
""" """
mutually_exclusive( mutually_exclusive(

View File

@ -591,7 +591,7 @@ class Task(_Task):
elif task.get_project_object().default_output_destination: elif task.get_project_object().default_output_destination:
task.output_uri = task.get_project_object().default_output_destination task.output_uri = task.get_project_object().default_output_destination
elif cls.__default_output_uri: elif cls.__default_output_uri:
task.output_uri = cls.__default_output_uri task.output_uri = str(cls.__default_output_uri)
# store new task ID # store new task ID
cls.__update_master_pid_task(task=task) cls.__update_master_pid_task(task=task)
else: else:
@ -1083,7 +1083,7 @@ class Task(_Task):
if value is False: if value is False:
value = None value = None
elif value is True: elif value is True:
value = self.__default_output_uri or self._get_default_report_storage_uri() value = str(self.__default_output_uri or self._get_default_report_storage_uri())
# check if we have the correct packages / configuration # check if we have the correct packages / configuration
if value and value != self.storage_uri: if value and value != self.storage_uri:
@ -1703,30 +1703,30 @@ class Task(_Task):
:param total_num_nodes: The total number of nodes to be enqueued, including the master node, :param total_num_nodes: The total number of nodes to be enqueued, including the master node,
which should already be enqueued when running remotely which should already be enqueued when running remotely
:param port: Port opened by the master node. If the environment variable `CLEARML_MULTI_NODE_MASTER_DEF_PORT` :param port: Port opened by the master node. If the environment variable ``CLEARML_MULTI_NODE_MASTER_DEF_PORT``
is set, the value of this parameter will be set to the one defined in `CLEARML_MULTI_NODE_MASTER_DEF_PORT`. is set, the value of this parameter will be set to the one defined in ``CLEARML_MULTI_NODE_MASTER_DEF_PORT``.
If `CLEARML_MULTI_NODE_MASTER_DEF_PORT` doesn't exist, but `MASTER_PORT` does, then the value of this If ``CLEARML_MULTI_NODE_MASTER_DEF_PORT`` doesn't exist, but ``MASTER_PORT`` does, then the value of this
parameter will be set to the one defined in `MASTER_PORT`. If neither environment variables exist, parameter will be set to the one defined in ``MASTER_PORT``. If neither environment variables exist,
the value passed to the parameter will be used the value passed to the parameter will be used
:param queue: The queue to enqueue the nodes to. Can be different than the queue the master :param queue: The queue to enqueue the nodes to. Can be different from the queue the master
node is enqueued to. If None, the nodes will be enqueued to the same queue as the master node node is enqueued to. If None, the nodes will be enqueued to the same queue as the master node
:param wait: If True, the master node will wait for the other nodes to start :param wait: If True, the master node will wait for the other nodes to start
:param addr: The address of the master node's worker. If the environment variable :param addr: The address of the master node's worker. If the environment variable
`CLEARML_MULTI_NODE_MASTER_DEF_ADDR` is set, the value of this parameter will be set to ``CLEARML_MULTI_NODE_MASTER_DEF_ADDR`` is set, the value of this parameter will be set to
the one defined in `CLEARML_MULTI_NODE_MASTER_DEF_ADDR`. the one defined in ``CLEARML_MULTI_NODE_MASTER_DEF_ADDR``.
If `CLEARML_MULTI_NODE_MASTER_DEF_ADDR` doesn't exist, but `MASTER_ADDR` does, then the value of this If ``CLEARML_MULTI_NODE_MASTER_DEF_ADDR`` doesn't exist, but ``MASTER_ADDR`` does, then the value of this
parameter will be set to the one defined in `MASTER_ADDR`. If neither environment variables exist, parameter will be set to the one defined in ``MASTER_ADDR``. If neither environment variables exist,
the value passed to the parameter will be used. If this value is None (default), the private IP of the value passed to the parameter will be used. If this value is None (default), the private IP of
the machine the master node is running on will be used. the machine the master node is running on will be used.
:return: A dictionary containing relevant information regarding the multi node run. This dictionary :return: A dictionary containing relevant information regarding the multi node run. This dictionary has the following entries:
has the following entries:
- `master_addr` - the address of the machine that the master node is running on - `master_addr` - the address of the machine that the master node is running on
- `master_port` - the open port of the machine that the master node is running on - `master_port` - the open port of the machine that the master node is running on
- `total_num_nodes` - the total number of nodes, including the master - `total_num_nodes` - the total number of nodes, including the master
- `queue` - the queue the nodes are enqueued to, excluding the master - `queue` - the queue the nodes are enqueued to, excluding the master
- `node_rank` - the rank of the current node (master has rank 0) - `node_rank` - the rank of the current node (master has rank 0)
- `wait` - if True, the master node will wait for the other nodes to start - `wait` - if True, the master node will wait for the other nodes to start
""" """
def set_launch_multi_node_runtime_props(task, conf): def set_launch_multi_node_runtime_props(task, conf):
# noinspection PyProtectedMember # noinspection PyProtectedMember

View File

@ -1 +1 @@
__version__ = '1.11.0' __version__ = '1.11.1rc1'