From ccc8e83c58336928424ed14b176306b149258512 Mon Sep 17 00:00:00 2001 From: Idan Tene <12184618+idantene@users.noreply.github.com> Date: Thu, 6 Jan 2022 22:57:15 +0200 Subject: [PATCH] Fix and upgrade the SlackMonitor (#533) * Update to latest Slack import statements * Add filter callables to SlackMonitor * Add include_users and exclude_users filters * Add include_archived --- examples/services/monitoring/slack_alerts.py | 212 ++++++++++++++----- 1 file changed, 156 insertions(+), 56 deletions(-) diff --git a/examples/services/monitoring/slack_alerts.py b/examples/services/monitoring/slack_alerts.py index fd720bd3..d65b65c8 100644 --- a/examples/services/monitoring/slack_alerts.py +++ b/examples/services/monitoring/slack_alerts.py @@ -26,22 +26,54 @@ We are done! import argparse import os +from pathlib import Path from time import sleep -from typing import Optional +from typing import Optional, Callable, List, Union -from slack import WebClient -from slack.errors import SlackApiError +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError from clearml import Task from clearml.automation.monitor import Monitor +class UserFilter: + def __init__(self, include=None, exclude=None): + # type: (Optional[Union[str, List[str]]], Optional[Union[str, List[str]]]) -> () + # Either `include` or `exclude` should be specified, but not both + if include is not None and exclude is not None: + raise ValueError("Specify either 'include' or 'exclude', not both!") + include = include or list() + if isinstance(include, str): + include = [include] + exclude = exclude or list() + if isinstance(exclude, str): + exclude = [exclude] + res = Task._get_default_session().send_request("users", "get_all") + if not res.ok: + raise RuntimeError("Cannot get list of all users!") + all_users = {d["name"]: d["id"] for d in res.json()["data"]["users"]} + for user in include + exclude: + if user not in all_users: + print(f"Cannot translate user '{user}' to any known user ID - " + f"will use it verbatim") + self.include = [all_users.get(user, user) for user in include] # Map usernames to user IDs + self.exclude = [all_users.get(user, user) for user in exclude] + + def __call__(self, task): + # type: (Task) -> bool + if self.include: + return task.data.user not in self.include + return task.data.user in self.exclude + + class SlackMonitor(Monitor): """ Create a monitoring service that alerts on Task failures / completion in a Slack channel """ - def __init__(self, slack_api_token, channel, message_prefix=None): - # type: (str, str, Optional[str]) -> () + + def __init__(self, slack_api_token, channel, message_prefix=None, filters=None): + # type: (str, str, Optional[str], Optional[List[Callable[[Task], bool]]]) -> () """ Create a Slack Monitoring object. It will alert on any Task/Experiment that failed or completed @@ -50,15 +82,22 @@ class SlackMonitor(Monitor): :param channel: Name of the channel to post alerts to :param message_prefix: optional message prefix to add before any message posted For example: message_prefix="Hey ," + :param filters: An optional collection of callables that will be passed a Task + object and return True/False if it should be filtered away """ super(SlackMonitor, self).__init__() - self.channel = '{}'.format(channel[1:] if channel[0] == '#' else channel) + self.channel = "{}".format(channel[1:] if channel[0] == "#" else channel) self.slack_client = WebClient(token=slack_api_token) self.min_num_iterations = 0 - self.status_alerts = ["failed", ] + self.filters = filters or list() + self.status_alerts = [ + "failed", + ] self.include_manual_experiments = False + self.include_archived = False + self.verbose = False self._channel_id = None - self._message_prefix = '{} '.format(message_prefix) if message_prefix else '' + self._message_prefix = "{} ".format(message_prefix) if message_prefix else "" self.check_credentials() def check_credentials(self): @@ -73,20 +112,19 @@ class SlackMonitor(Monitor): cursor = None while True: response = self.slack_client.conversations_list(cursor=cursor) - channels.extend(response.data['channels']) + channels.extend(response.data["channels"]) cursor = response.data["response_metadata"].get("next_cursor") if not cursor: break - channel_id = [channel_info.get('id') for channel_info in channels - if channel_info.get('name') == self.channel] + channel_id = [channel_info.get("id") for channel_info in channels if channel_info.get("name") == self.channel] if not channel_id: - raise ValueError('Error: Could not locate channel name \'{}\''.format(self.channel)) + raise ValueError("Error: Could not locate channel name '{}'".format(self.channel)) # test bot permission (join channel) self._channel_id = channel_id[0] self.slack_client.conversations_join(channel=self._channel_id) - def post_message(self, message, retries=1, wait_period=10.): + def post_message(self, message, retries=1, wait_period=10.0): # type: (str, int, float) -> () """ Post message on our slack channel @@ -106,8 +144,7 @@ class SlackMonitor(Monitor): ) return except SlackApiError as e: - print("While trying to send message: \"\n{}\n\"\nGot an error: {}".format( - message, e.response['error'])) + print('While trying to send message: "\n{}\n"\nGot an error: {}'.format(message, e.response["error"])) def get_query_parameters(self): # type: () -> dict @@ -116,8 +153,10 @@ class SlackMonitor(Monitor): :return dict: Example dictionary: {'status': ['failed'], 'order_by': ['-last_update']} """ - filter_tags = ['-archived'] + (['-development'] if not self.include_manual_experiments else []) - return dict(status=self.status_alerts, order_by=['-last_update'], system_tags=filter_tags) + filter_tags = list() if self.include_archived else ["-archived"] + if not self.include_manual_experiments: + filter_tags.append("-development") + return dict(status=self.status_alerts, order_by=["-last_update"], system_tags=filter_tags) def process_task(self, task): """ @@ -129,70 +168,131 @@ class SlackMonitor(Monitor): """ # skipping failed tasks with low number of iterations if self.min_num_iterations and task.get_last_iteration() < self.min_num_iterations: - print('Skipping {} experiment id={}, number of iterations {} < {}'.format( - task.status, task.id, task.get_last_iteration(), self.min_num_iterations)) + print( + "Skipping {} experiment id={}, number of iterations {} < {}".format( + task.status, task.id, task.get_last_iteration(), self.min_num_iterations + ) + ) + return + if any(f(task) for f in self.filters): + if self.verbose: + print("Experiment id={} {} did not pass all filters".format(task.id, task.status)) return - print('Experiment id={} {}, raising alert on channel \"{}\"'.format(task.id, task.status, self.channel)) + print('Experiment id={} {}, raising alert on channel "{}"'.format(task.id, task.status, self.channel)) console_output = task.get_reported_console_output(number_of_reports=3) - message = \ - '{}Experiment ID <{}|{}> *{}*\nProject: *{}* - Name: *{}*\n' \ - '```\n{}\n```'.format( - self._message_prefix, - task.get_output_log_web_page(), task.id, - task.status, - task.get_project_name(), task.name, - ('\n'.join(console_output))[-2048:]) + message = "{}Experiment ID <{}|{}> *{}*\nProject: *{}* - Name: *{}*\n" "```\n{}\n```".format( + self._message_prefix, + task.get_output_log_web_page(), + task.id, + task.status, + task.get_project_name(), + task.name, + ("\n".join(console_output))[-2048:], + ) self.post_message(message, retries=5) def main(): - print('ClearML experiment monitor Slack service\n') + print("ClearML experiment monitor Slack service\n") # Slack Monitor arguments - parser = argparse.ArgumentParser(description='ClearML monitor experiments and post Slack Alerts') - parser.add_argument('--channel', type=str, - help='Set the channel to post the Slack alerts') - parser.add_argument('--slack_api', type=str, default=os.environ.get('SLACK_API_TOKEN', None), - help='Slack API key for sending messages') - parser.add_argument('--message_prefix', type=str, - help='Add message prefix (For example, to alert all channel members use: "Hey ,")') - parser.add_argument('--project', type=str, default='', - help='The name (or partial name) of the project to monitor, use empty for all projects') - parser.add_argument('--min_num_iterations', type=int, default=0, - help='Minimum number of iterations of failed/completed experiment to alert. ' - 'This will help eliminate unnecessary debug sessions that crashed right after starting ' - '(default:0 alert on all)') - parser.add_argument('--include_manual_experiments', action="store_true", default=False, - help='Include experiments running manually (i.e. not by clearml-agent)') - parser.add_argument('--include_completed_experiments', action="store_true", default=False, - help='Include completed experiments (i.e. not just failed experiments)') - parser.add_argument('--refresh_rate', type=float, default=10., - help='Set refresh rate of the monitoring service, default every 10.0 sec') - parser.add_argument('--service_queue', type=str, default='services', - help='Queue name to use when running as a service (default: \'services\'') - parser.add_argument('--local', action="store_true", default=False, - help='Run service locally instead of as a service ' - '(Default: Automatically launch itself on the services queue)') + parser = argparse.ArgumentParser(description="ClearML monitor experiments and post Slack Alerts") + parser.add_argument("--channel", type=str, help="Set the channel to post the Slack alerts") + parser.add_argument( + "--slack_api", + type=str, + default=os.environ.get("SLACK_API_TOKEN", None), + help="Slack API key for sending messages", + ) + parser.add_argument( + "--message_prefix", + type=str, + help='Add message prefix (For example, to alert all channel members use: "Hey ,")', + ) + parser.add_argument( + "--project", + type=str, + default="", + help="The name (or partial name) of the project to monitor, use empty for all projects", + ) + parser.add_argument( + "--min_num_iterations", + type=int, + default=0, + help="Minimum number of iterations of failed/completed experiment to alert. " + "This will help eliminate unnecessary debug sessions that crashed right after starting " + "(default:0 alert on all)", + ) + parser.add_argument( + "--include_manual_experiments", + action="store_true", + default=False, + help="Include experiments running manually (i.e. not by clearml-agent)", + ) + parser.add_argument( + "--include_completed_experiments", + action="store_true", + default=False, + help="Include completed experiments (i.e. not just failed experiments)", + ) + parser.add_argument("--include_archived", action="store_true", default=False, help="Include archived experiments") + parser.add_argument( + "--refresh_rate", + type=float, + default=10.0, + help="Set refresh rate of the monitoring service, default every 10.0 sec", + ) + parser.add_argument( + "--service_queue", + type=str, + default="services", + help="Queue name to use when running as a service (default: 'services'", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + default=False, + help="Increase standard output verbosity for SlackMonitor", + ) + users_group = parser.add_mutually_exclusive_group() + users_group.add_argument("--include_users", type=str, nargs="+", help="Only report tasks from these users") + users_group.add_argument("--exclude_users", type=str, nargs="+", help="Only report tasks not from these users") + parser.add_argument( + "--local", + action="store_true", + default=False, + help="Run service locally instead of as a service " + "(Default: Automatically launch itself on the services queue)", + ) args = parser.parse_args() if not args.slack_api: - print('Slack API key was not provided, please run with --slack_api ') + print("Slack API key was not provided, please run with --slack_api ") exit(1) if not args.channel: - print('Slack channel was not provided, please run with --channel ') + print("Slack channel was not provided, please run with --channel ") exit(1) + filters = list() + # create the user filter if needed + if args.include_users or args.exclude_users: + filters.append(UserFilter(include=args.include_users, exclude=args.exclude_users)) + # create the slack monitoring object slack_monitor = SlackMonitor( - slack_api_token=args.slack_api, channel=args.channel, message_prefix=args.message_prefix) + slack_api_token=args.slack_api, channel=args.channel, message_prefix=args.message_prefix, filters=filters + ) # configure the monitoring filters slack_monitor.min_num_iterations = args.min_num_iterations slack_monitor.include_manual_experiments = args.include_manual_experiments + slack_monitor.include_archived = args.include_archived + slack_monitor.verbose = args.verbose if args.project: slack_monitor.set_projects(project_names_re=[args.project]) if args.include_completed_experiments: