Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import sys | |
| from typing import Sequence, Mapping, Any, Union | |
| import torch | |
| import gradio as gr | |
| from huggingface_hub import hf_hub_download | |
| import subprocess | |
| import logging | |
| import spaces | |
| # Initialize logging | |
| logging.basicConfig(level=logging.INFO) | |
| # List of GitHub repositories | |
| custom_nodes = [ | |
| "https://github.com/rgthree/rgthree-comfy", | |
| "https://github.com/yolain/ComfyUI-Easy-Use", | |
| "https://github.com/Kosinkadink/ComfyUI-VideoHelperSuite", | |
| "https://github.com/chrisgoringe/cg-use-everywhere", | |
| "https://github.com/alt-key-project/comfyui-dream-project", | |
| "https://github.com/giriss/comfy-image-saver", | |
| "https://github.com/facok/ComfyUI-HunyuanVideoMultiLora" | |
| ] | |
| custom_nodes_dir = "models/custom_nodes" | |
| os.makedirs(custom_nodes_dir, exist_ok=True) | |
| # Clone or update repositories | |
| for repo in custom_nodes: | |
| repo_name = repo.split("/")[-1] | |
| repo_path = os.path.join(custom_nodes_dir, repo_name) | |
| if os.path.exists(repo_path): | |
| logging.info(f"π Updating {repo_name}...") | |
| subprocess.run(["git", "-C", repo_path, "pull"], check=True) | |
| else: | |
| logging.info(f"β¬οΈ Cloning {repo_name}...") | |
| subprocess.run(["git", "clone", repo, repo_path], check=True) | |
| logging.info("β All custom nodes downloaded successfully!") | |
| # Create a symlink if missing | |
| if not os.path.exists("custom_nodes"): | |
| os.symlink(custom_nodes_dir, "custom_nodes") | |
| logging.info("β Symlink created: custom_nodes β models/custom_nodes") | |
| os.makedirs("models/diffusion_models", exist_ok=True) | |
| os.makedirs("models/text_encoders", exist_ok=True) | |
| os.makedirs("models/vae", exist_ok=True) | |
| os.makedirs("models/lora", exist_ok=True) | |
| import shutil | |
| from huggingface_hub import hf_hub_download | |
| from pathlib import Path | |
| # Function to download and move files to the correct directory | |
| def download_and_move(repo_id, filename, local_dir): | |
| # Download the file | |
| file_path = hf_hub_download(repo_id=repo_id, filename=filename, local_dir=local_dir) | |
| # Calculate the target path without the intermediate folders | |
| src_path = Path(file_path) | |
| dest_path = Path(local_dir) / Path(filename).name | |
| # Move the file to the correct directory | |
| shutil.move(src_path, dest_path) | |
| print(f"Model moved to: {dest_path}") | |
| # Download VAE model | |
| download_and_move( | |
| repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
| filename="split_files/vae/hunyuan_video_vae_bf16.safetensors", | |
| local_dir="models/vae" | |
| ) | |
| # Download Diffusion Model | |
| download_and_move( | |
| repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
| filename="split_files/diffusion_models/hunyuan_video_t2v_720p_bf16.safetensors", | |
| local_dir="models/diffusion_models" | |
| ) | |
| # Download Text Encoder - Clip | |
| download_and_move( | |
| repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
| filename="split_files/text_encoders/clip_l.safetensors", | |
| local_dir="models/text_encoders" | |
| ) | |
| # Download Text Encoder - LLaVA | |
| download_and_move( | |
| repo_id="Comfy-Org/HunyuanVideo_repackaged", | |
| filename="split_files/text_encoders/llava_llama3_fp8_scaled.safetensors", | |
| local_dir="models/text_encoders" | |
| ) | |
| # Download LoRA model | |
| download_and_move( | |
| repo_id="alexShangeeth/huny_lora", | |
| filename="boreal-hl-v1.safetensors", | |
| local_dir="models/loras" | |
| ) | |
| def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
| """Returns the value at the given index of a sequence or mapping. | |
| If the object is a sequence (like list or string), returns the value at the given index. | |
| If the object is a mapping (like a dictionary), returns the value at the index-th key. | |
| Some return a dictionary, in these cases, we look for the "results" key | |
| Args: | |
| obj (Union[Sequence, Mapping]): The object to retrieve the value from. | |
| index (int): The index of the value to retrieve. | |
| Returns: | |
| Any: The value at the given index. | |
| Raises: | |
| IndexError: If the index is out of bounds for the object and the object is not a mapping. | |
| """ | |
| try: | |
| return obj[index] | |
| except KeyError: | |
| return obj["result"][index] | |
| def find_path(name: str, path: str = None) -> str: | |
| """ | |
| Recursively looks at parent folders starting from the given path until it finds the given name. | |
| Returns the path as a Path object if found, or None otherwise. | |
| """ | |
| # If no path is given, use the current working directory | |
| if path is None: | |
| path = os.getcwd() | |
| # Check if the current directory contains the name | |
| if name in os.listdir(path): | |
| path_name = os.path.join(path, name) | |
| print(f"{name} found: {path_name}") | |
| return path_name | |
| # Get the parent directory | |
| parent_directory = os.path.dirname(path) | |
| # If the parent directory is the same as the current directory, we've reached the root and stop the search | |
| if parent_directory == path: | |
| return None | |
| # Recursively call the function with the parent directory | |
| return find_path(name, parent_directory) | |
| def add_comfyui_directory_to_sys_path() -> None: | |
| """ | |
| Add 'ComfyUI' to the sys.path | |
| """ | |
| comfyui_path = find_path("ComfyUI") | |
| if comfyui_path is not None and os.path.isdir(comfyui_path): | |
| sys.path.append(comfyui_path) | |
| print(f"'{comfyui_path}' added to sys.path") | |
| def add_extra_model_paths() -> None: | |
| """ | |
| Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. | |
| """ | |
| try: | |
| from main import load_extra_path_config | |
| except ImportError: | |
| print( | |
| "Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." | |
| ) | |
| from utils.extra_config import load_extra_path_config | |
| extra_model_paths = find_path("extra_model_paths.yaml") | |
| if extra_model_paths is not None: | |
| load_extra_path_config(extra_model_paths) | |
| else: | |
| print("Could not find the extra_model_paths config file.") | |
| add_comfyui_directory_to_sys_path() | |
| add_extra_model_paths() | |
| def import_custom_nodes() -> None: | |
| """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS | |
| This function sets up a new asyncio event loop, initializes the PromptServer, | |
| creates a PromptQueue, and initializes the custom nodes. | |
| """ | |
| import asyncio | |
| import execution | |
| from nodes import init_extra_nodes | |
| import server | |
| # Creating a new event loop and setting it as the default loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Creating an instance of PromptServer with the loop | |
| server_instance = server.PromptServer(loop) | |
| execution.PromptQueue(server_instance) | |
| # Initializing custom nodes | |
| init_extra_nodes() | |
| from nodes import NODE_CLASS_MAPPINGS | |
| def generate_image(prompt,frames,lora_strenth,width,height,frame_rate): | |
| import_custom_nodes() | |
| with torch.inference_mode(): | |
| vaeloader = NODE_CLASS_MAPPINGS["VAELoader"]() | |
| vaeloader_10 = vaeloader.load_vae(vae_name="hunyuan_video_vae_bf16.safetensors") | |
| dualcliploader = NODE_CLASS_MAPPINGS["DualCLIPLoader"]() | |
| dualcliploader_11 = dualcliploader.load_clip( | |
| clip_name1="clip_l.safetensors", | |
| clip_name2="llava_llama3_fp8_scaled.safetensors", | |
| type="hunyuan_video", | |
| device="default", | |
| ) | |
| unetloader = NODE_CLASS_MAPPINGS["UNETLoader"]() | |
| unetloader_12 = unetloader.load_unet( | |
| unet_name="hunyuan_video_t2v_720p_bf16.safetensors", | |
| weight_dtype="fp8_e4m3fn", | |
| ) | |
| ksamplerselect = NODE_CLASS_MAPPINGS["KSamplerSelect"]() | |
| ksamplerselect_16 = ksamplerselect.get_sampler( | |
| sampler_name="gradient_estimation" | |
| ) | |
| randomnoise = NODE_CLASS_MAPPINGS["RandomNoise"]() | |
| randomnoise_25 = randomnoise.get_noise(noise_seed=random.randint(1, 2**64)) | |
| cliptextencode = NODE_CLASS_MAPPINGS["CLIPTextEncode"]() | |
| cliptextencode_44 = cliptextencode.encode( | |
| text=prompt, | |
| clip=get_value_at_index(dualcliploader_11, 0), | |
| ) | |
| int_literal = NODE_CLASS_MAPPINGS["Int Literal"]() | |
| int_literal_295 = int_literal.get_int(int=frames) | |
| hunyuanvideoloraloader = NODE_CLASS_MAPPINGS["HunyuanVideoLoraLoader"]() | |
| modelsamplingsd3 = NODE_CLASS_MAPPINGS["ModelSamplingSD3"]() | |
| fluxguidance = NODE_CLASS_MAPPINGS["FluxGuidance"]() | |
| basicguider = NODE_CLASS_MAPPINGS["BasicGuider"]() | |
| basicscheduler = NODE_CLASS_MAPPINGS["BasicScheduler"]() | |
| emptyhunyuanlatentvideo = NODE_CLASS_MAPPINGS["EmptyHunyuanLatentVideo"]() | |
| samplercustomadvanced = NODE_CLASS_MAPPINGS["SamplerCustomAdvanced"]() | |
| big_latent_switch_dream = NODE_CLASS_MAPPINGS["Big Latent Switch [Dream]"]() | |
| vaedecodetiled = NODE_CLASS_MAPPINGS["VAEDecodeTiled"]() | |
| imagesharpen = NODE_CLASS_MAPPINGS["ImageSharpen"]() | |
| vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() | |
| anything_everywhere3 = NODE_CLASS_MAPPINGS["Anything Everywhere3"]() | |
| easy_cleangpuused = NODE_CLASS_MAPPINGS["easy cleanGpuUsed"]() | |
| for q in range(1): | |
| hunyuanvideoloraloader_255 = hunyuanvideoloraloader.load_lora( | |
| lora_name="boreal-hl-v1.safetensors", | |
| strength=lora_strenth, | |
| blocks_type="all", | |
| model=get_value_at_index(unetloader_12, 0), | |
| ) | |
| modelsamplingsd3_67 = modelsamplingsd3.patch( | |
| shift=9, model=get_value_at_index(hunyuanvideoloraloader_255, 0) | |
| ) | |
| fluxguidance_26 = fluxguidance.append( | |
| guidance=12, conditioning=get_value_at_index(cliptextencode_44, 0) | |
| ) | |
| basicguider_22 = basicguider.get_guider( | |
| model=get_value_at_index(modelsamplingsd3_67, 0), | |
| conditioning=get_value_at_index(fluxguidance_26, 0), | |
| ) | |
| basicscheduler_17 = basicscheduler.get_sigmas( | |
| scheduler="simple", | |
| steps=40, | |
| denoise=1, | |
| model=get_value_at_index(hunyuanvideoloraloader_255, 0), | |
| ) | |
| emptyhunyuanlatentvideo_232 = emptyhunyuanlatentvideo.generate( | |
| width=width, | |
| height=height, | |
| length=get_value_at_index(int_literal_295, 0), | |
| batch_size=1, | |
| ) | |
| samplercustomadvanced_13 = samplercustomadvanced.sample( | |
| noise=get_value_at_index(randomnoise_25, 0), | |
| guider=get_value_at_index(basicguider_22, 0), | |
| sampler=get_value_at_index(ksamplerselect_16, 0), | |
| sigmas=get_value_at_index(basicscheduler_17, 0), | |
| latent_image=get_value_at_index(emptyhunyuanlatentvideo_232, 0), | |
| ) | |
| big_latent_switch_dream_243 = big_latent_switch_dream.pick( | |
| select=0, | |
| on_missing="next", | |
| input_2=get_value_at_index(samplercustomadvanced_13, 1), | |
| ) | |
| vaedecodetiled_73 = vaedecodetiled.decode( | |
| tile_size=128, | |
| overlap=64, | |
| temporal_size=64, | |
| temporal_overlap=8, | |
| samples=get_value_at_index(big_latent_switch_dream_243, 0), | |
| vae=get_value_at_index(vaeloader_10, 0), | |
| ) | |
| imagesharpen_106 = imagesharpen.sharpen( | |
| sharpen_radius=1, | |
| sigma=0.43, | |
| alpha=0.5, | |
| image=get_value_at_index(vaedecodetiled_73, 0), | |
| ) | |
| vhs_videocombine_82 = vhs_videocombine.combine_video( | |
| frame_rate=frame_rate, | |
| loop_count=0, | |
| filename_prefix="HunyuanVideo", | |
| format="video/h264-mp4", | |
| pix_fmt="yuv420p", | |
| crf=10, | |
| save_metadata=True, | |
| trim_to_audio=False, | |
| pingpong=False, | |
| save_output=True, | |
| images=get_value_at_index(imagesharpen_106, 0), | |
| vae=get_value_at_index(vaeloader_10, 0), | |
| unique_id=3348895206324303610, | |
| ) | |
| anything_everywhere3_180 = anything_everywhere3.func( | |
| CLIP=get_value_at_index(dualcliploader_11, 0), | |
| VAE=get_value_at_index(vaeloader_10, 0), | |
| ) | |
| easy_cleangpuused_182 = easy_cleangpuused.empty_cache( | |
| anything=get_value_at_index(big_latent_switch_dream_243, 0), | |
| unique_id=16583500820061639415, | |
| ) | |
| #saved_path = f"output/{vhs_videocombine_82['ui']['filename_prefix'][0]}" | |
| #return saved_path | |
| def get_latest_video(directory="output"): | |
| files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith(".mp4")] | |
| if not files: | |
| raise FileNotFoundError("No video files found in the output directory.") | |
| latest_file = max(files, key=os.path.getmtime) # Get file with the latest modification time | |
| return latest_file | |
| # Get the latest video file based on modification time | |
| saved_path = get_latest_video() | |
| return saved_path | |
| import gradio as gr | |
| # Placeholder functions for demonstration purposes | |
| def generate_video(prompt, frames, lora_strength, width, height, frame_rate): | |
| # Integrate with your generation logic here | |
| return "Generated Video Placeholder" | |
| with gr.Blocks(theme="soft") as app: | |
| gr.Markdown("# π FLUX Style Shaping π") | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_input = gr.Textbox(label="π Prompt", placeholder="Enter your prompt here...") | |
| frames_input = gr.Number(label="π Frames", value=30, minimum=1) | |
| lora_strength_input = gr.Slider(label="π§ LoRA Strength", minimum=0.0, maximum=1.0, value=0.5, step=0.01) | |
| width_input = gr.Number(label="π Width", value=512, minimum=256, step=64) | |
| height_input = gr.Number(label="π Height", value=512, minimum=256, step=64) | |
| frame_rate_input = gr.Number(label="π― Frame Rate", value=24, minimum=1) | |
| generate_btn = gr.Button("π Generate Video") | |
| with gr.Column(): | |
| output_video = gr.Video(label="π¬ Generated Video", interactive=False) | |
| generate_btn.click( | |
| fn=generate_image, | |
| inputs=[prompt_input, frames_input, lora_strength_input, width_input, height_input, frame_rate_input], | |
| outputs=[output_video] | |
| ) | |
| app.launch(share=True) | |