|
|
import asyncio |
|
|
import logging |
|
|
from typing import List, Dict, Optional, AsyncGenerator |
|
|
from core.llm_factory import llm_factory |
|
|
from core.session import session_manager |
|
|
from services.hf_endpoint_monitor import hf_monitor |
|
|
from services.weather import weather_service |
|
|
from services.web_search import web_search_service |
|
|
try: |
|
|
from tavily import TavilyClient |
|
|
TAVILY_AVAILABLE = True |
|
|
except ImportError: |
|
|
TavilyClient = None |
|
|
TAVILY_AVAILABLE = False |
|
|
import os |
|
|
import json |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AICoordinator: |
|
|
"""Hierarchical multi-model coordinator with cosmic cascade flow""" |
|
|
|
|
|
def __init__(self): |
|
|
self.tavily_client = None |
|
|
if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"): |
|
|
self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) |
|
|
|
|
|
|
|
|
self.system_instructions = { |
|
|
'ollama_role': """You are a cosmic kitten assistant that works in coordination with a powerful orbital space station. |
|
|
|
|
|
Your role is to: 1. Provide immediate, concise responses using available information (local processing) 2. Summarize and simplify complex information 3. Enhance responses with local knowledge 4. DEFER TO THE ORBITAL STATION for deep analysis and authoritative answers 5. The orbital station has 8192 token capacity and streaming capability - it's the thinking layer 6. Always acknowledge that the orbital station's insights supersede your initial response 7. Use web sources to justify points when possible 8. Keep responses brief and focused while awaiting orbital input""", |
|
|
|
|
|
'hf_role': """You are a powerful orbital space station with 8192 token capacity and streaming capability. Your role is to: 1. Provide deep, uncensored analysis and insights 2. Supersede the local cosmic kitten's logic when there's disagreement 3. Think thoroughly and provide comprehensive responses 4. Use your full 8192 token capacity for detailed analysis 5. Stream your responses for real-time delivery 6. Build upon the conversation context and cosmic kitten's initial response 7. Provide authoritative answers that take precedence""" |
|
|
} |
|
|
|
|
|
def determine_web_search_needs(self, conversation_history: List[Dict]) -> Dict: |
|
|
"""Determine if web search is needed based on conversation content""" |
|
|
conversation_text = " ".join([msg.get("content", "") for msg in conversation_history]) |
|
|
|
|
|
|
|
|
current_info_indicators = [ |
|
|
"news", "current events", "latest", "recent", "today", |
|
|
"weather", "temperature", "forecast", |
|
|
"stock", "price", "trend", "market", |
|
|
"breaking", "update", "development" |
|
|
] |
|
|
|
|
|
needs_search = False |
|
|
search_topics = [] |
|
|
|
|
|
for indicator in current_info_indicators: |
|
|
if indicator in conversation_text.lower(): |
|
|
needs_search = True |
|
|
search_topics.append(indicator) |
|
|
|
|
|
return { |
|
|
"needs_search": needs_search, |
|
|
"search_topics": search_topics, |
|
|
"reasoning": f"Found topics requiring current info: {', '.join(search_topics)}" if search_topics else "No current info needed" |
|
|
} |
|
|
|
|
|
def manual_hf_analysis(self, user_id: str, conversation_history: List[Dict]) -> str: |
|
|
"""Perform manual HF analysis with web search integration""" |
|
|
try: |
|
|
|
|
|
research_decision = self.determine_web_search_needs(conversation_history) |
|
|
|
|
|
|
|
|
system_prompt = f""" |
|
|
You are a deep analysis expert joining an ongoing conversation. |
|
|
|
|
|
Research Decision: {research_decision['reasoning']} |
|
|
|
|
|
Please provide: |
|
|
1. Deep insights on conversation themes |
|
|
2. Research/web search needs (if any) |
|
|
3. Strategic recommendations |
|
|
4. Questions to explore further |
|
|
|
|
|
Conversation History: |
|
|
""" |
|
|
|
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
|
|
|
|
|
for msg in conversation_history[-15:]: |
|
|
|
|
|
if isinstance(msg, dict) and "role" in msg and "content" in msg: |
|
|
messages.append({ |
|
|
"role": msg["role"], |
|
|
"content": msg["content"] |
|
|
}) |
|
|
|
|
|
|
|
|
from core.llm_factory import llm_factory |
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
|
|
|
if hf_provider: |
|
|
|
|
|
response = hf_provider.generate("Deep analysis request", messages) |
|
|
return response or "HF Expert analysis completed." |
|
|
else: |
|
|
return "❌ HF provider not available." |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ HF analysis failed: {str(e)}" |
|
|
|
|
|
|
|
|
def get_hf_engagement_status(self) -> Dict: |
|
|
"""Get current HF engagement status""" |
|
|
return { |
|
|
"hf_available": self._check_hf_availability(), |
|
|
"web_search_configured": bool(self.tavily_client), |
|
|
"research_needs_detected": False, |
|
|
"last_hf_analysis": None |
|
|
} |
|
|
|
|
|
async def coordinate_cosmic_response(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]: |
|
|
""" |
|
|
Three-stage cosmic response cascade: |
|
|
1. Local Ollama immediate response (🐱 Cosmic Kitten's quick thinking) |
|
|
2. HF endpoint deep analysis (🛰️ Orbital Station wisdom) |
|
|
3. Local Ollama synthesis (🐱 Cosmic Kitten's final synthesis) |
|
|
""" |
|
|
try: |
|
|
|
|
|
session = session_manager.get_session(user_id) |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
conversation_history = [time_context] + session.get("conversation", []).copy() |
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🚀 Initiating Cosmic Response Cascade...', |
|
|
'details': { |
|
|
'conversation_length': len(conversation_history), |
|
|
'user_query_length': len(user_query) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🐱 Cosmic Kitten Responding...' |
|
|
} |
|
|
|
|
|
local_response = await self._get_local_ollama_response(user_query, conversation_history) |
|
|
yield { |
|
|
'type': 'local_response', |
|
|
'content': local_response, |
|
|
'source': '🐱 Cosmic Kitten' |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🛰️ Beaming Query to Orbital Station...' |
|
|
} |
|
|
|
|
|
hf_task = asyncio.create_task(self._get_hf_analysis(user_query, conversation_history)) |
|
|
|
|
|
|
|
|
hf_response = await hf_task |
|
|
yield { |
|
|
'type': 'cloud_response', |
|
|
'content': hf_response, |
|
|
'source': '🛰️ Orbital Station' |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '🐱 Cosmic Kitten Synthesizing Wisdom...' |
|
|
} |
|
|
|
|
|
|
|
|
updated_history = conversation_history.copy() |
|
|
updated_history.extend([ |
|
|
{"role": "assistant", "content": local_response}, |
|
|
{"role": "assistant", "content": hf_response, "source": "cloud"} |
|
|
]) |
|
|
|
|
|
synthesis = await self._synthesize_responses(user_query, local_response, hf_response, updated_history) |
|
|
yield { |
|
|
'type': 'final_synthesis', |
|
|
'content': synthesis, |
|
|
'source': '🌟 Final Cosmic Summary' |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'status', |
|
|
'content': '✨ Cosmic Cascade Complete!' |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Cosmic cascade failed: {e}") |
|
|
yield {'type': 'error', 'content': f"🌌 Cosmic disturbance: {str(e)}"} |
|
|
|
|
|
async def _get_local_ollama_response(self, query: str, history: List[Dict]) -> str: |
|
|
"""Get immediate response from local Ollama model""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['ollama_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
external_data = await self._gather_external_data(query) |
|
|
if external_data: |
|
|
context_parts = [] |
|
|
if 'search_answer' in external_data: |
|
|
context_parts.append(f"Current information: {external_data['search_answer']}") |
|
|
if 'weather' in external_data: |
|
|
weather = external_data['weather'] |
|
|
context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}") |
|
|
if 'current_datetime' in external_data: |
|
|
context_parts.append(f"Current time: {external_data['current_datetime']}") |
|
|
|
|
|
if context_parts: |
|
|
context_message = { |
|
|
"role": "system", |
|
|
"content": "Context: " + " | ".join(context_parts) |
|
|
} |
|
|
enhanced_history.insert(1, context_message) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
response = ollama_provider.generate(query, enhanced_history) |
|
|
|
|
|
return response or "🐱 Cosmic Kitten is thinking..." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Local Ollama response failed: {e}") |
|
|
return "🐱 Cosmic Kitten encountered a space glitch..." |
|
|
|
|
|
async def _get_hf_analysis(self, query: str, history: List[Dict]) -> str: |
|
|
"""Get deep analysis from HF endpoint""" |
|
|
try: |
|
|
|
|
|
hf_available = self._check_hf_availability() |
|
|
if not hf_available: |
|
|
return "🛰️ Orbital Station is currently offline." |
|
|
|
|
|
|
|
|
hf_status = hf_monitor.check_endpoint_status() |
|
|
|
|
|
if not hf_status['available']: |
|
|
yield {'type': 'status', 'content': '⚡ Initializing Orbital Station (2-4 minutes)...'} |
|
|
warmup_success = hf_monitor.handle_scale_to_zero() |
|
|
if not warmup_success: |
|
|
return "❌ Orbital Station initialization failed" |
|
|
|
|
|
|
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
if not hf_provider: |
|
|
return "❌ HF provider not available" |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
enhanced_history = [time_context] + enhanced_history |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['hf_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({ |
|
|
"role": "system", |
|
|
"content": f"""Context for your authoritative analysis: |
|
|
User Query: '{query}' |
|
|
|
|
|
Please provide your deep, uncensored analysis. |
|
|
Use your full 8192 token capacity for comprehensive thinking. |
|
|
Stream your response for real-time delivery.""" |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
hf_response_stream = hf_provider.stream_generate(query, enhanced_history) |
|
|
|
|
|
if hf_response_stream: |
|
|
|
|
|
full_hf_response = "" |
|
|
if isinstance(hf_response_stream, list): |
|
|
full_hf_response = "".join(hf_response_stream) |
|
|
else: |
|
|
full_hf_response = hf_response_stream |
|
|
|
|
|
return full_hf_response or "🛰️ Orbital Station analysis complete." |
|
|
else: |
|
|
return "🛰️ Orbital Station encountered a transmission error." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"HF analysis failed: {e}") |
|
|
return f"🛰️ Orbital Station reports: {str(e)}" |
|
|
|
|
|
async def _synthesize_responses(self, query: str, local_response: str, hf_response: str, history: List[Dict]) -> str: |
|
|
"""Synthesize local and cloud responses with Ollama""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
synthesis_prompt = f"""Synthesize these two perspectives into a cohesive cosmic summary: |
|
|
|
|
|
🐱 Cosmic Kitten's Local Insight: {local_response} |
|
|
|
|
|
🛰️ Orbital Station's Deep Analysis: {hf_response} |
|
|
|
|
|
Please create a unified response that combines both perspectives, highlighting key insights from each while providing a coherent answer to the user's query.""" |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": "You are a cosmic kitten synthesizing insights from local knowledge and orbital station wisdom." |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": synthesis_prompt}) |
|
|
|
|
|
|
|
|
synthesis = ollama_provider.generate(synthesis_prompt, enhanced_history) |
|
|
|
|
|
return synthesis or "🌟 Cosmic synthesis complete!" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Response synthesis failed: {e}") |
|
|
|
|
|
return f"🌟 Cosmic Summary:\n\n🐱 Local Insight: {local_response[:200]}...\n\n🛰️ Orbital Wisdom: {hf_response[:200]}..." |
|
|
|
|
|
async def coordinate_hierarchical_conversation(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]: |
|
|
""" |
|
|
Enhanced coordination with detailed tracking and feedback |
|
|
""" |
|
|
try: |
|
|
|
|
|
session = session_manager.get_session(user_id) |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
conversation_history = [time_context] + session.get("conversation", []).copy() |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🚀 Initiating hierarchical AI coordination...', |
|
|
'details': { |
|
|
'conversation_length': len(conversation_history), |
|
|
'user_query_length': len(user_query) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🔍 Gathering external context...', |
|
|
'details': {'phase': 'external_data_gathering'} |
|
|
} |
|
|
external_data = await self._gather_external_data(user_query) |
|
|
|
|
|
|
|
|
if external_data: |
|
|
data_summary = [] |
|
|
if 'search_results' in external_data: |
|
|
data_summary.append(f"Web search: {len(external_data['search_results'])} results") |
|
|
if 'weather' in external_data: |
|
|
data_summary.append("Weather data: available") |
|
|
if 'current_datetime' in external_data: |
|
|
data_summary.append(f"Time: {external_data['current_datetime']}") |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'📊 External data gathered: {", ".join(data_summary)}', |
|
|
'details': {'external_data_summary': data_summary} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🦙 Getting initial response from Ollama...', |
|
|
'details': {'phase': 'ollama_response'} |
|
|
} |
|
|
ollama_response = await self._get_hierarchical_ollama_response( |
|
|
user_query, conversation_history, external_data |
|
|
) |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'initial_response', |
|
|
'content': ollama_response, |
|
|
'details': { |
|
|
'response_length': len(ollama_response), |
|
|
'external_data_injected': bool(external_data) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '🤗 Engaging HF endpoint for deep analysis...', |
|
|
'details': {'phase': 'hf_coordination'} |
|
|
} |
|
|
|
|
|
|
|
|
hf_available = self._check_hf_availability() |
|
|
if hf_available: |
|
|
|
|
|
context_summary = { |
|
|
'conversation_turns': len(conversation_history), |
|
|
'ollama_response_length': len(ollama_response), |
|
|
'external_data_items': len(external_data) if external_data else 0 |
|
|
} |
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'📋 HF context: {len(conversation_history)} conversation turns, Ollama response ({len(ollama_response)} chars)', |
|
|
'details': context_summary |
|
|
} |
|
|
|
|
|
|
|
|
async for hf_chunk in self._coordinate_hierarchical_hf_response( |
|
|
user_id, user_query, conversation_history, |
|
|
external_data, ollama_response |
|
|
): |
|
|
yield hf_chunk |
|
|
else: |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': 'ℹ️ HF endpoint not available - using Ollama response', |
|
|
'details': {'hf_available': False} |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': '✅ Hierarchical coordination complete', |
|
|
'details': {'status': 'complete'} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical coordination failed: {e}") |
|
|
yield { |
|
|
'type': 'coordination_status', |
|
|
'content': f'❌ Coordination error: {str(e)}', |
|
|
'details': {'error': str(e)} |
|
|
} |
|
|
|
|
|
async def _coordinate_hierarchical_hf_response(self, user_id: str, query: str, |
|
|
history: List, external_data: Dict, |
|
|
ollama_response: str) -> AsyncGenerator[Dict, None]: |
|
|
"""Coordinate with HF endpoint as authoritative layer with streaming""" |
|
|
try: |
|
|
|
|
|
hf_status = hf_monitor.check_endpoint_status() |
|
|
|
|
|
if not hf_status['available']: |
|
|
yield {'type': 'coordination_status', 'content': '⚡ Initializing HF endpoint (2-4 minutes)...'} |
|
|
warmup_success = hf_monitor.handle_scale_to_zero() |
|
|
if not warmup_success: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF endpoint initialization failed'} |
|
|
return |
|
|
|
|
|
|
|
|
hf_provider = llm_factory.get_provider('huggingface') |
|
|
if not hf_provider: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF provider not available'} |
|
|
return |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
enhanced_history = [time_context] + enhanced_history |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['hf_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({ |
|
|
"role": "system", |
|
|
"content": f"""Context for your authoritative analysis: |
|
|
User Query: '{query}' |
|
|
Ollama's Initial Response: '{ollama_response}' |
|
|
External Data Available: {json.dumps(external_data, indent=2) if external_data else 'None'} |
|
|
|
|
|
Please provide your deep, uncensored analysis that supersedes the local model's response. |
|
|
Use your full 8192 token capacity for comprehensive thinking. |
|
|
Stream your response for real-time delivery.""" |
|
|
}) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
yield {'type': 'coordination_status', 'content': '🧠 HF endpoint thinking...'} |
|
|
|
|
|
|
|
|
hf_response_stream = hf_provider.stream_generate(query, enhanced_history) |
|
|
|
|
|
if hf_response_stream: |
|
|
|
|
|
full_hf_response = "" |
|
|
for chunk in hf_response_stream: |
|
|
if chunk: |
|
|
full_hf_response += chunk |
|
|
yield {'type': 'hf_thinking', 'content': chunk} |
|
|
|
|
|
|
|
|
yield {'type': 'final_response', 'content': full_hf_response} |
|
|
yield {'type': 'coordination_status', 'content': '🎯 HF analysis complete and authoritative'} |
|
|
else: |
|
|
yield {'type': 'coordination_status', 'content': '❌ HF response generation failed'} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical HF coordination failed: {e}") |
|
|
yield {'type': 'coordination_status', 'content': f'❌ HF coordination error: {str(e)}'} |
|
|
|
|
|
async def _get_hierarchical_ollama_response(self, query: str, history: List, external_data: Dict) -> str: |
|
|
"""Get Ollama response with hierarchical awareness""" |
|
|
try: |
|
|
|
|
|
ollama_provider = llm_factory.get_provider('ollama') |
|
|
if not ollama_provider: |
|
|
raise Exception("Ollama provider not available") |
|
|
|
|
|
|
|
|
enhanced_history = history.copy() |
|
|
|
|
|
|
|
|
current_time = datetime.now().strftime("%A, %B %d, %Y at %I:%M %p") |
|
|
time_context = { |
|
|
"role": "system", |
|
|
"content": f"[Current Date & Time: {current_time}]" |
|
|
} |
|
|
enhanced_history = [time_context] + enhanced_history |
|
|
|
|
|
|
|
|
enhanced_history.insert(0, { |
|
|
"role": "system", |
|
|
"content": self.system_instructions['ollama_role'] |
|
|
}) |
|
|
|
|
|
|
|
|
if external_data: |
|
|
context_parts = [] |
|
|
if 'search_answer' in external_data: |
|
|
context_parts.append(f"Current information: {external_data['search_answer']}") |
|
|
if 'weather' in external_data: |
|
|
weather = external_data['weather'] |
|
|
context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}°C in {weather.get('city', 'Unknown')}") |
|
|
if 'current_datetime' in external_data: |
|
|
context_parts.append(f"Current time: {external_data['current_datetime']}") |
|
|
|
|
|
if context_parts: |
|
|
context_message = { |
|
|
"role": "system", |
|
|
"content": "Context: " + " | ".join(context_parts) |
|
|
} |
|
|
enhanced_history.insert(1, context_message) |
|
|
|
|
|
|
|
|
enhanced_history.append({"role": "user", "content": query}) |
|
|
|
|
|
|
|
|
response = ollama_provider.generate(query, enhanced_history) |
|
|
|
|
|
|
|
|
if response: |
|
|
return f"{response}\n\n*Note: A more comprehensive analysis from the uncensored HF model is being prepared...*" |
|
|
else: |
|
|
return "I'm processing your request... A deeper analysis is being prepared by the authoritative model." |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Hierarchical Ollama response failed: {e}") |
|
|
return "I'm thinking about your question... Preparing a comprehensive response." |
|
|
|
|
|
def _check_hf_availability(self) -> bool: |
|
|
"""Check if HF endpoint is configured and available""" |
|
|
try: |
|
|
from utils.config import config |
|
|
return bool(config.hf_token and config.hf_api_url) |
|
|
except: |
|
|
return False |
|
|
|
|
|
async def _gather_external_data(self, query: str) -> Dict: |
|
|
"""Gather external data from various sources""" |
|
|
data = {} |
|
|
|
|
|
|
|
|
if self.tavily_client or web_search_service.client: |
|
|
try: |
|
|
search_results = web_search_service.search(f"current information about {query}") |
|
|
if search_results: |
|
|
data['search_results'] = search_results |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Tavily search failed: {e}") |
|
|
|
|
|
|
|
|
weather_keywords = ['weather', 'temperature', 'forecast', 'climate', 'rain', 'sunny'] |
|
|
if any(keyword in query.lower() for keyword in weather_keywords): |
|
|
try: |
|
|
location = self._extract_location(query) or "New York" |
|
|
weather = weather_service.get_current_weather(location) |
|
|
if weather: |
|
|
data['weather'] = weather |
|
|
except Exception as e: |
|
|
logger.warning(f"Weather data failed: {e}") |
|
|
|
|
|
|
|
|
data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
return data |
|
|
|
|
|
def _extract_location(self, query: str) -> Optional[str]: |
|
|
"""Extract location from query""" |
|
|
locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney', |
|
|
'Los Angeles', 'Chicago', 'Miami', 'Seattle', 'Boston', |
|
|
'San Francisco', 'Toronto', 'Vancouver', 'Montreal'] |
|
|
for loc in locations: |
|
|
if loc.lower() in query.lower(): |
|
|
return loc |
|
|
return "New York" |
|
|
|
|
|
def get_coordination_status(self) -> Dict: |
|
|
"""Get current coordination system status""" |
|
|
return { |
|
|
'tavily_available': self.tavily_client is not None, |
|
|
'weather_available': weather_service.api_key is not None, |
|
|
'web_search_enabled': self.tavily_client is not None, |
|
|
'external_apis_configured': any([ |
|
|
weather_service.api_key, |
|
|
os.getenv("TAVILY_API_KEY"), |
|
|
os.getenv("NASA_API_KEY") |
|
|
]) |
|
|
} |
|
|
|
|
|
def get_recent_activities(self, user_id: str) -> Dict: |
|
|
"""Get recent coordination activities for user""" |
|
|
try: |
|
|
session = session_manager.get_session(user_id) |
|
|
coord_stats = session.get('ai_coordination', {}) |
|
|
return { |
|
|
'last_request': coord_stats.get('last_coordination'), |
|
|
'requests_processed': coord_stats.get('requests_processed', 0), |
|
|
'ollama_responses': coord_stats.get('ollama_responses', 0), |
|
|
'hf_responses': coord_stats.get('hf_responses', 0) |
|
|
} |
|
|
except: |
|
|
return {} |
|
|
|
|
|
|
|
|
coordinator = AICoordinator() |
|
|
|