Spaces:
Paused
Paused
| # aduc_framework/engineers/composer.py | |
| # | |
| # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos | |
| # | |
| # Versão 4.1.0 (Hub Cognitivo Consolidado) | |
| # | |
| # - A arquitetura está estável e completa. O Composer atua como o hub | |
| # central de comunicação com os LLMs. | |
| # - O método `execute_plan` é usado pelo Planner2D para a criação do roteiro. | |
| # - O método `execute_cognitive_task` é uma ferramenta genérica usada por | |
| # outros especialistas (como Planner4D e Deformes3D) para solicitar | |
| # tarefas de raciocínio pontuais ao LLM, como decidir um movimento de | |
| # câmera ou um plano de composição de imagem. | |
| import logging | |
| import json | |
| import re | |
| import yaml | |
| from pathlib import Path | |
| from PIL import Image | |
| from typing import List, Dict, Any, Generator, Optional, Callable | |
| from .prompt_engine import prompt_engine_singleton | |
| from ..managers.llama_multimodal_manager import llama_multimodal_manager_singleton | |
| from ..managers.gemini_manager import gemini_manager_singleton | |
| logger = logging.getLogger(__name__) | |
| def robust_json_parser(raw_text: str) -> dict: | |
| """ | |
| Analisa um objeto JSON de uma string que pode conter texto extra. | |
| """ | |
| logger.debug(f"COMPOSER(JSON_PARSER): Tentando parsear JSON (primeiros 500 chars):\n---\n{raw_text[:500]}\n---") | |
| match = re.search(r'```json\s*(\{.*?\})\s*```', raw_text, re.DOTALL) | |
| if match: | |
| json_str = match.group(1); logger.debug("JSON explícito encontrado.") | |
| return json.loads(json_str) | |
| try: | |
| start_index = raw_text.find('{'); end_index = raw_text.rfind('}') | |
| if start_index != -1 and end_index != -1 and end_index > start_index: | |
| json_str = raw_text[start_index : end_index + 1]; logger.debug("JSON por delimitadores '{...}' encontrado.") | |
| return json.loads(json_str) | |
| except json.JSONDecodeError: pass | |
| logger.warning("Nenhum JSON válido encontrado nos métodos primários. Tentando parsear o texto inteiro.") | |
| return json.loads(raw_text) | |
| class Composer: | |
| """ | |
| O Composer é o hub central de comunicação com o Large Language Model (LLM). | |
| Ele executa tanto planos de trabalho de várias etapas quanto tarefas cognitivas únicas. | |
| """ | |
| def __init__(self): | |
| logger.info("COMPOSER: Lendo config.yaml para selecionar o LLM Engine...") | |
| with open("config.yaml", 'r') as f: | |
| config = yaml.safe_load(f) | |
| self.provider = config.get('specialists', {}).get('llm_engine', {}).get('provider', 'llama_multimodal') | |
| if self.provider == 'gemini': | |
| self.llm_manager = gemini_manager_singleton | |
| logger.info("COMPOSER: Motor de LLM configurado para usar 'Gemini'.") | |
| else: | |
| self.llm_manager = llama_multimodal_manager_singleton | |
| logger.info("COMPOSER: Motor de LLM configurado para usar 'Llama' (padrão).") | |
| prompt_engine_singleton.set_provider(self.provider) | |
| self.task_templates = self._load_task_templates() | |
| logger.info(f"Composer inicializado com {len(self.task_templates)} templates de tarefa.") | |
| def _load_task_templates(self) -> Dict[str, str]: | |
| templates = {} | |
| template_dir = Path(__file__).resolve().parent.parent / "prompts" / "task_templates" | |
| if not template_dir.is_dir(): | |
| raise FileNotFoundError(f"Diretório de templates de tarefa não encontrado: {template_dir}") | |
| for task_file in template_dir.glob("*.txt"): | |
| task_id = task_file.stem | |
| with open(task_file, 'r', encoding='utf-8') as f: | |
| templates[task_id] = f.read() | |
| return templates | |
| def _talk_to_llm(self, generic_prompt: str, images: Optional[List[Image.Image]] = None, expected_format="text") -> Any: | |
| final_model_prompt = prompt_engine_singleton.translate( | |
| generic_prompt_content=generic_prompt, has_image=bool(images) | |
| ) | |
| logger.info(f"COMPOSER: PROMPT FINAL SENDO ENVIADO para ({self.provider}):\n--- INÍCIO DO PROMPT ---\n{final_model_prompt}\n--- FIM DO PROMPT ---") | |
| response_raw = self.llm_manager.process_turn(prompt_text=final_model_prompt, image_list=images) | |
| logger.info(f"COMPOSER: RESPOSTA BRUTA RECEBIDA de ({self.provider}):\n--- INÍCIO DA RESPOSTA BRUTA ---\n{response_raw}\n--- FIM DA RESPOSTA BRUTA ---") | |
| if expected_format == "json": | |
| try: | |
| return robust_json_parser(response_raw) | |
| except (json.JSONDecodeError, ValueError) as e: | |
| raise ValueError(f"O LLM ({self.provider}) retornou um formato JSON inválido. Erro: {e}") | |
| return response_raw | |
| def execute_cognitive_task(self, task_id: str, template_data: Dict[str, Any], images: Optional[List[Image.Image]] = None) -> Any: | |
| """ | |
| Executa uma única tarefa cognitiva (de "pensamento") e retorna o resultado. | |
| """ | |
| logger.info(f"COMPOSER: Executando tarefa cognitiva: {task_id}") | |
| generic_template = self.task_templates.get(task_id) | |
| if not generic_template: | |
| raise ValueError(f"Template para a tarefa cognitiva '{task_id}' não foi encontrado.") | |
| prompt_content = generic_template | |
| for key, value in template_data.items(): | |
| prompt_content = prompt_content.replace(f"{{{key}}}", str(value)) | |
| expected_format = "json" if "JSON REQUIRED" in generic_template.upper() else "text" | |
| response = self._talk_to_llm(generic_prompt=prompt_content, images=images, expected_format=expected_format) | |
| if expected_format == "text": | |
| return response.strip().replace("\"", "") | |
| return response | |
| def execute_plan(self, execution_plan: List[Dict[str, Any]], initial_data: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]: | |
| """ | |
| Executa um plano de trabalho de várias etapas para a criação do roteiro. | |
| """ | |
| dna = {"global_prompt": initial_data["global_prompt"], "initial_media_paths": initial_data["user_media_paths"], "continuous_story": "", "scenes": []} | |
| user_media = [Image.open(p) for p in initial_data["user_media_paths"]] | |
| for i, task in enumerate(execution_plan): | |
| try: | |
| task_id = task['task_id'] | |
| yield {"status": "progress", "message": task.get('description', '')} | |
| generic_template = self.task_templates.get(task_id) | |
| if not generic_template: | |
| raise ValueError(f"Template para a tarefa '{task_id}' não foi encontrado.") | |
| prompt_content = generic_template | |
| prompt_content = prompt_content.replace("{global_prompt}", str(dna.get("global_prompt", ""))) | |
| prompt_content = prompt_content.replace("{num_scenes}", str(task.get('inputs', {}).get("num_scenes", ""))) | |
| prompt_content = prompt_content.replace("{continuous_story}", str(dna.get("continuous_story", ""))) | |
| prompt_content = prompt_content.replace("{independent_scenes_json}", json.dumps({"scenes": dna.get("scenes", [])}, indent=2)) | |
| is_json_output = task_id in ["STEP_02_CREATE_INDEPENDENT_SCENES", "STEP_03_FRAGMENT_SCENES_INTO_ACTS", "STEP_04_FINAL_REVIEW"] | |
| expected_format = "json" if is_json_output else "text" | |
| response = self._talk_to_llm(prompt_content, user_media if i == 0 else None, expected_format) | |
| if task_id == "STEP_01_CREATE_CONTINUOUS_STORY": | |
| dna["continuous_story"] = response | |
| elif task_id == "STEP_02_CREATE_INDEPENDENT_SCENES": | |
| dna["scenes"] = response.get("scenes", []) | |
| elif task_id == "STEP_03_FRAGMENT_SCENES_INTO_ACTS": | |
| dna["scenes"] = response.get("scenes", []) | |
| except Exception as e: | |
| raise e | |
| yield {"status": "complete", "message": "Execução do Composer concluída.", "dna": dna} | |
| composer_singleton = Composer() |