From 2c5e2bb849a905daedbc813819ca073307fb0446 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 21 Jun 2020 23:30:07 +0300 Subject: [PATCH] Add Slack Monitor service --- examples/services/monitoring/requirements.txt | 2 + examples/services/monitoring/slack_alerts.py | 215 ++++++++++++++++++ trains/automation/monitor.py | 176 ++++++++++++++ 3 files changed, 393 insertions(+) create mode 100644 examples/services/monitoring/requirements.txt create mode 100644 examples/services/monitoring/slack_alerts.py create mode 100644 trains/automation/monitor.py diff --git a/examples/services/monitoring/requirements.txt b/examples/services/monitoring/requirements.txt new file mode 100644 index 00000000..182dbbab --- /dev/null +++ b/examples/services/monitoring/requirements.txt @@ -0,0 +1,2 @@ +trains +slackclient > 2.0.0 diff --git a/examples/services/monitoring/slack_alerts.py b/examples/services/monitoring/slack_alerts.py new file mode 100644 index 00000000..59f12f4d --- /dev/null +++ b/examples/services/monitoring/slack_alerts.py @@ -0,0 +1,215 @@ +""" +Create a Trains Monitoring Service that posts alerts on Slack Channel groups based on some logic + +Creating a new Slack Bot (Allegro Trains Bot): +1. Login to your Slack account +2. Go to https://api.slack.com/apps/new +3. Give the new App a name (For example "Allegro Train Bot") and select your workspace +4. Press Create App +5. In "Basic Information" under "Display Information" fill in the following fields + - In "Short description" insert "Allegro Train Bot" + - In "Background color" insert #202432 +6. Press Save Changes +7. In "OAuth & Permissions" under "Scopes" click on "Add an OAuth Scope" and + select from the drop down list the following three permissions: + channels:join + channels:read + chat:write +8. Now under "OAuth Tokens & Redirect URLs" press on "Install App to Workspace", + then hit "Allow" on the confirmation dialog +9. Under "OAuth Tokens & Redirect URLs" copy the "Bot User OAuth Access Token" by clicking on "Copy" button +10. To use the copied API Token in the Allegro Trains Slack service, + execute the script with --slack_api "" (notice the use of double quotes around the token) + +We are done! +""" + +import argparse +import os +from time import sleep +from typing import Optional + +from slack import WebClient +from slack.errors import SlackApiError + +from trains import Task +from trains.automation.monitor import Monitor + + +class SlackMonitor(Monitor): + """ + Create a monitoring service that alerts on Task failures / completion in a Slack channel + """ + def __init__(self, slack_api_token, channel, message_prefix=None): + # type: (str, str, Optional[str]) -> () + """ + Create a Slack Monitoring object. + It will alert on any Task/Experiment that failed or completed + + :param slack_api_token: Slack bot API Token. Token should start with "xoxb-" + :param channel: Name of the channel to post alerts to + :param message_prefix: optional message prefix to add before any message posted + For example: message_prefix="Hey ," + """ + super(SlackMonitor, self).__init__() + self.channel = '{}'.format(channel[1:] if channel[0] == '#' else channel) + self.slack_client = WebClient(token=slack_api_token) + self.min_num_iterations = 0 + self.status_alerts = ["failed", ] + self.include_manual_experiments = False + self._channel_id = None + self._message_prefix = '{} '.format(message_prefix) if message_prefix else '' + self.check_credentials() + + def check_credentials(self): + # type: () -> () + """ + Check we have the correct credentials for the slack channel + """ + self.slack_client.api_test() + + # Find channel ID + response = self.slack_client.conversations_list() + channel_id = [channel_info.get('id') for channel_info in response.data['channels'] + if channel_info.get('name') == self.channel] + if not channel_id: + raise ValueError('Error: Could not locate channel name \'{}\''.format(self.channel)) + + # test bot permission (join channel) + self._channel_id = channel_id[0] + self.slack_client.conversations_join(channel=self._channel_id) + + def post_message(self, message, retries=1, wait_period=10.): + # type: (str, int, float) -> () + """ + Post message on our slack channel + + :param message: Message to be sent (markdown style) + :param retries: Number of retries before giving up + :param wait_period: wait between retries in seconds + """ + for i in range(retries): + if i != 0: + sleep(wait_period) + + try: + self.slack_client.chat_postMessage( + channel=self._channel_id, + blocks=[dict(type="section", text={"type": "mrkdwn", "text": message})], + ) + return + except SlackApiError as e: + print("While trying to send message: \"\n{}\n\"\nGot an error: {}".format( + message, e.response['error'])) + + def get_query_parameters(self): + # type: () -> dict + """ + Return the query parameters for the monitoring. + + :return dict: Example dictionary: {'status': ['failed'], 'order_by': ['-last_update']} + """ + filter_tags = ['-archived'] + (['-development'] if not self.include_manual_experiments else []) + return dict(status=self.status_alerts, order_by=['-last_update'], system_tags=filter_tags) + + def process_task(self, task): + """ + # type: (Task) -> () + Called on every Task that we monitor. + This is where we send the Slack alert + + :return: None + """ + # skipping failed tasks with low number of iterations + if self.min_num_iterations and task.get_last_iteration() < self.min_num_iterations: + print('Skipping {} experiment id={}, number of iterations {} < {}'.format( + task.status, task.id, task.get_last_iteration(), self.min_num_iterations)) + return + + print('Experiment id={} {}, raising alert on channel \"{}\"'.format(task.id, task.status, self.channel)) + + console_output = task.get_reported_console_output(number_of_reports=3) + message = \ + '{}Experiment ID <{}|{}> *{}*\nProject: *{}* - Name: *{}*\n' \ + '```\n{}\n```'.format( + self._message_prefix, + task.get_output_log_web_page(), task.id, + task.status, + task.get_project_name(), task.name, + ('\n'.join(console_output))[-2048:]) + self.post_message(message, retries=5) + + +def main(): + print('TRAINS experiment monitor Slack service\n') + + # Slack Monitor arguments + parser = argparse.ArgumentParser(description='TRAINS monitor experiments and post Slack Alerts') + parser.add_argument('--channel', type=str, + help='Set the channel to post the Slack alerts') + parser.add_argument('--slack_api', type=str, default=os.environ.get('SLACK_API_TOKEN', None), + help='Slack API key for sending messages') + parser.add_argument('--message_prefix', type=str, + help='Add message prefix (For example, to alert all channel members use: "Hey ,")') + parser.add_argument('--project', type=str, default='', + help='The name (or partial name) of the project to monitor, use empty for all projects') + parser.add_argument('--min_num_iterations', type=int, default=0, + help='Minimum number of iterations of failed/completed experiment to alert. ' + 'This will help eliminate unnecessary debug sessions that crashed right after starting ' + '(default:0 alert on all)') + parser.add_argument('--include_manual_experiments', action="store_true", default=False, + help='Include experiments running manually (i.e. not by trains-agent)') + parser.add_argument('--include_completed_experiments', action="store_true", default=False, + help='Include completed experiments (i.e. not just failed experiments)') + parser.add_argument('--refresh_rate', type=float, default=10., + help='Set refresh rate of the monitoring service, default every 10.0 sec') + parser.add_argument('--service_queue', type=str, default='services', + help='Queue name to use when running as a service (default: \'services\'') + parser.add_argument('--local', action="store_true", default=False, + help='Run service locally instead of as a service ' + '(Default: Automatically launch itself on the services queue)') + + args = parser.parse_args() + + if not args.slack_api: + print('Slack API key was not provided, please run with --slack_api ') + exit(1) + + if not args.channel: + print('Slack channel was not provided, please run with --channel ') + exit(1) + + # create the slack monitoring object + slack_monitor = SlackMonitor( + slack_api_token=args.slack_api, channel=args.channel, message_prefix=args.message_prefix) + + # configure the monitoring filters + slack_monitor.min_num_iterations = args.min_num_iterations + slack_monitor.include_manual_experiments = args.include_manual_experiments + if args.project: + slack_monitor.set_projects(project_names_re=[args.project]) + if args.include_completed_experiments: + slack_monitor.status_alerts += ["completed"] + + # start the monitoring Task + task = Task.init(project_name='Monitoring', task_name='Slack Alerts', task_type=Task.TaskTypes.monitor) + if not args.local: + task.execute_remotely(queue_name=args.service_queue) + # we will not get here if we are running locally + + print('\nStarting monitoring service\nProject: "{}"\nRefresh rate: {}s\n'.format( + args.project or 'all', args.refresh_rate)) + + # Let everyone know we are up and running + start_message = \ + '{}Allegro Trains Slack monitoring service started\nMonitoring project \'{}\''.format( + (args.message_prefix + ' ') if args.message_prefix else '', + args.project or 'all') + slack_monitor.post_message(start_message) + + # Start the monitor service, this function will never end + slack_monitor.monitor(pool_period=args.refresh_rate) + + +if __name__ == "__main__": + main() diff --git a/trains/automation/monitor.py b/trains/automation/monitor.py new file mode 100644 index 00000000..7c751500 --- /dev/null +++ b/trains/automation/monitor.py @@ -0,0 +1,176 @@ +from datetime import datetime +from time import time, sleep +from typing import Optional, Sequence + +from ..backend_api.session.client import APIClient +from ..backend_interface.util import exact_match_regex +from ..task import Task + + +class Monitor(object): + """ + Base class for monitoring Tasks on the system. + Inherit to implement specific logic + """ + + def __init__(self): + # type: () -> () + self._timestamp = None + self._previous_timestamp = None + self._task_name_filter = None + self._project_names_re = None + self._project_ids = None + self._projects = None + self._projects_refresh_timestamp = None + self._trains_apiclient = None + + def set_projects(self, project_names=None, project_names_re=None, project_ids=None): + # type: (Optional[Sequence[str]], Optional[Sequence[str]], Optional[Sequence[str]]) -> () + """ + Set the specific projects to monitor, default is all projects. + + :param project_names: List of project names to monitor (exact name matched) + :param project_names_re: List of project names to monitor (with regular expression matching) + :param project_ids: List of project ids to monitor + :return: + """ + self._project_ids = project_ids + self._project_names_re = project_names_re or [] + if project_names: + self._project_names_re += [exact_match_regex(name) for name in project_names] + + def set_task_name_filter(self, task_name_filter=None): + # type: (Optional[str]) -> () + """ + Set the task filter selection + + :param task_name_filter: List of project names to monitor (exact name matched) + :return: + """ + self._task_name_filter = task_name_filter or None + + def monitor(self, pool_period=15.0): + # type: (float) -> () + """ + Main loop function, this call will never leave, it implements the main monitoring loop. + Every loop step, `monitor_step` is called (implementing the filter/query interface) + In order to combine multiple Monitor objects, call `monitor_step` manually. + + :param float pool_period: pool period in seconds + :return: Function will never return + """ + self._timestamp = time() + last_report = self._timestamp + + # main loop + while True: + self._timestamp = time() + try: + self.monitor_step() + except Exception as ex: + print('Exception: {}'.format(ex)) + + # print I'm alive message every 15 minutes + if time() - last_report > 60. * 15: + print('Service is running') + last_report = time() + + # sleep until the next poll + sleep(pool_period) + + def monitor_step(self): + # type: () -> () + """ + Implement the main query / interface of he monitor class. + In order to combine multiple Monitor objects, call `monitor_step` manually. + If Tasks are detected in this call, + + :return: None + """ + previous_timestamp = self._previous_timestamp or time() + timestamp = time() + try: + # retrieve experiments orders by last update time + task_filter = self.get_query_parameters() + task_filter.update( + { + 'page_size': 100, + 'page': 0, + 'status_changed': ['>{}'.format(datetime.utcfromtimestamp(previous_timestamp)), ], + 'project': self._get_projects_ids(), + } + ) + queried_tasks = Task.get_tasks(task_name=self._task_name_filter, task_filter=task_filter) + except Exception as ex: + # do not update the previous timestamp + print('Exception querying Tasks: {}'.format(ex)) + return + + # process queried tasks + for task in queried_tasks: + try: + self.process_task(task) + except Exception as ex: + print('Exception processing Task ID={}:\n{}'.format(task.id, ex)) + + self._previous_timestamp = timestamp + + def get_query_parameters(self): + # type: () -> dict + """ + Return the query parameters for the monitoring. + This should be overloaded with specific implementation query + + :return dict: Example dictionary: {'status': ['failed'], 'order_by': ['-last_update']} + """ + return dict(status=['failed'], order_by=['-last_update']) + + def process_task(self, task): + """ + # type: (Task) -> () + Abstract function + + Called on every Task that we monitor. For example monitoring failed Task, + will call this Task the first time the Task was detected as failed. + + :return: None + """ + pass + + def _get_projects_ids(self): + # type: () -> Optional[Sequence[str]] + """ + Convert project names / regular expressions into project IDs + + :return: list of project ids (strings) + """ + if not self._project_ids and not self._project_names_re: + return None + + # refresh project ids every 5 minutes + if self._projects_refresh_timestamp and self._projects is not None and \ + time() - self._projects_refresh_timestamp < 60. * 5: + return self._projects + + # collect specific selected IDs + project_ids = self._project_ids or [] + + # select project id based on name matching + for name_re in self._project_names_re: + results = self._get_api_client().projects.get_all(name=name_re) + project_ids += [r.id for r in results] + + self._projects_refresh_timestamp = time() + self._projects = project_ids + return self._projects + + def _get_api_client(self): + # type: () -> APIClient + """ + Return an APIClient object to directly query the trains-server + + :return: APIClient object + """ + if not self._trains_apiclient: + self._trains_apiclient = APIClient() + return self._trains_apiclient