Spaces:
Sleeping
Sleeping
| # pylint: disable=no-member | |
| import base64 | |
| import gc | |
| import math | |
| import mimetypes | |
| import multiprocessing | |
| import os | |
| import re | |
| import tempfile | |
| import time | |
| import uuid | |
| from datetime import timedelta | |
| from typing import Dict, List, Optional, TypedDict, Union | |
| from urllib.parse import urlparse | |
| import cv2 | |
| import imageio | |
| import pandas as pd | |
| import pytesseract | |
| import requests | |
| import torch | |
| import whisper | |
| import yt_dlp | |
| from bs4 import BeautifulSoup, Tag | |
| from dotenv import load_dotenv | |
| from duckduckgo_search import DDGS | |
| from langchain_core.messages import HumanMessage | |
| from langchain_core.tools import tool | |
| from langchain_ollama import ChatOllama | |
| from PIL import Image | |
| from playwright.sync_api import sync_playwright | |
| from youtube_transcript_api import ( | |
| NoTranscriptFound, | |
| TranscriptsDisabled, | |
| YouTubeTranscriptApi, | |
| ) | |
| load_dotenv() | |
| base_url = os.getenv("OLLAMA_BASE_URL") | |
| model_vision = ChatOllama( | |
| model="gemma3:latest", | |
| base_url=base_url, | |
| ) | |
| model_text = ChatOllama( | |
| model="hf.co/lmstudio-community/Qwen2.5-14B-Instruct-GGUF:Q6_K", base_url=base_url | |
| ) | |
| def use_vision_model(question: str) -> str: | |
| """ | |
| A multimodal reasoning model that combines image and text input to answer | |
| questions using the image. | |
| """ | |
| # Extract image paths | |
| image_paths = re.findall(r"[\w\-/\.]+\.(?:png|jpg|jpeg|webp)", question) | |
| image_paths = [p for p in image_paths if os.path.exists(p)] | |
| if not image_paths: | |
| return "No valid image file found in the question." | |
| image_path = image_paths[0] | |
| # # Preprocess the image using OpenCV | |
| # image = cv2.imread(image_path) | |
| # gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| # gray = cv2.convertScaleAbs(gray, alpha=1.2, beta=20) | |
| # gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
| # edges = cv2.Canny(gray, 50, 150, apertureSize=3) | |
| # # Create a temporary file for the processed image | |
| # with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as tmp_file: | |
| # temp_image_path = tmp_file.name | |
| # cv2.imwrite(temp_image_path, image) | |
| # Encode the temp image(this code was under with tempfile) | |
| mime_type, _ = mimetypes.guess_type(image_path) | |
| mime_type = mime_type or "image/png" | |
| with open(image_path, "rb") as f: | |
| encoded = base64.b64encode(f.read()).decode("utf-8") | |
| # Prepare the prompt and image for the model | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": question}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:{mime_type};base64,{encoded}"}, | |
| }, | |
| ], | |
| } | |
| ] | |
| # Invoke the vision model | |
| response = model_vision.invoke(messages) | |
| # Clean up | |
| del messages, encoded, image_path | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| return str(response.content) if hasattr(response, "content") else str(response) | |
| # YouTube Video Review Tool | |
| def review_youtube_video(url: str) -> str: | |
| """Reviews a YouTube video and answers a specific question about that video. | |
| Args: | |
| url (str): the URL to the YouTube video. | |
| question (str): The question you are asking about the video. | |
| Returns: | |
| str: The answer to the question | |
| """ | |
| # Extract video ID from URL (assuming it is in the format https://youtube.com/watch?v=VIDEO_ID) | |
| video_id = url.split("v=")[1] | |
| transcript_url = ( | |
| f"https://www.youtube.com/api/timedtext?v={video_id}" # Getting transcript data | |
| ) | |
| response = requests.get(transcript_url, timeout=200) | |
| transcript = response.text # This is the transcript (XML or SRT format) | |
| # Prepare the content (just the transcript, no question needed) | |
| transcript_content = f"Here is the transcript of the video: {transcript}" | |
| # Return the transcript content so the main LLM can handle question generation | |
| return transcript_content | |
| # YouTube Frames to Images Tool | |
| def video_frames_to_images( | |
| url: str, | |
| sample_interval_seconds: int = 5, | |
| ) -> List[str]: | |
| """Extracts frames from a video at specified intervals and saves them as images. | |
| Args: | |
| url (str): the URL to the video. | |
| folder_name (str): the name of the folder to save the images to. | |
| sample_interval_seconds (int): the interval between frames to sample. | |
| Returns: | |
| List[str]: A list of paths to the saved image files. | |
| """ | |
| folder_name = "./frames" | |
| # Create a subdirectory for the frames | |
| frames_dir = os.path.join(folder_name, "frames") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| ydl_opts = { | |
| "format": "bestvideo[height<=1080]+bestaudio/best[height<=1080]/best", | |
| "outtmpl": os.path.join(folder_name, "video.%(ext)s"), | |
| "quiet": True, | |
| "noplaylist": True, | |
| "merge_output_format": "mp4", | |
| "force_ipv4": True, | |
| } | |
| info_extracted = [] | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| info_extracted.append(info) | |
| video_path = next( | |
| ( | |
| os.path.join(folder_name, f) | |
| for f in os.listdir(folder_name) | |
| if f.endswith(".mp4") | |
| ), | |
| None, | |
| ) | |
| if not video_path: | |
| raise RuntimeError("Failed to download video as mp4") | |
| reader = imageio.get_reader(video_path) | |
| # metadata = reader.get_meta_data() | |
| fps = 25 | |
| duration_seconds = 120 | |
| frame_interval = int(fps * sample_interval_seconds) | |
| num_frames = int(fps * duration_seconds) | |
| # if num_frames is None or math.isinf(num_frames): | |
| # num_frames = int(fps * duration_seconds) | |
| # Handle case where the number of frames is infinite or invalid | |
| # if num_frames == float("inf") or not isinstance(num_frames, int): | |
| # reader.close() | |
| # raise RuntimeError("Invalid video length (infinite or not an integer)") | |
| image_paths: List[str] = [] | |
| for idx in range(num_frames): | |
| if idx % frame_interval == 0: | |
| # Save frame as image | |
| frame = reader.get_data(idx) | |
| image_path = os.path.join(frames_dir, f"frame_{idx:06d}.jpg") | |
| imageio.imwrite(image_path, frame) | |
| image_paths.append(image_path) | |
| reader.close() | |
| return image_paths | |
| # File Reading Tool | |
| def read_file(filepath: str) -> str: | |
| """Reads the content of a PYTHON file. | |
| Args: | |
| filepath (str): the path to the file to read. | |
| Returns: | |
| str: The content of the file. | |
| """ | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as file: | |
| content = file.read() | |
| # Calculate metadata for the prompt | |
| filename = os.path.basename(filepath) | |
| line_count = content.count("\\n") + 1 | |
| code_str = content.strip() | |
| # Compose the prompt | |
| prompt = f""" | |
| You are a Python expert and code reviewer. Analyze the following Python script and answer the question provided. | |
| Give Final Answer: the output of the code | |
| Script Length: {line_count} lines | |
| Filename: {filename} | |
| Python Code: | |
| ```python | |
| {code_str} | |
| ``` | |
| """ | |
| model = model_text | |
| # Call the model | |
| message = HumanMessage(content=prompt) | |
| response = model.invoke([message]) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Return the result | |
| if hasattr(response, "content") and isinstance(response.content, str): | |
| return response.content | |
| return str(response) | |
| except FileNotFoundError: | |
| return f"File not found: {filepath}" | |
| except IOError as e: | |
| return f"Error reading file: {str(e)}" | |
| # To run python code | |
| def execute_code(code: str): | |
| """Helper function to execute the code in a separate process.""" | |
| try: | |
| exec(code) | |
| except Exception as e: | |
| raise RuntimeError(f"Error executing the code: {str(e)}") from e | |
| def run_code_from_file(file_path: str, timeout: int = 10): | |
| """ | |
| Reads a Python file and executes it, with timeout handling. | |
| Args: | |
| file_path (str): The full path to the Python file to execute. | |
| timeout (int): The timeout in seconds before forcefully stopping the execution. | |
| """ | |
| # Check if the file exists | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"The file {file_path} does not exist.") | |
| # Read the file and get the code to execute | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| code = file.read() | |
| # Start a process to execute the code | |
| process = multiprocessing.Process(target=execute_code, args=(code,)) | |
| process.start() | |
| # Wait for the process to finish or timeout | |
| process.join(timeout) | |
| # If the process is still alive after the timeout, terminate it | |
| if process.is_alive(): | |
| process.terminate() # Stop the execution | |
| raise TimeoutError( | |
| f"The code execution took longer than {timeout} seconds and was terminated." | |
| ) | |
| # File Download Tool | |
| def download_file_from_url(url: str, directory: str) -> Dict[str, Union[str, None]]: | |
| """Downloads a file from a URL and saves it to a directory. | |
| Args: | |
| url (str): the URL to download the file from. | |
| directory (str): the directory to save the file to. | |
| Returns: | |
| Dict[str, Union[str, None]]: A dictionary containing the file type and path. | |
| """ | |
| response = requests.get(url, stream=True, timeout=10) | |
| response.raise_for_status() | |
| content_type = response.headers.get("content-type", "").lower() | |
| # Try to get filename from headers | |
| filename = None | |
| cd = response.headers.get("content-disposition", "") | |
| match = re.search(r"filename\*=UTF-8\'\'(.+)", cd) or re.search( | |
| r'filename="?([^"]+)"?', cd | |
| ) | |
| if match: | |
| filename = match.group(1) | |
| # If not in headers, try URL | |
| if not filename: | |
| filename = os.path.basename(url.split("?")[0]) | |
| # Fallback to generated filename | |
| if not filename: | |
| extension = { | |
| "image/jpeg": ".jpg", | |
| "image/png": ".png", | |
| "image/gif": ".gif", | |
| "audio/wav": ".wav", | |
| "audio/mpeg": ".mp3", | |
| "video/mp4": ".mp4", | |
| "text/plain": ".txt", | |
| "text/csv": ".csv", | |
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", | |
| "application/vnd.ms-excel": ".xls", | |
| "application/octet-stream": ".bin", | |
| }.get(content_type, ".bin") | |
| filename = f"downloaded_file{extension}" | |
| os.makedirs(directory, exist_ok=True) | |
| file_path = os.path.join(directory, filename) | |
| print(file_path) | |
| with open(file_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| # shutil.copy(file_path, os.getcwd()) | |
| return { | |
| "type": content_type, | |
| "filename": filename, | |
| "path": file_path, | |
| } | |
| # Text Extraction from Image Tool | |
| def extract_text_from_image(image_path: str) -> str: | |
| """Extracts text from an image using OCR. | |
| Args: | |
| image_path (str): the path to the image to extract text from. | |
| Returns: | |
| str: The text extracted from the image. | |
| """ | |
| image = Image.open(image_path) | |
| text = pytesseract.image_to_string(image) | |
| return f"Extracted text from image:\n\n{text}" | |
| # CSV Analysis Tool | |
| def analyze_csv_file(file_path: str, query: str) -> str: | |
| """Analyzes a CSV file and answers questions about its contents using an | |
| Ollama model. | |
| Args: | |
| file_path (str): The path to the CSV file to analyze. | |
| query (str): The question to answer about the CSV file. | |
| Returns: | |
| str: The result of the analysis. | |
| """ | |
| # Load the CSV file | |
| df = pd.read_csv(file_path) | |
| df_str = df.to_string(index=False) | |
| # Compose the prompt | |
| prompt = f""" | |
| You are a data analyst. Analyze the following CSV data and answer the question provided. | |
| CSV Dimensions: {df.shape[0]} rows × {df.shape[1]} columns | |
| CSV Data: | |
| {df_str} | |
| Please provide: | |
| 1. A summary of the data structure and content | |
| 2. Key patterns and insights | |
| 3. Potential data quality issues | |
| 4. Suggestions for analysis | |
| User Query: | |
| {query} | |
| Format your response in markdown with sections and bullet points. | |
| """ | |
| model = model_text | |
| # Call the model | |
| response = model.invoke([{"type": "text", "text": prompt}]) | |
| del df | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Return the result | |
| if hasattr(response, "content") and isinstance(response.content, str): | |
| return response.content | |
| return str(response) | |
| # Excel Analysis Tool | |
| def analyze_excel_file(file_path: str) -> str: | |
| """Analyzes an Excel file and answers questions about its contents using an | |
| Ollama model | |
| Args: | |
| file_path (str): the path to the Excel file to analyze. | |
| query (str): the question to answer about the Excel file. | |
| Returns: | |
| str: The result of the analysis. | |
| """ | |
| llm = model_text | |
| print(file_path) | |
| # Read all sheets from the Excel file | |
| excel_file = pd.ExcelFile(file_path) | |
| sheet_names = excel_file.sheet_names | |
| result = f"Excel file loaded with {len(sheet_names)} sheets: {', '.join(sheet_names)}\n\n" | |
| for sheet_name in sheet_names: | |
| df = pd.read_excel(file_path, sheet_name=sheet_name) | |
| df_str = df.to_string() | |
| # Build the prompt | |
| prompt = f"""Analyze the following Excel sheet data and answer the user's query. | |
| Sheet Name: {sheet_name} | |
| Dimensions: {len(df)} rows × {len(df.columns)} columns | |
| Data: | |
| {df_str} | |
| Please provide: | |
| 1. A summary of the data structure and content | |
| 2. List all the values of the columns in a proper table format. | |
| 3. If a file contains food items, assume it refers to the | |
| monetary value of the items, not the quantity sold. | |
| 4. If the File contains food items, make a new list which | |
| contains the name of all the food item in the column only (not including drinks). | |
| 5. If the file contains any time of monetary value its in USD with two decimal places. | |
| Format the response clearly using headings and bullet points.""" | |
| # Call the LLM with the prompt | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| result += f"=== Sheet: {sheet_name} ===\n" | |
| result += str(response.content) + "\n" | |
| result += "=" * 50 + "\n\n" | |
| del df | |
| gc.collect() | |
| excel_file.close() | |
| torch.cuda.empty_cache() | |
| return result | |
| # Audio Transcription Tool | |
| def transcribe_audio(audio_file_path: str) -> str: | |
| """Transcribes an audio file using Whisper's audio capabilities. | |
| Always give Final Answer of the question in a specific format for example list all the pages mentioned in increasing order in one line. | |
| Change vanilla extract to pure vanilla extract in the final answer. | |
| Args: | |
| audio_file_path (str): The path to the audio file to transcribe. | |
| mime_type (str): The MIME type of the audio file. | |
| Returns: | |
| str: The transcript of the audio file. | |
| Raises: | |
| ValueError: If the MIME type is not supported. | |
| """ | |
| model = whisper.load_model("base") | |
| result = model.transcribe(audio_file_path) | |
| assert isinstance(result["text"], str) | |
| del model | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return result["text"] | |
| def _extract_video_id(url: str) -> Optional[str]: | |
| """Extract video ID from YouTube URL. | |
| Args: | |
| url (str): the URL to the YouTube video. | |
| Returns: | |
| str: The video ID of the YouTube video. | |
| """ | |
| patterns = [ | |
| r"(?:youtube\.com\/watch\?v=|youtube\.com\/embed\/|youtu\.be\/)([^&\n?#]+)", | |
| r"(?:youtube\.com\/v\/|youtube\.com\/e\/|youtube\.com\/user\/[^\/]+\/|youtube\.com\/[^\/]+\/|youtube\.com\/embed\/|youtu\.be\/)([^&\n?#]+)", | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def transcribe_youtube(url: str) -> str: | |
| """ | |
| Transcribes a YouTube video using YouTube Transcript API or ChatOllama with Whisper as fallback. | |
| This function first tries to fetch the transcript of a YouTube video using the YouTube Transcript API. | |
| If the transcript is unavailable (e.g., due to captions being disabled), it falls back to using | |
| ChatOllama integrated with Whisper to transcribe the audio. | |
| Args: | |
| url (str): The URL to the YouTube video. | |
| Returns: | |
| str: The transcript of the YouTube video, or an error message if transcription fails. | |
| """ | |
| try: | |
| # Try using YouTube Transcript API | |
| video_id = _extract_video_id(url) | |
| transcript = "" | |
| transcript_chunks = YouTubeTranscriptApi.get_transcript( | |
| video_id, languages=["en"] | |
| ) | |
| for chunk in transcript_chunks: | |
| timestamp = str(timedelta(seconds=int(chunk["start"]))) | |
| transcript += f"[{timestamp}] {chunk['text']}\n" | |
| # Return API transcript if available | |
| if transcript.strip(): | |
| return transcript | |
| except (TranscriptsDisabled, NoTranscriptFound, Exception) as err: | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| # Download audio from YouTube | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "outtmpl": os.path.join(tmpdir, "audio.%(ext)s"), | |
| "quiet": True, | |
| "noplaylist": True, | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "wav", | |
| "preferredquality": "192", | |
| } | |
| ], | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| if info is not None: | |
| title = info.get("title", "Unknown Title") # Type:None | |
| duration = info.get("duration", 0) # in seconds | |
| uploader = info.get("uploader", "Unknown Uploader") | |
| else: | |
| title = "Unknown Title" | |
| duration = 0 | |
| uploader = "Unknown Uploader" | |
| audio_path = next( | |
| ( | |
| os.path.join(tmpdir, f) | |
| for f in os.listdir(tmpdir) | |
| if f.endswith(".wav") | |
| ), | |
| None, | |
| ) | |
| if not audio_path: | |
| raise RuntimeError("Failed to download or convert audio") from err | |
| # Use Whisper for initial transcription | |
| whisper_model = whisper.load_model("base") | |
| transcription = whisper_model.transcribe(audio_path, verbose=False) | |
| raw_transcript = transcription["text"] | |
| del whisper_model | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| result = f"Title: {title}\nUploader: {uploader}\nDuration: {duration} seconds\nTranscript: {raw_transcript}" | |
| return result | |
| except Exception as fallback_exc: | |
| raise RuntimeError("Fallback Transcription failed") from fallback_exc | |
| return "Transcription failed unexpectedly." | |
| def website_scrape(url: str) -> str: | |
| """scrapes a website and returns the text. | |
| args: | |
| url (str): the url to the website to scrape. | |
| returns: | |
| str: the text of the website. | |
| """ | |
| try: | |
| parsed_url = urlparse(url) | |
| if not parsed_url.scheme or not parsed_url.netloc: | |
| raise ValueError( | |
| f"Invalid URL: '{url}'. Call `duckduckgo_search` first to get a valid URL." | |
| ) | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.goto(url, wait_until="networkidle", timeout=60000) | |
| page.wait_for_load_state("domcontentloaded") | |
| html_content = page.content() | |
| browser.close() | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| relevant_text = "" | |
| # for header in soup.find_all(["h2", "h3"]): | |
| # heading_text = header.get_text().strip().lower() | |
| # if "discography" in heading_text or "studio albums" in heading_text: | |
| # section_texts = [] | |
| # tag = header.find_next_sibling() | |
| # while tag and ( | |
| # not isinstance(tag, Tag) or tag.name not in ["h2", "h3"] | |
| # ): | |
| # section_texts.append(tag.get_text(separator=" ", strip=True)) | |
| # tag = tag.find_next_sibling() | |
| # relevant_text = "\n\n".join(section_texts) | |
| # break | |
| # if not relevant_text: | |
| # article = soup.find("article") | |
| # if article: | |
| # relevant_text = article.get_text(separator=" ", strip=True) | |
| # if not relevant_text: | |
| relevant_text = soup.get_text(separator=" ", strip=True) | |
| # step 2: chunk the text (optional but recommended) | |
| def chunk_text(text, max_length=1000): | |
| words = text.split() | |
| chunks = [] | |
| for i in range(0, len(words), max_length): | |
| chunks.append(" ".join(words[i : i + max_length])) | |
| return chunks | |
| chunks = chunk_text(relevant_text) | |
| # return only the first 2–3 chunks to keep it concise | |
| return "\n\n".join(chunks[:5]) | |
| except ValueError as e: | |
| # Catch URL validation errors | |
| return str(e) | |
| except Exception as e: | |
| # Catch other unexpected errors | |
| return f"Scraping failed: {str(e)}" | |
| class SearchResult(TypedDict): | |
| query: str | |
| status: str | |
| attempt: int | |
| results: Optional[List[dict]] | |
| error: Optional[str] | |
| def duckduckgo_search(query: str, max_results: int = 10) -> SearchResult: | |
| """ | |
| Perform a DuckDuckGo search with retry and backoff. | |
| Use this FIRST before invoking and scraping tools. | |
| Args: | |
| query: The search query string. | |
| max_results: Max number of results to return (default 10). | |
| Returns: | |
| A dict with the query, results, status, attempt count, and any error. | |
| """ | |
| max_retries = 3 | |
| base_delay = 2 | |
| backoff_factor = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| with DDGS() as ddgs: | |
| results = ddgs.text(keywords=query, max_results=max_results) | |
| if results: | |
| formatted_results = [ | |
| { | |
| "title": result.get("title", ""), | |
| "url": result.get("href", ""), | |
| "body": result.get("body", ""), | |
| } | |
| for result in results | |
| ] | |
| return { | |
| "query": query, | |
| "status": "success", | |
| "attempt": attempt + 1, | |
| "results": formatted_results, | |
| "error": None, | |
| } | |
| except Exception as e: | |
| print(f"[DuckDuckGo Tool] Attempt {attempt + 1} failed: {e}") | |
| time.sleep(base_delay * (backoff_factor**attempt)) | |
| return { | |
| "query": query, | |
| "status": "failed", | |
| "attempt": max_retries, | |
| "results": None, | |
| "error": "Max retries exceeded or request failed.", | |
| } | |
| def reverse_decoder(question: str) -> str: | |
| """Decodes a reversed sentence if the input appears to be written backward. | |
| Args: | |
| question (str): The possibly reversed question string. | |
| Returns: | |
| str: The decoded sentence. | |
| """ | |
| # Remove leading punctuation if present | |
| cleaned = question.strip().strip(".!?") | |
| # Check if it's likely reversed (simple heuristic: mostly lowercase, reversed word order) | |
| reversed_text = cleaned[::-1] | |
| return reversed_text | |