Add support for .ipynb script entry files (install nbconvert in runtime, copnvert to python and execute the python script), including CLEARML_AGENT_FORCE_TASK_INIT patching of ipynb files (post python conversion)

This commit is contained in:
allegroai 2024-07-24 17:41:59 +03:00
parent 700ae85de0
commit 93df021108
2 changed files with 53 additions and 8 deletions

View File

@ -645,6 +645,8 @@ class Worker(ServiceCommandSection):
ExternalRequirements, ExternalRequirements,
partial(PackageCollectorRequirement, collect_package=['trains']), partial(PackageCollectorRequirement, collect_package=['trains']),
partial(PackageCollectorRequirement, collect_package=['clearml']), partial(PackageCollectorRequirement, collect_package=['clearml']),
partial(PackageCollectorRequirement, collect_package=['nbconvert']),
partial(PackageCollectorRequirement, collect_package=['ipython']),
) )
# default poll queues every _polling_interval seconds # default poll queues every _polling_interval seconds
@ -2847,7 +2849,8 @@ class Worker(ServiceCommandSection):
(current_task.script.binary or "").split("/")[-1] in ('bash', 'zsh', 'sh')) (current_task.script.binary or "").split("/")[-1] in ('bash', 'zsh', 'sh'))
if not is_bash_binary and not is_python_binary: if not is_bash_binary and not is_python_binary:
print("WARNING binary '{}' not supported, defaulting to python".format(current_task.script.binary)) if (current_task.script.binary or "").strip():
print("WARNING binary '{}' not supported, defaulting to python".format(current_task.script.binary))
is_python_binary = True is_python_binary = True
extra = [] extra = []
@ -2876,6 +2879,25 @@ class Worker(ServiceCommandSection):
os.environ = _org_env os.environ = _org_env
else: else:
extra.extend(shlex.split(execution.entry_point)) extra.extend(shlex.split(execution.entry_point))
elif (is_python_binary and execution.entry_point and
execution.entry_point.strip().lower().endswith('.ipynb')):
# now we have to convert the notebook to python
convert_extra = copy(extra)
convert_extra.extend(["-m", "nbconvert", "--to", "python", execution.entry_point])
convert_command = self.package_api.get_python_command(convert_extra)
exit_code = convert_command.check_call(cwd=script_dir)
if exit_code:
raise ValueError("Failed [{}] converting jupyter notebook: {}".format(
exit_code, execution.entry_point))
converted_script_filename = Path(execution.entry_point).with_suffix(".py").as_posix()
if ENV_AGENT_FORCE_TASK_INIT.get():
patch_add_task_init_call(converted_script_filename)
extra.append(converted_script_filename)
else: else:
extra.append(execution.entry_point) extra.append(execution.entry_point)
except Exception: except Exception:
@ -3368,21 +3390,39 @@ class Worker(ServiceCommandSection):
requirements_manager.set_cwd(cwd) requirements_manager.set_cwd(cwd)
cached_requirements_failed = False cached_requirements_failed = False
if cached_requirements and (cached_requirements.get('pip') is not None or if not repo_info or (
cached_requirements.get('conda') is not None): cached_requirements and
(cached_requirements.get('pip') is not None or
cached_requirements.get('conda') is not None)
):
self.log("Found task requirements section, trying to install") self.log("Found task requirements section, trying to install")
cached_requirements = cached_requirements or {}
if ENV_AGENT_FORCE_TASK_INIT.get(): if ENV_AGENT_FORCE_TASK_INIT.get():
# notice we have PackageCollectorRequirement to protect against double includes of "clearml" # notice we have PackageCollectorRequirement to protect against double includes of "clearml"
print("Force clearml Task.init patch adding clearml package to requirements") print("Force clearml Task.init patch adding clearml package to requirements")
if cached_requirements.get('pip'): if not cached_requirements or cached_requirements.get('pip'):
cached_requirements["pip"] = ("clearml\n" + cached_requirements["pip"]) \ cached_requirements["pip"] = ("clearml\n" + cached_requirements.get("pip", "")) \
if isinstance(cached_requirements["pip"], str) else \ if isinstance(cached_requirements.get("pip", ""), str) else \
(["clearml"] + cached_requirements["pip"]) (["clearml"] + cached_requirements.get("pip", []))
if cached_requirements.get('conda'): if cached_requirements.get('conda'):
cached_requirements["conda"] = ("clearml\n" + cached_requirements["conda"]) \ cached_requirements["conda"] = ("clearml\n" + cached_requirements["conda"]) \
if isinstance(cached_requirements["conda"], str) else \ if isinstance(cached_requirements["conda"], str) else \
(["clearml"] + cached_requirements["conda"]) (["clearml"] + cached_requirements["conda"])
# check if we need to push jupyter nbconvert if we need to run ipynb
if execution.entry_point and execution.entry_point.strip().lower().endswith('.ipynb'):
print("Force nbconvert patch to convert .ipynb to python")
# now we have to make sure we run it with jupyter notebook
if not cached_requirements or cached_requirements.get('pip'):
cached_requirements["pip"] = ("nbconvert\nipython\n" + cached_requirements.get("pip", "")) \
if isinstance(cached_requirements.get("pip", ""), str) else \
(["nbconvert", "ipython"] + cached_requirements.get("pip", []))
if cached_requirements.get('conda'):
cached_requirements["conda"] = ("nbconvert\nipython\n" + cached_requirements["conda"]) \
if isinstance(cached_requirements["conda"], str) else \
(["nbconvert", "ipython"] + cached_requirements["conda"])
try: try:
package_api.load_requirements(cached_requirements) package_api.load_requirements(cached_requirements)
except Exception as e: except Exception as e:
@ -3415,6 +3455,11 @@ class Worker(ServiceCommandSection):
print("Force clearml Task.init patch adding clearml package to requirements") print("Force clearml Task.init patch adding clearml package to requirements")
requirements_text = "clearml\n" + requirements_text requirements_text = "clearml\n" + requirements_text
if execution.entry_point and execution.entry_point.strip().lower().endswith('.ipynb'):
# notice we have PackageCollectorRequirement to protect against double includes of "clearml"
print("Force nbconvert patch to convert .ipynb to python")
requirements_text = "nbconvert\nipython\n" + requirements_text
new_requirements = requirements_manager.replace(requirements_text) new_requirements = requirements_manager.replace(requirements_text)
temp_file = None temp_file = None

View File

@ -936,7 +936,7 @@ def _locate_future_import(lines):
def patch_add_task_init_call(local_filename): def patch_add_task_init_call(local_filename):
if not local_filename or not Path(local_filename).is_file(): if not local_filename or not Path(local_filename).is_file() or not str(local_filename).lower().endswith(".py"):
return return
idx_a = 0 idx_a = 0