Add daemon --order-fairness for round-robin queue pulling

Add daemon --stop to terminate running agent (assume all the rest of the arguments are the same)
Clean up all log files on termination unless executed with --debug
This commit is contained in:
allegroai 2020-07-11 01:42:56 +03:00
parent c6d998c4df
commit 94997f9c88
3 changed files with 109 additions and 29 deletions

View File

@ -17,7 +17,7 @@ from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from tempfile import mkdtemp, gettempdir
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple
@ -66,7 +66,7 @@ from trains_agent.helper.base import (
get_python_path,
is_linux_platform,
rm_file,
add_python_path)
add_python_path, safe_remove_tree, )
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
from trains_agent.helper.os.daemonize import daemonize_process
from trains_agent.helper.package.base import PackageManager
@ -89,7 +89,7 @@ from trains_agent.helper.process import (
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker
commit_docker, terminate_process,
)
from trains_agent.helper.package.cython_req import CythonRequirement
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
@ -319,6 +319,7 @@ class Worker(ServiceCommandSection):
_run_as_user_home = '/trains_agent_home'
_docker_fixed_user_cache = '/trains_agent_cache'
_temp_cleanup_list = []
@property
def service(self):
@ -332,6 +333,8 @@ class Worker(ServiceCommandSection):
@staticmethod
def register_signal_handler():
def handler(*_):
for f in Worker._temp_cleanup_list + [Singleton.get_pid_file()]:
safe_remove_tree(f)
raise Sigterm()
signal.signal(signal.SIGTERM, handler)
@ -388,6 +391,7 @@ class Worker(ServiceCommandSection):
self._standalone_mode = None
self._services_mode = None
self._force_current_version = None
self._redirected_stdout_file_no = None
@classmethod
def _verify_command_states(cls, kwargs):
@ -569,12 +573,12 @@ class Worker(ServiceCommandSection):
else:
self.handle_task_termination(task_id, status, stop_signal_status)
# remove temp files after we sent everything to the backend
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
if self.docker_image_func:
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
def run_tasks_loop(self, queues, worker_params):
def run_tasks_loop(self, queues, worker_params, priority_order=True):
"""
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
@ -584,6 +588,9 @@ class Worker(ServiceCommandSection):
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``trains_agent.helper.process.WorkerParams``
:param priority_order: If True pull order in priority manner. always from the first
If False, pull from each queue once in a round robin manner
:type priority_order: bool
"""
if not self._daemon_foreground:
@ -614,6 +621,16 @@ class Worker(ServiceCommandSection):
print("No tasks in queue {}".format(queue))
continue
# clear output log if we start a new Task
if not worker_params.debug and self._redirected_stdout_file_no is not None and \
self._redirected_stdout_file_no > 2:
# noinspection PyBroadException
try:
os.lseek(self._redirected_stdout_file_no, 0, 0)
os.ftruncate(self._redirected_stdout_file_no, 0)
except:
pass
self.send_logs(
task_id=task_id,
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
@ -622,7 +639,11 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
# if we are using priority start pulling from the first always,
# if we are doing round robin, pull from the next one
if priority_order:
break
else:
# sleep and retry polling
if self._daemon_foreground or worker_params.debug:
@ -674,7 +695,7 @@ class Worker(ServiceCommandSection):
self._session.print_configuration()
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, **kwargs):
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
@ -686,6 +707,10 @@ class Worker(ServiceCommandSection):
kwargs = self._verify_command_states(kwargs)
docker = docker or kwargs.get('docker')
if kwargs.get('stop', False):
self._kill_daemon()
return
# make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders
self._singleton()
@ -711,9 +736,8 @@ class Worker(ServiceCommandSection):
self._register(queues)
# create temp config file with current configuration
self.temp_config_path = safe_mkstemp(
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
)
self.temp_config_path = NamedTemporaryFile(
suffix=".cfg", prefix=".trains_agent.", mode='w+t').name
# print docker image
if docker is not False and docker is not None:
@ -753,6 +777,9 @@ class Worker(ServiceCommandSection):
)
)
if not self._session.debug_mode:
self._temp_cleanup_list.append(name)
if not detached:
# redirect std out/err to new file
sys.stdout = sys.stderr = out_file
@ -760,6 +787,7 @@ class Worker(ServiceCommandSection):
# in detached mode
# fully detach stdin.stdout/stderr and leave main process, running in the background
daemonize_process(out_file.fileno())
self._redirected_stdout_file_no = out_file.fileno()
# make sure we update the singleton lock file to the new pid
Singleton.update_pid_file()
# reprint headers to std file (we are now inside the daemon process)
@ -779,6 +807,7 @@ class Worker(ServiceCommandSection):
debug=self._session.debug_mode,
trace=self._session.trace,
),
priority_order=not order_fairness,
)
except Exception:
tb = six.text_type(traceback.format_exc())
@ -2025,6 +2054,7 @@ class Worker(ServiceCommandSection):
host_pip_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_pip_cache", '~/.trains/pip-cache'))).expanduser().as_posix()
host_ssh_cache = mkdtemp(prefix='trains_agent.ssh.')
self._temp_cleanup_list.append(host_ssh_cache)
# make sure all folders are valid
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
@ -2322,18 +2352,26 @@ class Worker(ServiceCommandSection):
return command, script_dir
def _kill_daemon(self):
worker_id, worker_name = self._generate_worker_id_name()
# Iterate over all running process
for pid, uid, slot, file in Singleton.get_running_pids():
# wither we have a match for the worker_id or we just pick the first one
if pid >= 0 and (
(worker_id and uid == worker_id) or
(not worker_id and uid.startswith('{}:'.format(worker_name)))):
# this is us kill it
print('Terminating trains-agent worker_id={} pid={}'.format(uid, pid))
if not terminate_process(pid, timeout=10):
error('Could not terminate process pid={}'.format(pid))
return True
print('Could not find a running trains-agent instance with worker_name={} worker_id={}'.format(
worker_name, worker_id))
return False
def _singleton(self):
# ensure singleton
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices)
elif nvidia_visible_devices == '':
pass
else:
worker_name = '{}:cpu'.format(worker_name)
worker_id, worker_name = self._generate_worker_id_name()
# if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup
@ -2348,6 +2386,19 @@ class Worker(ServiceCommandSection):
# update folders based on free slot
self._session.create_cache_folders(slot_index=worker_slot)
def _generate_worker_id_name(self):
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices)
elif nvidia_visible_devices == '':
pass
else:
worker_name = '{}:cpu'.format(worker_name)
return worker_id, worker_name
def _resolve_queue_names(self, queues, create_if_missing=False):
if not queues:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())

View File

@ -37,6 +37,10 @@ class Singleton(object):
except:
pass
@classmethod
def get_lock_filename(cls):
return os.path.join(cls._get_temp_folder(), cls._lock_file_name)
@classmethod
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
"""
@ -47,7 +51,7 @@ class Singleton(object):
:return: (str worker_id, int slot_number) Return None value on instance already running
"""
# try to lock file
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
lock_file = cls.get_lock_filename()
timeout = 0
while os.path.exists(lock_file):
if timeout > cls._lock_timeout:
@ -79,30 +83,41 @@ class Singleton(object):
return ret
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
def get_running_pids(cls):
temp_folder = cls._get_temp_folder()
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
slots = {}
pids = []
for file in files:
parts = file.split(cls.sep)
# noinspection PyBroadException
try:
pid = int(parts[1])
if not psutil.pid_exists(pid):
pid = -1
except Exception:
# something is wrong, use non existing pid and delete the file
pid = -1
uid, slot = None, None
# noinspection PyBroadException
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
pass
pids.append((pid, uid, slot, file))
return pids
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
slots = {}
for pid, uid, slot, file in cls.get_running_pids():
worker = None
if api_client and ENV_DOCKER_HOST_MOUNT.get() and uid:
try:
@ -111,7 +126,7 @@ class Singleton(object):
worker = None
# count active instances and delete dead files
if not worker and not psutil.pid_exists(pid):
if not worker and pid < 0:
# delete the file
try:
os.remove(os.path.join(file))
@ -165,3 +180,9 @@ class Singleton(object):
@classmethod
def get_slot(cls):
return cls.instance_slot or 0
@classmethod
def get_pid_file(cls):
if not cls._pid_file:
return None
return cls._pid_file.name

View File

@ -68,6 +68,10 @@ DAEMON_ARGS = dict({
'dest': 'queues',
'type': foreign_object_id('queues'),
},
'--order-fairness': {
'help': 'Pull from each queue in a round-robin order, instead of priority order.',
'action': 'store_true',
},
'--standalone-mode': {
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
@ -85,6 +89,10 @@ DAEMON_ARGS = dict({
'action': 'store_true',
'aliases': ['-d'],
},
'--stop': {
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
},
}, **WORKER_ARGS)