Add daemon service mode to allow multiple tasks to be launched simultaneously on the same machine (--service-mode)

This commit is contained in:
allegroai 2020-05-09 19:45:14 +03:00
parent 38e02ca5cd
commit 9cb71b9526
2 changed files with 87 additions and 12 deletions

View File

@ -306,6 +306,12 @@ class Worker(ServiceCommandSection):
# machine status update intervals, seconds
_machine_update_interval = 30.0
# message printed before starting task logging,
# it will be parsed by services_mode, to identify internal docker logging start
_task_logging_start_message = "Running task '{}'"
# last message before passing control to the actual task
_task_logging_pass_control_message = "Running task id [{}]:"
@property
def service(self):
""" Worker command service endpoint """
@ -371,8 +377,24 @@ class Worker(ServiceCommandSection):
self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False)
self._daemon_foreground = None
self._standalone_mode = None
self._services_mode = None
self._force_current_version = None
@classmethod
def _verify_command_states(cls, kwargs):
"""
Conform and enforce command argument
This is where you can automatically turn on/off switches based on different states.
:param kwargs:
:return: kwargs
"""
if kwargs.get('services_mode'):
kwargs['cpu_only'] = True
kwargs['docker'] = kwargs.get('docker', [])
kwargs['gpus'] = None
return kwargs
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
requirements_manager = RequirementsManager(
self._session, base_interpreter=base_interpreter
@ -411,7 +433,9 @@ class Worker(ServiceCommandSection):
:param docker: Docker image in which the execution task will run
"""
# start new process and execute task id
print("Running task '{}'".format(task_id))
# "Running task '{}'".format(task_id)
print(self._task_logging_start_message.format(task_id))
# set task status to in_progress so we know it was popped from the queue
try:
self._session.send_api(tasks_api.StartedRequest(task=task_id, force=True))
@ -455,7 +479,13 @@ class Worker(ServiceCommandSection):
docker_arguments = self._docker_arguments
# Update docker command
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
if self._services_mode:
# if this is services mode, give the docker a unique worker id, as it will register itself.
full_docker_cmd = self.docker_image_func(
worker_id='{}:service:{}'.format(self.worker_id, task_id),
docker_image=docker_image, docker_arguments=docker_arguments)
else:
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
try:
self._session.send_api(
tasks_api.EditRequest(task_id, force=True, execution=dict(
@ -463,8 +493,13 @@ class Worker(ServiceCommandSection):
except Exception:
pass
full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute --disable-monitoring {} --id {}'.format(
'--standalone-mode' if self._standalone_mode else '', task_id)
# if this is services_mode, change the worker_id to a unique name
# abd use full-monitoring, ot it registers itself as a worker for this specific service.
# notice, the internal agent will monitor itself once the docker is up and running
full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute {} {} --id {}'.format(
'--full-monitoring' if self._services_mode else '--disable-monitoring',
'--standalone-mode' if self._standalone_mode else '',
task_id)
cmd = Argv(*full_docker_cmd)
print('Running Docker:\n{}\n'.format(str(cmd)))
else:
@ -520,12 +555,15 @@ class Worker(ServiceCommandSection):
self.handle_user_abort(task_id)
status = ExitStatus.interrupted
finally:
self.handle_task_termination(task_id, status, stop_signal_status)
# remove temp files after we sent everything to the backend
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
if self.docker_image_func:
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
if self._services_mode:
print('Service started, docker running in the background')
else:
self.handle_task_termination(task_id, status, stop_signal_status)
# remove temp files after we sent everything to the backend
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
if self.docker_image_func:
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
def run_tasks_loop(self, queues, worker_params):
"""
@ -637,6 +675,7 @@ class Worker(ServiceCommandSection):
start_check_update_daemon()
self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False)
self.check(**kwargs)
self.log.debug("starting resource monitor thread")
@ -802,6 +841,7 @@ class Worker(ServiceCommandSection):
**kwargs
)
service_mode_internal_agent_started = None
while status is None and not stopping:
stop_reason = stop_signal.test() if stop_signal else TaskStopSignal.default
@ -825,13 +865,24 @@ class Worker(ServiceCommandSection):
stderr.flush()
# get diff from previous poll
printed_lines = _print_file(stdout_path, stdout_line_count)
if self._services_mode:
# if the internal agent started, we stop logging, it will take over logging.
# if the internal agent started running the task itself, it will return status==0,
# then we can quit the monitoring loop of this process
printed_lines, service_mode_internal_agent_started, status = self._check_if_internal_agent_started(
printed_lines, service_mode_internal_agent_started, task_id)
if status is not None:
stop_reason = 'Service started'
stdout_line_count += self.send_logs(
task_id, _print_file(stdout_path, stdout_line_count)
task_id, printed_lines
)
if stderr_path:
stderr_line_count += self.send_logs(
task_id, _print_file(stderr_path, stderr_line_count)
)
except subprocess.CalledProcessError as ex:
# non zero return code
stop_reason = 'Exception occurred'
@ -847,6 +898,10 @@ class Worker(ServiceCommandSection):
stop_reason = 'Exception occurred'
status = -1
# if running in services mode, keep the file open
if self._services_mode:
return status, stop_reason
stdout.close()
if stderr_path:
stderr.close()
@ -862,6 +917,19 @@ class Worker(ServiceCommandSection):
return status, stop_reason
def _check_if_internal_agent_started(self, printed_lines, service_mode_internal_agent_started, task_id):
log_start_msg = self._task_logging_start_message.format(task_id)
log_control_end_msg = self._task_logging_pass_control_message.format(task_id)
filter_lines = printed_lines if not service_mode_internal_agent_started else []
for i, line in enumerate(printed_lines):
if not service_mode_internal_agent_started and line.startswith(log_start_msg):
service_mode_internal_agent_started = True
filter_lines = printed_lines[:i+1]
elif line.startswith(log_control_end_msg):
return filter_lines, service_mode_internal_agent_started, 0
return filter_lines, service_mode_internal_agent_started, None
def send_logs(self, task_id, lines, level="DEBUG"):
"""
Send output lines as log events to backend
@ -1187,7 +1255,8 @@ class Worker(ServiceCommandSection):
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
print("Running task id [%s]:" % current_task.id)
# print("Running task id [%s]:" % current_task.id)
print(self._task_logging_pass_control_message.format(current_task.id))
extra = ['-u', ]
if optimization:
extra.append(
@ -1951,6 +2020,8 @@ class Worker(ServiceCommandSection):
base_cmd += ['--gpus', 'device='+gpu_devices, ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
elif gpu_devices.strip() == 'none':
dockers_nvidia_visible_devices = gpu_devices
if docker_arguments:
docker_arguments = list(docker_arguments) \

View File

@ -72,6 +72,10 @@ DAEMON_ARGS = dict({
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
},
'--services-mode': {
'help': 'Launch multiple long-term docker services. Implies docker & cpu-only flags.',
'action': 'store_true',
},
'--detached': {
'help': 'Detached mode, run agent in the background',
'action': 'store_true',