#app.py import streamlit as st import subprocess import os import glob import time import streamlit_antd_components as sac import uuid import shutil import re import threading import psutil from video_processor import process_video, get_status, get_error_details, status_dict, cancel_job, cleanup_cancelled_jobs st.set_page_config(layout="wide") # Cache the check_dependencies function @st.cache_data def check_dependencies(): dependencies_status = {} # Check ffmpeg ffmpeg_path = shutil.which('ffmpeg') if ffmpeg_path: dependencies_status['ffmpeg'] = "installed" try: result = subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) dependencies_status['ffmpeg_version'] = result.stdout.split('\n')[0] except subprocess.CalledProcessError: dependencies_status['ffmpeg'] = "error" else: dependencies_status['ffmpeg'] = "not accessible" # Check yt-dlp and auto-editor for dep in ['yt-dlp', 'auto-editor']: try: result = subprocess.run([dep, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) dependencies_status[dep] = "installed" dependencies_status[f"{dep}_version"] = result.stdout.split('\n')[0] except FileNotFoundError: dependencies_status[dep] = "not installed" except subprocess.CalledProcessError: dependencies_status[dep] = "not accessible" return dependencies_status def display_dependencies_status(): with st.sidebar: st.header("Dependency Check") dependencies_status = check_dependencies() # Display ffmpeg status if dependencies_status['ffmpeg'] == "installed": st.success("ffmpeg is installed and accessible.") st.text(dependencies_status['ffmpeg_version']) elif dependencies_status['ffmpeg'] == "error": st.warning("ffmpeg is found but there was an error running it.") else: st.error("ffmpeg is not accessible.") # Display yt-dlp and auto-editor status for dep in ['yt-dlp', 'auto-editor']: if dependencies_status[dep] == "installed": st.success(f"{dep} is installed and accessible.") st.text(dependencies_status[f"{dep}_version"]) elif dependencies_status[dep] == "not installed": st.error(f"{dep} is not installed.") else: st.error(f"{dep} is not accessible.") # Add button to delete all output folders if st.button("Delete All Output Folders"): delete_all_output_folders() # Display system statistics st.header("System Statistics") cpu_usage = psutil.cpu_percent(interval=1) memory_info = psutil.virtual_memory() disk_info = psutil.disk_usage('/') st.text(f"CPU Usage: {cpu_usage}%") st.text(f"Memory Usage: {memory_info.percent}%") st.text(f"Total Memory: {memory_info.total / (1024 ** 3):.2f} GB") st.text(f"Available Memory: {memory_info.available / (1024 ** 3):.2f} GB") st.text(f"Disk Usage: {disk_info.percent}%") st.text(f"Total Disk Space: {disk_info.total / (1024 ** 3):.2f} GB") st.text(f"Free Disk Space: {disk_info.free / (1024 ** 3):.2f} GB") def delete_all_output_folders(): output_folders = glob.glob("./output_folder_*") for folder in output_folders: shutil.rmtree(folder) st.success("All output folders have been deleted.") # Call this function when your app starts display_dependencies_status() def main(): st.title("⚡️ ISKM Spotify Service 🛎️") sac.alert(label='🪷🌹🪷Hare Krishna 🪷🌹🪷 | 🙌dandwat 🙇 pranam🙌 | 🙏please accept my humble obeisances | 🫡 🪷🌹All glories Srila Prabhupada🌹🪷🫡 | 🎵 Welcome to ISKM Spotify Service! 🎶 | 🎧 Enter a YouTube URL to transform it into divine format 🙏 ✅ v0.0.2 Nanda Gopa 👶🏻 Stage | 🚧 Maintaince Mode', size='xs', radius=55, color='orange', banner=sac.Banner(play=True, direction='right', speed=51, pauseOnHover=True), icon=True, closable=True) # Add instruction guide in an expander with st.expander("📚 Instructions"): st.markdown(""" ### How to Use ISKM Spotify Service 1. **Enter YouTube URL**: - Paste a valid YouTube video or playlist URL in the text box. - The URL should start with https://www.youtube.com/ or https://youtu.be/. 2. **Process Video**: - Click the "Process Video" button to start the conversion. - The tool will download, trim, and process the video(s). 3. **Download**: - Once processing is complete, download buttons will appear. - For single videos, you'll see one download button. - For playlists, you'll see multiple download buttons (one for each video). 4. **Dependency Check**: - Check the sidebar to ensure all required dependencies are installed and accessible. ### Notes: - Processing may take some time, especially for longer videos or playlists. - Ensure you have a stable internet connection throughout the process. - The tool will trim silence and unnecessary parts from the videos. - Final output will be in MP4 format, optimized for audio quality. If you encounter any issues, please check the dependency status in the sidebar or contact support. """) tabs = st.tabs(["Submit Job", "Check Status", "Cancel Job", "Job Table"]) with tabs[0]: st.header("Submit Job") url = st.text_input("Enter the YouTube URL:") if st.button("Process Video"): if url: session_id = str(uuid.uuid4()) status_dict[session_id] = {"status": "Processing", "started_time": time.strftime("%Y-%m-%d %H:%M:%S"), "video_url": url, "link_submitted": url} threading.Thread(target=process_video, args=(url, session_id)).start() st.success(f"{session_id}") st.info(f"Video processing started! Your session ID is ☝️ Use this ID to check the status.") else: st.warning("Please enter a YouTube URL.") with tabs[1]: st.header("Check Status") session_id_input = st.text_input("Enter your session ID:") if st.button("Check Status"): if session_id_input: status = get_status(session_id_input) st.info(f"Status: {status}") # Check if processing is completed and provide a download link if "Processing completed!" in status: file_path = status.split("File: ")[-1] with open(file_path, "rb") as file: btn = st.download_button( label="Download Processed Video", data=file, file_name=os.path.basename(file_path), mime="video/mp4" ) elif "Error occurred" in status: error_details = get_error_details(session_id_input) st.error(f"Error Details: {error_details}") else: st.warning("Please enter a session ID.") with tabs[2]: st.header("Cancel Job") cancel_session_id = st.text_input("Enter the session ID to cancel:") if st.button("Cancel Job"): if cancel_job(cancel_session_id): st.success(f"Job {cancel_session_id} has been cancelled.") else: st.warning("Invalid session ID or job already completed.") with tabs[3]: st.header("Job Table") cleanup_cancelled_jobs() # Clean up cancelled jobs before displaying the table job_data = [{"Job ID": job_id, **details} for job_id, details in status_dict.items()] st.table(job_data) if __name__ == "__main__": main()