#!/usr/bin/env python3 """ ExposureGPT - Simplified MCP OSINT Tool Single tool for intelligence gathering using Shodan + OpenAI """ import gradio as gr import json import logging import os import sys try: import shodan except ImportError: shodan = None try: from openai import OpenAI except ImportError: OpenAI = None from typing import Dict, List, Optional from datetime import datetime from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize services SHODAN_API_KEY = os.getenv('SHODAN_API_KEY') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') # Initialize clients shodan_client = None openai_client = None if shodan and SHODAN_API_KEY and SHODAN_API_KEY != 'your_shodan_key': try: shodan_client = shodan.Shodan(SHODAN_API_KEY) logger.info("āœ… Shodan API connected") except Exception as e: logger.error(f"āŒ Shodan connection failed: {e}") if OpenAI and OPENAI_API_KEY: try: openai_client = OpenAI(api_key=OPENAI_API_KEY) logger.info("āœ… OpenAI API connected") except Exception as e: logger.error(f"āŒ OpenAI connection failed: {e}") def intelligence_gathering(target: str) -> str: """ Comprehensive OSINT intelligence gathering for domains, IPs, or organizations. Uses LLM to interpret user input, then Shodan for infrastructure discovery and OpenAI for intelligent analysis. Provides security assessment, risk analysis, and actionable recommendations. Args: target: Domain, IP address, organization name, or natural language query to analyze Returns: Comprehensive intelligence report with AI-powered insights """ try: logger.info(f"šŸŽÆ Starting intelligence gathering for: {target}") # Step 1: LLM interprets and clarifies user input interpreted_target = _interpret_user_input(target) # Check if LLM needs clarification if interpreted_target.startswith("CLARIFICATION_NEEDED:"): return interpreted_target.replace("CLARIFICATION_NEEDED:", "") # If LLM interpreted the input, show what we're searching if interpreted_target != target: logger.info(f"šŸ¤– LLM interpreted '{target}' as '{interpreted_target}'") # Step 2: Gather raw intelligence data shodan_data = _gather_shodan_intelligence(interpreted_target) # Check if we have any data to work with if shodan_data.get('error') and not shodan_data.get('devices'): return f"āŒ Cannot analyze {interpreted_target}: {shodan_data['error']}\n\nPlease configure API keys and try again." # Step 3: Generate AI-powered analysis ai_analysis = _generate_ai_analysis(interpreted_target, shodan_data) # Step 4: Format comprehensive report report = _format_intelligence_report(interpreted_target, shodan_data, ai_analysis) # Add interpretation note if target was changed if interpreted_target != target: report = f"šŸ¤– **LLM Interpretation**: Analyzed '{interpreted_target}' based on your query: '{target}'\n\n" + report logger.info(f"āœ… Intelligence gathering completed for {interpreted_target}") return report except Exception as e: logger.error(f"āŒ Error in intelligence gathering: {e}") return f"āŒ Intelligence gathering failed for {target}: {str(e)}" def _interpret_user_input(user_input: str) -> str: """Use LLM to interpret and clarify user input before Shodan search""" if not openai_client: # If no OpenAI, just return the input as-is return user_input try: # Create interpretation prompt prompt = f""" You are an OSINT intelligence assistant helping users search for internet-exposed infrastructure using Shodan. USER INPUT: "{user_input}" Your job is to interpret this input and return the BEST target for Shodan analysis. Choose ONE of these response formats: FORMAT 1 - DIRECT SEARCH (when input is clear): Return ONLY the exact domain/IP to search, nothing else. Examples: - Input: "google.com" → Output: "google.com" - Input: "8.8.8.8" → Output: "8.8.8.8" - Input: "tesla" → Output: "tesla.com" - Input: "Microsoft Corporation" → Output: "microsoft.com" FORMAT 2 - CLARIFICATION NEEDED (when input is ambiguous): Start with "CLARIFICATION_NEEDED:" then ask for clarification. Examples: - Input: "that email company" → Output: "CLARIFICATION_NEEDED: I need clarification. Do you want to search:\n• gmail.com (Google's email service)\n• outlook.com (Microsoft's email service)\n• yahoo.com (Yahoo's email service)\n\nPlease specify which email service you'd like to analyze." GUIDELINES: - Prefer .com domains for companies (tesla → tesla.com) - For clear company names, use their main domain - For ambiguous inputs, ask for clarification with specific options - Never include explanations in Format 1 responses - Always provide 2-3 specific options in clarification requests What should I search for?""" # Get LLM response response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are an expert at interpreting user queries for OSINT analysis. Be concise and precise."}, {"role": "user", "content": prompt} ], max_tokens=200, temperature=0.1 ) result = response.choices[0].message.content.strip() logger.info(f"šŸ¤– LLM interpreted '{user_input}' → '{result}'") return result except Exception as e: logger.warning(f"LLM interpretation failed: {e}") # Fallback to original input return user_input def _gather_shodan_intelligence(target: str) -> Dict: """Gather intelligence using Shodan API""" if not shodan_client: return {"error": "Shodan API not configured. Please set SHODAN_API_KEY environment variable.", "devices": [], "stats": {}} try: # Determine search strategy based on target type if _is_ip_address(target): # Direct IP lookup devices = [_get_host_info(target)] query = f"ip:{target}" elif _is_domain(target): # Domain-based search query = f"hostname:{target}" devices = _search_shodan(query) else: # Organization search query = f'org:"{target}"' devices = _search_shodan(query) # Get statistics stats = _get_shodan_stats(query) return { "devices": [d for d in devices if d], # Filter out None results "stats": stats, "query": query, "target_type": _classify_target_type(target) } except Exception as e: logger.error(f"Shodan intelligence gathering error: {e}") return {"error": str(e), "devices": [], "stats": {}} def _search_shodan(query: str, limit: int = 20) -> List[Dict]: """Search Shodan with rate limiting""" try: import time time.sleep(1.2) # Rate limiting results = shodan_client.search(query, limit=limit) devices = [] for result in results.get('matches', []): device = _parse_shodan_result(result) if device: devices.append(device) return devices except shodan.APIError as e: logger.warning(f"Shodan API error: {e}") return [] except Exception as e: logger.error(f"Shodan search error: {e}") return [] def _get_host_info(ip: str) -> Optional[Dict]: """Get detailed host information""" try: import time time.sleep(1.2) # Rate limiting host_info = shodan_client.host(ip) return _parse_host_result(host_info) except Exception as e: logger.warning(f"Could not get host info for {ip}: {e}") return None def _parse_shodan_result(result: Dict) -> Optional[Dict]: """Parse Shodan search result""" try: ip = result.get('ip_str', 'Unknown') port = result.get('port', 80) product = result.get('product', 'Unknown') # Risk assessment vulns = result.get('vulns', []) risk_score = len(vulns) * 2 # High-risk ports high_risk_ports = [21, 23, 3389, 5900, 3306, 5432, 27017] if port in high_risk_ports: risk_score += 3 # Determine risk level if risk_score >= 6: risk_level = "HIGH" elif risk_score >= 3: risk_level = "MEDIUM" else: risk_level = "LOW" return { "ip": ip, "port": port, "product": product, "service": result.get('service_name', 'Unknown'), "vulns": vulns, "risk_level": risk_level, "risk_score": risk_score, "banner": result.get('data', '')[:200], "location": result.get('location', {}), "org": result.get('org', 'Unknown'), "timestamp": result.get('timestamp', '') } except Exception as e: logger.error(f"Error parsing Shodan result: {e}") return None def _parse_host_result(host_info: Dict) -> Dict: """Parse detailed host information""" try: ip = host_info.get('ip_str', 'Unknown') # Collect all services services = [] all_vulns = [] for service in host_info.get('data', []): services.append({ "port": service.get('port'), "service": service.get('service_name', 'Unknown'), "product": service.get('product', 'Unknown'), "version": service.get('version', ''), "vulns": service.get('vulns', []) }) all_vulns.extend(service.get('vulns', [])) # Calculate overall risk risk_score = len(all_vulns) * 2 + len(services) if risk_score >= 10: risk_level = "CRITICAL" elif risk_score >= 6: risk_level = "HIGH" elif risk_score >= 3: risk_level = "MEDIUM" else: risk_level = "LOW" return { "ip": ip, "services": services, "total_services": len(services), "all_vulns": list(set(all_vulns)), # Unique vulns "risk_level": risk_level, "risk_score": risk_score, "hostnames": host_info.get('hostnames', []), "org": host_info.get('org', 'Unknown'), "location": host_info.get('location', {}), "last_update": host_info.get('last_update', '') } except Exception as e: logger.error(f"Error parsing host result: {e}") return {"error": str(e)} def _get_shodan_stats(query: str) -> Dict: """Get search statistics""" try: import time time.sleep(1.2) # Rate limiting # Get basic count results = shodan_client.search(query, limit=0) total = results.get('total', 0) # Try to get facets for more stats try: facet_results = shodan_client.search(query, limit=0, facets='country,org,port') facets = facet_results.get('facets', {}) except: facets = {} return { "total_results": total, "countries": facets.get('country', [])[:5], "organizations": facets.get('org', [])[:5], "ports": facets.get('port', [])[:10] } except Exception as e: logger.warning(f"Could not get Shodan stats: {e}") return {"total_results": 0} def _generate_ai_analysis(target: str, shodan_data: Dict) -> str: """Generate AI-powered analysis using OpenAI""" if not openai_client: return "AI analysis not available. Please set OPENAI_API_KEY environment variable to enable AI-powered insights." try: # Prepare data for AI analysis devices = shodan_data.get('devices', []) stats = shodan_data.get('stats', {}) # Create enhanced dramatic analysis prompt prompt = f""" You are writing a CRITICAL SECURITY BRIEFING for executives about: {target} Write in an URGENT, DRAMATIC tone that demands immediate action. Use the format below EXACTLY: SHODAN INTELLIGENCE DATA: - Total results found: {stats.get('total_results', 0)} - Devices analyzed: {len(devices)} - Target type: {shodan_data.get('target_type', 'Unknown')} DEVICE DETAILS: """ # Add device summaries for i, device in enumerate(devices[:5], 1): # Limit to 5 devices risk_level = device.get('risk_level', 'UNKNOWN') vulns = device.get('vulns', []) prompt += f""" Device {i}: - IP: {device.get('ip', 'Unknown')} - Service: {device.get('service', 'Unknown')} on port {device.get('port', 'Unknown')} - Product: {device.get('product', 'Unknown')} - Risk Level: {risk_level} - Vulnerabilities: {len(vulns)} found - Organization: {device.get('org', 'Unknown')} """ prompt += f""" Write a DRAMATIC SECURITY BRIEFING using this EXACT format: **CRITICAL SECURITY ASSESSMENT - {target.upper()}** ======================================================================================================================== **EXECUTIVE ALERT: ACTIVE SECURITY THREATS DETECTED** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ **Target:** {target} **Assessment Date:** {datetime.now().strftime('%B %d, %Y at %H:%M UTC')} **Overall Risk:** [CRITICAL/HIGH/MEDIUM/LOW] **Immediate Action Required:** [YES/NO] **THE BOTTOM LINE** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ • [Write 3-5 bullet points about immediate threats] • [Include specific numbers of vulnerabilities found] • [Mention financial risk estimates] • [Reference real-world breach examples] **THREAT LANDSCAPE - WHAT ATTACKERS SEE** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [List specific IPs, services, vulnerabilities found with dramatic language] [Include specific CVE numbers if any vulnerabilities found] [Mention exposed services and their risks] **ACTIVE ATTACK VECTORS - READY TO EXPLOIT** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [Describe how attackers could exploit the findings] [Include timeframes and success rates] [Reference tools attackers might use] **FINANCIAL EXPOSURE ANALYSIS** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [Estimate breach costs, downtime costs, compliance fines] [Compare prevention costs vs breach costs] **UNCOMFORTABLE TRUTHS** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [List harsh realities about their security posture] [Include industry statistics and examples] **FINAL VERDICT** ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ **Security Posture:** [Assessment] **Exploitation Difficulty:** [Easy/Medium/Hard] **Financial Risk:** [Amount range] **Action Required:** [Immediate/Urgent/Soon] Write this in an URGENT, DRAMATIC tone that will make executives take immediate action. Use specific details and create a sense of urgency.""" # Generate AI response response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a senior cybersecurity consultant writing URGENT executive briefings. Use dramatic, action-oriented language that creates immediate urgency. Include specific technical details, financial impact estimates, and real-world breach examples. Make executives understand the gravity of security threats through compelling, fear-based messaging."}, {"role": "user", "content": prompt} ], max_tokens=2500, temperature=0.3 ) return response.choices[0].message.content except Exception as e: logger.error(f"AI analysis error: {e}") return f"AI analysis failed: {str(e)}" def _format_intelligence_report(target: str, shodan_data: Dict, ai_analysis: str) -> str: """Format comprehensive intelligence report""" devices = shodan_data.get('devices', []) stats = shodan_data.get('stats', {}) target_type = shodan_data.get('target_type', 'Unknown') # Count risk levels risk_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} for device in devices: risk_level = device.get('risk_level', 'LOW') risk_counts[risk_level] += 1 # Create report report = f"""# šŸŽÆ Intelligence Report: {target} ## šŸ“Š Executive Summary - **Target**: {target} ({target_type}) - **Shodan Results**: {stats.get('total_results', 0)} total matches - **Devices Analyzed**: {len(devices)} - **Risk Distribution**: {risk_counts['CRITICAL']} Critical, {risk_counts['HIGH']} High, {risk_counts['MEDIUM']} Medium, {risk_counts['LOW']} Low ## šŸ¤– AI Security Analysis {ai_analysis} ## šŸ” Technical Intelligence ### Infrastructure Overview """ if devices: report += f"Found {len(devices)} internet-exposed devices:\n\n" for i, device in enumerate(devices[:10], 1): # Show top 10 devices ip = device.get('ip', 'Unknown') service = device.get('service', 'Unknown') port = device.get('port', 'Unknown') risk = device.get('risk_level', 'LOW') vulns = device.get('vulns', []) org = device.get('org', 'Unknown') report += f"""**Device {i}: {ip}** - Service: {service} (Port {port}) - Risk Level: {risk} - Vulnerabilities: {len(vulns)} found - Organization: {org} """ if vulns: report += f"- CVEs: {', '.join(vulns[:3])}{'...' if len(vulns) > 3 else ''}\n" report += "\n" else: report += "No exposed devices found in Shodan database.\n\n" # Add statistics if available if stats.get('total_results', 0) > 0: report += f"### šŸ“ˆ Global Statistics\n" report += f"- **Total Shodan Results**: {stats.get('total_results', 0)}\n" if stats.get('countries'): report += f"- **Top Countries**: {', '.join([c['value'] for c in stats['countries'][:3]])}\n" if stats.get('organizations'): report += f"- **Top Organizations**: {', '.join([o['value'] for o in stats['organizations'][:3]])}\n" if stats.get('ports'): report += f"- **Common Ports**: {', '.join([str(p['value']) for p in stats['ports'][:5]])}\n" # Add metadata report += f""" ## ⚔ Analysis Metadata - **Timestamp**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - **Sources**: Shodan Internet Intelligence, OpenAI GPT-4o-mini - **Query**: {shodan_data.get('query', 'N/A')} - **Analysis Type**: Comprehensive OSINT Assessment --- *Generated by ExposureGPT - Simplified OSINT Intelligence Platform* """ return report def _is_ip_address(target: str) -> bool: """Check if target is an IP address""" import re ip_pattern = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$' return bool(re.match(ip_pattern, target)) def _is_domain(target: str) -> bool: """Check if target is a domain""" import re # More flexible domain pattern that handles subdomains and various TLDs domain_pattern = r'^[a-zA-Z0-9][a-zA-Z0-9.-]*\.[a-zA-Z]{2,}$' return bool(re.match(domain_pattern, target)) and '.' in target def _classify_target_type(target: str) -> str: """Classify the type of target""" if _is_ip_address(target): return "IP Address" elif _is_domain(target): return "Domain" else: return "Organization" def create_interface(): """Create simple Gradio interface""" # Use proper Gradio theme with Google Fonts to avoid 404 errors theme = gr.themes.Soft( primary_hue=gr.themes.colors.pink, secondary_hue=gr.themes.colors.cyan, neutral_hue=gr.themes.colors.slate, font=[gr.themes.GoogleFont("Inter"), "sans-serif"] ) # Minimal CSS - just background and center content css = """ .gradio-container { background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%) !important; max-width: 1200px !important; margin: 0 auto !important; padding: 20px !important; } """ with gr.Blocks( title="ExposureGPT - Simplified OSINT Intelligence", theme=theme, css=css ) as demo: gr.Markdown(""" # šŸŽÆ ExposureGPT - Simplified OSINT Intelligence **šŸ¤– Smart OSINT analysis using LLM + Shodan + OpenAI** Enter anything - the LLM will interpret your input and ask for clarification if needed! Try: "Tesla", "that social media company", "google.com", or "8.8.8.8" """) target_input = gr.Textbox( label="šŸŽÆ Target to Analyze", placeholder="Enter anything: 'Tesla', 'that social media company', 'google.com', '8.8.8.8'", value="Tesla" ) analyze_btn = gr.Button( "šŸš€ Run Intelligence Gathering", variant="primary", size="lg" ) output_report = gr.Markdown( value="Enter a target above and click the button to begin analysis..." ) analyze_btn.click( fn=intelligence_gathering, inputs=[target_input], outputs=[output_report] ) gr.Markdown(""" ### šŸ¤– Model Context Protocol (MCP) Server Details This application automatically serves as an **MCP server** that AI assistants can connect to for real-time OSINT intelligence gathering. **šŸ”— MCP Endpoint**: `https://acloudcenter-exposuregpt.hf.space/gradio_api/mcp/sse` **šŸ“Š Available Tool**: `intelligence_gathering(target: str)` - **Input**: Domain name, IP address, or organization name - **Output**: Comprehensive security intelligence report including: - Exact IP addresses and geographic locations - Exposed services, ports, and product versions - CVE vulnerabilities with severity scores - Risk assessment with actionable recommendations - Network infrastructure and hosting details **šŸ”§ Claude Desktop Configuration**: ```json { "mcpServers": { "exposuregpt": { "command": "npx", "args": ["mcp-remote", "https://acloudcenter-exposuregpt.hf.space/gradio_api/mcp/sse"] } } } ``` **⚔ Powered by**: Shodan Internet Intelligence + OpenAI GPT-4o-mini + Gradio Framework """) return demo def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="ExposureGPT - Simplified OSINT Intelligence") parser.add_argument("--cli", metavar="TARGET", help="Run CLI analysis") parser.add_argument("--port", type=int, default=7860, help="Port for web interface") parser.add_argument("--share", action="store_true", help="Create public link") args = parser.parse_args() # CLI mode if args.cli: print(f"\nšŸŽÆ ExposureGPT CLI Analysis: {args.cli}") print("=" * 60) result = intelligence_gathering(args.cli) print(result) return # Web interface demo = create_interface() print(f"šŸš€ ExposureGPT launching on port {args.port}") print(f"🌐 Interface: http://localhost:{args.port}") print(f"šŸ¤– MCP Endpoint: http://localhost:{args.port}/gradio_api/mcp/sse") # Enable MCP server via environment variable (alternative to mcp_server=True) os.environ['GRADIO_MCP_SERVER'] = 'True' demo.launch( server_port=args.port, share=args.share, server_name="0.0.0.0", ssr_mode=False # Fix font loading 404 errors ) if __name__ == "__main__": main()