diff --git a/trains_agent/backend_api/config/default/agent.conf b/trains_agent/backend_api/config/default/agent.conf index 17d8626..2888844 100644 --- a/trains_agent/backend_api/config/default/agent.conf +++ b/trains_agent/backend_api/config/default/agent.conf @@ -102,6 +102,22 @@ # optional shell script to run in docker when started before the experiment is started # extra_docker_shell_script: ["apt-get install -y bindfs", ] + # optional uptime configuration, make sure to use only one of 'uptime/downtime' and not both. + # If uptime is specified, agent will actively poll (and execute) tasks in the time-spans defined here. + # Outside of the specified time-spans, the agent will be idle. + # Defined using a list of items of the format: " ". + # hours - use values 0-23, single values would count as start hour and end at midnight. + # days - use days in abbreviated format (SUN-SAT) + # use '-' for ranges and ',' to separate singular values. + # for example, to enable the workers every Sunday and Tuesday between 17:00-20:00 set uptime to: + # uptime: ["17-20 SUN,TUE"] + + # optional downtime configuration, can be used only when uptime is not used. + # If downtime is specified, agent will be idle in the time-spans defined here. + # Outside of the specified time-spans, the agent will actively poll (and execute) tasks. + # Use the same format as described above for uptime + # downtime: [] + # set to true in order to force "docker pull" before running an experiment using a docker image. # This makes sure the docker image is updated. docker_force_pull: false diff --git a/trains_agent/backend_config/defs.py b/trains_agent/backend_config/defs.py index 02ef2fe..47ec631 100644 --- a/trains_agent/backend_config/defs.py +++ b/trains_agent/backend_config/defs.py @@ -46,6 +46,15 @@ class Environment(object): local = 'local' +class UptimeConf(object): + min_api_version = "2.10" + queue_tag_on = "force_workers:on" + queue_tag_off = "force_workers:off" + worker_key = "force" + worker_value_off = ["off"] + worker_value_on = ["on"] + + CONFIG_FILE_EXTENSION = '.conf' diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index 0155de4..78956ed 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -30,6 +30,8 @@ from pathlib2 import Path from pyhocon import ConfigTree, ConfigFactory from six.moves.urllib.parse import quote +from trains_agent.backend_api.services import queues +from trains_agent.backend_config.defs import UptimeConf from trains_agent.helper.check_update import start_check_update_daemon from trains_agent.commands.base import resolve_names, ServiceCommandSection from trains_agent.definitions import ( @@ -94,13 +96,12 @@ from trains_agent.helper.process import ( from trains_agent.helper.package.priority_req import PriorityPackageRequirement from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS from trains_agent.helper.resource_monitor import ResourceMonitor +from trains_agent.helper.runtime_verification import check_runtime, print_uptime_properties from trains_agent.session import Session from trains_agent.helper.singleton import Singleton from .events import Events -log = logging.getLogger(__name__) - DOCKER_ROOT_CONF_FILE = "/root/trains.conf" DOCKER_DEFAULT_CONF_FILE = "/root/default_trains.conf" @@ -152,6 +153,7 @@ class LiteralScriptManager(object): Create notebook file in appropriate location :return: directory and script path """ + log = logging.getLogger(__name__) if repo_info and repo_info.root: location = Path(repo_info.root, execution.working_dir) else: @@ -394,6 +396,13 @@ class Worker(ServiceCommandSection): self._services_mode = None self._force_current_version = None self._redirected_stdout_file_no = None + self._uptime_config = self._session.config.get("agent.uptime", None) + self._downtime_config = self._session.config.get("agent.downtime", None) + + # True - supported + # None - not initialized + # str - not supported, version string indicates last server version + self._runtime_props_support = None @classmethod def _verify_command_states(cls, kwargs): @@ -599,10 +608,19 @@ class Worker(ServiceCommandSection): print('Starting infinite task polling loop...') _last_machine_update_ts = 0 - while True: + while True: + queue_tags = None + runtime_props = None # iterate over queues (priority style, queues[0] is highest) for queue in queues: + + if queue_tags is None or runtime_props is None: + queue_tags, runtime_props = self.get_worker_properties(queues) + + if not self.should_be_currently_active(queue_tags[queue], runtime_props): + continue + # get next task in queue try: response = self._session.send_api( @@ -642,6 +660,9 @@ class Worker(ServiceCommandSection): self.run_one_task(queue, task_id, worker_params) self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) + queue_tags = None + runtime_props = None + # if we are using priority start pulling from the first always, # if we are doing round robin, pull from the next one if priority_order: @@ -655,6 +676,77 @@ class Worker(ServiceCommandSection): if self._session.config["agent.reload_config"]: self.reload_config() + def get_worker_properties(self, queue_ids): + queue_tags = { + q.id: {'name': q.name, 'tags': q.tags} + for q in self._session.send_api( + queues_api.GetAllRequest(id=queue_ids, only_fields=["id", "tags"]) + ).queues + } + runtime_props = self.get_runtime_properties() + return queue_tags, runtime_props + + def get_runtime_properties(self): + if self._runtime_props_support is not True: + # either not supported or never tested + if self._runtime_props_support == self._session.api_version: + # tested against latest api_version, not supported + return [] + if not self._session.check_min_api_version(UptimeConf.min_api_version): + # not supported due to insufficient api_version + self._runtime_props_support = self._session.api_version + return [] + try: + res = self.get("get_runtime_properties", worker=self.worker_id)["runtime_properties"] + # definitely supported + self._runtime_props_support = True + return res + except APIError: + self._runtime_props_support = self._session.api_version + return [] + + def should_be_currently_active(self, current_queue, runtime_properties): + """ + Checks if a worker is active according to queue tags, worker's runtime properties and uptime schedule. + """ + if UptimeConf.queue_tag_off in current_queue['tags']: + self.log.debug("Queue {} is tagged '{}', worker will not pull tasks".format( + current_queue['name'], UptimeConf.queue_tag_off) + ) + return False + if UptimeConf.queue_tag_on in current_queue['tags']: + self.log.debug("Queue {} is tagged '{}', worker will pull tasks".format( + current_queue['name'], UptimeConf.queue_tag_on) + ) + return True + force_flag = next( + (prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key), None + ) + if force_flag: + if force_flag["value"].lower() in UptimeConf.worker_value_off: + self.log.debug("worker has the following runtime property: '{}'. worker will not pull tasks".format( + force_flag) + ) + return False + elif force_flag["value"].lower() in UptimeConf.worker_value_on: + self.log.debug("worker has the following runtime property: '{}'. worker will pull tasks".format( + force_flag) + ) + return True + else: + print( + "Warning: invalid runtime_property '{}: {}' supported values are: '{}/{}', ignoring".format( + force_flag["key"], force_flag["value"], UptimeConf.worker_value_on, UptimeConf.worker_value_off + ) + ) + if self._uptime_config: + self.log.debug("following uptime configurations") + return check_runtime(self._uptime_config) + if self._downtime_config: + self.log.debug("following downtime configurations") + return check_runtime(self._downtime_config, is_uptime=False) + return True + def reload_config(self): try: reloaded = self._session.reload() @@ -708,12 +800,35 @@ class Worker(ServiceCommandSection): if self._services_mode: kwargs = self._verify_command_states(kwargs) docker = docker or kwargs.get('docker') + self._uptime_config = kwargs.get('uptime', None) or self._uptime_config + self._downtime_config = kwargs.get('downtime', None) or self._downtime_config + if self._uptime_config and self._downtime_config: + self.log.error( + "Both uptime and downtime were specified when only one of them could be used. Both will be ignored." + ) + self._uptime_config = None + self._downtime_config = None # We are not running a daemon we are killing one. # find the pid send termination signal and leave if kwargs.get('stop', False): return 1 if not self._kill_daemon() else 0 + queues_info = [ + q.to_dict() + for q in self._session.send_api( + queues_api.GetAllRequest(id=queues) + ).queues + ] + + if kwargs.get('status', False): + runtime_properties = self.get_runtime_properties() + if self._downtime_config: + print_uptime_properties(self._downtime_config, queues_info, runtime_properties, is_uptime=False) + else: + print_uptime_properties(self._uptime_config, queues_info, runtime_properties) + return 1 + # make sure we only have a single instance, # also make sure we set worker_id properly and cache folders self._singleton() @@ -725,12 +840,6 @@ class Worker(ServiceCommandSection): self.log.debug("starting resource monitor thread") print("Worker \"{}\" - ".format(self.worker_id), end='') - queues_info = [ - self._session.send_api( - queues_api.GetByIdRequest(queue) - ).queue.to_dict() - for queue in queues - ] columns = ("id", "name", "tags") print("Listening to queues:") print_table(queues_info, columns=columns, titles=columns) @@ -2076,7 +2185,7 @@ class Worker(ServiceCommandSection): shutil.copytree(Path('~/.ssh').expanduser().as_posix(), host_ssh_cache) except Exception: host_ssh_cache = None - log.warning('Failed creating temporary copy of ~/.ssh for git credential') + self.log.warning('Failed creating temporary copy of ~/.ssh for git credential') pass # check if the .git credentials exist: diff --git a/trains_agent/helper/runtime_verification.py b/trains_agent/helper/runtime_verification.py new file mode 100644 index 0000000..59a9d8a --- /dev/null +++ b/trains_agent/helper/runtime_verification.py @@ -0,0 +1,170 @@ +import re +from datetime import datetime, timedelta + +from typing import List, Tuple, Optional + +from trains_agent.backend_config.defs import UptimeConf + +DAYS = ["SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"] +PATTERN = re.compile(r"^(?P[^\s]+)\s(?P[^\s]+)") + + +def check_runtime(ranges_list, is_uptime=True): + # type: (List[str], bool) -> bool + for entry in ranges_list: + + days_list = get_days_list(entry) + if not check_day(days_list): + continue + + hours_list = get_hours_list(entry) + if check_hour(hours_list): + return is_uptime + return not is_uptime + + +def check_hour(hours): + # type: (List[str]) -> bool + return datetime.now().hour in hours + + +def check_day(days): + # type: (List[str]) -> bool + return datetime.now().strftime("%a").upper() in days + + +def get_days_list(entry): + # type: (str) -> List[str] + days_intervals = PATTERN.match(entry)["days"].split(",") + days_total = [] + for days in days_intervals: + start, end = days.split("-") if "-" in days else (days, days) + try: + days_total.extend( + [*range(DAYS.index(start.upper()), DAYS.index(end.upper()) + 1)] + ) + except ValueError: + print( + "Warning: days interval '{}' is invalid, use intervals of the format -." + " make sure to use the abbreviated format SUN-SAT".format(days) + ) + continue + return [DAYS[day] for day in days_total] + + +def get_hours_list(entry): + # type: (str) -> List[str] + hours_intervals = PATTERN.match(entry)["hours"].split(",") + hours_total = [] + for hours in hours_intervals: + start, end = get_start_end_hours(hours) + if not (start and end): + continue + hours_total.extend([*range(start, end)]) + return hours_total + + +def get_start_end_hours(hours): + # type: (str) -> Tuple[int, int] + try: + start, end = ( + tuple(map(int, hours.split("-"))) if "-" in hours else (int(hours), 24) + ) + except Exception as ex: + print( + "Warning: hours interval '{}' is invalid, use intervals of the format -".format( + hours, ex + ) + ) + start, end = (None, None) + if end == 0: + end = 24 + return start, end + + +def print_uptime_properties( + ranges_list, queues_info, runtime_properties, is_uptime=True +): + # type: (List[str], List[dict], List[dict], bool) -> None + if ranges_list: + uptime_string = ["Working hours {} configurations".format("uptime" if is_uptime else "downtime")] + uptime_string.extend(get_uptime_string(entry) for entry in ranges_list) + else: + uptime_string = ["No uptime/downtime configurations found"] + + is_server_forced, server_string = get_runtime_properties_string(runtime_properties) + is_queue_forced, queues_string = get_queues_tags_string(queues_info) + + res = list( + filter( + len, + [ + "\n".join(uptime_string), + "\nCurrently forced {}".format(is_queue_forced or is_server_forced) + if is_queue_forced or is_server_forced + else " ", + server_string, + queues_string, + ], + ) + ) + print("\n".join(res)) + + +def get_uptime_string(entry): + # type: (str) -> str + res = [] + days_list = get_days_list(entry) + hours_intervals = PATTERN.match(entry)["hours"].split(",") + for hours in hours_intervals: + start, end = get_start_end_hours(hours) + if not (start and end): + continue + res.append( + " - {}:00-{}:59 on {}".format(start, end - 1, ' and '.join(days_list)) + if not (start == end) + else "" + ) + return "\n".join(res) + + +def get_runtime_properties_string(runtime_properties): + # type: (List[dict]) -> Tuple[Optional[str], str] + server_string = [] + force_flag = next( + (prop for prop in runtime_properties if prop["key"] == UptimeConf.worker_key), + None, + ) + is_server_forced = None + if force_flag: + is_server_forced = force_flag["value"].upper() + expiry_hour = ( + (datetime.now() + timedelta(seconds=force_flag["expiry"])).strftime("%H:%M") + if force_flag["expiry"] + else None + ) + expires = " expires at {}".format(expiry_hour) if expiry_hour else "" + server_string.append( + " - Server runtime property '{}: {}'{}".format(force_flag['key'], force_flag['value'], expires) + ) + return is_server_forced, "\n".join(server_string) + + +def get_queues_tags_string(queues_info): + # type: (List[dict]) -> Tuple[Optional[str], str] + queues_string = [] + is_queue_forced = None + for queue in queues_info: + if "force_workers:off" in queue.get("tags", []): + tag = "force_workers:off" + is_queue_forced = is_queue_forced or "OFF" + elif "force_workers:on" in queue.get("tags", []): + tag = "force_workers:on" + is_queue_forced = "ON" + else: + tag = None + tagged = " (tagged '{}')'".format(tag) if tag else "" + queues_string.append( + " - Listening to queue '{}'{}".format(queue.get('name', ''), tagged) + ) + return is_queue_forced, "\n".join(queues_string) diff --git a/trains_agent/interface/worker.py b/trains_agent/interface/worker.py index 7a3cd53..e3b3430 100644 --- a/trains_agent/interface/worker.py +++ b/trains_agent/interface/worker.py @@ -93,7 +93,24 @@ DAEMON_ARGS = dict({ 'help': 'Stop the running agent (based on the same set of arguments)', 'action': 'store_true', }, - + '--uptime': { + 'help': 'Specify uptime for trains-agent in " " format. for example, use "17-20 TUE" to set ' + 'Tuesday\'s uptime to 17-20' + 'Note: Make sure to have only one of uptime/downtime configuration and not both.', + 'nargs': '*', + 'default': None, + }, + '--downtime': { + 'help': 'Specify uptime for trains-agent in " " format. for example, use "09-13 TUE" to set ' + 'Tuesday\'s downtime to 09-13' + 'Note: Make sure to have only on of uptime/downtime configuration and not both.', + 'nargs': '*', + 'default': None, + }, + '--status': { + 'help': 'Print the worker\'s schedule (uptime properties, server\'s runtime properties and listening queues)', + 'action': 'store_true', + }, }, **WORKER_ARGS) COMMANDS = {