Carlexxx
feat: ✨ aBINC 2.2
fb56537
raw
history blame
8.03 kB
# aduc_framework/engineers/composer.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 4.1.0 (Hub Cognitivo Consolidado)
#
# - A arquitetura está estável e completa. O Composer atua como o hub
# central de comunicação com os LLMs.
# - O método `execute_plan` é usado pelo Planner2D para a criação do roteiro.
# - O método `execute_cognitive_task` é uma ferramenta genérica usada por
# outros especialistas (como Planner4D e Deformes3D) para solicitar
# tarefas de raciocínio pontuais ao LLM, como decidir um movimento de
# câmera ou um plano de composição de imagem.
import logging
import json
import re
import yaml
from pathlib import Path
from PIL import Image
from typing import List, Dict, Any, Generator, Optional, Callable
from .prompt_engine import prompt_engine_singleton
from ..managers.llama_multimodal_manager import llama_multimodal_manager_singleton
from ..managers.gemini_manager import gemini_manager_singleton
logger = logging.getLogger(__name__)
def robust_json_parser(raw_text: str) -> dict:
"""
Analisa um objeto JSON de uma string que pode conter texto extra.
"""
logger.debug(f"COMPOSER(JSON_PARSER): Tentando parsear JSON (primeiros 500 chars):\n---\n{raw_text[:500]}\n---")
match = re.search(r'```json\s*(\{.*?\})\s*```', raw_text, re.DOTALL)
if match:
json_str = match.group(1); logger.debug("JSON explícito encontrado.")
return json.loads(json_str)
try:
start_index = raw_text.find('{'); end_index = raw_text.rfind('}')
if start_index != -1 and end_index != -1 and end_index > start_index:
json_str = raw_text[start_index : end_index + 1]; logger.debug("JSON por delimitadores '{...}' encontrado.")
return json.loads(json_str)
except json.JSONDecodeError: pass
logger.warning("Nenhum JSON válido encontrado nos métodos primários. Tentando parsear o texto inteiro.")
return json.loads(raw_text)
class Composer:
"""
O Composer é o hub central de comunicação com o Large Language Model (LLM).
Ele executa tanto planos de trabalho de várias etapas quanto tarefas cognitivas únicas.
"""
def __init__(self):
logger.info("COMPOSER: Lendo config.yaml para selecionar o LLM Engine...")
with open("config.yaml", 'r') as f:
config = yaml.safe_load(f)
self.provider = config.get('specialists', {}).get('llm_engine', {}).get('provider', 'llama_multimodal')
if self.provider == 'gemini':
self.llm_manager = gemini_manager_singleton
logger.info("COMPOSER: Motor de LLM configurado para usar 'Gemini'.")
else:
self.llm_manager = llama_multimodal_manager_singleton
logger.info("COMPOSER: Motor de LLM configurado para usar 'Llama' (padrão).")
prompt_engine_singleton.set_provider(self.provider)
self.task_templates = self._load_task_templates()
logger.info(f"Composer inicializado com {len(self.task_templates)} templates de tarefa.")
def _load_task_templates(self) -> Dict[str, str]:
templates = {}
template_dir = Path(__file__).resolve().parent.parent / "prompts" / "task_templates"
if not template_dir.is_dir():
raise FileNotFoundError(f"Diretório de templates de tarefa não encontrado: {template_dir}")
for task_file in template_dir.glob("*.txt"):
task_id = task_file.stem
with open(task_file, 'r', encoding='utf-8') as f:
templates[task_id] = f.read()
return templates
def _talk_to_llm(self, generic_prompt: str, images: Optional[List[Image.Image]] = None, expected_format="text") -> Any:
final_model_prompt = prompt_engine_singleton.translate(
generic_prompt_content=generic_prompt, has_image=bool(images)
)
logger.info(f"COMPOSER: PROMPT FINAL SENDO ENVIADO para ({self.provider}):\n--- INÍCIO DO PROMPT ---\n{final_model_prompt}\n--- FIM DO PROMPT ---")
response_raw = self.llm_manager.process_turn(prompt_text=final_model_prompt, image_list=images)
logger.info(f"COMPOSER: RESPOSTA BRUTA RECEBIDA de ({self.provider}):\n--- INÍCIO DA RESPOSTA BRUTA ---\n{response_raw}\n--- FIM DA RESPOSTA BRUTA ---")
if expected_format == "json":
try:
return robust_json_parser(response_raw)
except (json.JSONDecodeError, ValueError) as e:
raise ValueError(f"O LLM ({self.provider}) retornou um formato JSON inválido. Erro: {e}")
return response_raw
def execute_cognitive_task(self, task_id: str, template_data: Dict[str, Any], images: Optional[List[Image.Image]] = None) -> Any:
"""
Executa uma única tarefa cognitiva (de "pensamento") e retorna o resultado.
"""
logger.info(f"COMPOSER: Executando tarefa cognitiva: {task_id}")
generic_template = self.task_templates.get(task_id)
if not generic_template:
raise ValueError(f"Template para a tarefa cognitiva '{task_id}' não foi encontrado.")
prompt_content = generic_template
for key, value in template_data.items():
prompt_content = prompt_content.replace(f"{{{key}}}", str(value))
expected_format = "json" if "JSON REQUIRED" in generic_template.upper() else "text"
response = self._talk_to_llm(generic_prompt=prompt_content, images=images, expected_format=expected_format)
if expected_format == "text":
return response.strip().replace("\"", "")
return response
def execute_plan(self, execution_plan: List[Dict[str, Any]], initial_data: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]:
"""
Executa um plano de trabalho de várias etapas para a criação do roteiro.
"""
dna = {"global_prompt": initial_data["global_prompt"], "initial_media_paths": initial_data["user_media_paths"], "continuous_story": "", "scenes": []}
user_media = [Image.open(p) for p in initial_data["user_media_paths"]]
for i, task in enumerate(execution_plan):
try:
task_id = task['task_id']
yield {"status": "progress", "message": task.get('description', '')}
generic_template = self.task_templates.get(task_id)
if not generic_template:
raise ValueError(f"Template para a tarefa '{task_id}' não foi encontrado.")
prompt_content = generic_template
prompt_content = prompt_content.replace("{global_prompt}", str(dna.get("global_prompt", "")))
prompt_content = prompt_content.replace("{num_scenes}", str(task.get('inputs', {}).get("num_scenes", "")))
prompt_content = prompt_content.replace("{continuous_story}", str(dna.get("continuous_story", "")))
prompt_content = prompt_content.replace("{independent_scenes_json}", json.dumps({"scenes": dna.get("scenes", [])}, indent=2))
is_json_output = task_id in ["STEP_02_CREATE_INDEPENDENT_SCENES", "STEP_03_FRAGMENT_SCENES_INTO_ACTS", "STEP_04_FINAL_REVIEW"]
expected_format = "json" if is_json_output else "text"
response = self._talk_to_llm(prompt_content, user_media if i == 0 else None, expected_format)
if task_id == "STEP_01_CREATE_CONTINUOUS_STORY":
dna["continuous_story"] = response
elif task_id == "STEP_02_CREATE_INDEPENDENT_SCENES":
dna["scenes"] = response.get("scenes", [])
elif task_id == "STEP_03_FRAGMENT_SCENES_INTO_ACTS":
dna["scenes"] = response.get("scenes", [])
except Exception as e:
raise e
yield {"status": "complete", "message": "Execução do Composer concluída.", "dna": dna}
composer_singleton = Composer()