From a34c612f38af9c5f2c9f53931ed9df35ac834e90 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 19 Jun 2023 13:45:26 +0200 Subject: [PATCH] More robust runner update handler --- .../server/process/shared/common.ts | 18 +++++++++++++----- shared/ffmpeg/ffmpeg-command-wrapper.ts | 12 ++++++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index 88f7c33f1..dbeb9dfc1 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts @@ -1,5 +1,4 @@ import { remove } from 'fs-extra' -import { throttle } from 'lodash' import { ConfigManager, downloadFile, logger } from 'packages/peertube-runner/shared' import { join } from 'path' import { buildUUID } from '@shared/extra-utils' @@ -60,17 +59,26 @@ export function buildFFmpegVOD (options: { ? 500 : 60000 - const updateJobProgress = throttle((progress: number) => { - if (progress < 0 || progress > 100) progress = undefined + let progress: number + const interval = setInterval(() => { updateTranscodingProgress({ server, job, runnerToken, progress }) .catch(err => logger.error({ err }, 'Cannot send job progress')) - }, updateInterval, { trailing: false }) + }, updateInterval) return new FFmpegVOD({ ...getCommonFFmpegOptions(), - updateJobProgress + onError: () => clearInterval(interval), + onEnd: () => clearInterval(interval), + + updateJobProgress: arg => { + if (arg < 0 || arg > 100) { + progress = undefined + } else { + progress = arg + } + } }) } diff --git a/shared/ffmpeg/ffmpeg-command-wrapper.ts b/shared/ffmpeg/ffmpeg-command-wrapper.ts index 7a8c19d4b..efb75c198 100644 --- a/shared/ffmpeg/ffmpeg-command-wrapper.ts +++ b/shared/ffmpeg/ffmpeg-command-wrapper.ts @@ -21,6 +21,8 @@ export interface FFmpegCommandWrapperOptions { lTags?: { tags: string[] } updateJobProgress?: (progress?: number) => void + onEnd?: () => void + onError?: (err: Error) => void } export class FFmpegCommandWrapper { @@ -37,6 +39,8 @@ export class FFmpegCommandWrapper { private readonly lTags: { tags: string[] } private readonly updateJobProgress: (progress?: number) => void + private readonly onEnd?: () => void + private readonly onError?: (err: Error) => void private command: FfmpegCommand @@ -48,7 +52,11 @@ export class FFmpegCommandWrapper { this.threads = options.threads this.logger = options.logger this.lTags = options.lTags || { tags: [] } + this.updateJobProgress = options.updateJobProgress + + this.onEnd = options.onEnd + this.onError = options.onError } getAvailableEncoders () { @@ -101,12 +109,16 @@ export class FFmpegCommandWrapper { this.command.on('error', (err, stdout, stderr) => { if (silent !== true) this.logger.error('Error in ffmpeg.', { stdout, stderr, shellCommand, ...this.lTags }) + if (this.onError) this.onError(err) + rej(err) }) this.command.on('end', (stdout, stderr) => { this.logger.debug('FFmpeg command ended.', { stdout, stderr, shellCommand, ...this.lTags }) + if (this.onEnd) this.onEnd() + res() })