Fix task.execute_remotely from jupyter notebook

This commit is contained in:
allegroai 2021-05-10 23:52:33 +03:00
parent 0e0d86f6d8
commit f46561629f
3 changed files with 34 additions and 10 deletions

View File

@ -13,6 +13,7 @@ from ..debugging.log import LoggerRoot
from ..task import Task
from ..automation import ClearmlJob
from ..model import BaseModel
from ..utilities.process.mp import leave_process
class PipelineController(object):
@ -66,7 +67,10 @@ class PipelineController(object):
- ``True`` - The pipeline argument and configuration will be stored in the current Task. All arguments will
be under the hyper-parameter section ``Pipeline``, and the pipeline DAG will be stored as a
Task configuration object named ``Pipeline``.
- ``False`` - Do not store with Task.
Notice that when running remotely the DAG definitions will be taken from the Task itself (e.g. editing
the configuration in the UI will be reflected in the actual DAG created).
- ``False`` - Do not store DAG configuration on the Task.
In remote execution the DAG will always be created from code.
- ``Task`` - A specific Task object to connect the pipeline with.
:param bool always_create_task: Always create a new Task
- ``True`` - No current Task initialized. Create a new task named ``Pipeline`` in the ``base_task_id``
@ -103,10 +107,10 @@ class PipelineController(object):
task_type=Task.TaskTypes.controller,
)
# make sure all the created tasks are our children, as we are creating them
self._auto_connect_task = bool(auto_connect_task) and bool(self._task)
# make sure we add to the main Task the pipeline tag
if self._task:
self._task.add_tags([self._tag])
self._auto_connect_task = bool(auto_connect_task)
def add_step(
self,
@ -366,7 +370,7 @@ class PipelineController(object):
self.start()
self.wait()
self.stop()
exit(0)
leave_process(0)
else:
return self._task
@ -505,7 +509,7 @@ class PipelineController(object):
pipeline_dag = self._serialize()
# serialize pipeline state
if self._task:
if self._task and self._auto_connect_task:
self._task.connect_configuration(pipeline_dag, name=self._config_section)
self._task.connect(params, name=self._config_section)
@ -530,6 +534,12 @@ class PipelineController(object):
This will be used to create the DAG from the dict stored on the Task, when running remotely.
:return:
"""
# make sure that we override nodes that we do not clone.
for name in self._nodes:
if self._nodes[name].clone_task and name in dag_dict and dag_dict['name'].get('clone_task'):
dag_dict['name'] = dict(
(k, v) for k, v in self._nodes[name].__dict__.items() if k not in ('job', 'name'))
self._nodes = {
k: self.Node(name=k, **v) if not v.get('clone_task') or k not in self._nodes else self._nodes[k]
for k, v in dag_dict.items()}
@ -538,7 +548,7 @@ class PipelineController(object):
"""
Return True if we are running remotely and we have stored configuration on the Task
"""
if self._task and not self._task.running_locally() and self._task.is_main_task():
if self._auto_connect_task and self._task and not self._task.running_locally() and self._task.is_main_task():
stored_config = self._task.get_configuration_object(self._config_section)
return bool(stored_config)

View File

@ -66,7 +66,7 @@ from .utilities.proxy_object import ProxyDictPreWrite, ProxyDictPostWrite, flatt
from .utilities.resource_monitor import ResourceMonitor
from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor
from .utilities.process.mp import BackgroundMonitor, leave_process
# noinspection PyProtectedMember
from .backend_interface.task.args import _Arguments
@ -1771,7 +1771,7 @@ class Task(_Task):
{
"property_name": {"description": "This is a user property", "value": "property value"},
"another_property_name": {"description": "This is another user property", "value": "another value"},
"another_property_name": {"description": "This is user property", "value": "another value"},
"yet_another_property_name": "some value"
}
@ -1950,7 +1950,7 @@ class Task(_Task):
# leave this process.
if exit_process:
LoggerRoot.get_base_logger().warning('Terminating local execution process')
exit(0)
leave_process(0)
return task
@ -2031,7 +2031,7 @@ class Task(_Task):
kwargs.update(func_params)
func(**kwargs)
# This is it, leave the process
exit(0)
leave_process(0)
def wait_for_status(
self,

View File

@ -433,3 +433,17 @@ class BackgroundMonitor(object):
tic = time()
while cls.is_subprocess_alive() and (not timeout or time()-tic < timeout):
sleep(0.03)
def leave_process(status=0):
# type: (int) -> None
"""
Exit current process with status-code (status)
:param status: int exit code
"""
try:
sys.exit(status or 0)
except: # noqa
# ipython/jupyter notebook will not allow to call sys.exit
# we have to call the low level function
os._exit(status or 0) # noqa