diff --git a/clearml/automation/auto_scaler.py b/clearml/automation/auto_scaler.py index 550f3222..8e9baccf 100644 --- a/clearml/automation/auto_scaler.py +++ b/clearml/automation/auto_scaler.py @@ -2,7 +2,7 @@ import re from collections import defaultdict, deque from enum import Enum from itertools import chain -from threading import Event +from threading import Event, Thread from time import sleep, time import attr @@ -29,6 +29,8 @@ _workers_pattern = re.compile( """, re.VERBOSE ) +MINUTE = 60.0 + class WorkerId: def __init__(self, worker_id): @@ -181,7 +183,7 @@ class AutoScaler(object): def stale_workers(self, spun_workers): now = time() for worker_id, (resource, spin_time) in list(spun_workers.items()): - if now - spin_time > self.max_spin_up_time_min*60: + if now - spin_time > self.max_spin_up_time_min * MINUTE: self.logger.info('Stuck spun instance %s of type %s', worker_id, resource) yield worker_id @@ -305,6 +307,7 @@ class AutoScaler(object): resource_conf = self.resource_configurations[resource] worker_prefix = self.gen_worker_prefix(resource, resource_conf) instance_id = self.driver.spin_up_worker(resource_conf, worker_prefix, queue, task_id=task_id) + self.monitor_startup(instance_id) worker_id = '{}:{}'.format(worker_prefix, instance_id) self.logger.info('New instance ID: %s', instance_id) spun_workers[worker_id] = (resource, time()) @@ -319,7 +322,7 @@ class AutoScaler(object): if resources in required_idle_resources: continue # Remove from both cloud and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN - if time() - timestamp > self.max_idle_time_min * 60.0: + if time() - timestamp > self.max_idle_time_min * MINUTE: wid = WorkerId(worker_id) cloud_id = wid.cloud_id self.driver.spin_down_worker(cloud_id) @@ -331,8 +334,8 @@ class AutoScaler(object): self.report_app_stats(task_logger, queue_id_to_name, up_machines, idle_workers) # Nothing else to do - self.logger.info("Idle for %.2f seconds", self.polling_interval_time_min * 60.0) - sleep(self.polling_interval_time_min * 60.0) + self.logger.info("Idle for %.2f seconds", self.polling_interval_time_min * MINUTE) + sleep(self.polling_interval_time_min * MINUTE) def update_idle_workers(self, all_workers, idle_workers): if not all_workers: @@ -371,6 +374,40 @@ class AutoScaler(object): self.logger.info('initial state: %s', value) self._state = value + def monitor_startup(self, instance_id): + thr = Thread(target=self.instance_log_thread, args=(instance_id,)) + thr.daemon = True + thr.start() + + def instance_log_thread(self, instance_id): + start = time() + # The driver will return the log content from the start on every call, + # we keep record to avoid logging the same line twice + # TODO: Find a cross cloud way to get incremental logs + last_lnum = 0 + while time() - start <= self.max_spin_up_time_min * MINUTE: + self.logger.info('getting startup logs for %r', instance_id) + data = self.driver.console_log(instance_id) + lines = data.splitlines() + if not lines: + self.logger.info('not startup logs for %r', instance_id) + else: + last_lnum, lines = latest_lines(lines, last_lnum) + for line in lines: + self.logger.info('%r STARTUP LOG: %s', instance_id, line) + sleep(MINUTE) + + +def latest_lines(lines, last): + """Return lines after last and not empty + + >>> latest_lines(['a', 'b', '', 'c', '', 'd'], 1) + 6, ['c', 'd'] + """ + last_lnum = len(lines) + latest = [l for n, l in enumerate(lines, 1) if n > last and l.strip()] + return last_lnum, latest + def get_task_logger(): task = Task.current_task() diff --git a/clearml/automation/aws_driver.py b/clearml/automation/aws_driver.py index 8dc25083..603c0e03 100644 --- a/clearml/automation/aws_driver.py +++ b/clearml/automation/aws_driver.py @@ -11,6 +11,7 @@ from .cloud_driver import parse_tags try: # noinspection PyPackageRequirements import boto3 + from botocore.exceptions import ClientError Task.add_requirements("boto3") except ImportError as err: @@ -160,3 +161,11 @@ class AWSDriver(CloudDriver): def kind(self): return 'AWS' + + def console_log(self, instance_id): + ec2 = boto3.client("ec2", **self.creds()) + try: + out = ec2.get_console_output(InstanceId=instance_id) + return out.get('Output', '') + except ClientError as err: + return 'error: cannot get logs for {}:\n{}'.format(instance_id, err) diff --git a/clearml/automation/cloud_driver.py b/clearml/automation/cloud_driver.py index 0d8d7b0e..dca8e75f 100644 --- a/clearml/automation/cloud_driver.py +++ b/clearml/automation/cloud_driver.py @@ -118,6 +118,10 @@ class CloudDriver(ABC): def instance_type_key(self): """Return key in configuration for instance type""" + @abstractmethod + def console_log(self, instance_id): + """Return log for instance""" + def gen_user_data(self, worker_prefix, queue_name, task_id, cpu_only=False): return bash_script_template.format( queue=queue_name,