mirror of
https://github.com/matatonic/openedai-speech
synced 2025-06-26 18:16:32 +00:00
0.16.0 +Multi-client safe
This commit is contained in:
parent
edc14301fd
commit
1d144a12e0
@ -3,14 +3,14 @@ FROM python:3.11-slim
|
||||
RUN --mount=type=cache,target=/root/.cache/pip pip install -U pip
|
||||
|
||||
ARG TARGETPLATFORM
|
||||
RUN apt-get update && apt-get install --no-install-recommends -y curl ffmpeg libaio-dev
|
||||
RUN apt-get update && apt-get install --no-install-recommends -y curl ffmpeg
|
||||
RUN if [ "$TARGETPLATFORM" != "linux/amd64" ]; then apt-get install --no-install-recommends -y build-essential ; fi
|
||||
RUN if [ "$TARGETPLATFORM" != "linux/amd64" ]; then curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y ; fi
|
||||
ENV PATH="/root/.cargo/bin:${PATH}"
|
||||
# for deepspeed support - doesn't seem worth it, image +7.5GB, over the 10GB ghcr.io limit, and no noticable gain in speed or VRAM usage?
|
||||
#RUN curl -O https://developer.download.nvidia.com/compute/cuda/repos/debian11/x86_64/cuda-keyring_1.1-1_all.deb
|
||||
#RUN dpkg -i cuda-keyring_1.1-1_all.deb && rm cuda-keyring_1.1-1_all.deb
|
||||
#RUN apt-get update && apt-get install --no-install-recommends -y build-essential cuda-toolkit
|
||||
#RUN apt-get update && apt-get install --no-install-recommends -y libaio-dev build-essential cuda-toolkit
|
||||
#ENV CUDA_HOME=/usr/local/cuda
|
||||
RUN apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
@ -23,15 +23,17 @@ Details:
|
||||
* Configurable [generation parameters](#generation-parameters)
|
||||
* Streamed output while generating
|
||||
* Occasionally, certain words or symbols may sound incorrect, you can fix them with regex via `pre_process_map.yaml`
|
||||
* Tested with python 3.9-3.11, piper does not install on python 3.12 yet
|
||||
|
||||
|
||||
If you find a better voice match for `tts-1` or `tts-1-hd`, please let me know so I can update the defaults.
|
||||
|
||||
## Recent Changes
|
||||
|
||||
Version 0.15.2, 2024-06-28
|
||||
Version 0.16.0, 2024-06-29
|
||||
|
||||
* Multi-client safe version. Audio generation is synchronized in a single process. The estimated 'realtime' factor of XTTS on a GPU is roughly 1/3, this means that multiple streams simultaneously, or `speed` over 2, may experience audio underrun (delays or pauses in playback). This makes multiple clients possible and safe, but in practice 2 or 3 simultaneous streams is the maximum without audio underrun.
|
||||
|
||||
* Thread safe version, with audio generation synchronized at the sentence level. The estimated 'realtime' factor of XTTS of roughly 1/3, this means that multiple streams simultaneously, or `speed` over 2, may experience audio underrun (ie. delays in playback)
|
||||
|
||||
Version 0.15.1, 2024-06-27
|
||||
|
||||
|
||||
169
speech.py
169
speech.py
@ -1,9 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import asyncio
|
||||
import contextlib
|
||||
import gc
|
||||
import io
|
||||
import os
|
||||
import queue
|
||||
import re
|
||||
@ -73,12 +71,8 @@ class xtts_wrapper():
|
||||
|
||||
if self.unload_timer:
|
||||
logger.info(f"Setting unload timer to {self.unload_timer} seconds")
|
||||
self.not_idle()
|
||||
self.check_idle()
|
||||
|
||||
def not_idle(self):
|
||||
with self.lock:
|
||||
self.last_used = time.time()
|
||||
self.check_idle()
|
||||
|
||||
def check_idle(self):
|
||||
with self.lock:
|
||||
@ -92,22 +86,28 @@ class xtts_wrapper():
|
||||
self.timer.start()
|
||||
|
||||
def tts(self, text, language, speaker_wav, **hf_generate_kwargs):
|
||||
logger.debug(f"waiting lock")
|
||||
with self.lock, torch.no_grad(): # I wish this could be another way, but it seems that inference_stream cannot be access async reliably
|
||||
logger.debug(f"grabbed lock, tts text: {text}")
|
||||
with torch.no_grad():
|
||||
self.last_used = time.time()
|
||||
tokens = 0
|
||||
try:
|
||||
gpt_cond_latent, speaker_embedding = self.xtts.get_conditioning_latents(audio_path=[speaker_wav]) # XXX TODO: allow multiple wav
|
||||
with self.lock:
|
||||
gpt_cond_latent, speaker_embedding = self.xtts.get_conditioning_latents(audio_path=[speaker_wav]) # not worth caching calls, it's < 0.001s after model is loaded
|
||||
pcm_stream = self.xtts.inference_stream(text, language, gpt_cond_latent, speaker_embedding, **hf_generate_kwargs)
|
||||
self.last_used = time.time()
|
||||
|
||||
for wav in self.xtts.inference_stream(text, language, gpt_cond_latent, speaker_embedding, **hf_generate_kwargs):
|
||||
yield wav.cpu().numpy().tobytes()
|
||||
while True:
|
||||
with self.lock:
|
||||
yield next(pcm_stream).cpu().numpy().tobytes()
|
||||
self.last_used = time.time()
|
||||
tokens += 1
|
||||
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
finally:
|
||||
logger.debug(f"held lock for {time.time() - self.last_used:0.1f} sec")
|
||||
logger.debug(f"Generated {tokens} tokens in {time.time() - self.last_used:.2f}s @ {tokens / (time.time() - self.last_used):.2f} T/s")
|
||||
self.last_used = time.time()
|
||||
|
||||
|
||||
|
||||
def default_exists(filename: str):
|
||||
if not os.path.exists(filename):
|
||||
fpath, ext = os.path.splitext(filename)
|
||||
@ -203,13 +203,12 @@ async def generate_speech(request: GenerateSpeechRequest):
|
||||
elif response_format == "pcm":
|
||||
if model == 'tts-1': # piper
|
||||
media_type = "audio/pcm;rate=22050"
|
||||
elif model == 'tts-1-hd':
|
||||
elif model == 'tts-1-hd': # xtts
|
||||
media_type = "audio/pcm;rate=24000"
|
||||
else:
|
||||
raise BadRequestError(f"Invalid response_format: '{response_format}'", param='response_format')
|
||||
|
||||
ffmpeg_args = None
|
||||
tts_io_out = None
|
||||
|
||||
# Use piper for tts-1, and if xtts_device == none use for all models.
|
||||
if model == 'tts-1' or args.xtts_device == 'none':
|
||||
@ -289,77 +288,9 @@ async def generate_speech(request: GenerateSpeechRequest):
|
||||
|
||||
ffmpeg_proc = subprocess.Popen(ffmpeg_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
|
||||
# before the xtts lock, it was:
|
||||
#def generator():
|
||||
# for chunk in xtts.tts(text=input_text, language=language, speaker_wav=speaker, **hf_generate_kwargs):
|
||||
# ffmpeg_proc.stdin.write(chunk) # <-- but this blocks forever and holds the xtts lock if a client disconnects
|
||||
#worker = threading.Thread(target=generator)
|
||||
#worker.daemon = True
|
||||
#worker.start()
|
||||
#return StreamingResponse(content=ffmpeg_proc.stdout, media_type=media_type)
|
||||
#
|
||||
# What follows is stupidly overcomplicated, but there is no other way I can find (yet) that detects client disconnects and not get blocked up
|
||||
|
||||
os.set_blocking(ffmpeg_proc.stdout.fileno(), False) # this doesn't work on windows until python 3.12
|
||||
os.set_blocking(ffmpeg_proc.stdin.fileno(), False) # this doesn't work on windows until python 3.12
|
||||
ffmpeg_in = io.FileIO(ffmpeg_proc.stdin.fileno(), 'wb')
|
||||
|
||||
in_q = queue.Queue() # speech pcm
|
||||
out_q = queue.Queue() # ffmpeg audio out
|
||||
ex_q = queue.Queue() # exceptions
|
||||
|
||||
def ffmpeg_io():
|
||||
# in_q -> ffmopeg -> out_q
|
||||
while not (ffmpeg_proc.stdout.closed and ffmpeg_proc.stdin.closed):
|
||||
try:
|
||||
while not ffmpeg_proc.stdout.closed:
|
||||
chunk = ffmpeg_proc.stdout.read()
|
||||
if chunk is None:
|
||||
break
|
||||
|
||||
if len(chunk) == 0: # real end
|
||||
out_q.put(None)
|
||||
ffmpeg_proc.stdout.close()
|
||||
break
|
||||
|
||||
out_q.put(chunk)
|
||||
continue # consume audio without delay
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"ffmpeg stdout read: {repr(e)}")
|
||||
out_q.put(None)
|
||||
ex_q.put(e)
|
||||
return
|
||||
|
||||
try:
|
||||
while not ffmpeg_proc.stdin.closed:
|
||||
chunk = in_q.get_nowait()
|
||||
if chunk is None:
|
||||
ffmpeg_proc.stdin.close()
|
||||
break
|
||||
n = ffmpeg_in.write(chunk) # BrokenPipeError from here on client disconnect
|
||||
if n is None:
|
||||
in_q.queue.appendleft(chunk)
|
||||
break
|
||||
if n != len(chunk):
|
||||
in_q.queue.appendleft(chunk[n:])
|
||||
break
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
except BrokenPipeError as e:
|
||||
ex_q.put(e) # we need to get this exception into the generation loop, which holds the lock
|
||||
ffmpeg_proc.kill()
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
ex_q.put(e)
|
||||
ffmpeg_proc.kill()
|
||||
return
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
def exception_check(exq: queue.Queue):
|
||||
try:
|
||||
e = exq.get_nowait()
|
||||
@ -376,56 +307,45 @@ async def generate_speech(request: GenerateSpeechRequest):
|
||||
exception_check(ex_q)
|
||||
in_q.put(chunk)
|
||||
|
||||
in_q.put(None)
|
||||
|
||||
except BrokenPipeError as e: # client disconnect lands here
|
||||
#logger.debug(f"{repr(e)}")
|
||||
logger.info("Client disconnected")
|
||||
|
||||
except asyncio.CancelledError as e:
|
||||
logger.debug(f"{repr(e)}")
|
||||
pass
|
||||
logger.info("Client disconnected - 'Broken pipe'")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Exception: {repr(e)}")
|
||||
raise e
|
||||
|
||||
finally:
|
||||
in_q.put(None) # sentinel
|
||||
|
||||
worker = threading.Thread(target=generator, daemon = True)
|
||||
worker.start()
|
||||
def out_writer():
|
||||
# in_q -> ffmpeg
|
||||
try:
|
||||
while True:
|
||||
chunk = in_q.get()
|
||||
if chunk is None: # sentinel
|
||||
break
|
||||
ffmpeg_proc.stdin.write(chunk) # BrokenPipeError from here on client disconnect
|
||||
|
||||
worker2 = threading.Thread(target=ffmpeg_io, daemon = True)
|
||||
worker2.start()
|
||||
|
||||
async def audio_out():
|
||||
# out_q -> client
|
||||
while True:
|
||||
try:
|
||||
audio = out_q.get_nowait()
|
||||
if audio is None:
|
||||
return
|
||||
yield audio
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
except asyncio.CancelledError as e:
|
||||
logger.debug("{repr(e)}")
|
||||
ex_q.put(e)
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.debug("{repr(e)}")
|
||||
ex_q.put(e)
|
||||
return
|
||||
except Exception as e: # BrokenPipeError
|
||||
ex_q.put(e) # we need to get this exception into the generation loop
|
||||
ffmpeg_proc.kill()
|
||||
return
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
finally:
|
||||
ffmpeg_proc.stdin.close()
|
||||
|
||||
generator_worker = threading.Thread(target=generator, daemon=True)
|
||||
generator_worker.start()
|
||||
|
||||
out_writer_worker = threading.Thread(target=out_writer, daemon=True)
|
||||
out_writer_worker.start()
|
||||
|
||||
def cleanup():
|
||||
ffmpeg_proc.kill()
|
||||
del worker
|
||||
del worker2
|
||||
del generator_worker
|
||||
del out_writer_worker
|
||||
|
||||
return StreamingResponse(audio_out(), media_type=media_type, background=cleanup)
|
||||
return StreamingResponse(content=ffmpeg_proc.stdout, media_type=media_type, background=cleanup)
|
||||
else:
|
||||
raise BadRequestError("No such model, must be tts-1 or tts-1-hd.", param='model')
|
||||
|
||||
@ -448,6 +368,7 @@ if __name__ == "__main__":
|
||||
parser.add_argument('--preload', action='store', default=None, help="Preload a model (Ex. 'xtts' or 'xtts_v2.0.2'). By default it's loaded on first use.")
|
||||
parser.add_argument('--unload-timer', action='store', default=None, type=int, help="Idle unload timer for the XTTS model in seconds, Ex. 900 for 15 minutes")
|
||||
parser.add_argument('--use-deepspeed', action='store_true', default=False, help="Use deepspeed with xtts (this option is unsupported)")
|
||||
parser.add_argument('--no-cache-speaker', action='store_true', default=False, help="Don't use the speaker wav embeddings cache")
|
||||
parser.add_argument('-P', '--port', action='store', default=8000, type=int, help="Server tcp port")
|
||||
parser.add_argument('-H', '--host', action='store', default='0.0.0.0', help="Host to listen on, Ex. 0.0.0.0")
|
||||
parser.add_argument('-L', '--log-level', default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Set the log level")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user