Add support for obtaining cloud console logs

This commit is contained in:
allegroai 2022-03-24 19:37:01 +02:00
parent 8142796e15
commit b1120195df
3 changed files with 55 additions and 5 deletions

View File

@ -2,7 +2,7 @@ import re
from collections import defaultdict, deque from collections import defaultdict, deque
from enum import Enum from enum import Enum
from itertools import chain from itertools import chain
from threading import Event from threading import Event, Thread
from time import sleep, time from time import sleep, time
import attr import attr
@ -29,6 +29,8 @@ _workers_pattern = re.compile(
""", re.VERBOSE """, re.VERBOSE
) )
MINUTE = 60.0
class WorkerId: class WorkerId:
def __init__(self, worker_id): def __init__(self, worker_id):
@ -181,7 +183,7 @@ class AutoScaler(object):
def stale_workers(self, spun_workers): def stale_workers(self, spun_workers):
now = time() now = time()
for worker_id, (resource, spin_time) in list(spun_workers.items()): for worker_id, (resource, spin_time) in list(spun_workers.items()):
if now - spin_time > self.max_spin_up_time_min*60: if now - spin_time > self.max_spin_up_time_min * MINUTE:
self.logger.info('Stuck spun instance %s of type %s', worker_id, resource) self.logger.info('Stuck spun instance %s of type %s', worker_id, resource)
yield worker_id yield worker_id
@ -305,6 +307,7 @@ class AutoScaler(object):
resource_conf = self.resource_configurations[resource] resource_conf = self.resource_configurations[resource]
worker_prefix = self.gen_worker_prefix(resource, resource_conf) worker_prefix = self.gen_worker_prefix(resource, resource_conf)
instance_id = self.driver.spin_up_worker(resource_conf, worker_prefix, queue, task_id=task_id) instance_id = self.driver.spin_up_worker(resource_conf, worker_prefix, queue, task_id=task_id)
self.monitor_startup(instance_id)
worker_id = '{}:{}'.format(worker_prefix, instance_id) worker_id = '{}:{}'.format(worker_prefix, instance_id)
self.logger.info('New instance ID: %s', instance_id) self.logger.info('New instance ID: %s', instance_id)
spun_workers[worker_id] = (resource, time()) spun_workers[worker_id] = (resource, time())
@ -319,7 +322,7 @@ class AutoScaler(object):
if resources in required_idle_resources: if resources in required_idle_resources:
continue continue
# Remove from both cloud and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN # Remove from both cloud and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN
if time() - timestamp > self.max_idle_time_min * 60.0: if time() - timestamp > self.max_idle_time_min * MINUTE:
wid = WorkerId(worker_id) wid = WorkerId(worker_id)
cloud_id = wid.cloud_id cloud_id = wid.cloud_id
self.driver.spin_down_worker(cloud_id) self.driver.spin_down_worker(cloud_id)
@ -331,8 +334,8 @@ class AutoScaler(object):
self.report_app_stats(task_logger, queue_id_to_name, up_machines, idle_workers) self.report_app_stats(task_logger, queue_id_to_name, up_machines, idle_workers)
# Nothing else to do # Nothing else to do
self.logger.info("Idle for %.2f seconds", self.polling_interval_time_min * 60.0) self.logger.info("Idle for %.2f seconds", self.polling_interval_time_min * MINUTE)
sleep(self.polling_interval_time_min * 60.0) sleep(self.polling_interval_time_min * MINUTE)
def update_idle_workers(self, all_workers, idle_workers): def update_idle_workers(self, all_workers, idle_workers):
if not all_workers: if not all_workers:
@ -371,6 +374,40 @@ class AutoScaler(object):
self.logger.info('initial state: %s', value) self.logger.info('initial state: %s', value)
self._state = value self._state = value
def monitor_startup(self, instance_id):
thr = Thread(target=self.instance_log_thread, args=(instance_id,))
thr.daemon = True
thr.start()
def instance_log_thread(self, instance_id):
start = time()
# The driver will return the log content from the start on every call,
# we keep record to avoid logging the same line twice
# TODO: Find a cross cloud way to get incremental logs
last_lnum = 0
while time() - start <= self.max_spin_up_time_min * MINUTE:
self.logger.info('getting startup logs for %r', instance_id)
data = self.driver.console_log(instance_id)
lines = data.splitlines()
if not lines:
self.logger.info('not startup logs for %r', instance_id)
else:
last_lnum, lines = latest_lines(lines, last_lnum)
for line in lines:
self.logger.info('%r STARTUP LOG: %s', instance_id, line)
sleep(MINUTE)
def latest_lines(lines, last):
"""Return lines after last and not empty
>>> latest_lines(['a', 'b', '', 'c', '', 'd'], 1)
6, ['c', 'd']
"""
last_lnum = len(lines)
latest = [l for n, l in enumerate(lines, 1) if n > last and l.strip()]
return last_lnum, latest
def get_task_logger(): def get_task_logger():
task = Task.current_task() task = Task.current_task()

View File

@ -11,6 +11,7 @@ from .cloud_driver import parse_tags
try: try:
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
import boto3 import boto3
from botocore.exceptions import ClientError
Task.add_requirements("boto3") Task.add_requirements("boto3")
except ImportError as err: except ImportError as err:
@ -160,3 +161,11 @@ class AWSDriver(CloudDriver):
def kind(self): def kind(self):
return 'AWS' return 'AWS'
def console_log(self, instance_id):
ec2 = boto3.client("ec2", **self.creds())
try:
out = ec2.get_console_output(InstanceId=instance_id)
return out.get('Output', '')
except ClientError as err:
return 'error: cannot get logs for {}:\n{}'.format(instance_id, err)

View File

@ -118,6 +118,10 @@ class CloudDriver(ABC):
def instance_type_key(self): def instance_type_key(self):
"""Return key in configuration for instance type""" """Return key in configuration for instance type"""
@abstractmethod
def console_log(self, instance_id):
"""Return log for instance"""
def gen_user_data(self, worker_prefix, queue_name, task_id, cpu_only=False): def gen_user_data(self, worker_prefix, queue_name, task_id, cpu_only=False):
return bash_script_template.format( return bash_script_template.format(
queue=queue_name, queue=queue_name,