Translator / transcribe /helpers /vadprocessor.py
Xin Zhang
[fix]: update vad threshold.
b67c020
raw
history blame
11.1 kB
from copy import deepcopy
from queue import Queue, Empty
from time import time
from config import VAD_MODEL_PATH
# from silero_vad import load_silero_vad
import numpy as np
import onnxruntime
class OnnxWrapper():
def __init__(self, path, force_onnx_cpu=False):
opts = onnxruntime.SessionOptions()
opts.inter_op_num_threads = 1
opts.intra_op_num_threads = 1
if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers():
self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts)
else:
self.session = onnxruntime.InferenceSession(path, sess_options=opts)
self.reset_states()
self.sample_rates = [16000]
def _validate_input(self, x: np.ndarray, sr: int):
if x.ndim == 1:
x = x[None]
if x.ndim > 2:
raise ValueError(f"Too many dimensions for input audio chunk {x.ndim}")
if sr != 16000 and (sr % 16000 == 0):
step = sr // 16000
x = x[:, ::step]
sr = 16000
if sr not in self.sample_rates:
raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)")
if sr / x.shape[1] > 31.25:
raise ValueError("Input audio chunk is too short")
return x, sr
def reset_states(self, batch_size=1):
self._state = np.zeros((2, batch_size, 128)).astype(np.float32)
self._context = np.zeros(0)
self._last_sr = 0
self._last_batch_size = 0
def __call__(self, x, sr: int):
x, sr = self._validate_input(x, sr)
num_samples = 512 if sr == 16000 else 256
if x.shape[-1] != num_samples:
raise ValueError(
f"Provided number of samples is {x.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)")
batch_size = x.shape[0]
context_size = 64 if sr == 16000 else 32
if not self._last_batch_size:
self.reset_states(batch_size)
if (self._last_sr) and (self._last_sr != sr):
self.reset_states(batch_size)
if (self._last_batch_size) and (self._last_batch_size != batch_size):
self.reset_states(batch_size)
if not len(self._context):
self._context = np.zeros((batch_size, context_size)).astype(np.float32)
x = np.concatenate([self._context, x], axis=1)
if sr in [8000, 16000]:
ort_inputs = {'input': x, 'state': self._state, 'sr': np.array(sr, dtype='int64')}
ort_outs = self.session.run(None, ort_inputs)
out, state = ort_outs
self._state = state
else:
raise ValueError()
self._context = x[..., -context_size:]
self._last_sr = sr
self._last_batch_size = batch_size
# out = torch.from_numpy(out)
return out
def audio_forward(self, audio: np.ndarray, sr: int):
outs = []
x, sr = self._validate_input(audio, sr)
self.reset_states()
num_samples = 512 if sr == 16000 else 256
if x.shape[1] % num_samples:
pad_num = num_samples - (x.shape[1] % num_samples)
x = np.pad(x, ((0, 0), (0, pad_num)), 'constant', constant_values=(0.0, 0.0))
for i in range(0, x.shape[1], num_samples):
wavs_batch = x[:, i:i + num_samples]
out_chunk = self.__call__(wavs_batch, sr)
outs.append(out_chunk)
stacked = np.concatenate(outs, axis=1)
return stacked
class VADIteratorOnnx:
def __init__(self,
threshold: float = 0.5,
sampling_rate: int = 16000,
min_silence_duration_ms: int = 100,
max_speech_duration_s: float = float('inf'),
):
self.model = OnnxWrapper(VAD_MODEL_PATH, True)
self.threshold = threshold
self.sampling_rate = sampling_rate
if sampling_rate not in [8000, 16000]:
raise ValueError('VADIterator does not support sampling rates other than [8000, 16000]')
self.min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
self.max_speech_samples = int(sampling_rate * max_speech_duration_s)
# self.speech_pad_samples = sampling_rate * speech_pad_ms / 1000
self.reset_states()
def reset_states(self):
self.model.reset_states()
self.triggered = False
self.temp_end = 0
self.current_sample = 0
self.start = 0
def __call__(self, x: np.ndarray, return_seconds=False):
"""
x: np.ndarray
audio chunk (see examples in repo)
return_seconds: bool (default - False)
whether return timestamps in seconds (default - samples)
"""
window_size_samples = 512 if self.sampling_rate == 16000 else 256
x = x[:window_size_samples]
if len(x) < window_size_samples:
x = np.pad(x, ((0, 0), (0, window_size_samples - len(x))), 'constant', constant_values=0.0)
self.current_sample += window_size_samples
speech_prob = self.model(x, self.sampling_rate)[0,0]
# print(f"{self.current_sample/self.sampling_rate:.2f}: {speech_prob}")
if (speech_prob >= self.threshold) and self.temp_end:
self.temp_end = 0
if (speech_prob >= self.threshold) and not self.triggered:
self.triggered = True
speech_start = max(0, self.current_sample - window_size_samples)
self.start = speech_start
return {'start': int(speech_start) if not return_seconds else round(speech_start / self.sampling_rate, 1)}
if (speech_prob >= self.threshold) and self.current_sample - self.start >= self.max_speech_samples:
if self.temp_end:
self.temp_end = 0
self.start = self.current_sample
return {'end': int(self.current_sample) if not return_seconds else round(self.current_sample / self.sampling_rate, 1)}
if (speech_prob < self.threshold - 0.15) and self.triggered:
if not self.temp_end:
self.temp_end = self.current_sample
if self.current_sample - self.temp_end < self.min_silence_samples:
return None
else:
speech_end = self.temp_end - window_size_samples
self.temp_end = 0
self.triggered = False
return {'end': int(speech_end) if not return_seconds else round(speech_end / self.sampling_rate, 1)}
return None
class VadV2:
def __init__(self,
threshold: float = 0.5,
sampling_rate: int = 16000,
min_silence_duration_ms: int = 100,
speech_pad_ms: int = 30,
max_speech_duration_s: float = float('inf')):
# self.vad_iterator = VADIterator(threshold, sampling_rate, min_silence_duration_ms)
self.vad_iterator = VADIteratorOnnx(threshold, sampling_rate, min_silence_duration_ms, max_speech_duration_s)
self.speech_pad_samples = int(sampling_rate * speech_pad_ms / 1000)
self.sampling_rate = sampling_rate
self.audio_buffer = np.array([], dtype=np.float32)
self.start = 0
self.end = 0
self.offset = 0
assert speech_pad_ms <= min_silence_duration_ms, "speech_pad_ms should be less than min_silence_duration_ms"
self.max_speech_samples = int(sampling_rate * max_speech_duration_s)
self.silence_chunk_size = 0
self.silence_chunk_threshold = 60 / (512 / self.sampling_rate)
def reset(self):
self.audio_buffer = np.array([], dtype=np.float32)
self.start = 0
self.end = 0
self.offset = 0
self.vad_iterator.reset_states()
def __call__(self, x: np.ndarray = None):
if x is None:
if self.start:
start = max(self.offset, self.start - self.speech_pad_samples)
end = self.offset + len(self.audio_buffer)
start_ts = round(start / self.sampling_rate, 1)
end_ts = round(end / self.sampling_rate, 1)
audio_data = self.audio_buffer[start - self.offset: end - self.offset]
result = {
"start": start_ts,
"end": end_ts,
"audio": audio_data,
}
else:
result = None
self.reset()
return result
self.audio_buffer = np.append(self.audio_buffer, deepcopy(x))
result = self.vad_iterator(x)
if result is not None:
# self.start = result.get('start', self.start)
# self.end = result.get('end', self.end)
self.silence_chunk_size = 0
if 'start' in result:
self.start = result['start']
if 'end' in result:
self.end = result['end']
else:
self.silence_chunk_size += 1
if self.start == 0 and len(self.audio_buffer) > self.speech_pad_samples:
self.offset += len(self.audio_buffer) - self.speech_pad_samples
self.audio_buffer = self.audio_buffer[-self.speech_pad_samples:]
if self.silence_chunk_size >= self.silence_chunk_threshold:
self.offset += len(self.audio_buffer) - self.speech_pad_samples
self.audio_buffer = self.audio_buffer[-self.speech_pad_samples:]
self.silence_chunk_size = 0
if self.end > self.start:
start = max(self.offset, self.start - self.speech_pad_samples)
end = self.end + self.speech_pad_samples
start_ts = round(start / self.sampling_rate, 1)
end_ts = round(end / self.sampling_rate, 1)
audio_data = self.audio_buffer[start - self.offset: end - self.offset]
self.audio_buffer = self.audio_buffer[self.end - self.offset:]
self.offset = self.end
self.start = self.end
# self.start = 0
self.end = 0
result = {
"start": start_ts,
"end": end_ts,
"audio": audio_data,
}
return result
return None
class VadProcessor:
def __init__(
self,
prob_threshold=0.5,
silence_s=0.2,
cache_s=0.15,
sr=16000
):
self.prob_threshold = prob_threshold
self.cache_s = cache_s
self.sr = sr
self.silence_s = silence_s
self.vad = VadV2(self.prob_threshold, self.sr, self.silence_s * 1000, self.cache_s * 1000, max_speech_duration_s=15)
def process_audio(self, audio_buffer: np.ndarray):
audio = np.array([], np.float32)
for i in range(0, len(audio_buffer), 512):
chunk = audio_buffer[i:i+512]
ret = self.vad(chunk)
if ret:
audio = np.append(audio, ret['audio'])
return audio