ExposureGPT / exposuregpt_simple.py
ACloudCenter's picture
restore
53193f8 verified
raw
history blame
26.3 kB
#!/usr/bin/env python3
"""
ExposureGPT - Simplified MCP OSINT Tool
Single tool for intelligence gathering using Shodan + OpenAI
"""
import gradio as gr
import json
import logging
import os
import sys
try:
import shodan
except ImportError:
shodan = None
try:
from openai import OpenAI
except ImportError:
OpenAI = None
from typing import Dict, List, Optional
from datetime import datetime
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize services
SHODAN_API_KEY = os.getenv('SHODAN_API_KEY')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
# Initialize clients
shodan_client = None
openai_client = None
if shodan and SHODAN_API_KEY and SHODAN_API_KEY != 'your_shodan_key':
try:
shodan_client = shodan.Shodan(SHODAN_API_KEY)
logger.info("βœ… Shodan API connected")
except Exception as e:
logger.error(f"❌ Shodan connection failed: {e}")
if OpenAI and OPENAI_API_KEY:
try:
openai_client = OpenAI(api_key=OPENAI_API_KEY)
logger.info("βœ… OpenAI API connected")
except Exception as e:
logger.error(f"❌ OpenAI connection failed: {e}")
def intelligence_gathering(target: str) -> str:
"""
Comprehensive OSINT intelligence gathering for domains, IPs, or organizations.
Uses LLM to interpret user input, then Shodan for infrastructure discovery and OpenAI for intelligent analysis.
Provides security assessment, risk analysis, and actionable recommendations.
Args:
target: Domain, IP address, organization name, or natural language query to analyze
Returns:
Comprehensive intelligence report with AI-powered insights
"""
try:
logger.info(f"🎯 Starting intelligence gathering for: {target}")
# Step 1: LLM interprets and clarifies user input
interpreted_target = _interpret_user_input(target)
# Check if LLM needs clarification
if interpreted_target.startswith("CLARIFICATION_NEEDED:"):
return interpreted_target.replace("CLARIFICATION_NEEDED:", "")
# If LLM interpreted the input, show what we're searching
if interpreted_target != target:
logger.info(f"πŸ€– LLM interpreted '{target}' as '{interpreted_target}'")
# Step 2: Gather raw intelligence data
shodan_data = _gather_shodan_intelligence(interpreted_target)
# Check if we have any data to work with
if shodan_data.get('error') and not shodan_data.get('devices'):
return f"❌ Cannot analyze {interpreted_target}: {shodan_data['error']}\n\nPlease configure API keys and try again."
# Step 3: Generate AI-powered analysis
ai_analysis = _generate_ai_analysis(interpreted_target, shodan_data)
# Step 4: Format comprehensive report
report = _format_intelligence_report(interpreted_target, shodan_data, ai_analysis)
# Add interpretation note if target was changed
if interpreted_target != target:
report = f"πŸ€– **LLM Interpretation**: Analyzed '{interpreted_target}' based on your query: '{target}'\n\n" + report
logger.info(f"βœ… Intelligence gathering completed for {interpreted_target}")
return report
except Exception as e:
logger.error(f"❌ Error in intelligence gathering: {e}")
return f"❌ Intelligence gathering failed for {target}: {str(e)}"
def _interpret_user_input(user_input: str) -> str:
"""Use LLM to interpret and clarify user input before Shodan search"""
if not openai_client:
# If no OpenAI, just return the input as-is
return user_input
try:
# Create interpretation prompt
prompt = f"""
You are an OSINT intelligence assistant helping users search for internet-exposed infrastructure using Shodan.
USER INPUT: "{user_input}"
Your job is to interpret this input and return the BEST target for Shodan analysis. Choose ONE of these response formats:
FORMAT 1 - DIRECT SEARCH (when input is clear):
Return ONLY the exact domain/IP to search, nothing else.
Examples:
- Input: "google.com" β†’ Output: "google.com"
- Input: "8.8.8.8" β†’ Output: "8.8.8.8"
- Input: "tesla" β†’ Output: "tesla.com"
- Input: "Microsoft Corporation" β†’ Output: "microsoft.com"
FORMAT 2 - CLARIFICATION NEEDED (when input is ambiguous):
Start with "CLARIFICATION_NEEDED:" then ask for clarification.
Examples:
- Input: "that email company" β†’ Output: "CLARIFICATION_NEEDED: I need clarification. Do you want to search:\nβ€’ gmail.com (Google's email service)\nβ€’ outlook.com (Microsoft's email service)\nβ€’ yahoo.com (Yahoo's email service)\n\nPlease specify which email service you'd like to analyze."
GUIDELINES:
- Prefer .com domains for companies (tesla β†’ tesla.com)
- For clear company names, use their main domain
- For ambiguous inputs, ask for clarification with specific options
- Never include explanations in Format 1 responses
- Always provide 2-3 specific options in clarification requests
What should I search for?"""
# Get LLM response
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an expert at interpreting user queries for OSINT analysis. Be concise and precise."},
{"role": "user", "content": prompt}
],
max_tokens=200,
temperature=0.1
)
result = response.choices[0].message.content.strip()
logger.info(f"πŸ€– LLM interpreted '{user_input}' β†’ '{result}'")
return result
except Exception as e:
logger.warning(f"LLM interpretation failed: {e}")
# Fallback to original input
return user_input
def _gather_shodan_intelligence(target: str) -> Dict:
"""Gather intelligence using Shodan API"""
if not shodan_client:
return {"error": "Shodan API not configured. Please set SHODAN_API_KEY environment variable.", "devices": [], "stats": {}}
try:
# Determine search strategy based on target type
if _is_ip_address(target):
# Direct IP lookup
devices = [_get_host_info(target)]
query = f"ip:{target}"
elif _is_domain(target):
# Domain-based search
query = f"hostname:{target}"
devices = _search_shodan(query)
else:
# Organization search
query = f'org:"{target}"'
devices = _search_shodan(query)
# Get statistics
stats = _get_shodan_stats(query)
return {
"devices": [d for d in devices if d], # Filter out None results
"stats": stats,
"query": query,
"target_type": _classify_target_type(target)
}
except Exception as e:
logger.error(f"Shodan intelligence gathering error: {e}")
return {"error": str(e), "devices": [], "stats": {}}
def _search_shodan(query: str, limit: int = 20) -> List[Dict]:
"""Search Shodan with rate limiting"""
try:
import time
time.sleep(1.2) # Rate limiting
results = shodan_client.search(query, limit=limit)
devices = []
for result in results.get('matches', []):
device = _parse_shodan_result(result)
if device:
devices.append(device)
return devices
except shodan.APIError as e:
logger.warning(f"Shodan API error: {e}")
return []
except Exception as e:
logger.error(f"Shodan search error: {e}")
return []
def _get_host_info(ip: str) -> Optional[Dict]:
"""Get detailed host information"""
try:
import time
time.sleep(1.2) # Rate limiting
host_info = shodan_client.host(ip)
return _parse_host_result(host_info)
except Exception as e:
logger.warning(f"Could not get host info for {ip}: {e}")
return None
def _parse_shodan_result(result: Dict) -> Optional[Dict]:
"""Parse Shodan search result"""
try:
ip = result.get('ip_str', 'Unknown')
port = result.get('port', 80)
product = result.get('product', 'Unknown')
# Risk assessment
vulns = result.get('vulns', [])
risk_score = len(vulns) * 2
# High-risk ports
high_risk_ports = [21, 23, 3389, 5900, 3306, 5432, 27017]
if port in high_risk_ports:
risk_score += 3
# Determine risk level
if risk_score >= 6:
risk_level = "HIGH"
elif risk_score >= 3:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
return {
"ip": ip,
"port": port,
"product": product,
"service": result.get('service_name', 'Unknown'),
"vulns": vulns,
"risk_level": risk_level,
"risk_score": risk_score,
"banner": result.get('data', '')[:200],
"location": result.get('location', {}),
"org": result.get('org', 'Unknown'),
"timestamp": result.get('timestamp', '')
}
except Exception as e:
logger.error(f"Error parsing Shodan result: {e}")
return None
def _parse_host_result(host_info: Dict) -> Dict:
"""Parse detailed host information"""
try:
ip = host_info.get('ip_str', 'Unknown')
# Collect all services
services = []
all_vulns = []
for service in host_info.get('data', []):
services.append({
"port": service.get('port'),
"service": service.get('service_name', 'Unknown'),
"product": service.get('product', 'Unknown'),
"version": service.get('version', ''),
"vulns": service.get('vulns', [])
})
all_vulns.extend(service.get('vulns', []))
# Calculate overall risk
risk_score = len(all_vulns) * 2 + len(services)
if risk_score >= 10:
risk_level = "CRITICAL"
elif risk_score >= 6:
risk_level = "HIGH"
elif risk_score >= 3:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
return {
"ip": ip,
"services": services,
"total_services": len(services),
"all_vulns": list(set(all_vulns)), # Unique vulns
"risk_level": risk_level,
"risk_score": risk_score,
"hostnames": host_info.get('hostnames', []),
"org": host_info.get('org', 'Unknown'),
"location": host_info.get('location', {}),
"last_update": host_info.get('last_update', '')
}
except Exception as e:
logger.error(f"Error parsing host result: {e}")
return {"error": str(e)}
def _get_shodan_stats(query: str) -> Dict:
"""Get search statistics"""
try:
import time
time.sleep(1.2) # Rate limiting
# Get basic count
results = shodan_client.search(query, limit=0)
total = results.get('total', 0)
# Try to get facets for more stats
try:
facet_results = shodan_client.search(query, limit=0, facets='country,org,port')
facets = facet_results.get('facets', {})
except:
facets = {}
return {
"total_results": total,
"countries": facets.get('country', [])[:5],
"organizations": facets.get('org', [])[:5],
"ports": facets.get('port', [])[:10]
}
except Exception as e:
logger.warning(f"Could not get Shodan stats: {e}")
return {"total_results": 0}
def _generate_ai_analysis(target: str, shodan_data: Dict) -> str:
"""Generate AI-powered analysis using OpenAI"""
if not openai_client:
return "AI analysis not available. Please set OPENAI_API_KEY environment variable to enable AI-powered insights."
try:
# Prepare data for AI analysis
devices = shodan_data.get('devices', [])
stats = shodan_data.get('stats', {})
# Create enhanced dramatic analysis prompt
prompt = f"""
You are writing a CRITICAL SECURITY BRIEFING for executives about: {target}
Write in an URGENT, DRAMATIC tone that demands immediate action. Use the format below EXACTLY:
SHODAN INTELLIGENCE DATA:
- Total results found: {stats.get('total_results', 0)}
- Devices analyzed: {len(devices)}
- Target type: {shodan_data.get('target_type', 'Unknown')}
DEVICE DETAILS:
"""
# Add device summaries
for i, device in enumerate(devices[:5], 1): # Limit to 5 devices
risk_level = device.get('risk_level', 'UNKNOWN')
vulns = device.get('vulns', [])
prompt += f"""
Device {i}:
- IP: {device.get('ip', 'Unknown')}
- Service: {device.get('service', 'Unknown')} on port {device.get('port', 'Unknown')}
- Product: {device.get('product', 'Unknown')}
- Risk Level: {risk_level}
- Vulnerabilities: {len(vulns)} found
- Organization: {device.get('org', 'Unknown')}
"""
prompt += f"""
Write a DRAMATIC SECURITY BRIEFING using this EXACT format:
**CRITICAL SECURITY ASSESSMENT - {target.upper()}**
========================================================================================================================
**EXECUTIVE ALERT: ACTIVE SECURITY THREATS DETECTED**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
**Target:** {target}
**Assessment Date:** {datetime.now().strftime('%B %d, %Y at %H:%M UTC')}
**Overall Risk:** [CRITICAL/HIGH/MEDIUM/LOW]
**Immediate Action Required:** [YES/NO]
**THE BOTTOM LINE**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
β€’ [Write 3-5 bullet points about immediate threats]
β€’ [Include specific numbers of vulnerabilities found]
β€’ [Mention financial risk estimates]
β€’ [Reference real-world breach examples]
**THREAT LANDSCAPE - WHAT ATTACKERS SEE**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[List specific IPs, services, vulnerabilities found with dramatic language]
[Include specific CVE numbers if any vulnerabilities found]
[Mention exposed services and their risks]
**ACTIVE ATTACK VECTORS - READY TO EXPLOIT**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[Describe how attackers could exploit the findings]
[Include timeframes and success rates]
[Reference tools attackers might use]
**FINANCIAL EXPOSURE ANALYSIS**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[Estimate breach costs, downtime costs, compliance fines]
[Compare prevention costs vs breach costs]
**UNCOMFORTABLE TRUTHS**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[List harsh realities about their security posture]
[Include industry statistics and examples]
**FINAL VERDICT**
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
**Security Posture:** [Assessment]
**Exploitation Difficulty:** [Easy/Medium/Hard]
**Financial Risk:** [Amount range]
**Action Required:** [Immediate/Urgent/Soon]
Write this in an URGENT, DRAMATIC tone that will make executives take immediate action. Use specific details and create a sense of urgency."""
# Generate AI response
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a senior cybersecurity consultant writing URGENT executive briefings. Use dramatic, action-oriented language that creates immediate urgency. Include specific technical details, financial impact estimates, and real-world breach examples. Make executives understand the gravity of security threats through compelling, fear-based messaging."},
{"role": "user", "content": prompt}
],
max_tokens=2500,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"AI analysis error: {e}")
return f"AI analysis failed: {str(e)}"
def _format_intelligence_report(target: str, shodan_data: Dict, ai_analysis: str) -> str:
"""Format comprehensive intelligence report"""
devices = shodan_data.get('devices', [])
stats = shodan_data.get('stats', {})
target_type = shodan_data.get('target_type', 'Unknown')
# Count risk levels
risk_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
for device in devices:
risk_level = device.get('risk_level', 'LOW')
risk_counts[risk_level] += 1
# Create report
report = f"""# 🎯 Intelligence Report: {target}
## πŸ“Š Executive Summary
- **Target**: {target} ({target_type})
- **Shodan Results**: {stats.get('total_results', 0)} total matches
- **Devices Analyzed**: {len(devices)}
- **Risk Distribution**: {risk_counts['CRITICAL']} Critical, {risk_counts['HIGH']} High, {risk_counts['MEDIUM']} Medium, {risk_counts['LOW']} Low
## πŸ€– AI Security Analysis
{ai_analysis}
## πŸ” Technical Intelligence
### Infrastructure Overview
"""
if devices:
report += f"Found {len(devices)} internet-exposed devices:\n\n"
for i, device in enumerate(devices[:10], 1): # Show top 10 devices
ip = device.get('ip', 'Unknown')
service = device.get('service', 'Unknown')
port = device.get('port', 'Unknown')
risk = device.get('risk_level', 'LOW')
vulns = device.get('vulns', [])
org = device.get('org', 'Unknown')
report += f"""**Device {i}: {ip}**
- Service: {service} (Port {port})
- Risk Level: {risk}
- Vulnerabilities: {len(vulns)} found
- Organization: {org}
"""
if vulns:
report += f"- CVEs: {', '.join(vulns[:3])}{'...' if len(vulns) > 3 else ''}\n"
report += "\n"
else:
report += "No exposed devices found in Shodan database.\n\n"
# Add statistics if available
if stats.get('total_results', 0) > 0:
report += f"### πŸ“ˆ Global Statistics\n"
report += f"- **Total Shodan Results**: {stats.get('total_results', 0)}\n"
if stats.get('countries'):
report += f"- **Top Countries**: {', '.join([c['value'] for c in stats['countries'][:3]])}\n"
if stats.get('organizations'):
report += f"- **Top Organizations**: {', '.join([o['value'] for o in stats['organizations'][:3]])}\n"
if stats.get('ports'):
report += f"- **Common Ports**: {', '.join([str(p['value']) for p in stats['ports'][:5]])}\n"
# Add metadata
report += f"""
## ⚑ Analysis Metadata
- **Timestamp**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **Sources**: Shodan Internet Intelligence, OpenAI GPT-4o-mini
- **Query**: {shodan_data.get('query', 'N/A')}
- **Analysis Type**: Comprehensive OSINT Assessment
---
*Generated by ExposureGPT - Simplified OSINT Intelligence Platform*
"""
return report
def _is_ip_address(target: str) -> bool:
"""Check if target is an IP address"""
import re
ip_pattern = r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$'
return bool(re.match(ip_pattern, target))
def _is_domain(target: str) -> bool:
"""Check if target is a domain"""
import re
# More flexible domain pattern that handles subdomains and various TLDs
domain_pattern = r'^[a-zA-Z0-9][a-zA-Z0-9.-]*\.[a-zA-Z]{2,}$'
return bool(re.match(domain_pattern, target)) and '.' in target
def _classify_target_type(target: str) -> str:
"""Classify the type of target"""
if _is_ip_address(target):
return "IP Address"
elif _is_domain(target):
return "Domain"
else:
return "Organization"
def create_interface():
"""Create simple Gradio interface"""
# Use proper Gradio theme with Google Fonts to avoid 404 errors
theme = gr.themes.Soft(
primary_hue=gr.themes.colors.pink,
secondary_hue=gr.themes.colors.cyan,
neutral_hue=gr.themes.colors.slate,
font=[gr.themes.GoogleFont("Inter"), "sans-serif"]
)
# Minimal CSS - just background and center content
css = """
.gradio-container {
background: linear-gradient(135deg, #0f172a 0%, #1e293b 100%) !important;
max-width: 1200px !important;
margin: 0 auto !important;
padding: 20px !important;
}
"""
with gr.Blocks(
title="ExposureGPT - Simplified OSINT Intelligence",
theme=theme,
css=css
) as demo:
gr.Markdown("""
# 🎯 ExposureGPT - Simplified OSINT Intelligence
**πŸ€– Smart OSINT analysis using LLM + Shodan + OpenAI**
Enter anything - the LLM will interpret your input and ask for clarification if needed!
Try: "Tesla", "that social media company", "google.com", or "8.8.8.8"
""")
target_input = gr.Textbox(
label="🎯 Target to Analyze",
placeholder="Enter anything: 'Tesla', 'that social media company', 'google.com', '8.8.8.8'",
value="Tesla"
)
analyze_btn = gr.Button(
"πŸš€ Run Intelligence Gathering",
variant="primary",
size="lg"
)
output_report = gr.Markdown(
value="Enter a target above and click the button to begin analysis..."
)
analyze_btn.click(
fn=intelligence_gathering,
inputs=[target_input],
outputs=[output_report]
)
gr.Markdown("""
### πŸ€– Model Context Protocol (MCP) Server Details
This application automatically serves as an **MCP server** that AI assistants can connect to for real-time OSINT intelligence gathering.
**πŸ”— MCP Endpoint**: `https://acloudcenter-exposuregpt.hf.space/gradio_api/mcp/sse`
**πŸ“Š Available Tool**: `intelligence_gathering(target: str)`
- **Input**: Domain name, IP address, or organization name
- **Output**: Comprehensive security intelligence report including:
- Exact IP addresses and geographic locations
- Exposed services, ports, and product versions
- CVE vulnerabilities with severity scores
- Risk assessment with actionable recommendations
- Network infrastructure and hosting details
**πŸ”§ Claude Desktop Configuration**:
```json
{
"mcpServers": {
"exposuregpt": {
"command": "npx",
"args": ["mcp-remote", "https://acloudcenter-exposuregpt.hf.space/gradio_api/mcp/sse"]
}
}
}
```
**⚑ Powered by**: Shodan Internet Intelligence + OpenAI GPT-4o-mini + Gradio Framework
""")
return demo
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="ExposureGPT - Simplified OSINT Intelligence")
parser.add_argument("--cli", metavar="TARGET", help="Run CLI analysis")
parser.add_argument("--port", type=int, default=7860, help="Port for web interface")
parser.add_argument("--share", action="store_true", help="Create public link")
args = parser.parse_args()
# CLI mode
if args.cli:
print(f"\n🎯 ExposureGPT CLI Analysis: {args.cli}")
print("=" * 60)
result = intelligence_gathering(args.cli)
print(result)
return
# Web interface
demo = create_interface()
print(f"πŸš€ ExposureGPT launching on port {args.port}")
print(f"🌐 Interface: http://localhost:{args.port}")
print(f"πŸ€– MCP Endpoint: http://localhost:{args.port}/gradio_api/mcp/sse")
# Enable MCP server via environment variable (alternative to mcp_server=True)
os.environ['GRADIO_MCP_SERVER'] = 'True'
demo.launch(
server_port=args.port,
share=args.share,
server_name="0.0.0.0",
ssr_mode=False # Fix font loading 404 errors
)
if __name__ == "__main__":
main()