Add support for truncating task log file after reporting to server

This commit is contained in:
allegroai 2021-10-21 12:02:31 +03:00
parent a890e36a36
commit 92e3f00435

View File

@ -570,6 +570,7 @@ class Worker(ServiceCommandSection):
self._downtime_config = self._session.config.get("agent.downtime", None)
self._suppress_cr = self._session.config.get("agent.suppress_carriage_return", True)
self._host_ssh_cache = None
self._truncate_task_output_files = bool(self._session.config.get("agent.truncate_task_output_files", False))
# True - supported
# None - not initialized
@ -1547,10 +1548,16 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]]
def _print_file(file_path, prev_pos=0):
with open(file_path, "rb") as f:
with open(file_path, "ab+") as f:
f.seek(prev_pos)
binary_text = f.read()
pos = f.tell()
if not self._truncate_task_output_files:
# non-buffered behavior
pos = f.tell()
else:
# buffered - read everything and truncate
f.truncate(0)
pos = 0
# skip the previously printed lines,
blines = binary_text.split(b'\n') if binary_text else []
if not blines:
@ -1566,6 +1573,21 @@ class Worker(ServiceCommandSection):
stderr = open(stderr_path, "wt") if stderr_path else stdout
stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, []
stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, []
lines_buffer = defaultdict(list)
def report_lines(lines, source):
if not self._truncate_task_output_files:
# non-buffered
return self.send_logs(task_id, lines, session=session)
buffer = lines_buffer[source]
buffer += lines
sent = self.send_logs(task_id, buffer, session=session)
if sent > 0:
lines_buffer[source] = buffer[sent:]
return sent
service_mode_internal_agent_started = None
stopping = False
status = None
@ -1616,10 +1638,11 @@ class Worker(ServiceCommandSection):
if status is not None:
stop_reason = 'Service started'
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
except subprocess.CalledProcessError as ex:
# non zero return code
@ -1633,10 +1656,10 @@ class Worker(ServiceCommandSection):
except Exception:
# we should not get here, but better safe than sorry
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
stop_reason = TaskStopReason.exception
status = -1
@ -1655,10 +1678,10 @@ class Worker(ServiceCommandSection):
# Send last lines
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
stdout_line_count += self.send_logs(task_id, printed_lines, session=session)
stdout_line_count += report_lines(printed_lines, "stdout")
if stderr_path:
printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count)
stderr_line_count += self.send_logs(task_id, printed_lines, session=session)
stderr_line_count += report_lines(printed_lines, "stderr")
return status, stop_reason