#!/usr/bin/env python3 """ services/vincie.py VincieService — preparação e execução CLI do VINCIE (upstream) - Garante repositório íntegro (clona/repara se faltarem main.py/.git). - Baixa snapshot completo do modelo no HF_HUB_CACHE. - Cria symlink idempotente ckpt/VINCIE-3B (no repo e em /app/ckpt) apontando para o snapshot (contém dit.pth, vae.pth, llm14b). - Valida artefatos esperados pelo generate.yaml. - Executa main.py do upstream com overrides de geração (sem mexer em ckpt.path). - Limpa VRAM levemente após cada job. Observação: - Para latência mínima, preferir o vince_server in-process (pipeline aquecida). - Este serviço via subprocess é fiel ao upstream e útil como fallback/diag. """ import os import json import subprocess from pathlib import Path from typing import List, Optional from huggingface_hub import snapshot_download class VincieService: def __init__( self, repo_dir: str = "/app/VINCIE", python_bin: str = "python", repo_url: str = "https://github.com/ByteDance-Seed/VINCIE", model_repo: str = "ByteDance-Seed/VINCIE-3B", output_root: str = "/app/outputs", ): self.repo_dir = Path(repo_dir) self.python = python_bin self.repo_url = repo_url self.model_repo = model_repo self.output_root = Path(output_root) self.output_root.mkdir(parents=True, exist_ok=True) self.generate_yaml = self.repo_dir / "configs" / "generate.yaml" self.ckpt_link_repo = self.repo_dir / "ckpt" / "VINCIE-3B" self.ckpt_link_app = Path("/app/ckpt") / "VINCIE-3B" self.ckpt_dir: Optional[Path] = None self._env = os.environ.copy() # ---------- util ---------- @staticmethod def _run(cmd: List[str], cwd: Optional[Path] = None, env=None): subprocess.run(cmd, cwd=str(cwd) if cwd else None, check=True, env=env) @staticmethod def _ensure_symlink(link: Path, target: Path): link.parent.mkdir(parents=True, exist_ok=True) if link.is_symlink(): try: if link.resolve() != target: link.unlink() link.symlink_to(target, target_is_directory=True) except Exception: # relinka a partir do zero link.unlink(missing_ok=True) link.symlink_to(target, target_is_directory=True) elif link.exists(): VincieService._run(["rm", "-rf", str(link)]) link.symlink_to(target, target_is_directory=True) else: link.symlink_to(target, target_is_directory=True) # ---------- repo/modelo ---------- def ensure_repo(self) -> None: self.repo_dir.mkdir(parents=True, exist_ok=True) main_py = self.repo_dir / "main.py" git_dir = self.repo_dir / ".git" if main_py.exists() and git_dir.exists(): return tmp = self.repo_dir.with_name(self.repo_dir.name + ".tmp") if tmp.exists(): self._run(["rm", "-rf", str(tmp)]) self._run(["git", "clone", self.repo_url, str(tmp)]) # swap atômico simples if self.repo_dir.exists(): self._run(["rm", "-rf", str(self.repo_dir)]) tmp.rename(self.repo_dir) def ensure_model(self, revision: Optional[str] = None, token: Optional[str] = None) -> None: cache_dir = os.environ.get("HF_HUB_CACHE") snapshot_path = snapshot_download( repo_id=self.model_repo, token=token or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN"), cache_dir=cache_dir, ) self.ckpt_dir = Path(snapshot_path) # symlinks idempotentes self._ensure_symlink(self.ckpt_link_repo, self.ckpt_dir) self._ensure_symlink(self.ckpt_link_app, self.ckpt_dir) def validate_assets(self) -> None: # exige generate.yaml/main.py e conteúdo essencial no snapshot if not self.generate_yaml.exists() or not (self.repo_dir / "main.py").exists(): raise RuntimeError("VINCIE repo inválido (faltando generate.yaml ou main.py)") target = self.ckpt_dir or self.ckpt_link_repo need = [target / "dit.pth", target / "vae.pth", target / "llm14b"] missing = [str(p) for p in need if not p.exists()] if missing: raise RuntimeError(f"Snapshot incompleto: {missing}") # também requer que o link repo exista (a config usa ckpt/ relativo) if not self.ckpt_link_repo.exists(): raise RuntimeError("ckpt link ausente no repo: ckpt/VINCIE-3B") # ---------- execução ---------- def _build_overrides( self, extra_overrides: Optional[List[str]] = None, cfg_scale: Optional[float] = None, resolution_input: Optional[int] = None, aspect_ratio_input: Optional[str] = None, steps: Optional[int] = None, ) -> List[str]: overrides = list(extra_overrides or []) # não altera ckpt.path; respeita o YAML if cfg_scale is not None: overrides.append(f"generation.cfg_scale={cfg_scale}") if resolution_input is not None: overrides.append(f"generation.resolution_input={resolution_input}") if aspect_ratio_input is not None: overrides.append(f"generation.aspect_ratio_input={aspect_ratio_input}") if steps is not None: overrides.append(f"generation.steps={steps}") return overrides def _clean_gpu_memory(self) -> None: code = r""" import torch, gc try: torch.cuda.synchronize() except Exception: pass gc.collect() try: torch.cuda.empty_cache() torch.cuda.memory.reset_peak_memory_stats() except Exception: pass """ self._run([self.python, "-c", code], env=self._env) # ---------- APIs ---------- def multi_turn_edit( self, input_image: str, turns: List[str], out_dir_name: Optional[str] = None, *, cfg_scale: Optional[float] = None, resolution_input: Optional[int] = None, aspect_ratio_input: Optional[str] = None, steps: Optional[int] = None, ) -> str: self.ensure_repo() self.ensure_model() self.validate_assets() out_dir = self.output_root / (out_dir_name or f"multi_turn_{Path(input_image).stem}") out_dir.mkdir(parents=True, exist_ok=True) image_json = json.dumps([str(input_image)]) prompts_json = json.dumps(turns) base_overrides = [ f"generation.positive_prompt.image_path={image_json}", f"generation.positive_prompt.prompts={prompts_json}", ] overrides = self._build_overrides( extra_overrides=base_overrides, cfg_scale=cfg_scale, resolution_input=resolution_input, aspect_ratio_input=aspect_ratio_input, steps=steps, ) cmd = [ self.python, "main.py", str(self.generate_yaml), *overrides, f"generation.output.dir={str(out_dir)}", ] self._run(cmd, cwd=self.repo_dir, env=self._env) self._clean_gpu_memory() return str(out_dir) def multi_concept_compose( self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, out_dir_name: Optional[str] = None, *, cfg_scale: Optional[float] = None, resolution_input: Optional[int] = None, aspect_ratio_input: Optional[str] = None, steps: Optional[int] = None, ) -> str: self.ensure_repo() self.ensure_model() self.validate_assets() out_dir = self.output_root / (out_dir_name or "multi_concept") out_dir.mkdir(parents=True, exist_ok=True) imgs_json = json.dumps([str(p) for p in concept_images]) prompts_all = concept_prompts + [final_prompt] prompts_json = json.dumps(prompts_all) base_overrides = [ f"generation.positive_prompt.image_path={imgs_json}", f"generation.positive_prompt.prompts={prompts_json}", "generation.pad_img_placehoder=False", ] overrides = self._build_overrides( extra_overrides=base_overrides, cfg_scale=cfg_scale, resolution_input=resolution_input, aspect_ratio_input=aspect_ratio_input, steps=steps, ) cmd = [ self.python, "main.py", str(self.generate_yaml), *overrides, f"generation.output.dir={str(out_dir)}", ] self._run(cmd, cwd=self.repo_dir, env=self._env) self._clean_gpu_memory() return str(out_dir)