Add CLEARML_AGENT_FORCE_TASK_INIT to allow runtime patching of script even if no repo is specified and the code is running a preinstalled docker

This commit is contained in:
allegroai 2024-02-29 14:02:27 +02:00
parent 919013d4fe
commit 9a321a410f
3 changed files with 116 additions and 5 deletions

View File

@ -78,7 +78,7 @@ from clearml_agent.definitions import (
ENV_EXTRA_DOCKER_LABELS,
ENV_AGENT_FORCE_CODE_DIR,
ENV_AGENT_FORCE_EXEC_SCRIPT,
ENV_TEMP_STDOUT_FILE_DIR,
ENV_TEMP_STDOUT_FILE_DIR, ENV_AGENT_FORCE_TASK_INIT,
)
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from clearml_agent.errors import (
@ -142,7 +142,8 @@ from clearml_agent.helper.process import (
check_if_command_exists,
terminate_all_child_processes,
)
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch
from clearml_agent.helper.repo import clone_repository_cached, RepoInfo, VCS, fix_package_import_diff_patch, \
patch_add_task_init_call
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.helper.runtime_verification import check_runtime, print_uptime_properties
from clearml_agent.helper.singleton import Singleton
@ -2666,7 +2667,8 @@ class Worker(ServiceCommandSection):
execution_info=execution,
update_requirements=not skip_freeze_update,
)
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
script_dir = (directory if isinstance(directory, Path) else Path(directory)
).expanduser().absolute().as_posix()
# run code
# print("Running task id [%s]:" % current_task.id)
@ -2677,6 +2679,10 @@ class Worker(ServiceCommandSection):
WorkerParams(optimization=optimization).get_optimization_flag()
)
# check if we need to patch entry point script
if ENV_AGENT_FORCE_TASK_INIT.get():
patch_add_task_init_call((Path(script_dir) / execution.entry_point).as_posix())
# check if this is a module load, then load it.
# noinspection PyBroadException
try:
@ -3170,6 +3176,18 @@ class Worker(ServiceCommandSection):
if cached_requirements and (cached_requirements.get('pip') is not None or
cached_requirements.get('conda') is not None):
self.log("Found task requirements section, trying to install")
if ENV_AGENT_FORCE_TASK_INIT.get():
# notice we have PackageCollectorRequirement to protect against double includes of "clearml"
print("Force clearml Task.init patch adding clearml package to requirements")
if cached_requirements.get('pip'):
cached_requirements["pip"] = ("clearml\n" + cached_requirements["pip"]) \
if isinstance(cached_requirements["pip"], str) else \
(["clearml"] + cached_requirements["pip"])
if cached_requirements.get('conda'):
cached_requirements["conda"] = ("clearml\n" + cached_requirements["conda"]) \
if isinstance(cached_requirements["conda"], str) else \
(["clearml"] + cached_requirements["conda"])
try:
package_api.load_requirements(cached_requirements)
except Exception as e:
@ -3197,6 +3215,11 @@ class Worker(ServiceCommandSection):
continue
print("Trying pip install: {}".format(requirements_file))
requirements_text = requirements_file.read_text()
if ENV_AGENT_FORCE_TASK_INIT.get():
# notice we have PackageCollectorRequirement to protect against double includes of "clearml"
print("Force clearml Task.init patch adding clearml package to requirements")
requirements_text = "clearml\n" + requirements_text
new_requirements = requirements_manager.replace(requirements_text)
temp_file = None

View File

@ -160,6 +160,7 @@ ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PIP_VENV
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig("CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL", type=bool)
ENV_AGENT_FORCE_CODE_DIR = EnvironmentConfig("CLEARML_AGENT_FORCE_CODE_DIR")
ENV_AGENT_FORCE_EXEC_SCRIPT = EnvironmentConfig("CLEARML_AGENT_FORCE_EXEC_SCRIPT")
ENV_AGENT_FORCE_POETRY = EnvironmentConfig("CLEARML_AGENT_FORCE_POETRY", type=bool)
ENV_AGENT_FORCE_TASK_INIT = EnvironmentConfig("CLEARML_AGENT_FORCE_TASK_INIT", type=bool)
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig("CLEARML_DOCKER_SKIP_GPUS_FLAG", "TRAINS_DOCKER_SKIP_GPUS_FLAG")
ENV_AGENT_GIT_USER = EnvironmentConfig("CLEARML_AGENT_GIT_USER", "TRAINS_AGENT_GIT_USER")

View File

@ -809,8 +809,8 @@ def fix_package_import_diff_patch(entry_script_file):
lines = f.readlines()
except Exception:
return
# make sre we are the first import (i.e. we patched the source code)
if not lines or not lines[0].strip().startswith('from clearml ') or 'Task.init' not in lines[1]:
# make sure we are the first import (i.e. we patched the source code)
if len(lines or []) < 2 or not lines[0].strip().startswith('from clearml ') or 'Task.init' not in lines[1]:
return
original_lines = lines
@ -867,3 +867,90 @@ def fix_package_import_diff_patch(entry_script_file):
f.writelines(new_lines)
except Exception:
return
def _locate_future_import(lines):
# type: (list[str]) -> int
"""
:param lines: string lines of a python file
:return: line index of the last __future_ import. return -1 if no __future__ was found
"""
# skip over the first two lines, they are ours
# then skip over empty or comment lines
lines = [(i, line.split('#', 1)[0].rstrip()) for i, line in enumerate(lines)
if line.strip('\r\n\t ') and not line.strip().startswith('#')]
# remove triple quotes ' """ '
nested_c = -1
skip_lines = []
for i, line_pair in enumerate(lines):
for _ in line_pair[1].split('"""')[1:]:
if nested_c >= 0:
skip_lines.extend(list(range(nested_c, i + 1)))
nested_c = -1
else:
nested_c = i
# now select all the
lines = [pair for i, pair in enumerate(lines) if i not in skip_lines]
from_future = re.compile(r"^from[\s]*__future__[\s]*")
import_future = re.compile(r"^import[\s]*__future__[\s]*")
# test if we have __future__ import
found_index = -1
for a_i, (_, a_line) in enumerate(lines):
if found_index >= a_i:
continue
if from_future.match(a_line) or import_future.match(a_line):
found_index = a_i
# check the last import block
i, line = lines[found_index]
# wither we have \\ character at the end of the line or the line is indented
parenthesized_lines = '(' in line and ')' not in line
while line.endswith('\\') or parenthesized_lines:
found_index += 1
i, line = lines[found_index]
if ')' in line:
break
else:
break
return found_index if found_index < 0 else lines[found_index][0]
def patch_add_task_init_call(local_filename):
if not local_filename or not Path(local_filename).is_file():
return
idx_a = 0
# find the right entry for the patch if we have a local file (basically after __future__
try:
with open(local_filename, 'rt') as f:
lines = f.readlines()
except Exception as ex:
print("Failed patching entry point file {}: {}".format(local_filename, ex))
return
future_found = _locate_future_import(lines)
if future_found >= 0:
idx_a = future_found + 1
# check if we have not already patched it, no need to add another one
if len(lines or []) >= idx_a+2 and lines[idx_a].strip().startswith('from clearml ') and 'Task.init' in lines[idx_a+1]:
print("File {} already patched with Task.init()".format(local_filename))
return
patch = [
"from clearml import Task\n",
"(__name__ != \"__main__\") or Task.init()\n",
]
lines = lines[:idx_a] + patch + lines[idx_a:]
# noinspection PyBroadException
try:
with open(local_filename, 'wt') as f:
f.writelines(lines)
except Exception as ex:
print("Failed patching entry point file {}: {}".format(local_filename, ex))
return
print("Force clearml Task.init patch adding to entry point script: {}".format(local_filename))