Add preliminary agent uptime/downtime support

This commit is contained in:
allegroai 2020-09-29 19:34:51 +03:00
parent 28f47419b0
commit 31a56c71bd
5 changed files with 332 additions and 11 deletions

View File

@ -102,6 +102,22 @@
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
# optional uptime configuration, make sure to use only one of 'uptime/downtime' and not both.
# If uptime is specified, agent will actively poll (and execute) tasks in the time-spans defined here.
# Outside of the specified time-spans, the agent will be idle.
# Defined using a list of items of the format: "<hours> <days>".
# hours - use values 0-23, single values would count as start hour and end at midnight.
# days - use days in abbreviated format (SUN-SAT)
# use '-' for ranges and ',' to separate singular values.
# for example, to enable the workers every Sunday and Tuesday between 17:00-20:00 set uptime to:
# uptime: ["17-20 SUN,TUE"]
# optional downtime configuration, can be used only when uptime is not used.
# If downtime is specified, agent will be idle in the time-spans defined here.
# Outside of the specified time-spans, the agent will actively poll (and execute) tasks.
# Use the same format as described above for uptime
# downtime: []
# set to true in order to force "docker pull" before running an experiment using a docker image.
# This makes sure the docker image is updated.
docker_force_pull: false

View File

@ -46,6 +46,15 @@ class Environment(object):
local = 'local'
class UptimeConf(object):
min_api_version = "2.10"
queue_tag_on = "force_workers:on"
queue_tag_off = "force_workers:off"
worker_key = "force"
worker_value_off = ["off"]
worker_value_on = ["on"]
CONFIG_FILE_EXTENSION = '.conf'

View File

@ -30,6 +30,8 @@ from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from six.moves.urllib.parse import quote
from trains_agent.backend_api.services import queues
from trains_agent.backend_config.defs import UptimeConf
from trains_agent.helper.check_update import start_check_update_daemon
from trains_agent.commands.base import resolve_names, ServiceCommandSection
from trains_agent.definitions import (
@ -94,13 +96,12 @@ from trains_agent.helper.process import (
from trains_agent.helper.package.priority_req import PriorityPackageRequirement
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
from trains_agent.helper.resource_monitor import ResourceMonitor
from trains_agent.helper.runtime_verification import check_runtime, print_uptime_properties
from trains_agent.session import Session
from trains_agent.helper.singleton import Singleton
from .events import Events
log = logging.getLogger(__name__)
DOCKER_ROOT_CONF_FILE = "/root/trains.conf"
DOCKER_DEFAULT_CONF_FILE = "/root/default_trains.conf"
@ -152,6 +153,7 @@ class LiteralScriptManager(object):
Create notebook file in appropriate location
:return: directory and script path
"""
log = logging.getLogger(__name__)
if repo_info and repo_info.root:
location = Path(repo_info.root, execution.working_dir)
else:
@ -394,6 +396,13 @@ class Worker(ServiceCommandSection):
self._services_mode = None
self._force_current_version = None
self._redirected_stdout_file_no = None
self._uptime_config = self._session.config.get("agent.uptime", None)
self._downtime_config = self._session.config.get("agent.downtime", None)
# True - supported
# None - not initialized
# str - not supported, version string indicates last server version
self._runtime_props_support = None
@classmethod
def _verify_command_states(cls, kwargs):
@ -599,10 +608,19 @@ class Worker(ServiceCommandSection):
print('Starting infinite task polling loop...')
_last_machine_update_ts = 0
while True:
while True:
queue_tags = None
runtime_props = None
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
if queue_tags is None or runtime_props is None:
queue_tags, runtime_props = self.get_worker_properties(queues)
if not self.should_be_currently_active(queue_tags[queue], runtime_props):
continue
# get next task in queue
try:
response = self._session.send_api(
@ -642,6 +660,9 @@ class Worker(ServiceCommandSection):
self.run_one_task(queue, task_id, worker_params)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
queue_tags = None
runtime_props = None
# if we are using priority start pulling from the first always,
# if we are doing round robin, pull from the next one
if priority_order:
@ -655,6 +676,77 @@ class Worker(ServiceCommandSection):
if self._session.config["agent.reload_config"]:
self.reload_config()
def get_worker_properties(self, queue_ids):
queue_tags = {
q.id: {'name': q.name, 'tags': q.tags}
for q in self._session.send_api(
queues_api.GetAllRequest(id=queue_ids, only_fields=["id", "tags"])
).queues
}
runtime_props = self.get_runtime_properties()
return queue_tags, runtime_props
def get_runtime_properties(self):
if self._runtime_props_support is not True:
# either not supported or never tested
if self._runtime_props_support == self._session.api_version:
# tested against latest api_version, not supported
return []
if not self._session.check_min_api_version(UptimeConf.min_api_version):
# not supported due to insufficient api_version
self._runtime_props_support = self._session.api_version
return []
try:
res = self.get("get_runtime_properties", worker=self.worker_id)["runtime_properties"]
# definitely supported
self._runtime_props_support = True
return res
except APIError:
self._runtime_props_support = self._session.api_version
return []
def should_be_currently_active(self, current_queue, runtime_properties):
"""
Checks if a worker is active according to queue tags, worker's runtime properties and uptime schedule.
"""
if UptimeConf.queue_tag_off in current_queue['tags']:
self.log.debug("Queue {} is tagged '{}', worker will not pull tasks".format(
current_queue['name'], UptimeConf.queue_tag_off)
)
return False
if UptimeConf.queue_tag_on in current_queue['tags']:
self.log.debug("Queue {} is tagged '{}', worker will pull tasks".format(
current_queue['name'], UptimeConf.queue_tag_on)
)
return True
force_flag = next(
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key), None
)
if force_flag:
if force_flag["value"].lower() in UptimeConf.worker_value_off:
self.log.debug("worker has the following runtime property: '{}'. worker will not pull tasks".format(
force_flag)
)
return False
elif force_flag["value"].lower() in UptimeConf.worker_value_on:
self.log.debug("worker has the following runtime property: '{}'. worker will pull tasks".format(
force_flag)
)
return True
else:
print(
"Warning: invalid runtime_property '{}: {}' supported values are: '{}/{}', ignoring".format(
force_flag["key"], force_flag["value"], UptimeConf.worker_value_on, UptimeConf.worker_value_off
)
)
if self._uptime_config:
self.log.debug("following uptime configurations")
return check_runtime(self._uptime_config)
if self._downtime_config:
self.log.debug("following downtime configurations")
return check_runtime(self._downtime_config, is_uptime=False)
return True
def reload_config(self):
try:
reloaded = self._session.reload()
@ -708,12 +800,35 @@ class Worker(ServiceCommandSection):
if self._services_mode:
kwargs = self._verify_command_states(kwargs)
docker = docker or kwargs.get('docker')
self._uptime_config = kwargs.get('uptime', None) or self._uptime_config
self._downtime_config = kwargs.get('downtime', None) or self._downtime_config
if self._uptime_config and self._downtime_config:
self.log.error(
"Both uptime and downtime were specified when only one of them could be used. Both will be ignored."
)
self._uptime_config = None
self._downtime_config = None
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
return 1 if not self._kill_daemon() else 0
queues_info = [
q.to_dict()
for q in self._session.send_api(
queues_api.GetAllRequest(id=queues)
).queues
]
if kwargs.get('status', False):
runtime_properties = self.get_runtime_properties()
if self._downtime_config:
print_uptime_properties(self._downtime_config, queues_info, runtime_properties, is_uptime=False)
else:
print_uptime_properties(self._uptime_config, queues_info, runtime_properties)
return 1
# make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders
self._singleton()
@ -725,12 +840,6 @@ class Worker(ServiceCommandSection):
self.log.debug("starting resource monitor thread")
print("Worker \"{}\" - ".format(self.worker_id), end='')
queues_info = [
self._session.send_api(
queues_api.GetByIdRequest(queue)
).queue.to_dict()
for queue in queues
]
columns = ("id", "name", "tags")
print("Listening to queues:")
print_table(queues_info, columns=columns, titles=columns)
@ -2076,7 +2185,7 @@ class Worker(ServiceCommandSection):
shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache)
except Exception:
host_ssh_cache = None
log.warning('Failed creating temporary copy of ~/.ssh for git credential')
self.log.warning('Failed creating temporary copy of ~/.ssh for git credential')
pass
# check if the .git credentials exist:

View File

@ -0,0 +1,170 @@
import re
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
from trains_agent.backend_config.defs import UptimeConf
DAYS = ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"]
PATTERN = re.compile(r"^(?P<hours>[^\s]+)\s(?P<days>[^\s]+)")
def check_runtime(ranges_list, is_uptime=True):
# type: (List[str], bool) -> bool
for entry in ranges_list:
days_list = get_days_list(entry)
if not check_day(days_list):
continue
hours_list = get_hours_list(entry)
if check_hour(hours_list):
return is_uptime
return not is_uptime
def check_hour(hours):
# type: (List[str]) -> bool
return datetime.now().hour in hours
def check_day(days):
# type: (List[str]) -> bool
return datetime.now().strftime("%a").upper() in days
def get_days_list(entry):
# type: (str) -> List[str]
days_intervals = PATTERN.match(entry)["days"].split(",")
days_total = []
for days in days_intervals:
start, end = days.split("-") if "-" in days else (days, days)
try:
days_total.extend(
[*range(DAYS.index(start.upper()), DAYS.index(end.upper()) + 1)]
)
except ValueError:
print(
"Warning: days interval '{}' is invalid, use intervals of the format <start>-<end>."
" make sure to use the abbreviated format SUN-SAT".format(days)
)
continue
return [DAYS[day] for day in days_total]
def get_hours_list(entry):
# type: (str) -> List[str]
hours_intervals = PATTERN.match(entry)["hours"].split(",")
hours_total = []
for hours in hours_intervals:
start, end = get_start_end_hours(hours)
if not (start and end):
continue
hours_total.extend([*range(start, end)])
return hours_total
def get_start_end_hours(hours):
# type: (str) -> Tuple[int, int]
try:
start, end = (
tuple(map(int, hours.split("-"))) if "-" in hours else (int(hours), 24)
)
except Exception as ex:
print(
"Warning: hours interval '{}' is invalid, use intervals of the format <start>-<end>".format(
hours, ex
)
)
start, end = (None, None)
if end == 0:
end = 24
return start, end
def print_uptime_properties(
ranges_list, queues_info, runtime_properties, is_uptime=True
):
# type: (List[str], List[dict], List[dict], bool) -> None
if ranges_list:
uptime_string = ["Working hours {} configurations".format("uptime" if is_uptime else "downtime")]
uptime_string.extend(get_uptime_string(entry) for entry in ranges_list)
else:
uptime_string = ["No uptime/downtime configurations found"]
is_server_forced, server_string = get_runtime_properties_string(runtime_properties)
is_queue_forced, queues_string = get_queues_tags_string(queues_info)
res = list(
filter(
len,
[
"\n".join(uptime_string),
"\nCurrently forced {}".format(is_queue_forced or is_server_forced)
if is_queue_forced or is_server_forced
else " ",
server_string,
queues_string,
],
)
)
print("\n".join(res))
def get_uptime_string(entry):
# type: (str) -> str
res = []
days_list = get_days_list(entry)
hours_intervals = PATTERN.match(entry)["hours"].split(",")
for hours in hours_intervals:
start, end = get_start_end_hours(hours)
if not (start and end):
continue
res.append(
" - {}:00-{}:59 on {}".format(start, end - 1, ' and '.join(days_list))
if not (start == end)
else ""
)
return "\n".join(res)
def get_runtime_properties_string(runtime_properties):
# type: (List[dict]) -> Tuple[Optional[str], str]
server_string = []
force_flag = next(
(prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key),
None,
)
is_server_forced = None
if force_flag:
is_server_forced = force_flag["value"].upper()
expiry_hour = (
(datetime.now() + timedelta(seconds=force_flag["expiry"])).strftime("%H:%M")
if force_flag["expiry"]
else None
)
expires = " expires at {}".format(expiry_hour) if expiry_hour else ""
server_string.append(
" - Server runtime property '{}: {}'{}".format(force_flag['key'], force_flag['value'], expires)
)
return is_server_forced, "\n".join(server_string)
def get_queues_tags_string(queues_info):
# type: (List[dict]) -> Tuple[Optional[str], str]
queues_string = []
is_queue_forced = None
for queue in queues_info:
if "force_workers:off" in queue.get("tags", []):
tag = "force_workers:off"
is_queue_forced = is_queue_forced or "OFF"
elif "force_workers:on" in queue.get("tags", []):
tag = "force_workers:on"
is_queue_forced = "ON"
else:
tag = None
tagged = " (tagged '{}')'".format(tag) if tag else ""
queues_string.append(
" - Listening to queue '{}'{}".format(queue.get('name', ''), tagged)
)
return is_queue_forced, "\n".join(queues_string)

View File

@ -93,7 +93,24 @@ DAEMON_ARGS = dict({
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
},
'--uptime': {
'help': 'Specify uptime for trains-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '
'Tuesday\'s uptime to 17-20'
'Note: Make sure to have only one of uptime/downtime configuration and not both.',
'nargs': '*',
'default': None,
},
'--downtime': {
'help': 'Specify uptime for trains-agent in "<hours> <days>" format. for example, use "09-13 TUE" to set '
'Tuesday\'s downtime to 09-13'
'Note: Make sure to have only on of uptime/downtime configuration and not both.',
'nargs': '*',
'default': None,
},
'--status': {
'help': 'Print the worker\'s schedule (uptime properties, server\'s runtime properties and listening queues)',
'action': 'store_true',
},
}, **WORKER_ARGS)
COMMANDS = {