Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| ExposureGPT - Simplified MCP OSINT Tool | |
| Single tool for intelligence gathering using Shodan + OpenAI | |
| """ | |
| import gradio as gr | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| try: | |
| import shodan | |
| except ImportError: | |
| shodan = None | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| OpenAI = None | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize services | |
| SHODAN_API_KEY = os.getenv('SHODAN_API_KEY') | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| # Initialize clients | |
| shodan_client = None | |
| openai_client = None | |
| if shodan and SHODAN_API_KEY and SHODAN_API_KEY != 'your_shodan_key': | |
| try: | |
| shodan_client = shodan.Shodan(SHODAN_API_KEY) | |
| logger.info("β Shodan API connected") | |
| except Exception as e: | |
| logger.error(f"β Shodan connection failed: {e}") | |
| if OpenAI and OPENAI_API_KEY: | |
| try: | |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| logger.info("β OpenAI API connected") | |
| except Exception as e: | |
| logger.error(f"β OpenAI connection failed: {e}") | |
| def intelligence_gathering(target: str) -> str: | |
| """ | |
| Comprehensive OSINT intelligence gathering for domains, IPs, or organizations. | |
| Uses LLM to interpret user input, then Shodan for infrastructure discovery and OpenAI for intelligent analysis. | |
| Provides security assessment, risk analysis, and actionable recommendations. | |
| Args: | |
| target: Domain, IP address, organization name, or natural language query to analyze | |
| Returns: | |
| Comprehensive intelligence report with AI-powered insights | |
| """ | |
| try: | |
| logger.info(f"π― Starting intelligence gathering for: {target}") | |
| # Step 1: LLM interprets and clarifies user input | |
| interpreted_target = _interpret_user_input(target) | |
| # Check if LLM needs clarification | |
| if interpreted_target.startswith("CLARIFICATION_NEEDED:"): | |
| return interpreted_target.replace("CLARIFICATION_NEEDED:", "") | |
| # If LLM interpreted the input, show what we're searching | |
| if interpreted_target != target: | |
| logger.info(f"π€ LLM interpreted '{target}' as '{interpreted_target}'") | |
| # Step 2: Gather raw intelligence data | |
| shodan_data = _gather_shodan_intelligence(interpreted_target) | |
| # Check if we have any data to work with | |
| if shodan_data.get('error') and not shodan_data.get('devices'): | |
| return f"β Cannot analyze {interpreted_target}: {shodan_data['error']}\n\nPlease configure API keys and try again." | |
| # Step 3: Generate AI-powered analysis | |
| ai_analysis = _generate_ai_analysis(interpreted_target, shodan_data) | |
| # Step 4: Format comprehensive report | |
| report = _format_intelligence_report(interpreted_target, shodan_data, ai_analysis) | |
| # Add interpretation note if target was changed | |
| if interpreted_target != target: | |
| report = f"π€ **LLM Interpretation**: Analyzed '{interpreted_target}' based on your query: '{target}'\n\n" + report | |
| logger.info(f"β Intelligence gathering completed for {interpreted_target}") | |
| return report | |
| except Exception as e: | |
| logger.error(f"β Error in intelligence gathering: {e}") | |
| return f"β Intelligence gathering failed for {target}: {str(e)}" | |
| def _interpret_user_input(user_input: str) -> str: | |
| """Use LLM to interpret and clarify user input before Shodan search""" | |
| if not openai_client: | |
| # If no OpenAI, just return the input as-is | |
| return user_input | |
| try: | |
| # Create interpretation prompt | |
| prompt = f""" | |
| You are an OSINT intelligence assistant helping users search for internet-exposed infrastructure using Shodan. | |
| USER INPUT: "{user_input}" | |
| Your job is to interpret this input and return the BEST target for Shodan analysis. Choose ONE of these response formats: | |
| FORMAT 1 - DIRECT SEARCH (when input is clear): | |
| Return ONLY the exact domain/IP to search, nothing else. | |
| Examples: | |
| - Input: "google.com" β Output: "google.com" | |
| - Input: "8.8.8.8" β Output: "8.8.8.8" | |
| - Input: "tesla" β Output: "tesla.com" | |
| - Input: "Microsoft Corporation" β Output: "microsoft.com" | |
| FORMAT 2 - CLARIFICATION NEEDED (when input is ambiguous): | |
| Start with "CLARIFICATION_NEEDED:" then ask for clarification. | |
| Examples: | |
| - Input: "that email company" β Output: "CLARIFICATION_NEEDED: I need clarification. Do you want to search:\nβ’ gmail.com (Google's email service)\nβ’ outlook.com (Microsoft's email service)\nβ’ yahoo.com (Yahoo's email service)\n\nPlease specify which email service you'd like to analyze." | |
| GUIDELINES: | |
| - Prefer .com domains for companies (tesla β tesla.com) | |
| - For clear company names, use their main domain | |
| - For ambiguous inputs, ask for clarification with specific options | |
| - Never include explanations in Format 1 responses | |
| - Always provide 2-3 specific options in clarification requests | |
| What should I search for?""" | |
| # Get LLM response | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert at interpreting user queries for OSINT analysis. Be concise and precise."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=200, | |
| temperature=0.1 | |
| ) | |
| result = response.choices[0].message.content.strip() | |
| logger.info(f"π€ LLM interpreted '{user_input}' β '{result}'") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"LLM interpretation failed: {e}") | |
| # Fallback to original input | |
| return user_input | |
| def _gather_shodan_intelligence(target: str) -> Dict: | |
| """Gather intelligence using Shodan API""" | |
| if not shodan_client: | |
| return {"error": "Shodan API not configured. Please set SHODAN_API_KEY environment variable.", "devices": [], "stats": {}} | |
| try: | |
| # Determine search strategy based on target type | |
| if _is_ip_address(target): | |
| # Direct IP lookup | |
| devices = [_get_host_info(target)] | |
| query = f"ip:{target}" | |
| elif _is_domain(target): | |
| # Domain-based search | |
| query = f"hostname:{target}" | |
| devices = _search_shodan(query) | |
| else: | |
| # Organization search | |
| query = f'org:"{target}"' | |
| devices = _search_shodan(query) | |
| # Get statistics | |
| stats = _get_shodan_stats(query) | |
| return { | |
| "devices": [d for d in devices if d], # Filter out None results | |
| "stats": stats, | |
| "query": query, | |
| "target_type": _classify_target_type(target) | |
| } | |
| except Exception as e: | |
| logger.error(f"Shodan intelligence gathering error: {e}") | |
| return {"error": str(e), "devices": [], "stats": {}} | |
| def _search_shodan(query: str, limit: int = 20) -> List[Dict]: | |
| """Search Shodan with rate limiting""" | |
| try: | |
| import time | |
| time.sleep(1.2) # Rate limiting | |
| results = shodan_client.search(query, limit=limit) | |
| devices = [] | |
| for result in results.get('matches', []): | |
| device = _parse_shodan_result(result) | |
| if device: | |
| devices.append(device) | |
| return devices | |
| except shodan.APIError as e: | |
| logger.warning(f"Shodan API error: {e}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Shodan search error: {e}") | |
| return [] | |
| def _get_host_info(ip: str) -> Optional[Dict]: | |
| """Get detailed host information""" | |
| try: | |
| import time | |
| time.sleep(1.2) # Rate limiting | |
| host_info = shodan_client.host(ip) | |
| return _parse_host_result(host_info) | |
| except Exception as e: | |
| logger.warning(f"Could not get host info for {ip}: {e}") | |
| return None | |
| def _parse_shodan_result(result: Dict) -> Optional[Dict]: | |
| """Parse Shodan search result""" | |
| try: | |
| ip = result.get('ip_str', 'Unknown') | |
| port = result.get('port', 80) | |
| product = result.get('product', 'Unknown') | |
| # Risk assessment | |
| vulns = result.get('vulns', []) | |
| risk_score = len(vulns) * 2 | |
| # High-risk ports | |
| high_risk_ports = [21, 23, 3389, 5900, 3306, 5432, 27017] | |
| if port in high_risk_ports: | |
| risk_score += 3 | |
| # Determine risk level | |
| if risk_score >= 6: | |
| risk_level = "HIGH" | |
| elif risk_score >= 3: | |
| risk_level = "MEDIUM" | |
| else: | |
| risk_level = "LOW" | |
| return { | |
| "ip": ip, | |
| "port": port, | |
| "product": product, | |
| "service": result.get('service_name', 'Unknown'), | |
| "vulns": vulns, | |
| "risk_level": risk_level, | |
| "risk_score": risk_score, | |
| "banner": result.get('data', '')[:200], | |
| "location": result.get('location', {}), | |
| "org": result.get('org', 'Unknown'), | |
| "timestamp": result.get('timestamp', '') | |
| } | |
| except Exception as e: | |
| logger.error(f"Error parsing Shodan result: {e}") | |
| return None | |
| def _parse_host_result(host_info: Dict) -> Dict: | |
| """Parse detailed host information""" | |
| try: | |
| ip = host_info.get('ip_str', 'Unknown') | |
| # Collect all services | |
| services = [] | |
| all_vulns = [] | |
| for service in host_info.get('data', []): | |
| services.append({ | |
| "port": service.get('port'), | |
| "service": service.get('service_name', 'Unknown'), | |
| "product": service.get('product', 'Unknown'), | |
| "version": service.get('version', ''), | |
| "vulns": service.get('vulns', []) | |
| }) | |
| all_vulns.extend(service.get('vulns', [])) | |
| # Calculate overall risk | |
| risk_score = len(all_vulns) * 2 + len(services) | |
| if risk_score >= 10: | |
| risk_level = "CRITICAL" | |
| elif risk_score >= 6: | |
| risk_level = "HIGH" | |
| elif risk_score >= 3: | |
| risk_level = "MEDIUM" | |
| else: | |
| risk_level = "LOW" | |
| return { | |
| "ip": ip, | |
| "services": services, | |
| "total_services": len(services), | |
| "all_vulns": list(set(all_vulns)), # Unique vulns | |
| "risk_level": risk_level, | |
| "risk_score": risk_score, | |
| "hostnames": host_info.get('hostnames', []), | |
| "org": host_info.get('org', 'Unknown'), | |
| "location": host_info.get('location', {}), | |
| "last_update": host_info.get('last_update', '') | |
| } | |
| except Exception as e: | |
| logger.error(f"Error parsing host result: {e}") | |
| return {"error": str(e)} | |
| def _get_shodan_stats(query: str) -> Dict: | |
| """Get search statistics""" | |
| try: | |
| import time | |
| time.sleep(1.2) # Rate limiting | |
| # Get basic count | |
| results = shodan_client.search(query, limit=0) | |
| total = results.get('total', 0) | |
| # Try to get facets for more stats | |
| try: | |
| facet_results = shodan_client.search(query, limit=0, facets='country,org,port') | |
| facets = facet_results.get('facets', {}) | |
| except: | |
| facets = {} | |
| return { | |
| "total_results": total, | |
| "countries": facets.get('country', [])[:5], | |
| "organizations": facets.get('org', [])[:5], | |
| "ports": facets.get('port', [])[:10] | |
| } | |
| except Exception as e: | |
| logger.warning(f"Could not get Shodan stats: {e}") | |
| return {"total_results": 0} | |
| def _generate_ai_analysis(target: str, shodan_data: Dict) -> str: | |
| """Generate AI-powered analysis using OpenAI""" | |
| if not openai_client: | |
| return "AI analysis not available. Please set OPENAI_API_KEY environment variable to enable AI-powered insights." | |
| try: | |
| # Prepare data for AI analysis | |
| devices = shodan_data.get('devices', []) | |
| stats = shodan_data.get('stats', {}) | |
| # Create enhanced dramatic analysis prompt | |
| prompt = f""" | |
| You are writing a CRITICAL SECURITY BRIEFING for executives about: {target} | |
| Write in an URGENT, DRAMATIC tone that demands immediate action. Use the format below EXACTLY: | |
| SHODAN INTELLIGENCE DATA: | |
| - Total results found: {stats.get('total_results', 0)} | |
| - Devices analyzed: {len(devices)} | |
| - Target type: {shodan_data.get('target_type', 'Unknown')} | |
| DEVICE DETAILS: | |
| """ | |
| # Add device summaries | |
| for i, device in enumerate(devices[:5], 1): # Limit to 5 devices | |
| risk_level = device.get('risk_level', 'UNKNOWN') | |
| vulns = device.get('vulns', []) | |
| prompt += f""" | |
| Device {i}: | |
| - IP: {device.get('ip', 'Unknown')} | |
| - Service: {device.get('service', 'Unknown')} on port {device.get('port', 'Unknown')} | |
| - Product: {device.get('product', 'Unknown')} | |
| - Risk Level: {risk_level} | |
| - Vulnerabilities: {len(vulns)} found | |
| - Organization: {device.get('org', 'Unknown')} | |
| """ | |
| prompt += f""" | |
| Write a DRAMATIC SECURITY BRIEFING using this EXACT format: | |
| **CRITICAL SECURITY ASSESSMENT - {target.upper()}** | |
| ======================================================================================================================== | |
| **EXECUTIVE ALERT: ACTIVE SECURITY THREATS DETECTED** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| **Target:** {target} | |
| **Assessment Date:** {datetime.now().strftime('%B %d, %Y at %H:%M UTC')} | |
| **Overall Risk:** [CRITICAL/HIGH/MEDIUM/LOW] | |
| **Immediate Action Required:** [YES/NO] | |
| **THE BOTTOM LINE** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β’ [Write 3-5 bullet points about immediate threats] | |
| β’ [Include specific numbers of vulnerabilities found] | |
| β’ [Mention financial risk estimates] | |
| β’ [Reference real-world breach examples] | |
| **THREAT LANDSCAPE - WHAT ATTACKERS SEE** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| [List specific IPs, services, vulnerabilities found with dramatic language] | |
| [Include specific CVE numbers if any vulnerabilities found] | |
| [Mention exposed services and their risks] | |
| **ACTIVE ATTACK VECTORS - READY TO EXPLOIT** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| [Describe how attackers could exploit the findings] | |
| [Include timeframes and success rates] | |
| [Reference tools attackers might use] | |
| **FINANCIAL EXPOSURE ANALYSIS** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| [Estimate breach costs, downtime costs, compliance fines] | |
| [Compare prevention costs vs breach costs] | |
| **UNCOMFORTABLE TRUTHS** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| [List harsh realities about their security posture] | |
| [Include industry statistics and examples] | |
| **FINAL VERDICT** | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| **Security Posture:** [Assessment] | |
| **Exploitation Difficulty:** [Easy/Medium/Hard] | |
| **Financial Risk:** [Amount range] | |
| **Action Required:** [Immediate/Urgent/Soon] | |
| Write this in an URGENT, DRAMATIC tone that will make executives take immediate action. Use specific details and create a sense of urgency.""" | |
| # Generate AI response | |
| response = openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a senior cybersecurity consultant writing URGENT executive briefings. Use dramatic, action-oriented language that creates immediate urgency. Include specific technical details, financial impact estimates, and real-world breach examples. Make executives understand the gravity of security threats through compelling, fear-based messaging."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=2500, | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| logger.error(f"AI analysis error: {e}") | |
| return f"AI analysis failed: {str(e)}" | |
| def _format_intelligence_report(target: str, shodan_data: Dict, ai_analysis: str) -> str: | |
| """Format comprehensive intelligence report""" | |
| devices = shodan_data.get('devices', []) | |
| stats = shodan_data.get('stats', {}) | |
| target_type = shodan_data.get('target_type', 'Unknown') | |
| # Count risk levels | |
| risk_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} | |
| for device in devices: | |
| risk_level = device.get('risk_level', 'LOW') | |
| risk_counts[risk_level] += 1 | |
| # Create report | |
| report = f"""# π― Intelligence Report: {target} | |
| ## π Executive Summary | |
| - **Target**: {target} ({target_type}) | |
| - **Shodan Results**: {stats.get('total_results', 0)} total matches | |
| - **Devices Analyzed**: {len(devices)} | |
| - **Risk Distribution**: {risk_counts['CRITICAL']} Critical, {risk_counts['HIGH']} High, {risk_counts['MEDIUM']} Medium, {risk_counts['LOW']} Low | |
| ## π€ AI Security Analysis | |
| {ai_analysis} | |
| ## π Technical Intelligence | |
| ### Infrastructure Overview | |
| """ | |
| if devices: | |
| report += f"Found {len(devices)} internet-exposed devices:\n\n" | |
| for i, device in enumerate(devices[:10], 1): # Show top 10 devices | |
| ip = device.get('ip', 'Unknown') | |
| service = device.get('service', 'Unknown') | |
| port = device.get('port', 'Unknown') | |
| risk = device.get('risk_level', 'LOW') | |
| vulns = device.get('vulns', []) | |
| org = device.get('org', 'Unknown') | |
| report += f"""**Device {i}: {ip}** | |
| - Service: {service} (Port {port}) | |
| - Risk Level: {risk} | |
| - Vulnerabilities: {len(vulns)} found | |
| - Organization: {org} | |
| """ | |
| if vulns: | |
| report += f"- CVEs: {', '.join(vulns[:3])}{'...' if len(vulns) > 3 else ''}\n" | |
| report += "\n" | |
| else: | |
| report += "No exposed devices found in Shodan database.\n\n" | |
| # Add statistics if available | |
| if stats.get('total_results', 0) > 0: | |
| report += f"### π Global Statistics\n" | |
| report += f"- **Total Shodan Results**: {stats.get('total_results', 0)}\n" | |
| if stats.get('countries'): | |
| report += f"- **Top Countries**: {', '.join([c['value'] for c in stats['countries'][:3]])}\n" | |
| if stats.get('organizations'): | |
| report += f"- **Top Organizations**: {', '.join([o['value'] for o in stats['organizations'][:3]])}\n" | |
| if stats.get('ports'): | |
| report += f"- **Common Ports**: {', '.join([str(p['value']) for p in stats['ports'][:5]])}\n" | |
| # Add metadata | |
| report += f""" | |
| ## β‘ Analysis Metadata | |
| - **Timestamp**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| - **Sources**: Shodan Internet Intelligence, OpenAI GPT-4o-mini | |
| - **Query**: {shodan_data.get('query', 'N/A')} | |
| - **Analysis Type**: Comprehensive OSINT Assessment | |
| --- | |
| *Generated by ExposureGPT - Simplified OSINT Intelligence Platform* | |
| """ | |
| return report | |
| def _is_ip_address(target: str) -> bool: | |
| """Check if target is an IP address""" | |
| import re | |
| ip_pattern = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$' | |
| return bool(re.match(ip_pattern, target)) | |
| def _is_domain(target: str) -> bool: | |
| """Check if target is a domain""" | |
| import re | |
| # More flexible domain pattern that handles subdomains and various TLDs | |
| domain_pattern = r'^[a-zA-Z0-9][a-zA-Z0-9.-]*\.[a-zA-Z]{2,}$' | |
| return bool(re.match(domain_pattern, target)) and '.' in target | |
| def _classify_target_type(target: str) -> str: | |
| """Classify the type of target""" | |
| if _is_ip_address(target): | |
| return "IP Address" | |
| elif _is_domain(target): | |
| return "Domain" | |
| else: | |
| return "Organization" | |
| def create_interface(): | |
| """Create simple Gradio interface""" | |
| # Use proper Gradio theme with Google Fonts to avoid 404 errors | |
| theme = gr.themes.Soft( | |
| primary_hue=gr.themes.colors.pink, | |
| secondary_hue=gr.themes.colors.cyan, | |
| neutral_hue=gr.themes.colors.slate, | |
| font=[gr.themes.GoogleFont("Inter"), "sans-serif"] | |
| ) | |
| # Minimal CSS - just background and center content | |
| css = """ | |
| .gradio-container { | |
| background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%) !important; | |
| max-width: 1200px !important; | |
| margin: 0 auto !important; | |
| padding: 20px !important; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="ExposureGPT - Simplified OSINT Intelligence", | |
| theme=theme, | |
| css=css | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π― ExposureGPT - Simplified OSINT Intelligence | |
| **π€ Smart OSINT analysis using LLM + Shodan + OpenAI** | |
| Enter anything - the LLM will interpret your input and ask for clarification if needed! | |
| Try: "Tesla", "that social media company", "google.com", or "8.8.8.8" | |
| """) | |
| target_input = gr.Textbox( | |
| label="π― Target to Analyze", | |
| placeholder="Enter anything: 'Tesla', 'that social media company', 'google.com', '8.8.8.8'", | |
| value="Tesla" | |
| ) | |
| analyze_btn = gr.Button( | |
| "π Run Intelligence Gathering", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| output_report = gr.Markdown( | |
| value="Enter a target above and click the button to begin analysis..." | |
| ) | |
| analyze_btn.click( | |
| fn=intelligence_gathering, | |
| inputs=[target_input], | |
| outputs=[output_report] | |
| ) | |
| gr.Markdown(""" | |
| ### π€ Model Context Protocol (MCP) Server Details | |
| This application automatically serves as an **MCP server** that AI assistants can connect to for real-time OSINT intelligence gathering. | |
| **π MCP Endpoint**: `https://acloudcenter-exposuregpt.hf.space/gradio_api/mcp/sse` | |
| **π Available Tool**: `intelligence_gathering(target: str)` | |
| - **Input**: Domain name, IP address, or organization name | |
| - **Output**: Comprehensive security intelligence report including: | |
| - Exact IP addresses and geographic locations | |
| - Exposed services, ports, and product versions | |
| - CVE vulnerabilities with severity scores | |
| - Risk assessment with actionable recommendations | |
| - Network infrastructure and hosting details | |
| **π§ Claude Desktop Configuration**: | |
| ```json | |
| { | |
| "mcpServers": { | |
| "exposuregpt": { | |
| "command": "npx", | |
| "args": ["mcp-remote", "https://acloudcenter-exposuregpt.hf.space/gradio_api/mcp/sse"] | |
| } | |
| } | |
| } | |
| ``` | |
| **β‘ Powered by**: Shodan Internet Intelligence + OpenAI GPT-4o-mini + Gradio Framework | |
| """) | |
| return demo | |
| def main(): | |
| """Main entry point""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="ExposureGPT - Simplified OSINT Intelligence") | |
| parser.add_argument("--cli", metavar="TARGET", help="Run CLI analysis") | |
| parser.add_argument("--port", type=int, default=7860, help="Port for web interface") | |
| parser.add_argument("--share", action="store_true", help="Create public link") | |
| args = parser.parse_args() | |
| # CLI mode | |
| if args.cli: | |
| print(f"\nπ― ExposureGPT CLI Analysis: {args.cli}") | |
| print("=" * 60) | |
| result = intelligence_gathering(args.cli) | |
| print(result) | |
| return | |
| # Web interface | |
| demo = create_interface() | |
| print(f"π ExposureGPT launching on port {args.port}") | |
| print(f"π Interface: http://localhost:{args.port}") | |
| print(f"π€ MCP Endpoint: http://localhost:{args.port}/gradio_api/mcp/sse") | |
| # Enable MCP server via environment variable (alternative to mcp_server=True) | |
| os.environ['GRADIO_MCP_SERVER'] = 'True' | |
| demo.launch( | |
| server_port=args.port, | |
| share=args.share, | |
| server_name="0.0.0.0", | |
| ssr_mode=False # Fix font loading 404 errors | |
| ) | |
| if __name__ == "__main__": | |
| main() |