Spaces:
Running
Running
| """ | |
| Shared utilities for HuggingFace Space Generator | |
| Consolidated common functionality following Gradio best practices | |
| """ | |
| import os | |
| import re | |
| import json | |
| import requests | |
| import tempfile | |
| from datetime import datetime | |
| from typing import List, Dict, Tuple, Optional, Any | |
| from urllib.parse import urlparse | |
| from bs4 import BeautifulSoup | |
| import gradio as gr | |
| # Theme management following Gradio 5.x best practices | |
| AVAILABLE_THEMES = { | |
| "Default": gr.themes.Default(), | |
| "Soft": gr.themes.Soft(), | |
| "Glass": gr.themes.Glass(), | |
| "Monochrome": gr.themes.Monochrome(), | |
| "Base": gr.themes.Base(), | |
| } | |
| def get_theme(theme_name: str) -> gr.themes.Base: | |
| """Get a Gradio theme by name with safe fallback""" | |
| return AVAILABLE_THEMES.get(theme_name, gr.themes.Default()) | |
| def validate_url_domain(url: str) -> bool: | |
| """Basic URL domain validation""" | |
| try: | |
| parsed = urlparse(url) | |
| return bool(parsed.netloc and '.' in parsed.netloc) | |
| except: | |
| return False | |
| def extract_urls_from_text(text: str) -> List[str]: | |
| """Extract and validate URLs from text""" | |
| if not text: | |
| return [] | |
| url_pattern = r'https?://[^\s<>"{}|\\^`\[\]"]+' | |
| urls = re.findall(url_pattern, text) | |
| # Validate and clean URLs | |
| validated_urls = [] | |
| for url in urls: | |
| # Remove trailing punctuation | |
| url = url.rstrip('.,!?;:') | |
| # Basic validation | |
| if validate_url_domain(url) and len(url) > 10: | |
| validated_urls.append(url) | |
| return validated_urls | |
| def fetch_url_content(url: str, timeout: int = 15, max_chars: int = 4000) -> str: | |
| """Enhanced URL content fetching with better error handling""" | |
| if not validate_url_domain(url): | |
| return f"Invalid URL format: {url}" | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'Connection': 'keep-alive' | |
| } | |
| response = requests.get(url, timeout=timeout, headers=headers) | |
| response.raise_for_status() | |
| # Parse content | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove non-content elements | |
| for element in soup(["script", "style", "nav", "header", "footer", "aside", "form"]): | |
| element.decompose() | |
| # Extract main content | |
| main_content = ( | |
| soup.find('main') or | |
| soup.find('article') or | |
| soup.find('div', class_=lambda x: bool(x and 'content' in x.lower())) or | |
| soup | |
| ) | |
| text = main_content.get_text() | |
| # Clean text | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk and len(chunk) > 2) | |
| # Smart truncation | |
| if len(text) > max_chars: | |
| # Try to end at sentence boundary | |
| truncated = text[:max_chars] | |
| last_period = truncated.rfind('.') | |
| if last_period > max_chars * 0.8: | |
| text = truncated[:last_period + 1] | |
| else: | |
| text = truncated + "..." | |
| return text if text.strip() else "No readable content found at this URL" | |
| except requests.exceptions.Timeout: | |
| return f"Timeout error fetching {url} ({timeout}s limit exceeded)" | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching {url}: {str(e)}" | |
| except Exception as e: | |
| return f"Error processing content from {url}: {str(e)}" | |
| def create_safe_filename(base_name: str, suffix: str = ".md", prefix: str = "", | |
| include_timestamp: bool = True) -> str: | |
| """Create a safe filename with consistent sanitization""" | |
| # Sanitize base name | |
| safe_name = re.sub(r'[^\w\s-]', '', base_name.strip()) if base_name else "export" | |
| safe_name = re.sub(r'[-\s]+', '_', safe_name) | |
| # Add prefix if provided | |
| if prefix: | |
| safe_prefix = re.sub(r'[^\w\s-]', '', prefix.strip()) | |
| safe_prefix = re.sub(r'[-\s]+', '_', safe_prefix) | |
| safe_name = f"{safe_prefix}_{safe_name}" if safe_name else safe_prefix | |
| # Add timestamp if requested | |
| if include_timestamp: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| safe_name = f"{safe_name}_{timestamp}" if safe_name else f"export_{timestamp}" | |
| # Ensure we have a valid name | |
| if not safe_name: | |
| safe_name = "export" | |
| return f"{safe_name}{suffix}" | |
| def export_conversation_to_markdown(history: List[Dict[str, str]], | |
| config: Optional[Dict[str, Any]] = None) -> str: | |
| """Export conversation history to markdown format""" | |
| if not history: | |
| return "No conversation to export." | |
| # Build header | |
| markdown_content = f"""# Conversation Export | |
| Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| """ | |
| # Add configuration info if provided | |
| if config: | |
| markdown_content += f""" | |
| Assistant: {config.get('name', 'AI Assistant')} | |
| Model: {config.get('model', 'Unknown')} | |
| """ | |
| markdown_content += "\n---\n\n" | |
| # Process messages | |
| message_pair_count = 0 | |
| for message in history: | |
| if isinstance(message, dict): | |
| role = message.get('role', 'unknown') | |
| content = message.get('content', '') | |
| if role == 'user': | |
| message_pair_count += 1 | |
| markdown_content += f"## User Message {message_pair_count}\n\n{content}\n\n" | |
| elif role == 'assistant': | |
| markdown_content += f"## Assistant Response {message_pair_count}\n\n{content}\n\n---\n\n" | |
| return markdown_content | |
| def process_file_upload(file_path: str, max_chars: int = 8000) -> str: | |
| """Process uploaded files using Gradio best practices""" | |
| if not file_path or not os.path.exists(file_path): | |
| return "❌ File not found" | |
| try: | |
| file_size = os.path.getsize(file_path) | |
| file_name = os.path.basename(file_path) | |
| _, ext = os.path.splitext(file_path.lower()) | |
| # Text file extensions | |
| text_extensions = { | |
| '.txt', '.md', '.markdown', '.rst', | |
| '.py', '.js', '.jsx', '.ts', '.tsx', '.json', '.yaml', '.yml', | |
| '.html', '.htm', '.xml', '.css', '.scss', | |
| '.java', '.c', '.cpp', '.h', '.cs', '.go', '.rs', | |
| '.sh', '.bash', '.log', '.csv', '.sql' | |
| } | |
| if ext in text_extensions: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read(max_chars) | |
| if len(content) == max_chars: | |
| content += "\n... [truncated]" | |
| return f"📄 **{file_name}** ({file_size:,} bytes)\n```{ext[1:]}\n{content}\n```" | |
| # Special file types | |
| elif ext == '.pdf': | |
| return f"📑 **{file_name}** (PDF, {file_size:,} bytes)\n⚠️ PDF support requires additional libraries" | |
| elif ext in {'.jpg', '.jpeg', '.png', '.gif', '.webp'}: | |
| return f"🖼️ **{file_name}** (Image, {file_size:,} bytes)" | |
| elif ext in {'.xlsx', '.xls'}: | |
| return f"📊 **{file_name}** (Spreadsheet, {file_size:,} bytes)" | |
| elif ext in {'.zip', '.tar', '.gz', '.rar'}: | |
| return f"🗜️ **{file_name}** (Archive, {file_size:,} bytes)" | |
| else: | |
| return f"📎 **{file_name}** ({ext or 'no extension'}, {file_size:,} bytes)" | |
| except Exception as e: | |
| return f"❌ Error processing file: {str(e)}" | |
| class ConfigurationManager: | |
| """Manage configuration with validation and persistence""" | |
| def __init__(self, default_config: Dict[str, Any]): | |
| self.default_config = default_config | |
| self.config_path = "config.json" | |
| self.backup_dir = "config_backups" | |
| def load_config(self) -> Dict[str, Any]: | |
| """Load configuration with fallback to defaults""" | |
| try: | |
| with open(self.config_path, 'r') as f: | |
| config = json.load(f) | |
| print("✅ Loaded configuration from config.json") | |
| return config | |
| except FileNotFoundError: | |
| print("ℹ️ No config.json found, using defaults") | |
| self.save_config(self.default_config) | |
| return self.default_config.copy() | |
| except Exception as e: | |
| print(f"⚠️ Error loading config: {e}") | |
| return self.default_config.copy() | |
| def save_config(self, config: Dict[str, Any]) -> bool: | |
| """Save configuration with backup""" | |
| try: | |
| # Create backup | |
| self._create_backup() | |
| # Save new config | |
| with open(self.config_path, 'w') as f: | |
| json.dump(config, f, indent=2) | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error saving config: {e}") | |
| return False | |
| def _create_backup(self): | |
| """Create timestamped backup of current config""" | |
| try: | |
| if os.path.exists(self.config_path): | |
| os.makedirs(self.backup_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| backup_path = os.path.join(self.backup_dir, f"config_{timestamp}.json") | |
| with open(self.config_path, 'r') as source: | |
| with open(backup_path, 'w') as dest: | |
| dest.write(source.read()) | |
| # Keep only last 10 backups | |
| self._cleanup_old_backups() | |
| except Exception as e: | |
| print(f"Warning: Could not create backup: {e}") | |
| def _cleanup_old_backups(self): | |
| """Remove old backup files, keeping only the most recent 10""" | |
| try: | |
| backups = sorted([ | |
| f for f in os.listdir(self.backup_dir) | |
| if f.endswith('.json') | |
| ]) | |
| if len(backups) > 10: | |
| for old_backup in backups[:-10]: | |
| os.remove(os.path.join(self.backup_dir, old_backup)) | |
| except Exception: | |
| pass | |
| def validate_config(self, config: Dict[str, Any]) -> Tuple[bool, str]: | |
| """Validate configuration structure and values""" | |
| required_fields = ['name', 'description', 'system_prompt', 'model', | |
| 'temperature', 'max_tokens', 'api_key_var'] | |
| # Check required fields | |
| for field in required_fields: | |
| if field not in config: | |
| return False, f"Missing required field: {field}" | |
| # Validate temperature | |
| if not (0 <= config.get('temperature', 0.7) <= 2): | |
| return False, "Temperature must be between 0 and 2" | |
| # Validate max_tokens | |
| if not (50 <= config.get('max_tokens', 750) <= 4096): | |
| return False, "Max tokens must be between 50 and 4096" | |
| return True, "Configuration is valid" | |
| # Model definitions with metadata | |
| AVAILABLE_MODELS = [ | |
| # Google models | |
| { | |
| "id": "google/gemini-2.0-flash-001", | |
| "name": "Gemini 2.0 Flash", | |
| "description": "Fast, reliable for general tasks" | |
| }, | |
| { | |
| "id": "google/gemma-3-27b-it", | |
| "name": "Gemma 3 27B", | |
| "description": "High-performance open model" | |
| }, | |
| # Anthropic models | |
| { | |
| "id": "anthropic/claude-3.5-sonnet", | |
| "name": "Claude 3.5 Sonnet", | |
| "description": "Superior reasoning and analysis" | |
| }, | |
| { | |
| "id": "anthropic/claude-3.5-haiku", | |
| "name": "Claude 3.5 Haiku", | |
| "description": "Fast, efficient Claude model" | |
| }, | |
| # OpenAI models | |
| { | |
| "id": "openai/gpt-4o-mini", | |
| "name": "GPT-4o Mini", | |
| "description": "Fast, cost-effective GPT-4o variant" | |
| }, | |
| { | |
| "id": "openai/gpt-4o-mini-search-preview", | |
| "name": "GPT-4o Mini Search", | |
| "description": "GPT-4o with search capabilities" | |
| }, | |
| { | |
| "id": "openai/gpt-oss-120b", | |
| "name": "GPT-OSS 120B", | |
| "description": "Open source GPT model" | |
| }, | |
| # MistralAI models | |
| { | |
| "id": "mistralai/mistral-medium-3", | |
| "name": "Mistral Medium 3", | |
| "description": "Balanced performance and efficiency" | |
| }, | |
| # DeepSeek models | |
| { | |
| "id": "deepseek/deepseek-r1-distill-qwen-32b", | |
| "name": "DeepSeek R1 Distill Qwen 32B", | |
| "description": "Efficient distilled model" | |
| }, | |
| # NVIDIA models | |
| { | |
| "id": "nvidia/llama-3.1-nemotron-70b-instruct", | |
| "name": "Llama 3.1 Nemotron 70B", | |
| "description": "NVIDIA-optimized Llama" | |
| }, | |
| # Qwen models | |
| { | |
| "id": "qwen/qwen3-30b-a3b-instruct-2507", | |
| "name": "Qwen3 30B", | |
| "description": "Alibaba's multilingual model" | |
| } | |
| ] | |
| def get_model_choices() -> List[str]: | |
| """Get list of available model IDs""" | |
| return [model["id"] for model in AVAILABLE_MODELS] | |
| def get_model_info(model_id: str) -> Dict[str, str]: | |
| """Get information about a specific model""" | |
| for model in AVAILABLE_MODELS: | |
| if model["id"] == model_id: | |
| return model | |
| return {"id": model_id, "name": model_id, "description": "Custom model"} |