Spaces:
Paused
Paused
| # aduc_framework/managers/wan_manager_s2v.py (CORRIGIDO) | |
| import os | |
| import math | |
| import shutil | |
| import tempfile | |
| import subprocess | |
| import random | |
| from typing import Tuple, Union | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import librosa # <<< NOVA IMPORTAÇÃO para carregar áudio | |
| # Importações necessárias do Diffusers e Transformers | |
| from transformers import Wav2Vec2ForCTC | |
| from diffusers.models import AutoencoderKLWan | |
| from diffusers.utils.export_utils import export_to_video | |
| from diffusers.pipelines.wan.pipeline_wan_s2v import WanSpeechToVideoPipeline | |
| # ============================================================================== | |
| # <<< FUNÇÃO ADICIONADA PARA CORRIGIR O IMPORT ERROR >>> | |
| # Esta função substitui a `load_audio` que não existe na sua versão do diffusers. | |
| # ============================================================================== | |
| def load_audio(audio_path: str, target_sr: int = 16000) -> Tuple[np.ndarray, int]: | |
| """ | |
| Carrega um arquivo de áudio de um caminho, o converte para mono e | |
| reamostra para a taxa de amostragem alvo. | |
| Args: | |
| audio_path (str): Caminho para o arquivo de áudio. | |
| target_sr (int): A taxa de amostragem desejada. | |
| Returns: | |
| Tuple[np.ndarray, int]: Uma tupla contendo o waveform como um array numpy | |
| e a taxa de amostragem final. | |
| """ | |
| try: | |
| # Carrega, converte para mono e reamostra | |
| waveform, sr = librosa.load(audio_path, sr=target_sr, mono=True) | |
| return waveform, target_sr | |
| except Exception as e: | |
| raise IOError(f"Não foi possível carregar o arquivo de áudio em {audio_path}. Erro: {e}") | |
| class WanManagerS2V: | |
| """ | |
| Wan S2V Manager: | |
| - Gerencia a pipeline WanSpeechToVideoPipeline. | |
| - Carrega os modelos necessários (VAE, Audio Encoder, Pipeline). | |
| - Processa uma imagem inicial e um áudio para gerar um vídeo. | |
| - Mescla o áudio original ao vídeo gerado usando FFmpeg. | |
| """ | |
| # Usamos o modelo original compatível com Diffusers, NÃO o GGUF. | |
| MODEL_ID = "Wan-AI/Wan2.2-S2V-14B" | |
| def __init__(self): | |
| print("Loading S2V models into memory. This may take a few minutes...") | |
| audio_encoder = Wav2Vec2ForCTC.from_pretrained(self.MODEL_ID, subfolder="audio_encoder", torch_dtype=torch.float32) | |
| vae = AutoencoderKLWan.from_pretrained(self.MODEL_ID, subfolder="vae", torch_dtype=torch.float32) | |
| self.pipe = WanSpeechToVideoPipeline.from_pretrained( | |
| self.MODEL_ID, vae=vae, audio_encoder=audio_encoder, torch_dtype=torch.bfloat16 | |
| ) | |
| self.pipe.to("cuda") | |
| print("All S2V models loaded. Service is ready.") | |
| def _get_size_for_target_area(self, height: int, width: int, target_area: int = 480 * 832, divisor: int = 64) -> Tuple[int, int]: | |
| aspect_ratio = width / height | |
| target_width = int((target_area * aspect_ratio) ** 0.5 // divisor * divisor) | |
| target_height = int((target_area / aspect_ratio) ** 0.5 // divisor * divisor) | |
| return target_height, target_width | |
| def _merge_audio_to_video(self, video_path: str, audio_path: str) -> str: | |
| if not shutil.which("ffmpeg"): | |
| print("[WanManagerS2V] AVISO: FFmpeg não encontrado. O vídeo será retornado sem áudio.") | |
| return video_path | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_final: | |
| output_path = tmp_final.name | |
| try: | |
| command = [ | |
| "ffmpeg", "-y", "-i", video_path, "-i", audio_path, | |
| "-c:v", "copy", "-c:a", "aac", "-shortest", output_path, | |
| ] | |
| subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| os.remove(video_path) | |
| return output_path | |
| except Exception as e: | |
| print(f"[WanManagerS2V] ERRO ao mesclar áudio: {e}. Retornando vídeo silencioso.") | |
| if os.path.exists(output_path): os.remove(output_path) | |
| return video_path | |
| def generate_video( | |
| self, | |
| start_image: Image.Image, | |
| audio_path: str, | |
| prompt: str, | |
| negative_prompt: str, | |
| steps: int, | |
| guidance_scale: float, | |
| seed: int, | |
| randomize_seed: bool, | |
| ) -> Tuple[str, int]: | |
| if start_image is None: | |
| raise ValueError("A imagem inicial não pode ser vazia.") | |
| if not os.path.exists(audio_path): | |
| raise FileNotFoundError(f"O arquivo de áudio não foi encontrado em: {audio_path}") | |
| current_seed = random.randint(0, np.iinfo(np.int32).max) if randomize_seed else int(seed) | |
| generator = torch.Generator(device="cuda").manual_seed(current_seed) | |
| # Usa a nossa nova função load_audio | |
| audio_waveform, sampling_rate = load_audio(audio_path) | |
| height, width = self._get_size_for_target_area(start_image.height, start_image.width) | |
| print("Generating video from speech...") | |
| # A pipeline S2V espera a taxa de amostragem em 16000Hz, que nossa função já garante. | |
| output = self.pipe( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| image=start_image, | |
| audio=audio_waveform, | |
| sampling_rate=sampling_rate, | |
| height=height, | |
| width=width, | |
| num_inference_steps=steps, | |
| guidance_scale=guidance_scale, | |
| generator=generator, | |
| ).frames[0] | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: | |
| silent_video_path = tmp.name | |
| export_to_video(output, silent_video_path, fps=16) | |
| final_video_path = self._merge_audio_to_video(silent_video_path, audio_path) | |
| return final_video_path, current_seed |