from copy import deepcopy from queue import Queue, Empty from time import time from config import VAD_MODEL_PATH # from silero_vad import load_silero_vad import numpy as np import onnxruntime class OnnxWrapper(): def __init__(self, path, force_onnx_cpu=False): opts = onnxruntime.SessionOptions() opts.inter_op_num_threads = 1 opts.intra_op_num_threads = 1 if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers(): self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts) else: self.session = onnxruntime.InferenceSession(path, sess_options=opts) self.reset_states() self.sample_rates = [16000] def _validate_input(self, x: np.ndarray, sr: int): if x.ndim == 1: x = x[None] if x.ndim > 2: raise ValueError(f"Too many dimensions for input audio chunk {x.ndim}") if sr != 16000 and (sr % 16000 == 0): step = sr // 16000 x = x[:, ::step] sr = 16000 if sr not in self.sample_rates: raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)") if sr / x.shape[1] > 31.25: raise ValueError("Input audio chunk is too short") return x, sr def reset_states(self, batch_size=1): self._state = np.zeros((2, batch_size, 128)).astype(np.float32) self._context = np.zeros(0) self._last_sr = 0 self._last_batch_size = 0 def __call__(self, x, sr: int): x, sr = self._validate_input(x, sr) num_samples = 512 if sr == 16000 else 256 if x.shape[-1] != num_samples: raise ValueError( f"Provided number of samples is {x.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)") batch_size = x.shape[0] context_size = 64 if sr == 16000 else 32 if not self._last_batch_size: self.reset_states(batch_size) if (self._last_sr) and (self._last_sr != sr): self.reset_states(batch_size) if (self._last_batch_size) and (self._last_batch_size != batch_size): self.reset_states(batch_size) if not len(self._context): self._context = np.zeros((batch_size, context_size)).astype(np.float32) x = np.concatenate([self._context, x], axis=1) if sr in [8000, 16000]: ort_inputs = {'input': x, 'state': self._state, 'sr': np.array(sr, dtype='int64')} ort_outs = self.session.run(None, ort_inputs) out, state = ort_outs self._state = state else: raise ValueError() self._context = x[..., -context_size:] self._last_sr = sr self._last_batch_size = batch_size # out = torch.from_numpy(out) return out def audio_forward(self, audio: np.ndarray, sr: int): outs = [] x, sr = self._validate_input(audio, sr) self.reset_states() num_samples = 512 if sr == 16000 else 256 if x.shape[1] % num_samples: pad_num = num_samples - (x.shape[1] % num_samples) x = np.pad(x, ((0, 0), (0, pad_num)), 'constant', constant_values=(0.0, 0.0)) for i in range(0, x.shape[1], num_samples): wavs_batch = x[:, i:i + num_samples] out_chunk = self.__call__(wavs_batch, sr) outs.append(out_chunk) stacked = np.concatenate(outs, axis=1) return stacked class VADIteratorOnnx: def __init__(self, threshold: float = 0.5, sampling_rate: int = 16000, min_silence_duration_ms: int = 100, max_speech_duration_s: float = float('inf'), ): self.model = OnnxWrapper(VAD_MODEL_PATH, True) self.threshold = threshold self.sampling_rate = sampling_rate if sampling_rate not in [8000, 16000]: raise ValueError('VADIterator does not support sampling rates other than [8000, 16000]') self.min_silence_samples = sampling_rate * min_silence_duration_ms / 1000 self.max_speech_samples = int(sampling_rate * max_speech_duration_s) # self.speech_pad_samples = sampling_rate * speech_pad_ms / 1000 self.reset_states() def reset_states(self): self.model.reset_states() self.triggered = False self.temp_end = 0 self.current_sample = 0 self.start = 0 def __call__(self, x: np.ndarray, return_seconds=False): """ x: np.ndarray audio chunk (see examples in repo) return_seconds: bool (default - False) whether return timestamps in seconds (default - samples) """ window_size_samples = 512 if self.sampling_rate == 16000 else 256 x = x[:window_size_samples] if len(x) < window_size_samples: x = np.pad(x, ((0, 0), (0, window_size_samples - len(x))), 'constant', constant_values=0.0) self.current_sample += window_size_samples speech_prob = self.model(x, self.sampling_rate)[0,0] # print(f"{self.current_sample/self.sampling_rate:.2f}: {speech_prob}") if (speech_prob >= self.threshold) and self.temp_end: self.temp_end = 0 if (speech_prob >= self.threshold) and not self.triggered: self.triggered = True speech_start = max(0, self.current_sample - window_size_samples) self.start = speech_start return {'start': int(speech_start) if not return_seconds else round(speech_start / self.sampling_rate, 1)} if (speech_prob >= self.threshold) and self.current_sample - self.start >= self.max_speech_samples: if self.temp_end: self.temp_end = 0 self.start = self.current_sample return {'end': int(self.current_sample) if not return_seconds else round(self.current_sample / self.sampling_rate, 1)} if (speech_prob < self.threshold - 0.15) and self.triggered: if not self.temp_end: self.temp_end = self.current_sample if self.current_sample - self.temp_end < self.min_silence_samples: return None else: speech_end = self.temp_end - window_size_samples self.temp_end = 0 self.triggered = False return {'end': int(speech_end) if not return_seconds else round(speech_end / self.sampling_rate, 1)} return None class VadV2: def __init__(self, threshold: float = 0.5, sampling_rate: int = 16000, min_silence_duration_ms: int = 100, speech_pad_ms: int = 30, max_speech_duration_s: float = float('inf')): # self.vad_iterator = VADIterator(threshold, sampling_rate, min_silence_duration_ms) self.vad_iterator = VADIteratorOnnx(threshold, sampling_rate, min_silence_duration_ms, max_speech_duration_s) self.speech_pad_samples = int(sampling_rate * speech_pad_ms / 1000) self.sampling_rate = sampling_rate self.audio_buffer = np.array([], dtype=np.float32) self.start = 0 self.end = 0 self.offset = 0 assert speech_pad_ms <= min_silence_duration_ms, "speech_pad_ms should be less than min_silence_duration_ms" self.max_speech_samples = int(sampling_rate * max_speech_duration_s) self.silence_chunk_size = 0 self.silence_chunk_threshold = 60 / (512 / self.sampling_rate) def reset(self): self.audio_buffer = np.array([], dtype=np.float32) self.start = 0 self.end = 0 self.offset = 0 self.vad_iterator.reset_states() def __call__(self, x: np.ndarray = None): if x is None: if self.start: start = max(self.offset, self.start - self.speech_pad_samples) end = self.offset + len(self.audio_buffer) start_ts = round(start / self.sampling_rate, 1) end_ts = round(end / self.sampling_rate, 1) audio_data = self.audio_buffer[start - self.offset: end - self.offset] result = { "start": start_ts, "end": end_ts, "audio": audio_data, } else: result = None self.reset() return result self.audio_buffer = np.append(self.audio_buffer, deepcopy(x)) result = self.vad_iterator(x) if result is not None: # self.start = result.get('start', self.start) # self.end = result.get('end', self.end) self.silence_chunk_size = 0 if 'start' in result: self.start = result['start'] if 'end' in result: self.end = result['end'] else: self.silence_chunk_size += 1 if self.start == 0 and len(self.audio_buffer) > self.speech_pad_samples: self.offset += len(self.audio_buffer) - self.speech_pad_samples self.audio_buffer = self.audio_buffer[-self.speech_pad_samples:] if self.silence_chunk_size >= self.silence_chunk_threshold: self.offset += len(self.audio_buffer) - self.speech_pad_samples self.audio_buffer = self.audio_buffer[-self.speech_pad_samples:] self.silence_chunk_size = 0 if self.end > self.start: start = max(self.offset, self.start - self.speech_pad_samples) end = self.end + self.speech_pad_samples start_ts = round(start / self.sampling_rate, 1) end_ts = round(end / self.sampling_rate, 1) audio_data = self.audio_buffer[start - self.offset: end - self.offset] self.audio_buffer = self.audio_buffer[self.end - self.offset:] self.offset = self.end self.start = self.end # self.start = 0 self.end = 0 result = { "start": start_ts, "end": end_ts, "audio": audio_data, } return result return None class VadProcessor: def __init__( self, prob_threshold=0.5, silence_s=0.2, cache_s=0.15, sr=16000 ): self.prob_threshold = prob_threshold self.cache_s = cache_s self.sr = sr self.silence_s = silence_s self.vad = VadV2(self.prob_threshold, self.sr, self.silence_s * 1000, self.cache_s * 1000, max_speech_duration_s=15) def process_audio(self, audio_buffer: np.ndarray): audio = np.array([], np.float32) for i in range(0, len(audio_buffer), 512): chunk = audio_buffer[i:i+512] ret = self.vad(chunk) if ret: audio = np.append(audio, ret['audio']) return audio