Support all reporting using subprocesses instead of threads. Configure using sdk.development.report_use_subprocess

This commit is contained in:
allegroai 2021-01-24 09:17:02 +02:00
parent 4b4fa3eff0
commit 828af76ffe
12 changed files with 810 additions and 402 deletions

View File

@ -16,6 +16,7 @@ from ...backend_api.services import events
from ...config import config
from ...storage.util import quote_url
from ...utilities.attrs import attrs
from ...utilities.process.mp import SingletonLock
@six.add_metaclass(abc.ABCMeta)
@ -202,7 +203,7 @@ class UploadEvent(MetricsEventAdapter):
_upload_retries = 3
_metric_counters = {}
_metric_counters_lock = Lock()
_metric_counters_lock = SingletonLock()
_file_history_size = int(config.get('metrics.file_history_size', 5))
@staticmethod

View File

@ -16,6 +16,7 @@ from ...debugging import get_logger
from ...storage.helper import StorageHelper
from .events import MetricsEventAdapter
from ...utilities.process.mp import SingletonLock
log = get_logger('metrics')
@ -23,7 +24,7 @@ log = get_logger('metrics')
class Metrics(InterfaceBase):
""" Metrics manager and batch writer """
_storage_lock = Lock()
_storage_lock = SingletonLock()
_file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None)
_file_upload_retries = 3
_upload_pool = None

View File

@ -2,27 +2,120 @@ import datetime
import json
import logging
import math
from time import time
from multiprocessing import Semaphore
from threading import Event as TrEvent
import numpy as np
import six
from six.moves.queue import Queue as TrQueue, Empty
from .events import (
ScalarEvent, VectorEvent, ImageEvent, PlotEvent, ImageEventNoUpload,
UploadEvent, MediaEvent, ConsoleEvent, )
from ..base import InterfaceBase
from ..setupuploadmixin import SetupUploadMixin
from ...config import config
from ...utilities.async_manager import AsyncManagerMixin
from ...utilities.plotly_reporter import (
create_2d_histogram_plot, create_value_matrix, create_3d_surface,
create_2d_scatter_series, create_3d_scatter_series, create_line_plot, plotly_scatter3d_layout_dict,
create_image_plot, create_plotly_table, )
from ...utilities.process.mp import BackgroundMonitor
from ...utilities.py3_interop import AbstractContextManager
from ...utilities.process.mp import SafeQueue as PrQueue, SafeEvent
try:
from collections.abc import Iterable # noqa
except ImportError:
from collections import Iterable
import six
import numpy as np
from threading import Thread, Event
from ..base import InterfaceBase
from ..setupuploadmixin import SetupUploadMixin
from ...utilities.async_manager import AsyncManagerMixin
from ...utilities.plotly_reporter import create_2d_histogram_plot, create_value_matrix, create_3d_surface, \
create_2d_scatter_series, create_3d_scatter_series, create_line_plot, plotly_scatter3d_layout_dict, \
create_image_plot, create_plotly_table
from ...utilities.py3_interop import AbstractContextManager
from .events import ScalarEvent, VectorEvent, ImageEvent, PlotEvent, ImageEventNoUpload, \
UploadEvent, MediaEvent, ConsoleEvent
from ...config import config
class BackgroundReportService(BackgroundMonitor, AsyncManagerMixin):
def __init__(self, use_subprocess, async_enable, metrics, flush_frequency, flush_threshold):
super(BackgroundReportService, self).__init__(wait_period=flush_frequency)
self._subprocess = use_subprocess
self._flush_threshold = flush_threshold
self._exit_event = SafeEvent() if self._subprocess else TrEvent()
self._queue = PrQueue() if self._subprocess else TrQueue()
self._queue_size = 0
self._res_waiting = Semaphore()
self._metrics = metrics
self._storage_uri = None
self._async_enable = async_enable
def set_storage_uri(self, uri):
self._storage_uri = uri
def set_subprocess_mode(self):
if isinstance(self._queue, TrQueue):
self._write()
self._queue = PrQueue()
super(BackgroundReportService, self).set_subprocess_mode()
def stop(self):
if not self.is_subprocess() or self.is_subprocess_alive():
self._exit_event.set()
super(BackgroundReportService, self).stop()
def flush(self):
self._queue_size = 0
if not self.is_subprocess() or self.is_subprocess_alive():
self._event.set()
def add_event(self, ev):
self._queue.put(ev)
self._queue_size += 1
if self._queue_size >= self._flush_threshold:
self.flush()
def daemon(self):
while not self._exit_event.wait(0):
self._event.wait(self._wait_timeout)
self._event.clear()
self._res_waiting.acquire()
self._write()
# wait for all reports
if self.get_num_results() > 0:
self.wait_for_results()
self._res_waiting.release()
# make sure we flushed everything
self._async_enable = False
self._res_waiting.acquire()
self._write()
if self.get_num_results() > 0:
self.wait_for_results()
self._res_waiting.release()
def _write(self):
if self._queue.empty():
return
# print('reporting %d events' % len(self._events))
events = []
while not self._queue.empty():
try:
events.append(self._queue.get())
except Empty:
break
res = self._metrics.write_events(
events, async_enable=self._async_enable, storage_uri=self._storage_uri)
if self._async_enable:
self._add_async_result(res)
def send_all_events(self, wait=True):
self._write()
if wait and self.get_num_results() > 0:
self.wait_for_results()
def events_waiting(self):
if not self._queue.empty():
return True
if not self.is_alive():
return False
return not self._res_waiting.get_value()
def post_execution(self):
super(BackgroundReportService, self).post_execution()
class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncManagerMixin):
@ -40,7 +133,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
reporter.flush()
"""
def __init__(self, metrics, flush_threshold=10, async_enable=False):
def __init__(self, metrics, flush_threshold=10, async_enable=False, use_subprocess=False):
"""
Create a reporter
:param metrics: A Metrics manager instance that handles actual reporting, uploads etc.
@ -53,35 +146,24 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
log.setLevel(log.level)
super(Reporter, self).__init__(session=metrics.session, log=log)
self._metrics = metrics
self._flush_threshold = flush_threshold
self._events = []
self._bucket_config = None
self._storage_uri = None
self._async_enable = async_enable
self._flush_frequency = 30.0
self._exit_flag = False
self._flush_event = Event()
self._flush_event.clear()
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
self._max_iteration = 0
self._report_service = BackgroundReportService(
use_subprocess=use_subprocess, async_enable=async_enable, metrics=metrics,
flush_frequency=self._flush_frequency, flush_threshold=flush_threshold)
self._report_service.start()
def _set_storage_uri(self, value):
value = '/'.join(x for x in (value.rstrip('/'), self._metrics.storage_key_prefix) if x)
self._storage_uri = value
self._report_service.set_storage_uri(self._storage_uri)
storage_uri = property(None, _set_storage_uri)
max_float_num_digits = config.get('metrics.plot_max_num_digits', None)
@property
def flush_threshold(self):
return self._flush_threshold
@flush_threshold.setter
def flush_threshold(self, value):
self._flush_threshold = max(0, value)
@property
def async_enable(self):
return self._async_enable
@ -94,55 +176,38 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
def max_iteration(self):
return self._max_iteration
def _daemon(self):
while not self._exit_flag:
self._flush_event.wait(self._flush_frequency)
self._flush_event.clear()
self._write()
# wait for all reports
if self.get_num_results() > 0:
self.wait_for_results()
# make sure we flushed everything
self._async_enable = False
self._write()
if self.get_num_results() > 0:
self.wait_for_results()
def _report(self, ev):
ev_iteration = ev.get_iteration()
if ev_iteration is not None:
# we have to manually add get_iteration_offset() because event hasn't reached the Metric manager
self._max_iteration = max(self._max_iteration, ev_iteration + self._metrics.get_iteration_offset())
self._events.append(ev)
if len(self._events) >= self._flush_threshold:
self.flush()
def _write(self):
if not self._events:
return
# print('reporting %d events' % len(self._events))
res = self._metrics.write_events(self._events, async_enable=self._async_enable, storage_uri=self._storage_uri)
if self._async_enable:
self._add_async_result(res)
self._events = []
self._report_service.add_event(ev)
def flush(self):
"""
Flush cached reports to backend.
"""
self._flush_event.set()
self._report_service.flush()
def stop(self):
self._exit_flag = True
self._flush_event.set()
self._thread.join()
if not self._report_service:
return
report_service = self._report_service
self._report_service = None
if not report_service.is_subprocess() or report_service.is_alive():
report_service.stop()
report_service.wait()
else:
report_service.send_all_events()
def wait_for_events(self, timeout=None, step=2.0):
tic = time()
while self._events or self.get_num_results():
self.wait_for_results(timeout=step)
if timeout and time() - tic >= timeout:
break
def get_num_results(self):
return self._report_service.get_num_results()
def events_waiting(self):
return self._report_service.events_waiting()
def wait_for_results(self, *args, **kwargs):
return self._report_service.wait_for_results(*args, **kwargs)
def report_scalar(self, title, series, value, iter):
"""
@ -254,7 +319,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
plot = json.dumps(plot, default=default)
elif not isinstance(plot, six.string_types):
raise ValueError('Plot should be a string or a dict')
ev = PlotEvent(metric=self._normalize_name(title), variant=self._normalize_name(series),
plot_str=plot, iter=iter)
self._report(ev)
@ -734,7 +799,7 @@ class Reporter(InterfaceBase, AbstractContextManager, SetupUploadMixin, AsyncMan
image_src=url,
title=title + '/' + series,
width=640,
height=int(640*float(height or 480)/float(width or 640)),
height=int(640 * float(height or 480) / float(width or 640)),
)
return self.report_plot(

View File

@ -1,99 +1,112 @@
import json
import sys
import time
from pathlib2 import Path
from logging import LogRecord, getLogger, basicConfig, getLevelName, INFO, WARNING, Formatter, makeLogRecord, warning
from logging.handlers import BufferingHandler
from threading import Thread, Event
from six.moves.queue import Queue
from six.moves.queue import Queue as TrQueue
from threading import Event as TrEvent
from .development.worker import DevWorker
from ...backend_api.services import events
from ...backend_api.session.session import MaxRequestSizeError
from ...config import config
buffer_capacity = config.get('log.task_log_buffer_capacity', 100)
from ...utilities.process.mp import BackgroundMonitor
from ...utilities.process.mp import SafeQueue as PrQueue, SafeEvent
class TaskHandler(BufferingHandler):
__flush_max_history_seconds = 30.
__wait_for_flush_timeout = 10.
class BackgroundLogService(BackgroundMonitor):
__max_event_size = 1024 * 1024
__once = False
__offline_filename = 'log.jsonl'
@property
def task_id(self):
return self._task_id
@task_id.setter
def task_id(self, value):
self._task_id = value
def __init__(self, task, capacity=buffer_capacity, connect_logger=True):
super(TaskHandler, self).__init__(capacity)
self.task_id = task.id
self.session = task.session
self.last_timestamp = 0
self.counter = 1
def __init__(self, session, wait_period, worker=None, task_id=None, offline_log_filename=None):
super(BackgroundLogService, self).__init__(wait_period=wait_period)
self._worker = worker
self._task_id = task_id
self._queue = TrQueue()
self._flush = TrEvent()
self._last_event = None
self._exit_event = None
self._queue = None
self._thread = None
self._pending = 0
self._offline_log_filename = None
self._connect_logger = connect_logger
if task.is_offline():
offline_folder = Path(task.get_offline_mode_folder())
offline_folder.mkdir(parents=True, exist_ok=True)
self._offline_log_filename = offline_folder / self.__offline_filename
self._offline_log_filename = offline_log_filename
self.session = session
self.counter = 1
self._last_timestamp = 0
def shouldFlush(self, record):
"""
Should the handler flush its buffer
def stop(self):
super(BackgroundLogService, self).stop()
self.flush()
Returns true if the buffer is up to capacity. This method can be
overridden to implement custom flushing strategies.
"""
if self._task_id is None:
return False
def daemon(self):
# multiple daemons are supported
while not self._event.wait(0):
self._flush.wait(self._wait_timeout)
self._flush.clear()
self.send_all_records()
# if we need to add handlers to the base_logger,
# it will not automatically create stream one when first used, so we must manually configure it.
if self._connect_logger and not TaskHandler.__once:
base_logger = getLogger()
if len(base_logger.handlers) == 1 and isinstance(base_logger.handlers[0], TaskHandler):
if record.name != 'console' and not record.name.startswith('clearml.'):
base_logger.removeHandler(self)
basicConfig()
base_logger.addHandler(self)
TaskHandler.__once = True
else:
TaskHandler.__once = True
# flush all leftover events
self.send_all_records()
# if we passed the max buffer
if len(self.buffer) >= self.capacity:
return True
def _send_events(self, a_request):
if not a_request or not a_request.requests:
return
# if the first entry in the log was too long ago.
# noinspection PyBroadException
try:
if len(self.buffer) and (time.time() - self.buffer[0].created) > self.__flush_max_history_seconds:
return True
except Exception:
pass
if self._offline_log_filename:
with open(self._offline_log_filename.as_posix(), 'at') as f:
f.write(json.dumps([b.to_dict() for b in a_request.requests]) + '\n')
return
return False
# if self._thread is None:
# self._log_stderr('Task.close() flushing remaining logs ({})'.format(self.pending))
res = self.session.send(a_request)
if res and not res.ok():
# noinspection PyProtectedMember
TaskHandler._log_stderr("failed logging task to backend ({:d} lines, {})".format(
len(a_request.requests), str(res.meta)), level=WARNING)
except MaxRequestSizeError:
# noinspection PyProtectedMember
TaskHandler._log_stderr("failed logging task to backend ({:d} lines) log size exceeded limit".format(
len(a_request.requests)), level=WARNING)
except Exception as ex:
# noinspection PyProtectedMember
TaskHandler._log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format(
len(a_request.requests), ex))
# we should push ourselves back into the thread pool
if self._queue:
self._queue.put(a_request)
def set_subprocess_mode(self):
if isinstance(self._queue, TrQueue):
self.send_all_records()
self._queue = PrQueue()
super(BackgroundLogService, self).set_subprocess_mode()
self._flush = SafeEvent()
def add_to_queue(self, record):
self._queue.put(record)
def empty(self):
return self._queue.empty() if self._queue else True
def send_all_records(self):
buffer = []
while self._queue and not self._queue.empty():
# noinspection PyBroadException
try:
# request = self._queue.get(block=False)
request = self._queue.get()
if request:
buffer.append(request)
except Exception:
break
if buffer:
self._send_records(buffer)
def _record_to_event(self, record):
# type: (LogRecord) -> events.TaskLogEvent
if self._task_id is None:
return None
timestamp = int(record.created * 1000)
if timestamp == self.last_timestamp:
if timestamp == self._last_timestamp:
timestamp += self.counter
self.counter += 1
else:
self.last_timestamp = timestamp
self._last_timestamp = timestamp
self.counter = 1
# ignore backspaces (they are often used)
@ -113,10 +126,10 @@ class TaskHandler(BufferingHandler):
# if we have a previous event and it timed out, return it.
new_event = events.TaskLogEvent(
task=self.task_id,
task=self._task_id,
timestamp=timestamp,
level=record.levelname.lower(),
worker=self.session.worker,
worker=self._worker,
msg=msg
)
if self._last_event:
@ -126,135 +139,141 @@ class TaskHandler(BufferingHandler):
return return_events
def flush(self):
if self._task_id is None:
return
if not self.buffer:
return
buffer = None
self.acquire()
if self.buffer:
buffer = self.buffer
self.buffer = []
self.release()
if not buffer:
return
def _send_records(self, records):
# if we have previous batch requests first send them
buffer = []
for r in records:
if isinstance(r, events.AddBatchRequest):
self._send_events(r)
else:
buffer.append(r)
# noinspection PyBroadException
try:
record_events = [r for record in buffer for r in self._record_to_event(record)] + [self._last_event]
self._last_event = None
batch_requests = events.AddBatchRequest(requests=[events.AddRequest(e) for e in record_events if e])
except Exception:
self.__log_stderr("WARNING: clearml.log - Failed logging task to backend ({:d} lines)".format(len(buffer)))
batch_requests = None
self._send_events(batch_requests)
except Exception as ex:
# noinspection PyProtectedMember
TaskHandler._log_stderr(
"{}\nWARNING: trains.log - Failed logging task to backend ({:d} lines)".format(ex, len(buffer)))
import traceback
traceback.print_stack()
if batch_requests and batch_requests.requests:
self._pending += 1
self._add_to_queue(batch_requests)
def flush(self):
if self.is_alive():
self._flush.set()
def _create_thread_queue(self):
if self._queue:
class TaskHandler(BufferingHandler):
__flush_max_history_seconds = 30.
__wait_for_flush_timeout = 10.
__once = False
__offline_filename = 'log.jsonl'
@property
def task_id(self):
return self._task_id
@task_id.setter
def task_id(self, value):
self._task_id = value
def __init__(self, task, capacity=None, use_subprocess=False):
capacity = capacity or config.get('log.task_log_buffer_capacity', 100)
super(TaskHandler, self).__init__(capacity)
self.task_id = task.id
self.worker = task.session.worker
self.counter = 0
self._offline_log_filename = None
if task.is_offline():
offline_folder = Path(task.get_offline_mode_folder())
offline_folder.mkdir(parents=True, exist_ok=True)
self._offline_log_filename = offline_folder / self.__offline_filename
self._background_log = BackgroundLogService(
worker=task.session.worker, task_id=task.id,
session=task.session, wait_period=DevWorker.report_period,
offline_log_filename=self._offline_log_filename)
self._background_log_size = 0
if use_subprocess:
self._background_log.set_subprocess_mode()
self._background_log.start()
def emit(self, record):
self.counter += 1
if self._background_log:
self._background_log.add_to_queue(record)
self._background_log_size += 1
def shouldFlush(self, record):
"""
Should the handler flush its buffer
Returns true if the buffer is up to capacity. This method can be
overridden to implement custom flushing strategies.
"""
if self._task_id is None:
return False
# if we need to add handlers to the base_logger,
# it will not automatically create stream one when first used, so we must manually configure it.
if not TaskHandler.__once:
base_logger = getLogger()
if len(base_logger.handlers) == 1 and isinstance(base_logger.handlers[0], TaskHandler):
if record.name != 'console' and not record.name.startswith('trains.'):
base_logger.removeHandler(self)
basicConfig()
base_logger.addHandler(self)
TaskHandler.__once = True
else:
TaskHandler.__once = True
# if we passed the max buffer
return self.counter >= self.capacity and self._background_log and \
self._background_log_size >= self.capacity
def flush(self):
import traceback
if self._task_id is None:
return
self._queue = Queue()
self._exit_event = Event()
self._exit_event.clear()
# multiple workers could be supported as well
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
def _add_to_queue(self, request):
self._create_thread_queue()
self._queue.put(request)
self.counter = 0
if self._background_log:
self._background_log.flush()
self._background_log_size = 0
def close(self, wait=False):
# self.__log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
# self._log_stderr('Closing {} wait={}'.format(os.getpid(), wait))
# flush pending logs
if not self._task_id:
return
# avoid deadlocks just skip the lock, we are shutting down anyway
self.lock = None
self.flush()
# shut down the TaskHandler, from this point onwards. No events will be logged
_thread = self._thread
self._thread = None
if self._queue:
self._exit_event.set()
self._queue.put(None)
self._task_id = None
if wait and _thread:
# noinspection PyBroadException
try:
timeout = 1. if self._queue.empty() else self.__wait_for_flush_timeout
_thread.join(timeout=timeout)
if not self._queue.empty():
self.__log_stderr('Flush timeout {}s exceeded, dropping last {} lines'.format(
timeout, self._queue.qsize()))
# self.__log_stderr('Closing {} wait done'.format(os.getpid()))
except Exception:
pass
# shut down the TaskHandler, from this point onwards. No events will be logged
_background_log = self._background_log
self._background_log = None
if _background_log:
if not _background_log.is_subprocess() or _background_log.is_alive():
_background_log.stop()
if wait:
# noinspection PyBroadException
try:
timeout = 1. if _background_log.empty() else self.__wait_for_flush_timeout
_background_log.wait(timeout=timeout)
if not _background_log.empty():
self._log_stderr('Flush timeout {}s exceeded, dropping last {} lines'.format(
timeout, self._background_log_size))
# self._log_stderr('Closing {} wait done'.format(os.getpid()))
except Exception:
pass
else:
_background_log.send_all_records()
# call super and remove the handler
super(TaskHandler, self).close()
def _send_events(self, a_request):
try:
self._pending -= 1
if self._offline_log_filename:
with open(self._offline_log_filename.as_posix(), 'at') as f:
f.write(json.dumps([b.to_dict() for b in a_request.requests]) + '\n')
return
# if self._thread is None:
# self.__log_stderr('Task.close() flushing remaining logs ({})'.format(self._pending))
res = self.session.send(a_request)
if res and not res.ok():
self.__log_stderr("failed logging task to backend ({:d} lines, {})".format(
len(a_request.requests), str(res.meta)), level=WARNING)
except MaxRequestSizeError:
self.__log_stderr("failed logging task to backend ({:d} lines) log size exceeded limit".format(
len(a_request.requests)), level=WARNING)
except Exception as ex:
self.__log_stderr("Retrying, failed logging task to backend ({:d} lines): {}".format(
len(a_request.requests), ex))
# we should push ourselves back into the thread pool
if self._queue:
self._pending += 1
self._queue.put(a_request)
def _daemon(self):
# multiple daemons are supported
leave = self._exit_event.wait(0)
request = True
while not leave or request:
# pull from queue
request = None
if self._queue:
# noinspection PyBroadException
try:
request = self._queue.get(block=not leave)
except Exception:
pass
if request:
self._send_events(request)
leave = self._exit_event.wait(0)
# self.__log_stderr('leaving {}'.format(os.getpid()))
@staticmethod
def __log_stderr(msg, level=INFO):
# output directly to stderr, make sure we do not catch it.
write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
write('{asctime} - {name} - {levelname} - {message}\n'.format(
asctime=Formatter().formatTime(makeLogRecord({})),
name='clearml.log', levelname=getLevelName(level), message=msg))
@classmethod
def report_offline_session(cls, task, folder):
filename = Path(folder) / cls.__offline_filename
@ -283,3 +302,12 @@ class TaskHandler(BufferingHandler):
warning("failed logging task to backend ({:d} lines, {})".format(
len(batch_requests.requests), str(res.meta)))
return True
@staticmethod
def _log_stderr(msg, level=INFO):
# output directly to stderr, make sure we do not catch it.
# noinspection PyProtectedMember
write = sys.stderr._original_write if hasattr(sys.stderr, '_original_write') else sys.stderr.write
write('{asctime} - {name} - {levelname} - {message}\n'.format(
asctime=Formatter().formatTime(makeLogRecord({})),
name='trains.log', levelname=getLevelName(level), message=msg))

View File

@ -53,6 +53,7 @@ from .access import AccessMixin
from .repo import ScriptInfo, pip_freeze
from .hyperparams import HyperParams
from ...config import config, PROC_MASTER_ID_ENV_VAR, SUPPRESS_UPDATE_MESSAGE_ENV_VAR
from ...utilities.process.mp import SingletonLock
class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@ -69,6 +70,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
_store_diff = config.get('development.store_uncommitted_code_diff', False)
_store_remote_diff = config.get('development.store_code_diff_from_remote', False)
_report_use_subprocess = bool(config.get('development.report_use_subprocess', True))
_offline_filename = 'task.json'
class TaskTypes(Enum):
@ -138,6 +140,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
:param force_create: If True a new task will always be created (task_id, if provided, will be ignored)
:type force_create: bool
"""
SingletonLock.instantiate()
task_id = self._resolve_task_id(task_id, log=log) if not force_create else None
self.__edit_lock = None
super(Task, self).__init__(id=task_id, session=session, log=log)
@ -501,7 +504,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
storage_uri = self.get_output_destination(log_on_error=False)
except ValueError:
storage_uri = None
self.__reporter = Reporter(self._get_metrics_manager(storage_uri=storage_uri))
self.__reporter = Reporter(
self._get_metrics_manager(storage_uri=storage_uri), use_subprocess=self._report_use_subprocess)
return self.__reporter
def _get_output_destination_suffix(self, extra_path=None):

View File

@ -342,6 +342,19 @@ def end_of_program():
pass
def stdout_print(*args, **kwargs):
if len(args) == 1 and not kwargs:
line = str(args[0])
if not line.endswith('\n'):
line += '\n'
else:
line = '{} {}\n'.format(args or '', kwargs or '')
if hasattr(sys.stdout, '_original_write'):
sys.stdout._original_write(line)
else:
sys.stdout.write(line)
if __name__ == '__main__':
# from clearml import Task
# task = Task.init(project_name="examples", task_name="trace test")

View File

@ -1081,22 +1081,25 @@ class Logger(object):
"""
Set the logger flush period.
Deprecated - Use ``sdk.development.worker.report_period_sec`` to externally control the flush period.
:param float period: The period to flush the logger in seconds. To set no periodic flush,
specify ``None`` or ``0``.
"""
if self._task.is_main_task() and self._task_handler and DevWorker.report_period and \
not self._skip_console_log() and period is not None:
period = min(period or DevWorker.report_period, DevWorker.report_period)
if not period:
if self._flusher:
self._flusher.exit()
self._flusher = None
elif self._flusher:
self._flusher.set_period(period)
else:
self._flusher = LogFlusher(self, period)
self._flusher.start()
# if self._task.is_main_task() and self._task_handler and DevWorker.report_period and \
# not self._skip_console_log() and period is not None:
# period = min(period or DevWorker.report_period, DevWorker.report_period)
#
# if not period:
# if self._flusher:
# self._flusher.exit()
# self._flusher = None
# elif self._flusher:
# self._flusher.set_period(period)
# else:
# self._flusher = LogFlusher(self, period)
# self._flusher.start()
pass
def report_image_and_upload(
self,

View File

@ -62,6 +62,7 @@ from .utilities.proxy_object import ProxyDictPreWrite, ProxyDictPostWrite, flatt
from .utilities.resource_monitor import ResourceMonitor
from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor
# noinspection PyProtectedMember
from .backend_interface.task.args import _Arguments
@ -519,8 +520,9 @@ class Task(_Task):
Task.__main_task = task
# register the main task for at exit hooks (there should only be one)
task.__register_at_exit(task._at_exit)
# patch OS forking
PatchOsFork.patch_fork()
# patch OS forking if we are not logging with a subprocess
if not cls._report_use_subprocess:
PatchOsFork.patch_fork()
if auto_connect_frameworks:
is_auto_connect_frameworks_bool = not isinstance(auto_connect_frameworks, dict)
if is_auto_connect_frameworks_bool or auto_connect_frameworks.get('hydra', True):
@ -586,6 +588,10 @@ class Task(_Task):
# something to the log.
task._dev_mode_task_start()
# start monitoring in background process or background threads
# monitoring are: Resource monitoring and Dev Worker monitoring classes
BackgroundMonitor.start_all(execute_in_subprocess=task._report_use_subprocess)
return task
@classmethod
@ -1282,8 +1288,8 @@ class Task(_Task):
self._logger._flush_stdout_handler()
if self.__reporter:
self.__reporter.flush()
if wait_for_uploads:
self.__reporter.wait_for_events()
# if wait_for_uploads:
# self.__reporter.wait_for_events()
LoggerRoot.flush()
@ -2707,7 +2713,7 @@ class Task(_Task):
""" Called when we suspect the task has started running """
self._dev_mode_setup_worker(model_updated=model_updated)
def _dev_mode_stop_task(self, stop_reason):
def _dev_mode_stop_task(self, stop_reason, pid=None):
# make sure we do not get called (by a daemon thread) after at_exit
if self._at_exit_called:
return
@ -2726,30 +2732,45 @@ class Task(_Task):
# NOTICE! This will end the entire execution tree!
if self.__exit_hook:
self.__exit_hook.remote_user_aborted = True
self._kill_all_child_processes(send_kill=False)
self._kill_all_child_processes(send_kill=False, pid=pid, allow_kill_calling_pid=False)
time.sleep(2.0)
self._kill_all_child_processes(send_kill=True)
self._kill_all_child_processes(send_kill=True, pid=pid, allow_kill_calling_pid=True)
os._exit(1) # noqa
@staticmethod
def _kill_all_child_processes(send_kill=False):
def _kill_all_child_processes(send_kill=False, pid=None, allow_kill_calling_pid=True):
# get current process if pid not provided
pid = os.getpid()
current_pid = os.getpid()
kill_ourselves = None
pid = pid or current_pid
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
return
for child in parent.children(recursive=True):
# kill ourselves last (if we need to)
if child.pid == current_pid:
kill_ourselves = child
continue
if send_kill:
child.kill()
else:
child.terminate()
# kill ourselves
if send_kill:
parent.kill()
else:
parent.terminate()
# parent ourselves
if allow_kill_calling_pid or parent.pid != current_pid:
if send_kill:
parent.kill()
else:
parent.terminate()
# kill ourselves if we need to:
if allow_kill_calling_pid and kill_ourselves:
if send_kill:
kill_ourselves.kill()
else:
kill_ourselves.terminate()
def _dev_mode_setup_worker(self, model_updated=False):
if running_remotely() or not self.is_main_task() or self._at_exit_called or self._offline_mode:
@ -2840,115 +2861,122 @@ class Task(_Task):
is_sub_process = self.__is_subprocess()
# noinspection PyBroadException
try:
wait_for_uploads = True
# first thing mark task as stopped, so we will not end up with "running" on lost tasks
# if we are running remotely, the daemon will take care of it
task_status = None
wait_for_std_log = True
if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \
and self.is_main_task() and not is_sub_process:
# check if we crashed, ot the signal is not interrupt (manual break)
task_status = ('stopped', )
if self.__exit_hook:
is_exception = self.__exit_hook.exception
# check if we are running inside a debugger
if not is_exception and sys.modules.get('pydevd'):
if not is_sub_process:
# noinspection PyBroadException
try:
wait_for_uploads = True
# first thing mark task as stopped, so we will not end up with "running" on lost tasks
# if we are running remotely, the daemon will take care of it
task_status = None
wait_for_std_log = True
if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \
and self.is_main_task() and not is_sub_process:
# check if we crashed, ot the signal is not interrupt (manual break)
task_status = ('stopped', )
if self.__exit_hook:
is_exception = self.__exit_hook.exception
# check if we are running inside a debugger
if not is_exception and sys.modules.get('pydevd'):
# noinspection PyBroadException
try:
is_exception = sys.last_type
except Exception:
pass
# only if we have an exception (and not ctrl-break) or signal is not SIGTERM / SIGINT
if (is_exception and not isinstance(self.__exit_hook.exception, KeyboardInterrupt)) \
or (not self.__exit_hook.remote_user_aborted and
self.__exit_hook.signal not in (None, 2, 15)):
task_status = (
'failed',
'Exception {}'.format(is_exception) if is_exception else
'Signal {}'.format(self.__exit_hook.signal))
wait_for_uploads = False
else:
wait_for_uploads = (self.__exit_hook.remote_user_aborted or self.__exit_hook.signal is None)
if not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal is None and \
not is_exception:
task_status = ('completed', )
else:
task_status = ('stopped', )
# user aborted. do not bother flushing the stdout logs
wait_for_std_log = self.__exit_hook.signal is not None
# wait for repository detection (if we didn't crash)
if wait_for_uploads and self._logger:
# we should print summary here
self._summary_artifacts()
# make sure that if we crashed the thread we are not waiting forever
if not is_sub_process:
self._wait_for_repo_detection(timeout=10.)
# kill the repo thread (negative timeout, do not wait), if it hasn't finished yet.
self._wait_for_repo_detection(timeout=-1)
# wait for uploads
print_done_waiting = False
if wait_for_uploads and (BackendModel.get_num_results() > 0 or
(self.__reporter and self.__reporter.events_waiting())):
self.log.info('Waiting to finish uploads')
print_done_waiting = True
# from here, do not send log in background thread
if wait_for_uploads:
self.flush(wait_for_uploads=True)
# wait until the reporter flush everything
if self.__reporter:
self.__reporter.stop()
if self.is_main_task():
# notice: this will close the reporting for all the Tasks in the system
Metrics.close_async_threads()
# notice: this will close the jupyter monitoring
ScriptInfo.close()
if self.is_main_task():
# noinspection PyBroadException
try:
is_exception = sys.last_type
from .storage.helper import StorageHelper
StorageHelper.close_async_threads()
except Exception:
pass
if (is_exception and not isinstance(self.__exit_hook.exception, KeyboardInterrupt)) \
or (not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal not in (None, 2)):
task_status = ('failed', 'Exception')
wait_for_uploads = False
else:
wait_for_uploads = (self.__exit_hook.remote_user_aborted or self.__exit_hook.signal is None)
if not self.__exit_hook.remote_user_aborted and self.__exit_hook.signal is None and \
not is_exception:
task_status = ('completed', )
else:
task_status = ('stopped', )
# user aborted. do not bother flushing the stdout logs
wait_for_std_log = self.__exit_hook.signal is not None
if print_done_waiting:
self.log.info('Finished uploading')
# elif self._logger:
# # noinspection PyProtectedMember
# self._logger._flush_stdout_handler()
# from here, do not check worker status
if self._dev_worker:
self._dev_worker.unregister()
self._dev_worker = None
# stop resource monitoring
if self._resource_monitor:
self._resource_monitor.stop()
self._resource_monitor = None
# wait for repository detection (if we didn't crash)
if wait_for_uploads and self._logger:
# we should print summary here
self._summary_artifacts()
# make sure that if we crashed the thread we are not waiting forever
if not is_sub_process:
self._wait_for_repo_detection(timeout=10.)
# kill the repo thread (negative timeout, do not wait), if it hasn't finished yet.
self._wait_for_repo_detection(timeout=-1)
# wait for uploads
print_done_waiting = False
if wait_for_uploads and (BackendModel.get_num_results() > 0 or
(self.__reporter and self.__reporter.get_num_results() > 0)):
self.log.info('Waiting to finish uploads')
print_done_waiting = True
# from here, do not send log in background thread
if wait_for_uploads:
self.flush(wait_for_uploads=True)
# wait until the reporter flush everything
if self.__reporter:
self.__reporter.stop()
if self.is_main_task():
# notice: this will close the reporting for all the Tasks in the system
Metrics.close_async_threads()
# notice: this will close the jupyter monitoring
ScriptInfo.close()
if self.is_main_task():
# noinspection PyBroadException
try:
from .storage.helper import StorageHelper
StorageHelper.close_async_threads()
except Exception:
# change task status
if not task_status:
pass
elif task_status[0] == 'failed':
self.mark_failed(status_reason=task_status[1])
elif task_status[0] == 'completed':
self.completed()
elif task_status[0] == 'stopped':
self.stopped()
if print_done_waiting:
self.log.info('Finished uploading')
elif self._logger:
# noinspection PyProtectedMember
self._logger._flush_stdout_handler()
if self._logger:
self._logger.set_flush_period(None)
# noinspection PyProtectedMember
self._logger._close_stdout_handler(wait=wait_for_uploads or wait_for_std_log)
# from here, do not check worker status
if self._dev_worker:
self._dev_worker.unregister()
self._dev_worker = None
# stop resource monitoring
if self._resource_monitor:
self._resource_monitor.stop()
self._resource_monitor = None
if not is_sub_process:
# change task status
if not task_status:
pass
elif task_status[0] == 'failed':
self.mark_failed(status_reason=task_status[1])
elif task_status[0] == 'completed':
self.completed()
elif task_status[0] == 'stopped':
self.stopped()
if self._logger:
self._logger.set_flush_period(None)
# noinspection PyProtectedMember
self._logger._close_stdout_handler(wait=wait_for_uploads or wait_for_std_log)
# this is so in theory we can close a main task and start a new one
if self.is_main_task():
Task.__main_task = None
except Exception:
# make sure we do not interrupt the exit process
pass
# this is so in theory we can close a main task and start a new one
if self.is_main_task():
Task.__main_task = None
except Exception as ex:
import traceback
# make sure we do not interrupt the exit process
pass
# make sure we store last task state
if self._offline_mode and not is_sub_process:
@ -3095,6 +3123,7 @@ class Task(_Task):
except Exception:
pass
# noinspection PyUnresolvedReferences
os.kill(os.getpid(), sig)
self._signal_recursion_protection_flag = False
@ -3127,6 +3156,10 @@ class Task(_Task):
else:
cls.__exit_hook.update_callback(exit_callback)
@classmethod
def _remove_at_exit_callbacks(cls):
cls.__register_at_exit(None, only_remove_signal_and_exception_hooks=True)
@classmethod
def __get_task(cls, task_id=None, project_name=None, task_name=None):
if task_id:

View File

@ -1,12 +1,12 @@
import os
import time
from multiprocessing import Lock
import six
from .process.mp import SingletonLock
class AsyncManagerMixin(object):
_async_results_lock = Lock()
_async_results_lock = SingletonLock()
# per pid (process) list of async jobs (support for sub-processes forking)
_async_results = {}

View File

View File

@ -0,0 +1,273 @@
import os
import psutil
import sys
from multiprocessing import Process, Lock, Event as ProcessEvent
from multiprocessing.pool import ThreadPool
from threading import Thread, Event as TrEvent
from time import sleep
from typing import List
from ..py3_interop import AbstractContextManager
try:
from multiprocessing import SimpleQueue
except ImportError: # noqa
from multiprocessing.queues import SimpleQueue
class SingletonThreadPool(object):
__lock = None
__thread_pool = None
__thread_pool_pid = None
@classmethod
def get(cls):
if os.getpid() != cls.__thread_pool_pid:
cls.__thread_pool = ThreadPool(1)
cls.__thread_pool_pid = os.getpid()
return cls.__thread_pool
class SafeQueue(object):
__thread_pool = SingletonThreadPool()
def __init__(self, *args, **kwargs):
self._q = SimpleQueue(*args, **kwargs)
def empty(self):
return self._q.empty()
def get(self):
return self._q.get()
def put(self, obj):
# make sure the block put is done in the thread pool i.e. in the background
SafeQueue.__thread_pool.get().apply_async(self._q.put, args=(obj, ))
class SafeEvent(object):
__thread_pool = SingletonThreadPool()
def __init__(self):
self._event = ProcessEvent()
def is_set(self):
return self._event.is_set()
def set(self):
if not BackgroundMonitor.is_subprocess() or BackgroundMonitor.is_subprocess_alive():
self._event.set()
# SafeEvent.__thread_pool.get().apply_async(func=self._event.set, args=())
def clear(self):
return self._event.clear()
def wait(self, timeout=None):
return self._event.wait(timeout=timeout)
class SingletonLock(AbstractContextManager):
_instances = []
def __init__(self):
self._lock = None
SingletonLock._instances.append(self)
def acquire(self, *args, **kwargs):
self.create()
return self._lock.acquire(*args, **kwargs)
def release(self, *args, **kwargs):
if self._lock is None:
return None
return self._lock.release(*args, **kwargs)
def create(self):
if self._lock is None:
self._lock = Lock()
@classmethod
def instantiate(cls):
for i in cls._instances:
i.create()
def __enter__(self):
"""Return `self` upon entering the runtime context."""
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Raise any exception triggered within the runtime context."""
# Do whatever cleanup.
self.release()
if any((exc_type, exc_value, traceback,)):
raise (exc_type, exc_value, traceback)
class BackgroundMonitor(object):
# If we will need multiple monitoring contexts (i.e. subprocesses) this will become a dict
_main_process = None
_parent_pid = None
_sub_process_started = None
_instances = [] # type: List[BackgroundMonitor]
def __init__(self, wait_period):
self._event = TrEvent()
self._done_ev = TrEvent()
self._start_ev = TrEvent()
self._task_pid = os.getpid()
self._thread = None
self._wait_timeout = wait_period
self._subprocess = None
def start(self):
if not self._thread:
self._thread = True
self._event.clear()
self._done_ev.clear()
# append to instances
if self not in BackgroundMonitor._instances:
BackgroundMonitor._instances.append(self)
def wait(self, timeout=None):
if not self._thread:
return
self._done_ev.wait(timeout=timeout)
def _start(self):
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
def stop(self):
if not self._thread:
return
if not self.is_subprocess() or self.is_subprocess_alive():
self._event.set()
if isinstance(self._thread, Thread):
try:
self._instances.remove(self)
except ValueError:
pass
self._thread = None
def daemon(self):
while True:
if self._event.wait(self._wait_timeout):
break
self._daemon_step()
def _daemon(self):
self._start_ev.set()
self.daemon()
self.post_execution()
def post_execution(self):
self._done_ev.set()
def set_subprocess_mode(self):
# called just before launching the daemon in a subprocess
self._subprocess = True
self._done_ev = SafeEvent()
self._start_ev = SafeEvent()
self._event = SafeEvent()
def _daemon_step(self):
pass
@classmethod
def start_all(cls, execute_in_subprocess, wait_for_subprocess=False):
if not execute_in_subprocess:
for d in BackgroundMonitor._instances:
d._start()
elif not BackgroundMonitor._main_process:
cls._parent_pid = os.getpid()
cls._sub_process_started = SafeEvent()
cls._sub_process_started.clear()
# setup
for d in BackgroundMonitor._instances:
d.set_subprocess_mode()
BackgroundMonitor._main_process = Process(target=cls._background_process_start)
BackgroundMonitor._main_process.daemon = True
BackgroundMonitor._main_process.start()
# wait until subprocess is up
if wait_for_subprocess:
cls._sub_process_started.wait()
@classmethod
def _background_process_start(cls):
is_debugger_running = bool(getattr(sys, 'gettrace', None) and sys.gettrace())
# restore original signal, this will prevent any deadlocks
# Do not change the exception we need to catch base exception as well
# noinspection PyBroadException
try:
from ... import Task
# noinspection PyProtectedMember
Task.current_task()._remove_at_exit_callbacks()
except: # noqa
pass
# if a debugger is running, wait for it to attach to the subprocess
if is_debugger_running:
sleep(3)
# launch all the threads
for d in cls._instances:
d._start()
if cls._sub_process_started:
cls._sub_process_started.set()
# wait until we are signaled
for i in BackgroundMonitor._instances:
# noinspection PyBroadException
try:
if i._thread and i._thread.is_alive():
# DO Not change, we need to catch base exception, if the process gte's killed
try:
i._thread.join()
except: # noqa
break
else:
pass
except: # noqa
pass
# we are done, leave process
return
def is_alive(self):
if self.is_subprocess():
return self.is_subprocess_alive() and self._thread \
and self._start_ev.is_set() and not self._done_ev.is_set()
else:
return isinstance(self._thread, Thread) and self._thread.is_alive()
@classmethod
def is_subprocess_alive(cls):
if not cls._main_process:
return False
# noinspection PyBroadException
try:
return \
cls._main_process.is_alive() and \
psutil.Process(cls._main_process.pid).status() != psutil.STATUS_ZOMBIE
except Exception:
current_pid = cls._main_process.pid
if not current_pid:
return False
try:
parent = psutil.Process(cls._parent_pid)
except psutil.Error:
# could not find parent process id
return
for child in parent.children(recursive=True):
# kill ourselves last (if we need to)
if child.pid == current_pid:
return child.status() != psutil.STATUS_ZOMBIE
return False
@classmethod
def is_subprocess(cls):
return bool(cls._main_process)

View File

@ -2,11 +2,12 @@ import logging
import os
import warnings
from time import time
from threading import Thread, Event
import psutil
from pathlib2 import Path
from typing import Text
from .process.mp import BackgroundMonitor
from ..binding.frameworks.tensorflow_bind import IsTensorboardInit
try:
@ -15,13 +16,14 @@ except ImportError:
gpustat = None
class ResourceMonitor(object):
class ResourceMonitor(BackgroundMonitor):
_title_machine = ':monitor:machine'
_title_gpu = ':monitor:gpu'
def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0,
max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True):
super(ResourceMonitor, self).__init__(sample_frequency_per_sec)
self._task = task
self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec
@ -32,8 +34,6 @@ class ResourceMonitor(object):
self._readouts = {}
self._previous_readouts = {}
self._previous_readouts_ts = time()
self._thread = None
self._exit_event = Event()
self._gpustat_fail = 0
self._gpustat = gpustat
self._active_gpus = None
@ -43,6 +43,7 @@ class ResourceMonitor(object):
if not self._gpustat:
self._task.get_logger().report_text('ClearML Monitor: GPU monitoring is not available')
else: # if running_remotely():
# noinspection PyBroadException
try:
active_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES', '') or \
os.environ.get('CUDA_VISIBLE_DEVICES', '')
@ -51,24 +52,7 @@ class ResourceMonitor(object):
except Exception:
pass
def start(self):
self._exit_event.clear()
self._thread = Thread(target=self._run)
self._thread.daemon = True
self._thread.start()
def stop(self):
self._exit_event.set()
# self._thread.join()
def _run(self):
# noinspection PyBroadException
try:
self._daemon()
except Exception:
pass
def _daemon(self):
def daemon(self):
seconds_since_started = 0
reported = 0
last_iteration = 0
@ -76,6 +60,7 @@ class ResourceMonitor(object):
# get max GPU ID, and make sure our active list is within range
if self._active_gpus:
# noinspection PyBroadException
try:
gpu_stat = self._gpustat.new_query()
if max(self._active_gpus) > len(gpu_stat.gpus) - 1:
@ -91,7 +76,7 @@ class ResourceMonitor(object):
current_report_frequency = self._report_frequency if reported != 0 else self._first_report_sec
while (time() - last_report) < current_report_frequency:
# wait for self._sample_frequency seconds, if event set quit
if self._exit_event.wait(1.0 / self._sample_frequency):
if self._event.wait(1.0 / self._sample_frequency):
return
# noinspection PyBroadException
try:
@ -225,6 +210,7 @@ class ResourceMonitor(object):
# check if we can access the gpu statistics
if self._gpustat:
# noinspection PyBroadException
try:
stats.update(self._get_gpu_stats())
except Exception:
@ -243,6 +229,7 @@ class ResourceMonitor(object):
@classmethod
def get_logger_reported_titles(cls, task):
# noinspection PyProtectedMember
titles = list(task.get_logger()._get_used_title_series().keys())
try:
titles.remove(cls._title_machine)