diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index b856fb5..3227e67 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -398,6 +398,7 @@ class Worker(ServiceCommandSection): self._redirected_stdout_file_no = None self._uptime_config = self._session.config.get("agent.uptime", None) self._downtime_config = self._session.config.get("agent.downtime", None) + self._suppress_cr = self._session.config.get("agent.suppress_carriage_return", True) # True - supported # None - not initialized @@ -984,21 +985,26 @@ class Worker(ServiceCommandSection): **kwargs # type: Any ): # type: (...) -> Tuple[Optional[int], TaskStopReason] - def _print_file(file_path, prev_line_count): + def _print_file(file_path, prev_pos=0): with open(file_path, "rb") as f: + f.seek(prev_pos) binary_text = f.read() - if not binary_text: - return [] + pos = f.tell() # skip the previously printed lines, - blines = binary_text.split(b'\n')[prev_line_count:] + blines = binary_text.split(b'\n') if binary_text else [] if not blines: - return blines - return decode_binary_lines(blines if blines[-1] else blines[:-1]) + return blines, pos + return ( + decode_binary_lines(blines if blines[-1] else blines[:-1], + replace_cr=not self._suppress_cr, + overwrite_cr=self._suppress_cr), + pos + ) stdout = open(stdout_path, "wt") stderr = open(stderr_path, "wt") if stderr_path else stdout - stdout_line_count, stdout_last_lines = 0, [] - stderr_line_count, stderr_last_lines = 0, [] + stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, [] + stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, [] service_mode_internal_agent_started = None stopping = False status = None @@ -1037,7 +1043,7 @@ class Worker(ServiceCommandSection): stderr.flush() # get diff from previous poll - printed_lines = _print_file(stdout_path, stdout_line_count) + printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) if self._services_mode and not stopping and not status: # if the internal agent started, we stop logging, it will take over logging. # if the internal agent started running the task itself, it will return status==0, @@ -1047,13 +1053,10 @@ class Worker(ServiceCommandSection): if status is not None: stop_reason = 'Service started' - stdout_line_count += self.send_logs( - task_id, printed_lines - ) + stdout_line_count += self.send_logs(task_id, printed_lines) if stderr_path: - stderr_line_count += self.send_logs( - task_id, _print_file(stderr_path, stderr_line_count) - ) + printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) + stderr_line_count += self.send_logs(task_id, printed_lines) except subprocess.CalledProcessError as ex: # non zero return code @@ -1064,9 +1067,11 @@ class Worker(ServiceCommandSection): raise except Exception: # we should not get here, but better safe than sorry - stdout_line_count += self.send_logs(task_id, _print_file(stdout_path, stdout_line_count)) + printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) + stdout_line_count += self.send_logs(task_id, printed_lines) if stderr_path: - stderr_line_count += self.send_logs(task_id, _print_file(stderr_path, stderr_line_count)) + printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) + stderr_line_count += self.send_logs(task_id, printed_lines) stop_reason = 'Exception occurred' status = -1 @@ -1080,13 +1085,11 @@ class Worker(ServiceCommandSection): stderr.close() # Send last lines - stdout_line_count += self.send_logs( - task_id, _print_file(stdout_path, stdout_line_count) - ) + printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count) + stdout_line_count += self.send_logs(task_id, printed_lines) if stderr_path: - stderr_line_count += self.send_logs( - task_id, _print_file(stderr_path, stderr_line_count) - ) + printed_lines, stderr_pos_count = _print_file(stderr_path, stderr_pos_count) + stderr_line_count += self.send_logs(task_id, printed_lines) return status, stop_reason diff --git a/trains_agent/helper/console.py b/trains_agent/helper/console.py index d424f65..290ae5a 100644 --- a/trains_agent/helper/console.py +++ b/trains_agent/helper/console.py @@ -9,7 +9,7 @@ from attr import attrs, attrib import six from six import binary_type, text_type -from trains_agent.helper.base import nonstrict_in_place_sort, create_tree +from trains_agent.helper.base import nonstrict_in_place_sort def print_text(text, newline=True): @@ -22,15 +22,21 @@ def print_text(text, newline=True): sys.stdout.write(data) -def decode_binary_lines(binary_lines, encoding='utf-8'): +def decode_binary_lines(binary_lines, encoding='utf-8', replace_cr=False, overwrite_cr=False): # decode per line, if we failed decoding skip the line lines = [] for b in binary_lines: + # noinspection PyBroadException try: - l = b.decode(encoding=encoding, errors='replace').replace('\r', '\n') - except: - l = '' - lines.append(l + '\n' if l and l[-1] != '\n' else l) + line = b.decode(encoding=encoding, errors='replace') + if replace_cr: + line = line.replace('\r', '\n') + elif overwrite_cr: + cr_lines = line.split('\r') + line = cr_lines[-1] if cr_lines[-1] or len(cr_lines) < 2 else cr_lines[-2] + except Exception: + line = '' + lines.append(line + '\n' if not line or line[-1] != '\n' else line) return lines