Carlex22222 commited on
Commit
847d2f3
·
verified ·
1 Parent(s): 1f0256b

Update aduc_framework/engineers/deformes4D.py

Browse files
Files changed (1) hide show
  1. aduc_framework/engineers/deformes4D.py +153 -166
aduc_framework/engineers/deformes4D.py CHANGED
@@ -2,11 +2,12 @@
2
  #
3
  # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
4
  #
5
- # Versão 3.1.1 (Com correção de limpeza de arquivos)
6
  #
7
- # Este engenheiro implementa a Câmera (Ψ) e o Destilador (Δ) da arquitetura
8
- # ADUC-SDR. Sua única responsabilidade é a geração sequencial de fragmentos de
9
- # vídeo com base em um conjunto de keyframes pré-definido.
 
10
 
11
  import os
12
  import time
@@ -17,10 +18,8 @@ import logging
17
  from PIL import Image, ImageOps
18
  import gc
19
  import shutil
20
- from pathlib import Path
21
  from typing import List, Tuple, Dict, Any, Callable, Optional
22
 
23
- # --- Imports Relativos Corrigidos ---
24
  from ..types import LatentConditioningItem
25
  from ..managers.ltx_manager import ltx_manager_singleton
26
  from ..managers.vae_manager import vae_manager_singleton
@@ -28,26 +27,28 @@ from .deformes2D_thinker import deformes2d_thinker_singleton
28
  from ..tools.video_encode_tool import video_encode_tool_singleton
29
 
30
  logger = logging.getLogger(__name__)
31
-
32
  ProgressCallback = Optional[Callable[[float, str], None]]
33
 
34
  class Deformes4DEngine:
35
  """
36
- Orquestra a geração e concatenação de fragmentos de vídeo.
 
37
  """
 
 
 
 
 
 
38
  def __init__(self):
39
- """O construtor é leve e não recebe argumentos."""
40
  self.workspace_dir: Optional[str] = None
41
  self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
42
- logger.info("Deformes4DEngine instanciado (não inicializado).")
43
 
44
  def initialize(self, workspace_dir: str):
45
- """Inicializa o engenheiro com as configurações necessárias."""
46
- if self.workspace_dir is not None:
47
- return # Evita reinicialização
48
  self.workspace_dir = workspace_dir
49
  os.makedirs(self.workspace_dir, exist_ok=True)
50
- logger.info(f"Deformes4D Specialist (Executor) inicializado com workspace: {self.workspace_dir}.")
51
 
52
  def generate_original_movie(
53
  self,
@@ -55,181 +56,167 @@ class Deformes4DEngine:
55
  progress_callback: ProgressCallback = None
56
  ) -> Dict[str, Any]:
57
  """
58
- Gera o filme principal lendo todos os parâmetros do estado de geração.
59
  """
60
  if not self.workspace_dir:
61
- raise RuntimeError("Deformes4DEngine não foi inicializado. Chame o método initialize() antes de usar.")
62
 
63
- # 1. Extrai todos os parâmetros do estado de geração
64
- pre_prod_params = full_generation_state.get("parametros_geracao", {}).get("pre_producao", {})
65
- prod_params = full_generation_state.get("parametros_geracao", {}).get("producao", {})
 
 
66
 
67
- keyframes_data = full_generation_state.get("Keyframe_atos", [])
68
- global_prompt = full_generation_state.get("Promt_geral", "")
69
- storyboard = [ato["resumo_ato"] for ato in full_generation_state.get("Atos", [])]
70
- keyframe_paths = [kf["caminho_pixel"] for kf in keyframes_data]
71
 
72
- seconds_per_fragment = pre_prod_params.get('duration_per_fragment', 4.0)
73
- video_resolution = pre_prod_params.get('resolution', 480)
 
74
 
75
- trim_percent = prod_params.get('trim_percent', 50)
76
- handler_strength = prod_params.get('handler_strength', 0.5)
77
- destination_convergence_strength = prod_params.get('destination_convergence_strength', 0.75)
78
- guidance_scale = prod_params.get('guidance_scale', 2.0)
79
- stg_scale = prod_params.get('stg_scale', 0.025)
80
- num_inference_steps = prod_params.get('inference_steps', 20)
81
 
82
- # 2. Inicia o processo de geração
83
- FPS = 24
84
- FRAMES_PER_LATENT_CHUNK = 8
85
- LATENT_PROCESSING_CHUNK_SIZE = 4
 
 
 
 
 
 
 
 
 
 
 
 
86
 
87
- run_timestamp = int(time.time())
88
- temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}")
89
- temp_video_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}")
90
- os.makedirs(temp_latent_dir, exist_ok=True)
91
- os.makedirs(temp_video_clips_dir, exist_ok=True)
92
-
93
- total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK)
94
- frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK)
95
- latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK
96
- DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0
97
- DESTINATION_FRAME_TARGET = total_frames_brutos - 1
98
-
99
- base_ltx_params = {"guidance_scale": guidance_scale, "stg_scale": stg_scale, "num_inference_steps": num_inference_steps}
100
- story_history = ""
101
- target_resolution_tuple = (video_resolution, video_resolution)
102
- eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
103
- latent_fragment_paths = []
104
- video_fragments_data = []
105
-
106
- if len(keyframe_paths) < 2:
107
- raise ValueError(f"A geração requer pelo menos 2 keyframes. Fornecidos: {len(keyframe_paths)}.")
108
- num_transitions_to_generate = len(keyframe_paths) - 1
109
-
110
- logger.info("--- ESTÁGIO 1: Geração de Fragmentos Latentes ---")
111
- for i in range(num_transitions_to_generate):
112
- fragment_index = i + 1
113
- if progress_callback:
114
- progress_fraction = (i / num_transitions_to_generate) * 0.7
115
- progress_callback(progress_fraction, f"Gerando Latente {fragment_index}/{num_transitions_to_generate}")
116
-
117
- past_keyframe_path = keyframe_paths[i - 1] if i > 0 else keyframe_paths[i]
118
- start_keyframe_path = keyframe_paths[i]
119
- destination_keyframe_path = keyframe_paths[i + 1]
120
- future_story_prompt = storyboard[i + 1] if (i + 1) < len(storyboard) else "A cena final."
121
- decision = deformes2d_thinker_singleton.get_cinematic_decision(
122
- global_prompt, story_history, past_keyframe_path, start_keyframe_path,
123
- destination_keyframe_path, storyboard[i - 1] if i > 0 else "O início.",
124
- storyboard[i], future_story_prompt
125
- )
126
- motion_prompt = decision["motion_prompt"]
127
- story_history += f"\n- Ato {fragment_index}: {motion_prompt}"
128
 
129
- conditioning_items = []
130
- if eco_latent_for_next_loop is None:
131
- img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple)
132
- conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_start), 0, 1.0))
133
- else:
134
- conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0))
135
- conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength))
136
 
137
- img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple)
138
- conditioning_items.append(LatentConditioningItem(self._pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength))
 
 
 
 
 
 
 
 
 
 
139
 
140
  latents_brutos, _ = ltx_manager_singleton.generate_latent_fragment(
141
- height=video_resolution, width=video_resolution,
142
- conditioning_items_data=conditioning_items, motion_prompt=motion_prompt,
143
- video_total_frames=total_frames_brutos, video_fps=FPS,
144
- **base_ltx_params
145
  )
146
 
147
- last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone()
148
- eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone()
149
- dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone()
150
- latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone()
151
- del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache()
152
-
153
- cpu_latent = latents_video.cpu()
154
- latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt")
155
- torch.save(cpu_latent, latent_path)
156
- latent_fragment_paths.append(latent_path)
157
 
 
 
 
158
  video_fragments_data.append({"id": i, "prompt_video": motion_prompt})
159
- del latents_video, cpu_latent; gc.collect()
160
 
161
- del eco_latent_for_next_loop, dejavu_latent_for_next_loop; gc.collect(); torch.cuda.empty_cache()
162
 
163
- logger.info(f"--- ESTÁGIO 2: Processando {len(latent_fragment_paths)} latentes ---")
164
- final_video_clip_paths = []
165
- num_chunks = -(-len(latent_fragment_paths) // LATENT_PROCESSING_CHUNK_SIZE) if LATENT_PROCESSING_CHUNK_SIZE > 0 else 0
166
- for i in range(num_chunks):
167
- chunk_start_index = i * LATENT_PROCESSING_CHUNK_SIZE
168
- chunk_end_index = chunk_start_index + LATENT_PROCESSING_CHUNK_SIZE
169
- chunk_paths = latent_fragment_paths[chunk_start_index:chunk_end_index]
170
-
171
- if progress_callback:
172
- progress_fraction = 0.7 + (i / num_chunks * 0.28)
173
- progress_callback(progress_fraction, f"Processando & Decodificando Lote {i+1}/{num_chunks}")
174
-
175
- tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
176
- sub_group_latent = torch.cat(tensors_in_chunk, dim=2)
177
- del tensors_in_chunk; gc.collect(); torch.cuda.empty_cache()
178
-
179
- pixel_tensor = vae_manager_singleton.decode(sub_group_latent)
180
- del sub_group_latent; gc.collect(); torch.cuda.empty_cache()
181
-
182
- base_name = f"clip_{i:04d}_{run_timestamp}"
183
- current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4")
184
- self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS)
185
- final_video_clip_paths.append(current_clip_path)
186
- del pixel_tensor; gc.collect(); torch.cuda.empty_cache()
187
 
188
- if progress_callback: progress_callback(0.98, "Montando o filme final...")
189
- final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4")
190
- video_encode_tool_singleton.concatenate_videos(final_video_clip_paths, final_video_path, self.workspace_dir)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
191
 
192
- try:
193
- shutil.rmtree(temp_video_clips_dir)
194
- # A linha que apagava 'temp_latent_dir' foi removida para persistir os latentes.
195
- except OSError as e:
196
- logger.warning(f"Não foi possível remover o diretório de clipes temporários: {e}")
 
 
 
 
 
 
 
 
 
197
 
198
- logger.info(f"Processo completo! Vídeo original salvo em: {final_video_path}")
 
 
199
 
200
- final_video_data_for_state = {
201
- "id": 0, "caminho_pixel": final_video_path,
202
- "caminhos_latentes_fragmentos": latent_fragment_paths,
203
- "fragmentos_componentes": video_fragments_data
204
- }
205
-
206
- return {
207
- "final_path": final_video_path,
208
- "latent_paths": latent_fragment_paths,
209
- "video_data": final_video_data_for_state
210
- }
 
 
 
 
 
 
 
211
 
212
- # --- FUNÇÕES HELPER ---
213
- def save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int = 24):
214
- if video_tensor is None or video_tensor.ndim != 5 or video_tensor.shape[2] == 0: return
215
- video_tensor = video_tensor.squeeze(0).permute(1, 2, 3, 0)
216
- video_tensor = (video_tensor.clamp(-1, 1) + 1) / 2.0
217
- video_np = (video_tensor.detach().cpu().float().numpy() * 255).astype(np.uint8)
218
- with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer:
219
- for frame in video_np: writer.append_data(frame)
220
-
221
- def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image:
222
- if image.size != target_resolution:
223
- return ImageOps.fit(image, target_resolution, Image.Resampling.LANCZOS)
224
- return image
225
-
226
- def _pil_to_latent(self, pil_image: Image.Image) -> torch.Tensor:
227
- image_np = np.array(pil_image).astype(np.float32) / 255.0
228
- tensor = torch.from_numpy(image_np).permute(2, 0, 1).unsqueeze(0).unsqueeze(2)
229
- tensor = (tensor * 2.0) - 1.0
230
- return vae_manager_singleton.encode(tensor)
231
-
232
- def _quantize_to_multiple(self, n: int, m: int) -> int:
233
  if m == 0: return n
234
  quantized = int(round(n / m) * m)
235
  return m if n > 0 and quantized == 0 else quantized
 
2
  #
3
  # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
4
  #
5
+ # Versão 5.0.0 (Intelligent Editor Engine)
6
  #
7
+ # Este engenheiro atua como a Câmera e a Sala de Edição do framework. Ele lê
8
+ # a sequência de keyframes e as decisões do Diretor (`is_cut`) para gerar
9
+ # clipes de vídeo, seja com movimento contínuo (usando LTX e a autonomia do
10
+ # Cineasta) ou respeitando os cortes, e os monta no filme final.
11
 
12
  import os
13
  import time
 
18
  from PIL import Image, ImageOps
19
  import gc
20
  import shutil
 
21
  from typing import List, Tuple, Dict, Any, Callable, Optional
22
 
 
23
  from ..types import LatentConditioningItem
24
  from ..managers.ltx_manager import ltx_manager_singleton
25
  from ..managers.vae_manager import vae_manager_singleton
 
27
  from ..tools.video_encode_tool import video_encode_tool_singleton
28
 
29
  logger = logging.getLogger(__name__)
 
30
  ProgressCallback = Optional[Callable[[float, str], None]]
31
 
32
  class Deformes4DEngine:
33
  """
34
+ Orquestra a geração e montagem de fragmentos de vídeo, respeitando as
35
+ decisões de corte e continuidade do Diretor Autônomo.
36
  """
37
+
38
+ _EDITOR_PARAMS = {
39
+ "fps": 24,
40
+ "ltx_frames_per_latent": 8,
41
+ }
42
+
43
  def __init__(self):
 
44
  self.workspace_dir: Optional[str] = None
45
  self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
46
+ logger.info("Deformes4DEngine (Intelligent Editor) instanciado.")
47
 
48
  def initialize(self, workspace_dir: str):
 
 
 
49
  self.workspace_dir = workspace_dir
50
  os.makedirs(self.workspace_dir, exist_ok=True)
51
+ logger.info(f"Deformes4D (Intelligent Editor) inicializado com workspace: {self.workspace_dir}.")
52
 
53
  def generate_original_movie(
54
  self,
 
56
  progress_callback: ProgressCallback = None
57
  ) -> Dict[str, Any]:
58
  """
59
+ Ponto de entrada principal. Gera o filme completo a partir do estado de geração.
60
  """
61
  if not self.workspace_dir:
62
+ raise RuntimeError("Deformes4DEngine não foi inicializado.")
63
 
64
+ run_timestamp = int(time.time())
65
+ temp_latent_dir = os.path.join(self.workspace_dir, f"temp_latents_{run_timestamp}")
66
+ temp_clips_dir = os.path.join(self.workspace_dir, f"temp_clips_{run_timestamp}")
67
+ os.makedirs(temp_latent_dir, exist_ok=True)
68
+ os.makedirs(temp_clips_dir, exist_ok=True)
69
 
70
+ all_latent_paths, video_fragments_data = self._generate_all_latent_fragments(
71
+ full_generation_state, temp_latent_dir, progress_callback
72
+ )
 
73
 
74
+ video_clip_paths = self._decode_latents_to_clips(
75
+ all_latent_paths, temp_clips_dir, progress_callback
76
+ )
77
 
78
+ final_video_path = os.path.join(self.workspace_dir, f"original_movie_{run_timestamp}.mp4")
79
+ video_encode_tool_singleton.concatenate_videos(video_clip_paths, final_video_path, self.workspace_dir)
 
 
 
 
80
 
81
+ try:
82
+ shutil.rmtree(temp_clips_dir)
83
+ except OSError as e:
84
+ logger.warning(f"Não foi possível remover o diretório de clipes temporários: {e}")
85
+
86
+ logger.info(f"Processo de edição completo! Vídeo original salvo em: {final_video_path}")
87
+
88
+ return {
89
+ "final_path": final_video_path,
90
+ "latent_paths": all_latent_paths,
91
+ "video_data": {
92
+ "id": 0, "caminho_pixel": final_video_path,
93
+ "caminhos_latentes_fragmentos": all_latent_paths,
94
+ "fragmentos_componentes": video_fragments_data
95
+ }
96
+ }
97
 
98
+ def _generate_all_latent_fragments(self, state, temp_dir, progress):
99
+ """SRP: Gerencia o loop que gera todos os fragmentos de latente, um por um."""
100
+ keyframes_data = state.get("Keyframe_atos", [])
101
+ if len(keyframes_data) < 2:
102
+ raise ValueError("A geração de vídeo requer pelo menos 2 keyframes.")
103
+
104
+ pre_prod_params = state.get("parametros_geracao", {}).get("pre_producao", {})
105
+ prod_params = state.get("parametros_geracao", {}).get("producao", {})
106
+
107
+ seconds_per_fragment = pre_prod_params.get('duration_per_fragment', 4.0)
108
+ resolution = pre_prod_params.get('resolution', 480)
109
+ trim_percent = prod_params.get('trim_percent', 50)
110
+
111
+ total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * self._EDITOR_PARAMS["fps"]), self._EDITOR_PARAMS["ltx_frames_per_latent"])
112
+ frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), self._EDITOR_PARAMS["ltx_frames_per_latent"])
113
+ latents_a_podar = frames_a_podar // self._EDITOR_PARAMS["ltx_frames_per_latent"]
114
+
115
+ eco_latent, dejavu_latent = None, None
116
+ motion_history = ""
117
+ all_latent_paths, video_fragments_data = [], []
118
+
119
+ num_transitions = len(keyframes_data) - 1
120
+ for i in range(num_transitions):
121
+ if progress:
122
+ progress(i / num_transitions * 0.7, f"Filmando Clipe {i+1}/{num_transitions}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
+ start_kf, end_kf = keyframes_data[i], keyframes_data[i+1]
 
 
 
 
 
 
125
 
126
+ if end_kf.get("is_cut_point", False):
127
+ logger.info(f"Transição {i+1}: Diretor marcou 'CUT'. Resetando memória Déjà-Vu.")
128
+ eco_latent, dejavu_latent = None, None
129
+ continue
130
+
131
+ motion_prompt = deformes2d_thinker_singleton.get_motion_decision(start_kf, end_kf, motion_history)
132
+ motion_history += f"\n- {motion_prompt}"
133
+ logger.info(f"Cineasta decidiu o movimento para a transição {i+1}: '{motion_prompt[:50]}...'")
134
+
135
+ conditioning_items = self._prepare_ltx_conditioning(
136
+ start_kf, end_kf, (eco_latent, dejavu_latent), resolution, total_frames_brutos, prod_params
137
+ )
138
 
139
  latents_brutos, _ = ltx_manager_singleton.generate_latent_fragment(
140
+ height=resolution, width=resolution, conditioning_items_data=conditioning_items,
141
+ motion_prompt=motion_prompt, video_total_frames=total_frames_brutos,
142
+ video_fps=self._EDITOR_PARAMS["fps"], **prod_params
 
143
  )
144
 
145
+ last_trim = latents_brutos[:, :, -(latents_a_podar + 1):, :, :].clone()
146
+ eco_latent = last_trim[:, :, :2, :, :].clone()
147
+ dejavu_latent = last_trim[:, :, -1:, :, :].clone()
148
+ latents_video = latents_brutos[:, :, :-(latents_a_podar - 1), :, :].clone()
 
 
 
 
 
 
149
 
150
+ latent_path = os.path.join(temp_dir, f"latent_fragment_{i:04d}.pt")
151
+ torch.save(latents_video.cpu(), latent_path)
152
+ all_latent_paths.append(latent_path)
153
  video_fragments_data.append({"id": i, "prompt_video": motion_prompt})
 
154
 
155
+ return all_latent_paths, video_fragments_data
156
 
157
+ def _decode_latents_to_clips(self, latent_paths, temp_dir, progress):
158
+ """SRP: Decodifica uma lista de latentes em arquivos de vídeo .mp4."""
159
+ video_clip_paths = []
160
+ num_latents = len(latent_paths)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
162
+ for i, latent_path in enumerate(latent_paths):
163
+ if progress:
164
+ progress(0.7 + (i / num_latents * 0.3), f"Decodificando Clipe {i+1}/{num_latents}")
165
+
166
+ latent_tensor = torch.load(latent_path, map_location=self.device)
167
+ pixel_tensor = vae_manager_singleton.decode(latent_tensor)
168
+
169
+ clip_path = os.path.join(temp_dir, f"clip_{i:04d}.mp4")
170
+ self._save_video_from_tensor(pixel_tensor, clip_path, fps=self._EDITOR_PARAMS["fps"])
171
+ video_clip_paths.append(clip_path)
172
+
173
+ return video_clip_paths
174
+
175
+ def _prepare_ltx_conditioning(self, start_kf, end_kf, dejavu_memory, res, total_frames, prod_params):
176
+ """SRP: Prepara a lista de condicionais para o LTX para uma transição contínua."""
177
+ items = []
178
+ res_tuple = (res, res)
179
+ eco_latent, dejavu_latent = dejavu_memory
180
 
181
+ def to_latent(path):
182
+ pil = self._preprocess_image_for_latent_conversion(Image.open(path).convert("RGB"), res_tuple)
183
+ tensor = self._pil_to_pixel_tensor(pil)
184
+ return vae_manager_singleton.encode(tensor.to(self.device))
185
+
186
+ if eco_latent is None:
187
+ items.append(LatentConditioningItem(to_latent(start_kf['caminho_pixel']), 0, 1.0))
188
+ else:
189
+ items.append(LatentConditioningItem(eco_latent, 0, 1.0))
190
+ dejavu_frame_target = self._quantize_to_multiple(
191
+ int(total_frames * (prod_params.get('trim_percent', 50) / 100)),
192
+ self._EDITOR_PARAMS["ltx_frames_per_latent"]
193
+ ) - 1
194
+ if dejavu_frame_target < 0: dejavu_frame_target = 0
195
 
196
+ items.append(LatentConditioningItem(
197
+ dejavu_latent, dejavu_frame_target, prod_params.get('handler_strength', 0.5)
198
+ ))
199
 
200
+ destination_strength = prod_params.get('destination_convergence_strength', 0.75)
201
+ items.append(LatentConditioningItem(to_latent(end_kf['caminho_pixel']), total_frames - 1, destination_strength))
202
+ return items
203
+
204
+ # --- Funções Helper de Baixo Nível ---
205
+ def _save_video_from_tensor(self, video_tensor: torch.Tensor, path: str, fps: int):
206
+ if video_tensor is None or video_tensor.ndim != 5: return
207
+ video = (video_tensor.squeeze(0).permute(1, 2, 3, 0).clamp(-1, 1) + 1) / 2.0
208
+ video_np = (video.cpu().float().numpy() * 255).astype(np.uint8)
209
+ imageio.mimwrite(path, video_np, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p'])
210
+
211
+ def _preprocess_image_for_latent_conversion(self, image: Image.Image, res: tuple):
212
+ return ImageOps.fit(image, res, Image.Resampling.LANCZOS) if image.size != res else image
213
+
214
+ def _pil_to_pixel_tensor(self, pil_image: Image.Image) -> torch.Tensor:
215
+ arr = np.array(pil_image, dtype=np.float32) / 255.0
216
+ tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0).unsqueeze(2)
217
+ return (tensor * 2.0) - 1.0
218
 
219
+ def _quantize_to_multiple(self, n, m):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  if m == 0: return n
221
  quantized = int(round(n / m) * m)
222
  return m if n > 0 and quantized == 0 else quantized