Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import multiprocessing | |
| import os | |
| from copy import copy | |
| from logging import getLogger | |
| from pathlib import Path | |
| import PySimpleGUI as sg | |
| import sounddevice as sd | |
| import soundfile as sf | |
| import torch | |
| from pebble import ProcessFuture, ProcessPool | |
| from . import __version__ | |
| from .utils import get_optimal_device | |
| GUI_DEFAULT_PRESETS_PATH = Path(__file__).parent / "default_gui_presets.json" | |
| GUI_PRESETS_PATH = Path("./user_gui_presets.json").absolute() | |
| LOG = getLogger(__name__) | |
| def play_audio(path: Path | str): | |
| if isinstance(path, Path): | |
| path = path.as_posix() | |
| data, sr = sf.read(path) | |
| sd.play(data, sr) | |
| def load_presets() -> dict: | |
| defaults = json.loads(GUI_DEFAULT_PRESETS_PATH.read_text("utf-8")) | |
| users = ( | |
| json.loads(GUI_PRESETS_PATH.read_text("utf-8")) | |
| if GUI_PRESETS_PATH.exists() | |
| else {} | |
| ) | |
| # prioriy: defaults > users | |
| # order: defaults -> users | |
| return {**defaults, **users, **defaults} | |
| def add_preset(name: str, preset: dict) -> dict: | |
| presets = load_presets() | |
| presets[name] = preset | |
| with GUI_PRESETS_PATH.open("w") as f: | |
| json.dump(presets, f, indent=2) | |
| return load_presets() | |
| def delete_preset(name: str) -> dict: | |
| presets = load_presets() | |
| if name in presets: | |
| del presets[name] | |
| else: | |
| LOG.warning(f"Cannot delete preset {name} because it does not exist.") | |
| with GUI_PRESETS_PATH.open("w") as f: | |
| json.dump(presets, f, indent=2) | |
| return load_presets() | |
| def get_output_path(input_path: Path) -> Path: | |
| # Default output path | |
| output_path = input_path.parent / f"{input_path.stem}.out{input_path.suffix}" | |
| # Increment file number in path if output file already exists | |
| file_num = 1 | |
| while output_path.exists(): | |
| output_path = ( | |
| input_path.parent / f"{input_path.stem}.out_{file_num}{input_path.suffix}" | |
| ) | |
| file_num += 1 | |
| return output_path | |
| def get_supported_file_types() -> tuple[tuple[str, str], ...]: | |
| res = tuple( | |
| [ | |
| (extension, f".{extension.lower()}") | |
| for extension in sf.available_formats().keys() | |
| ] | |
| ) | |
| # Sort by popularity | |
| common_file_types = ["WAV", "MP3", "FLAC", "OGG", "M4A", "WMA"] | |
| res = sorted( | |
| res, | |
| key=lambda x: common_file_types.index(x[0]) | |
| if x[0] in common_file_types | |
| else len(common_file_types), | |
| ) | |
| return res | |
| def get_supported_file_types_concat() -> tuple[tuple[str, str], ...]: | |
| return (("Audio", " ".join(sf.available_formats().keys())),) | |
| def validate_output_file_type(output_path: Path) -> bool: | |
| supported_file_types = sorted( | |
| [f".{extension.lower()}" for extension in sf.available_formats().keys()] | |
| ) | |
| if not output_path.suffix: | |
| sg.popup_ok( | |
| "Error: Output path missing file type extension, enter " | |
| + "one of the following manually:\n\n" | |
| + "\n".join(supported_file_types) | |
| ) | |
| return False | |
| if output_path.suffix.lower() not in supported_file_types: | |
| sg.popup_ok( | |
| f"Error: {output_path.suffix.lower()} is not a supported " | |
| + "extension; use one of the following:\n\n" | |
| + "\n".join(supported_file_types) | |
| ) | |
| return False | |
| return True | |
| def get_devices( | |
| update: bool = True, | |
| ) -> tuple[list[str], list[str], list[int], list[int]]: | |
| if update: | |
| sd._terminate() | |
| sd._initialize() | |
| devices = sd.query_devices() | |
| hostapis = sd.query_hostapis() | |
| for hostapi in hostapis: | |
| for device_idx in hostapi["devices"]: | |
| devices[device_idx]["hostapi_name"] = hostapi["name"] | |
| input_devices = [ | |
| f"{d['name']} ({d['hostapi_name']})" | |
| for d in devices | |
| if d["max_input_channels"] > 0 | |
| ] | |
| output_devices = [ | |
| f"{d['name']} ({d['hostapi_name']})" | |
| for d in devices | |
| if d["max_output_channels"] > 0 | |
| ] | |
| input_devices_indices = [d["index"] for d in devices if d["max_input_channels"] > 0] | |
| output_devices_indices = [ | |
| d["index"] for d in devices if d["max_output_channels"] > 0 | |
| ] | |
| return input_devices, output_devices, input_devices_indices, output_devices_indices | |
| def after_inference(window: sg.Window, path: Path, auto_play: bool, output_path: Path): | |
| try: | |
| LOG.info(f"Finished inference for {path.stem}{path.suffix}") | |
| window["infer"].update(disabled=False) | |
| if auto_play: | |
| play_audio(output_path) | |
| except Exception as e: | |
| LOG.exception(e) | |
| def main(): | |
| LOG.info(f"version: {__version__}") | |
| # sg.theme("Dark") | |
| sg.theme_add_new( | |
| "Very Dark", | |
| { | |
| "BACKGROUND": "#111111", | |
| "TEXT": "#FFFFFF", | |
| "INPUT": "#444444", | |
| "TEXT_INPUT": "#FFFFFF", | |
| "SCROLL": "#333333", | |
| "BUTTON": ("white", "#112233"), | |
| "PROGRESS": ("#111111", "#333333"), | |
| "BORDER": 2, | |
| "SLIDER_DEPTH": 2, | |
| "PROGRESS_DEPTH": 2, | |
| }, | |
| ) | |
| sg.theme("Very Dark") | |
| model_candidates = list(sorted(Path("./logs/44k/").glob("G_*.pth"))) | |
| frame_contents = { | |
| "Paths": [ | |
| [ | |
| sg.Text("Model path"), | |
| sg.Push(), | |
| sg.InputText( | |
| key="model_path", | |
| default_text=model_candidates[-1].absolute().as_posix() | |
| if model_candidates | |
| else "", | |
| enable_events=True, | |
| ), | |
| sg.FileBrowse( | |
| initial_folder=Path("./logs/44k/").absolute | |
| if Path("./logs/44k/").exists() | |
| else Path(".").absolute().as_posix(), | |
| key="model_path_browse", | |
| file_types=( | |
| ("PyTorch", "G_*.pth G_*.pt"), | |
| ("Pytorch", "*.pth *.pt"), | |
| ), | |
| ), | |
| ], | |
| [ | |
| sg.Text("Config path"), | |
| sg.Push(), | |
| sg.InputText( | |
| key="config_path", | |
| default_text=Path("./configs/44k/config.json").absolute().as_posix() | |
| if Path("./configs/44k/config.json").exists() | |
| else "", | |
| enable_events=True, | |
| ), | |
| sg.FileBrowse( | |
| initial_folder=Path("./configs/44k/").as_posix() | |
| if Path("./configs/44k/").exists() | |
| else Path(".").absolute().as_posix(), | |
| key="config_path_browse", | |
| file_types=(("JSON", "*.json"),), | |
| ), | |
| ], | |
| [ | |
| sg.Text("Cluster model path (Optional)"), | |
| sg.Push(), | |
| sg.InputText( | |
| key="cluster_model_path", | |
| default_text=Path("./logs/44k/kmeans.pt").absolute().as_posix() | |
| if Path("./logs/44k/kmeans.pt").exists() | |
| else "", | |
| enable_events=True, | |
| ), | |
| sg.FileBrowse( | |
| initial_folder="./logs/44k/" | |
| if Path("./logs/44k/").exists() | |
| else ".", | |
| key="cluster_model_path_browse", | |
| file_types=(("PyTorch", "*.pt"), ("Pickle", "*.pt *.pth *.pkl")), | |
| ), | |
| ], | |
| ], | |
| "Common": [ | |
| [ | |
| sg.Text("Speaker"), | |
| sg.Push(), | |
| sg.Combo(values=[], key="speaker", size=(20, 1)), | |
| ], | |
| [ | |
| sg.Text("Silence threshold"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(-60.0, 0), | |
| orientation="h", | |
| key="silence_threshold", | |
| resolution=0.1, | |
| ), | |
| ], | |
| [ | |
| sg.Text( | |
| "Pitch (12 = 1 octave)\n" | |
| "ADJUST THIS based on your voice\n" | |
| "when Auto predict F0 is turned off.", | |
| size=(None, 4), | |
| ), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(-36, 36), | |
| orientation="h", | |
| key="transpose", | |
| tick_interval=12, | |
| ), | |
| ], | |
| [ | |
| sg.Checkbox( | |
| key="auto_predict_f0", | |
| text="Auto predict F0 (Pitch may become unstable when turned on in real-time inference.)", | |
| ) | |
| ], | |
| [ | |
| sg.Text("F0 prediction method"), | |
| sg.Push(), | |
| sg.Combo( | |
| ["crepe", "crepe-tiny", "parselmouth", "dio", "harvest"], | |
| key="f0_method", | |
| ), | |
| ], | |
| [ | |
| sg.Text("Cluster infer ratio"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0, 1.0), | |
| orientation="h", | |
| key="cluster_infer_ratio", | |
| resolution=0.01, | |
| ), | |
| ], | |
| [ | |
| sg.Text("Noise scale"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0.0, 1.0), | |
| orientation="h", | |
| key="noise_scale", | |
| resolution=0.01, | |
| ), | |
| ], | |
| [ | |
| sg.Text("Pad seconds"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0.0, 1.0), | |
| orientation="h", | |
| key="pad_seconds", | |
| resolution=0.01, | |
| ), | |
| ], | |
| [ | |
| sg.Text("Chunk seconds"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0.0, 3.0), | |
| orientation="h", | |
| key="chunk_seconds", | |
| resolution=0.01, | |
| ), | |
| ], | |
| [ | |
| sg.Text("Max chunk seconds (set lower if Out Of Memory, 0 to disable)"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0.0, 240.0), | |
| orientation="h", | |
| key="max_chunk_seconds", | |
| resolution=1.0, | |
| ), | |
| ], | |
| [ | |
| sg.Checkbox( | |
| key="absolute_thresh", | |
| text="Absolute threshold (ignored (True) in realtime inference)", | |
| ) | |
| ], | |
| ], | |
| "File": [ | |
| [ | |
| sg.Text("Input audio path"), | |
| sg.Push(), | |
| sg.InputText(key="input_path", enable_events=True), | |
| sg.FileBrowse( | |
| initial_folder=".", | |
| key="input_path_browse", | |
| file_types=get_supported_file_types_concat(), | |
| ), | |
| sg.FolderBrowse( | |
| button_text="Browse(Folder)", | |
| initial_folder=".", | |
| key="input_path_folder_browse", | |
| target="input_path", | |
| ), | |
| sg.Button("Play", key="play_input"), | |
| ], | |
| [ | |
| sg.Text("Output audio path"), | |
| sg.Push(), | |
| sg.InputText(key="output_path"), | |
| sg.FileSaveAs( | |
| initial_folder=".", | |
| key="output_path_browse", | |
| file_types=get_supported_file_types(), | |
| ), | |
| ], | |
| [sg.Checkbox(key="auto_play", text="Auto play", default=True)], | |
| ], | |
| "Realtime": [ | |
| [ | |
| sg.Text("Crossfade seconds"), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0, 0.6), | |
| orientation="h", | |
| key="crossfade_seconds", | |
| resolution=0.001, | |
| ), | |
| ], | |
| [ | |
| sg.Text( | |
| "Block seconds", # \n(big -> more robust, slower, (the same) latency)" | |
| tooltip="Big -> more robust, slower, (the same) latency", | |
| ), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0, 3.0), | |
| orientation="h", | |
| key="block_seconds", | |
| resolution=0.001, | |
| ), | |
| ], | |
| [ | |
| sg.Text( | |
| "Additional Infer seconds (before)", # \n(big -> more robust, slower)" | |
| tooltip="Big -> more robust, slower, additional latency", | |
| ), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0, 2.0), | |
| orientation="h", | |
| key="additional_infer_before_seconds", | |
| resolution=0.001, | |
| ), | |
| ], | |
| [ | |
| sg.Text( | |
| "Additional Infer seconds (after)", # \n(big -> more robust, slower, additional latency)" | |
| tooltip="Big -> more robust, slower, additional latency", | |
| ), | |
| sg.Push(), | |
| sg.Slider( | |
| range=(0, 2.0), | |
| orientation="h", | |
| key="additional_infer_after_seconds", | |
| resolution=0.001, | |
| ), | |
| ], | |
| [ | |
| sg.Text("Realtime algorithm"), | |
| sg.Push(), | |
| sg.Combo( | |
| ["2 (Divide by speech)", "1 (Divide constantly)"], | |
| default_value="1 (Divide constantly)", | |
| key="realtime_algorithm", | |
| ), | |
| ], | |
| [ | |
| sg.Text("Input device"), | |
| sg.Push(), | |
| sg.Combo( | |
| key="input_device", | |
| values=[], | |
| size=(60, 1), | |
| ), | |
| ], | |
| [ | |
| sg.Text("Output device"), | |
| sg.Push(), | |
| sg.Combo( | |
| key="output_device", | |
| values=[], | |
| size=(60, 1), | |
| ), | |
| ], | |
| [ | |
| sg.Checkbox( | |
| "Passthrough original audio (for latency check)", | |
| key="passthrough_original", | |
| default=False, | |
| ), | |
| sg.Push(), | |
| sg.Button("Refresh devices", key="refresh_devices"), | |
| ], | |
| [ | |
| sg.Frame( | |
| "Notes", | |
| [ | |
| [ | |
| sg.Text( | |
| "In Realtime Inference:\n" | |
| " - Setting F0 prediction method to 'crepe` may cause performance degradation.\n" | |
| " - Auto Predict F0 must be turned off.\n" | |
| "If the audio sounds mumbly and choppy:\n" | |
| " Case: The inference has not been made in time (Increase Block seconds)\n" | |
| " Case: Mic input is low (Decrease Silence threshold)\n" | |
| ) | |
| ] | |
| ], | |
| ), | |
| ], | |
| ], | |
| "Presets": [ | |
| [ | |
| sg.Text("Presets"), | |
| sg.Push(), | |
| sg.Combo( | |
| key="presets", | |
| values=list(load_presets().keys()), | |
| size=(40, 1), | |
| enable_events=True, | |
| ), | |
| sg.Button("Delete preset", key="delete_preset"), | |
| ], | |
| [ | |
| sg.Text("Preset name"), | |
| sg.Stretch(), | |
| sg.InputText(key="preset_name", size=(26, 1)), | |
| sg.Button("Add current settings as a preset", key="add_preset"), | |
| ], | |
| ], | |
| } | |
| # frames | |
| frames = {} | |
| for name, items in frame_contents.items(): | |
| frame = sg.Frame(name, items) | |
| frame.expand_x = True | |
| frames[name] = [frame] | |
| bottoms = [ | |
| [ | |
| sg.Checkbox( | |
| key="use_gpu", | |
| default=get_optimal_device() != torch.device("cpu"), | |
| text="Use GPU" | |
| + ( | |
| " (not available; if your device has GPU, make sure you installed PyTorch with CUDA support)" | |
| if get_optimal_device() == torch.device("cpu") | |
| else "" | |
| ), | |
| disabled=get_optimal_device() == torch.device("cpu"), | |
| ) | |
| ], | |
| [ | |
| sg.Button("Infer", key="infer"), | |
| sg.Button("(Re)Start Voice Changer", key="start_vc"), | |
| sg.Button("Stop Voice Changer", key="stop_vc"), | |
| sg.Push(), | |
| # sg.Button("ONNX Export", key="onnx_export"), | |
| ], | |
| ] | |
| column1 = sg.Column( | |
| [ | |
| frames["Paths"], | |
| frames["Common"], | |
| ], | |
| vertical_alignment="top", | |
| ) | |
| column2 = sg.Column( | |
| [ | |
| frames["File"], | |
| frames["Realtime"], | |
| frames["Presets"], | |
| ] | |
| + bottoms | |
| ) | |
| # columns | |
| layout = [[column1, column2]] | |
| # get screen size | |
| screen_width, screen_height = sg.Window.get_screen_size() | |
| if screen_height < 720: | |
| layout = [ | |
| [ | |
| sg.Column( | |
| layout, | |
| vertical_alignment="top", | |
| scrollable=False, | |
| expand_x=True, | |
| expand_y=True, | |
| vertical_scroll_only=True, | |
| key="main_column", | |
| ) | |
| ] | |
| ] | |
| window = sg.Window( | |
| f"{__name__.split('.')[0].replace('_', '-')} v{__version__}", | |
| layout, | |
| grab_anywhere=True, | |
| finalize=True, | |
| scaling=1, | |
| font=("Yu Gothic UI", 11) if os.name == "nt" else None, | |
| # resizable=True, | |
| # size=(1280, 720), | |
| # Below disables taskbar, which may be not useful for some users | |
| # use_custom_titlebar=True, no_titlebar=False | |
| # Keep on top | |
| # keep_on_top=True | |
| ) | |
| # event, values = window.read(timeout=0.01) | |
| # window["main_column"].Scrollable = True | |
| # make slider height smaller | |
| try: | |
| for v in window.element_list(): | |
| if isinstance(v, sg.Slider): | |
| v.Widget.configure(sliderrelief="flat", width=10, sliderlength=20) | |
| except Exception as e: | |
| LOG.exception(e) | |
| # for n in ["input_device", "output_device"]: | |
| # window[n].Widget.configure(justify="right") | |
| event, values = window.read(timeout=0.01) | |
| def update_speaker() -> None: | |
| from . import utils | |
| config_path = Path(values["config_path"]) | |
| if config_path.exists() and config_path.is_file(): | |
| hp = utils.get_hparams(values["config_path"]) | |
| LOG.debug(f"Loaded config from {values['config_path']}") | |
| window["speaker"].update( | |
| values=list(hp.__dict__["spk"].keys()), set_to_index=0 | |
| ) | |
| def update_devices() -> None: | |
| ( | |
| input_devices, | |
| output_devices, | |
| input_device_indices, | |
| output_device_indices, | |
| ) = get_devices() | |
| input_device_indices_reversed = { | |
| v: k for k, v in enumerate(input_device_indices) | |
| } | |
| output_device_indices_reversed = { | |
| v: k for k, v in enumerate(output_device_indices) | |
| } | |
| window["input_device"].update( | |
| values=input_devices, value=values["input_device"] | |
| ) | |
| window["output_device"].update( | |
| values=output_devices, value=values["output_device"] | |
| ) | |
| input_default, output_default = sd.default.device | |
| if values["input_device"] not in input_devices: | |
| window["input_device"].update( | |
| values=input_devices, | |
| set_to_index=input_device_indices_reversed.get(input_default, 0), | |
| ) | |
| if values["output_device"] not in output_devices: | |
| window["output_device"].update( | |
| values=output_devices, | |
| set_to_index=output_device_indices_reversed.get(output_default, 0), | |
| ) | |
| PRESET_KEYS = [ | |
| key | |
| for key in values.keys() | |
| if not any(exclude in key for exclude in ["preset", "browse"]) | |
| ] | |
| def apply_preset(name: str) -> None: | |
| for key, value in load_presets()[name].items(): | |
| if key in PRESET_KEYS: | |
| window[key].update(value) | |
| values[key] = value | |
| default_name = list(load_presets().keys())[0] | |
| apply_preset(default_name) | |
| window["presets"].update(default_name) | |
| del default_name | |
| update_speaker() | |
| update_devices() | |
| # with ProcessPool(max_workers=1) as pool: | |
| # to support Linux | |
| with ProcessPool( | |
| max_workers=min(2, multiprocessing.cpu_count()), | |
| context=multiprocessing.get_context("spawn"), | |
| ) as pool: | |
| future: None | ProcessFuture = None | |
| infer_futures: set[ProcessFuture] = set() | |
| while True: | |
| event, values = window.read(200) | |
| if event == sg.WIN_CLOSED: | |
| break | |
| if not event == sg.EVENT_TIMEOUT: | |
| LOG.info(f"Event {event}, values {values}") | |
| if event.endswith("_path"): | |
| for name in window.AllKeysDict: | |
| if str(name).endswith("_browse"): | |
| browser = window[name] | |
| if isinstance(browser, sg.Button): | |
| LOG.info( | |
| f"Updating browser {browser} to {Path(values[event]).parent}" | |
| ) | |
| browser.InitialFolder = Path(values[event]).parent | |
| browser.update() | |
| else: | |
| LOG.warning(f"Browser {browser} is not a FileBrowse") | |
| window["transpose"].update( | |
| disabled=values["auto_predict_f0"], | |
| visible=not values["auto_predict_f0"], | |
| ) | |
| input_path = Path(values["input_path"]) | |
| output_path = Path(values["output_path"]) | |
| if event == "add_preset": | |
| presets = add_preset( | |
| values["preset_name"], {key: values[key] for key in PRESET_KEYS} | |
| ) | |
| window["presets"].update(values=list(presets.keys())) | |
| elif event == "delete_preset": | |
| presets = delete_preset(values["presets"]) | |
| window["presets"].update(values=list(presets.keys())) | |
| elif event == "presets": | |
| apply_preset(values["presets"]) | |
| update_speaker() | |
| elif event == "refresh_devices": | |
| update_devices() | |
| elif event == "config_path": | |
| update_speaker() | |
| elif event == "input_path": | |
| # Don't change the output path if it's already set | |
| # if values["output_path"]: | |
| # continue | |
| # Set a sensible default output path | |
| window.Element("output_path").Update(str(get_output_path(input_path))) | |
| elif event == "infer": | |
| if "Default VC" in values["presets"]: | |
| window["presets"].update( | |
| set_to_index=list(load_presets().keys()).index("Default File") | |
| ) | |
| apply_preset("Default File") | |
| if values["input_path"] == "": | |
| LOG.warning("Input path is empty.") | |
| continue | |
| if not input_path.exists(): | |
| LOG.warning(f"Input path {input_path} does not exist.") | |
| continue | |
| # if not validate_output_file_type(output_path): | |
| # continue | |
| try: | |
| from so_vits_svc_fork.inference.main import infer | |
| LOG.info("Starting inference...") | |
| window["infer"].update(disabled=True) | |
| infer_future = pool.schedule( | |
| infer, | |
| kwargs=dict( | |
| # paths | |
| model_path=Path(values["model_path"]), | |
| output_path=output_path, | |
| input_path=input_path, | |
| config_path=Path(values["config_path"]), | |
| recursive=True, | |
| # svc config | |
| speaker=values["speaker"], | |
| cluster_model_path=Path(values["cluster_model_path"]) | |
| if values["cluster_model_path"] | |
| else None, | |
| transpose=values["transpose"], | |
| auto_predict_f0=values["auto_predict_f0"], | |
| cluster_infer_ratio=values["cluster_infer_ratio"], | |
| noise_scale=values["noise_scale"], | |
| f0_method=values["f0_method"], | |
| # slice config | |
| db_thresh=values["silence_threshold"], | |
| pad_seconds=values["pad_seconds"], | |
| chunk_seconds=values["chunk_seconds"], | |
| absolute_thresh=values["absolute_thresh"], | |
| max_chunk_seconds=values["max_chunk_seconds"], | |
| device="cpu" | |
| if not values["use_gpu"] | |
| else get_optimal_device(), | |
| ), | |
| ) | |
| infer_future.add_done_callback( | |
| lambda _future: after_inference( | |
| window, input_path, values["auto_play"], output_path | |
| ) | |
| ) | |
| infer_futures.add(infer_future) | |
| except Exception as e: | |
| LOG.exception(e) | |
| elif event == "play_input": | |
| if Path(values["input_path"]).exists(): | |
| pool.schedule(play_audio, args=[Path(values["input_path"])]) | |
| elif event == "start_vc": | |
| _, _, input_device_indices, output_device_indices = get_devices( | |
| update=False | |
| ) | |
| from so_vits_svc_fork.inference.main import realtime | |
| if future: | |
| LOG.info("Canceling previous task") | |
| future.cancel() | |
| future = pool.schedule( | |
| realtime, | |
| kwargs=dict( | |
| # paths | |
| model_path=Path(values["model_path"]), | |
| config_path=Path(values["config_path"]), | |
| speaker=values["speaker"], | |
| # svc config | |
| cluster_model_path=Path(values["cluster_model_path"]) | |
| if values["cluster_model_path"] | |
| else None, | |
| transpose=values["transpose"], | |
| auto_predict_f0=values["auto_predict_f0"], | |
| cluster_infer_ratio=values["cluster_infer_ratio"], | |
| noise_scale=values["noise_scale"], | |
| f0_method=values["f0_method"], | |
| # slice config | |
| db_thresh=values["silence_threshold"], | |
| pad_seconds=values["pad_seconds"], | |
| chunk_seconds=values["chunk_seconds"], | |
| # realtime config | |
| crossfade_seconds=values["crossfade_seconds"], | |
| additional_infer_before_seconds=values[ | |
| "additional_infer_before_seconds" | |
| ], | |
| additional_infer_after_seconds=values[ | |
| "additional_infer_after_seconds" | |
| ], | |
| block_seconds=values["block_seconds"], | |
| version=int(values["realtime_algorithm"][0]), | |
| input_device=input_device_indices[ | |
| window["input_device"].widget.current() | |
| ], | |
| output_device=output_device_indices[ | |
| window["output_device"].widget.current() | |
| ], | |
| device=get_optimal_device() if values["use_gpu"] else "cpu", | |
| passthrough_original=values["passthrough_original"], | |
| ), | |
| ) | |
| elif event == "stop_vc": | |
| if future: | |
| future.cancel() | |
| future = None | |
| elif event == "onnx_export": | |
| try: | |
| raise NotImplementedError("ONNX export is not implemented yet.") | |
| from so_vits_svc_fork.modules.onnx._export import onnx_export | |
| onnx_export( | |
| input_path=Path(values["model_path"]), | |
| output_path=Path(values["model_path"]).with_suffix(".onnx"), | |
| config_path=Path(values["config_path"]), | |
| device="cpu", | |
| ) | |
| except Exception as e: | |
| LOG.exception(e) | |
| if future is not None and future.done(): | |
| try: | |
| future.result() | |
| except Exception as e: | |
| LOG.error("Error in realtime: ") | |
| LOG.exception(e) | |
| future = None | |
| for future in copy(infer_futures): | |
| if future.done(): | |
| try: | |
| future.result() | |
| except Exception as e: | |
| LOG.error("Error in inference: ") | |
| LOG.exception(e) | |
| infer_futures.remove(future) | |
| if future: | |
| future.cancel() | |
| window.close() | |