import asyncio import json import queue import threading import time from logging import getLogger import asyncio import numpy as np import config import collections from api_model import TransResult, Message, DebugResult from .utils import log_block, save_to_wave, TestDataWriter, filter_words from .translatepipes import TranslatePipes from transcribe.helpers.vadprocessor import VadProcessor from transcribe.pipelines import MetaItem from dataclasses import dataclass, field logger = getLogger("TranscriptionService") @dataclass class FullSegment: """整句""" audio_array: np.ndarray created_time: float = field(default_factory=time.time) @staticmethod def merge(*audio_segments: list["FullSegment"]): audio_segments_sorted = sorted([*audio_segments], key=lambda item: item.created_time) return FullSegment( created_time=audio_segments_sorted[0].created_time, audio_array=np.concatenate([i.audio_array for i in audio_segments_sorted], axis=0) ) @property def time_duration(self) -> float: return len(self.audio_array) / config.SAMPLE_RATE @property def start_timestamp(self): return self.created_time @property def end_timestamp(self): return self.created_time + self.time_duration class WhisperTranscriptionService: """ Whisper语音转录服务类,处理音频流转录和翻译 """ SERVER_READY = "SERVER_READY" DISCONNECT = "DISCONNECT" def __init__(self, websocket, pipe: TranslatePipes, language=None, dst_lang=None, client_uid=None): print('>>>>>>>>>>>>>>>> init service >>>>>>>>>>>>>>>>>>>>>>') print('src_lang:', language) self.source_language = language # 源语言 self.target_language = dst_lang # 目标翻译语言 self.client_uid = client_uid # 转录结果稳定性管理 self.websocket = websocket self._translate_pipe = pipe # 音频处理相关 self.sample_rate = 16000 self.lock = threading.Lock() # 文本分隔符,根据语言设置 self.text_separator = self._get_text_separator(language) self.loop = asyncio.get_event_loop() # 发送就绪状态 # 原始音频队列 self._frame_queue = queue.Queue() # 音频队列缓冲区 self.frames_np = np.array([], dtype=np.float32) self.frames_np_start_timestamp = None # 完整音频队列 self.full_segments_queue = collections.deque() # 启动处理线程 self._translate_thread_stop = threading.Event() self._frame_processing_thread_stop = threading.Event() self.translate_thread = self._start_thread(self._transcription_processing_loop) self.frame_processing_thread = self._start_thread(self._frame_processing_loop) self.row_number = 0 # for test self._transcrible_time_cost = 0. self._translate_time_cost = 0. if config.SAVE_DATA_SAVE: self._save_task_stop = threading.Event() self._save_queue = queue.Queue() self._save_thread = self._start_thread(self.save_data_loop) # self._c = 0 def save_data_loop(self): writer = TestDataWriter() while not self._save_task_stop.is_set(): test_data = self._save_queue.get() writer.write(test_data) # Save test_data to CSV def _start_thread(self, target_function) -> threading.Thread: """启动守护线程执行指定函数""" thread = threading.Thread(target=target_function) thread.daemon = True thread.start() return thread def _get_text_separator(self, language: str) -> str: """根据语言返回适当的文本分隔符""" return "" if language == "zh" else " " def add_frames(self, frame_np: np.ndarray) -> None: """添加音频帧到处理队列""" self._frame_queue.put(frame_np) def _apply_voice_activity_detection(self, frame_np:np.array): """应用语音活动检测来优化音频缓冲区""" processed_audio = self._translate_pipe.voice_detect(frame_np.tobytes()) speech_audio = np.frombuffer(processed_audio.audio, dtype=np.float32) speech_status = processed_audio.speech_status return speech_audio, speech_status def _frame_processing_loop(self) -> None: """从队列获取音频帧并合并到缓冲区""" while not self._frame_processing_thread_stop.is_set(): try: frame_np = self._frame_queue.get(timeout=0.1) frame_np, speech_status = self._apply_voice_activity_detection(frame_np) if frame_np is None: continue with self.lock: if speech_status == "START" and self.frames_np_start_timestamp is None: self.frames_np_start_timestamp = time.time() # 添加音频到音频缓冲区 self.frames_np = np.append(self.frames_np, frame_np) if len(self.frames_np) >= self.sample_rate * config.MAX_SPEECH_DURATION_S: audio_array=self.frames_np.copy() self.full_segments_queue.appendleft(audio_array) # 根据时间是否满足三秒长度 来整合音频块 self.frames_np_start_timestamp = time.time() self.frames_np = np.array([], dtype=np.float32) elif speech_status == "END" and len(self.frames_np) > 0 and self.frames_np_start_timestamp: time_diff = time.time() - self.frames_np_start_timestamp if time_diff >= config.DESIGN_TIME_THREHOLD: audio_array=self.frames_np.copy() self.full_segments_queue.appendleft(audio_array) # 根据时间是否满足三秒长度 来整合音频块 self.frames_np_start_timestamp = None self.frames_np = np.array([], dtype=np.float32) else: logger.debug(f"🥳 当前时间与上一句的时间差: {time_diff:.2f}s,继续增加缓冲区") except queue.Empty: pass def _transcription_processing_loop(self) -> None: """主转录处理循环""" frame_epoch = 1 while not self._translate_thread_stop.is_set(): if len(self.frames_np) ==0: time.sleep(0.01) continue with self.lock: if len(self.full_segments_queue) > 0: audio_buffer = self.full_segments_queue.pop() partial = False else: audio_buffer = self.frames_np[:int(frame_epoch * 1.5 * self.sample_rate)].copy()# 获取 1.5s * epoch 个音频长度 partial = True if len(audio_buffer) < int(self.sample_rate): silence_audio = np.zeros(self.sample_rate, dtype=np.float32) silence_audio[-len(audio_buffer):] = audio_buffer audio_buffer = silence_audio logger.debug(f"audio buffer size: {len(audio_buffer) / self.sample_rate:.2f}s") meta_item = self._transcribe_audio(audio_buffer) segments = meta_item.segments logger.debug(f"Segments: {segments}") segments = filter_words(segments) if len(segments): seg_text = self.text_separator.join(seg.text for seg in segments) result = TransResult( seg_id=self.row_number, context=seg_text, from_=self.source_language, to=self.target_language, tran_content=self._translate_text_large(seg_text), partial=partial ) if partial == False: self.row_number += 1 frame_epoch = 1 else: frame_epoch += 1 self._send_result_to_client(result) def _transcribe_audio(self, audio_buffer: np.ndarray)->MetaItem: """转录音频并返回转录片段""" log_block("Audio buffer length", f"{audio_buffer.shape[0]/self.sample_rate:.2f}", "s") start_time = time.perf_counter() result = self._translate_pipe.transcrible(audio_buffer.tobytes(), self.source_language) segments = result.segments time_diff = (time.perf_counter() - start_time) logger.debug(f"📝 Transcrible Segments: {segments} ") # logger.debug(f"📝 Transcrible: {self.text_separator.join(seg.text for seg in segments)} ") log_block("📝 Transcrible output", f"{self.text_separator.join(seg.text for seg in segments)}", "") log_block("📝 Transcrible time", f"{time_diff:.3f}", "s") self._transcrible_time_cost = round(time_diff, 3) return result def _translate_text(self, text: str) -> str: """将文本翻译为目标语言""" if not text.strip(): return "" log_block("🐧 Translation input ", f"{text}") start_time = time.perf_counter() result = self._translate_pipe.translate(text, self.source_language, self.target_language) translated_text = result.translate_content time_diff = (time.perf_counter() - start_time) log_block("🐧 Translation time ", f"{time_diff:.3f}", "s") log_block("🐧 Translation out ", f"{translated_text}") self._translate_time_cost = round(time_diff, 3) return translated_text def _translate_text_large(self, text: str) -> str: """将文本翻译为目标语言""" if not text.strip(): return "" log_block("Translation input", f"{text}") start_time = time.perf_counter() result = self._translate_pipe.translate_large(text, self.source_language, self.target_language) translated_text = result.translate_content time_diff = (time.perf_counter() - start_time) log_block("Translation large model time ", f"{time_diff:.3f}", "s") log_block("Translation large model output", f"{translated_text}") self._translate_time_cost = round(time_diff, 3) return translated_text def _send_result_to_client(self, result: TransResult) -> None: """发送翻译结果到客户端""" try: message = Message(result=result, request_id=self.client_uid).model_dump_json(by_alias=True) coro = self.websocket.send_text(message) future = asyncio.run_coroutine_threadsafe(coro, self.loop) future.add_done_callback(lambda fut: fut.exception() and self.stop()) except RuntimeError: self.stop() except Exception as e: logger.error(f"Error sending result to client: {e}") def stop(self) -> None: """停止所有处理线程并清理资源""" self._translate_thread_stop.set() self._frame_processing_thread_stop.set() if config.SAVE_DATA_SAVE: self._save_task_stop.set() logger.info(f"Stopping transcription service for client: {self.client_uid}")