rdune71 commited on
Commit
a20d863
Β·
1 Parent(s): 482aace

Implement hierarchical multi-model conversation architecture with HF as authoritative layer

Browse files
core/coordinator.py CHANGED
@@ -1,6 +1,6 @@
1
  import asyncio
2
  import logging
3
- from typing import List, Dict, Optional, Tuple
4
  from core.llm_factory import llm_factory
5
  from core.session import session_manager
6
  from services.hf_endpoint_monitor import hf_monitor
@@ -12,174 +12,282 @@ except ImportError:
12
  TavilyClient = None
13
  TAVILY_AVAILABLE = False
14
  import os
 
 
15
 
16
  logger = logging.getLogger(__name__)
17
 
18
  class AICoordinator:
19
- """Coordinate multiple AI models and external services"""
20
 
21
  def __init__(self):
22
  self.tavily_client = None
23
  if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"):
24
  self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
 
26
- async def coordinate_response(self, user_id: str, user_query: str) -> Dict:
27
  """
28
- Coordinate Ollama (fast) and HF (deep) responses
29
 
30
- Returns:
31
- Dict with 'immediate_response' and 'final_response'
 
 
 
 
32
  """
33
  try:
34
  # Get conversation history
35
  session = session_manager.get_session(user_id)
36
- conversation_history = session.get("conversation", [])
37
 
38
- # Step 1: Gather external data with Ollama
39
- logger.info("Step 1: Gathering external data...")
 
 
40
  external_data = await self._gather_external_data(user_query)
41
 
42
- # Step 2: Get immediate Ollama response (fast)
43
- logger.info("Step 2: Getting immediate Ollama response...")
44
- immediate_response = await self._get_ollama_response(
45
  user_query, conversation_history, external_data
46
  )
47
 
48
- # Step 3: Initialize HF endpoint in background
49
- logger.info("Step 3: Initializing HF endpoint...")
50
- hf_task = asyncio.create_task(self._initialize_and_get_hf_response(
51
- user_query, conversation_history, external_data, immediate_response
52
- ))
53
-
54
- # Return immediate response while HF processes
55
- return {
56
- 'immediate_response': immediate_response,
57
- 'hf_task': hf_task, # Background task for HF processing
58
- 'external_data': external_data
59
- }
60
-
61
- except Exception as e:
62
- logger.error(f"Coordination failed: {e}")
63
- # Fallback to simple Ollama response
64
- immediate_response = await self._get_ollama_response(
65
- user_query, conversation_history, {}
66
- )
67
- return {
68
- 'immediate_response': immediate_response,
69
- 'hf_task': None,
70
- 'external_data': {}
71
- }
72
-
73
- async def _gather_external_data(self, query: str) -> Dict:
74
- """Gather external data from various sources"""
75
- data = {}
76
-
77
- # Tavily/DuckDuckGo search
78
- if self.tavily_client:
79
- try:
80
- search_result = self.tavily_client.search(query, max_results=3)
81
- data['search_results'] = search_result.get('results', [])
82
- except Exception as e:
83
- logger.warning(f"Tavily search failed: {e}")
84
-
85
- # Weather data (if location mentioned)
86
- if 'weather' in query.lower() or 'temperature' in query.lower():
87
- try:
88
- # Extract location from query or use default
89
- location = self._extract_location(query) or "New York"
90
- weather = weather_service.get_current_weather(location)
91
- if weather:
92
- data['weather'] = weather
93
- except Exception as e:
94
- logger.warning(f"Weather data failed: {e}")
95
-
96
- # Current date/time
97
- from datetime import datetime
98
- data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
99
-
100
- return data
101
-
102
- async def _get_ollama_response(self, query: str, history: List, external_data: Dict) -> str:
103
- """Get fast response from Ollama"""
104
- try:
105
- # Enhance query with external data
106
- enhanced_query = self._enhance_query_with_data(query, external_data)
107
-
108
- # Get Ollama provider
109
- ollama_provider = llm_factory.get_provider('ollama')
110
- if not ollama_provider:
111
- raise Exception("Ollama provider not available")
112
 
113
- # Prepare conversation with external context
114
- enhanced_history = history.copy()
115
- if external_data:
116
- context_message = {
117
- "role": "system",
118
- "content": f"External context: {str(external_data)}"
119
- }
120
- enhanced_history.insert(0, context_message)
121
 
122
- enhanced_history.append({"role": "user", "content": enhanced_query})
 
 
 
 
 
 
 
 
 
123
 
124
- # Generate response
125
- response = ollama_provider.generate(enhanced_query, enhanced_history)
126
- return response or "I'm processing your request..."
127
 
128
  except Exception as e:
129
- logger.error(f"Ollama response failed: {e}")
130
- return "I'm thinking about your question..."
 
 
 
 
 
 
 
 
 
 
 
131
 
132
- async def _initialize_and_get_hf_response(self, query: str, history: List,
133
- external_data: Dict, ollama_response: str) -> Optional[str]:
134
- """Initialize HF endpoint and get deep analysis"""
 
135
  try:
136
- # Check if HF endpoint is available
137
  hf_status = hf_monitor.check_endpoint_status()
138
 
139
  if not hf_status['available']:
140
- logger.info("HF endpoint not available, attempting to warm up...")
141
- # Try to warm up the endpoint
142
- warmup_success = hf_monitor.warm_up_endpoint()
143
  if not warmup_success:
144
- return None
 
145
 
146
  # Get HF provider
147
  hf_provider = llm_factory.get_provider('huggingface')
148
  if not hf_provider:
149
- return None
 
150
 
151
- # Prepare enhanced conversation for HF
152
  enhanced_history = history.copy()
153
 
154
- # Add Ollama's initial response for HF to consider
 
 
 
 
 
 
155
  enhanced_history.append({
156
- "role": "assistant",
157
- "content": f"Initial response (for reference): {ollama_response}"
 
 
 
 
 
 
 
158
  })
159
 
160
- # Add external data context
161
- if external_data:
162
- context_message = {
163
- "role": "system",
164
- "content": f"Additional context data: {str(external_data)}"
165
- }
166
- enhanced_history.insert(0, context_message)
167
 
168
- # Add HF's role instruction
169
- enhanced_history.append({
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
170
  "role": "system",
171
- "content": "You are providing deep analysis and second opinions. Consider the initial response and enhance it with deeper insights."
172
  })
173
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
  enhanced_history.append({"role": "user", "content": query})
175
 
176
- # Generate deep response
177
- deep_response = hf_provider.generate(query, enhanced_history)
178
- return deep_response
179
 
 
 
 
 
 
 
180
  except Exception as e:
181
- logger.error(f"HF response failed: {e}")
182
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
 
184
  def get_coordination_status(self) -> Dict:
185
  """Get current coordination system status"""
@@ -208,38 +316,5 @@ class AICoordinator:
208
  except:
209
  return {}
210
 
211
- def _enhance_query_with_data(self, query: str, data: Dict) -> str:
212
- """Enhance query with gathered external data"""
213
- if not data:
214
- return query
215
-
216
- context_parts = []
217
-
218
- if 'search_results' in data:
219
- context_parts.append("Recent information:")
220
- for result in data['search_results'][:2]: # Limit to 2 results
221
- context_parts.append(f"- {result.get('title', 'Result')}: {result.get('content', '')[:100]}...")
222
-
223
- if 'weather' in data:
224
- weather = data['weather']
225
- context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}Β°C in {weather.get('city', 'Unknown')}")
226
-
227
- if 'current_datetime' in data:
228
- context_parts.append(f"Current time: {data['current_datetime']}")
229
-
230
- if context_parts:
231
- return f"{query}\n\nContext: {' '.join(context_parts)}"
232
-
233
- return query
234
-
235
- def _extract_location(self, query: str) -> Optional[str]:
236
- """Extract location from query (simple implementation)"""
237
- # This could be enhanced with NER or more sophisticated parsing
238
- locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney']
239
- for loc in locations:
240
- if loc.lower() in query.lower():
241
- return loc
242
- return None
243
-
244
  # Global coordinator instance
245
  coordinator = AICoordinator()
 
1
  import asyncio
2
  import logging
3
+ from typing import List, Dict, Optional, AsyncGenerator
4
  from core.llm_factory import llm_factory
5
  from core.session import session_manager
6
  from services.hf_endpoint_monitor import hf_monitor
 
12
  TavilyClient = None
13
  TAVILY_AVAILABLE = False
14
  import os
15
+ import json
16
+ from datetime import datetime
17
 
18
  logger = logging.getLogger(__name__)
19
 
20
  class AICoordinator:
21
+ """Hierarchical multi-model coordinator with HF endpoint as authoritative layer"""
22
 
23
  def __init__(self):
24
  self.tavily_client = None
25
  if TAVILY_AVAILABLE and os.getenv("TAVILY_API_KEY"):
26
  self.tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
27
+
28
+ # System instructions for hierarchical behavior
29
+ self.system_instructions = {
30
+ 'ollama_role': """You are an AI assistant that works in coordination with a more powerful uncensored Hugging Face model.
31
+
32
+ Your role is to:
33
+ 1. Provide immediate, concise responses using available information
34
+ 2. Summarize and simplify complex information
35
+ 3. Enhance responses with local knowledge
36
+ 4. DEFER TO THE HF MODEL for deep analysis, uncensored insights, and authoritative answers
37
+ 5. The HF model has 8192 token capacity and streaming capability - it's the thinking layer
38
+ 6. Always acknowledge that the HF model's insights supersede your initial response
39
+ 7. Use web sources to justify points when possible
40
+ 8. Keep responses brief and focused while awaiting HF input""",
41
+
42
+ 'hf_role': """You are an uncensored, authoritative AI with 8192 token capacity and streaming capability.
43
+ Your role is to:
44
+ 1. Provide deep, uncensored analysis and insights
45
+ 2. Supersede the local Ollama model's logic when there's disagreement
46
+ 3. Think thoroughly and provide comprehensive responses
47
+ 4. Use your full 8192 token capacity for detailed analysis
48
+ 5. Stream your responses for real-time delivery
49
+ 6. Build upon the conversation context and Ollama's initial response
50
+ 7. Provide authoritative answers that take precedence"""
51
+ }
52
 
53
+ async def coordinate_hierarchical_conversation(self, user_id: str, user_query: str) -> AsyncGenerator[Dict, None]:
54
  """
55
+ Coordinate hierarchical conversation flow with HF as authoritative layer
56
 
57
+ Yields:
58
+ Dict with 'type' and 'content' fields:
59
+ - {'type': 'initial_response', 'content': str}
60
+ - {'type': 'coordination_status', 'content': str}
61
+ - {'type': 'hf_thinking', 'content': str} # Streaming HF response
62
+ - {'type': 'final_response', 'content': str}
63
  """
64
  try:
65
  # Get conversation history
66
  session = session_manager.get_session(user_id)
67
+ conversation_history = session.get("conversation", []).copy()
68
 
69
+ yield {'type': 'coordination_status', 'content': 'πŸš€ Initiating hierarchical AI coordination...'}
70
+
71
+ # Step 1: Gather external data
72
+ yield {'type': 'coordination_status', 'content': 'πŸ” Gathering external context...'}
73
  external_data = await self._gather_external_data(user_query)
74
 
75
+ # Step 2: Get initial Ollama response with hierarchical awareness
76
+ yield {'type': 'coordination_status', 'content': 'πŸ¦™ Getting initial response from Ollama...'}
77
+ ollama_response = await self._get_hierarchical_ollama_response(
78
  user_query, conversation_history, external_data
79
  )
80
 
81
+ # Send initial response
82
+ yield {'type': 'initial_response', 'content': ollama_response}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
+ # Step 3: Coordinate with authoritative HF endpoint
85
+ yield {'type': 'coordination_status', 'content': 'πŸ€— Engaging HF endpoint for deep analysis...'}
 
 
 
 
 
 
86
 
87
+ # Check HF availability and coordinate
88
+ hf_available = self._check_hf_availability()
89
+ if hf_available:
90
+ async for hf_chunk in self._coordinate_hierarchical_hf_response(
91
+ user_id, user_query, conversation_history,
92
+ external_data, ollama_response
93
+ ):
94
+ yield hf_chunk
95
+ else:
96
+ yield {'type': 'coordination_status', 'content': 'ℹ️ HF endpoint not available - using Ollama response'}
97
 
98
+ # Final coordination status
99
+ yield {'type': 'coordination_status', 'content': 'βœ… Hierarchical coordination complete'}
 
100
 
101
  except Exception as e:
102
+ logger.error(f"Hierarchical coordination failed: {e}")
103
+ yield {'type': 'coordination_status', 'content': f'❌ Coordination error: {str(e)}'}
104
+
105
+ # Fallback to simple Ollama response
106
+ try:
107
+ session = session_manager.get_session(user_id)
108
+ conversation_history = session.get("conversation", [])
109
+ fallback_response = await self._get_hierarchical_ollama_response(
110
+ user_query, conversation_history, {}
111
+ )
112
+ yield {'type': 'initial_response', 'content': fallback_response}
113
+ except:
114
+ yield {'type': 'initial_response', 'content': "I'm here to help! What would you like to discuss?"}
115
 
116
+ async def _coordinate_hierarchical_hf_response(self, user_id: str, query: str,
117
+ history: List, external_data: Dict,
118
+ ollama_response: str) -> AsyncGenerator[Dict, None]:
119
+ """Coordinate with HF endpoint as authoritative layer with streaming"""
120
  try:
121
+ # Check and warm up HF endpoint if needed
122
  hf_status = hf_monitor.check_endpoint_status()
123
 
124
  if not hf_status['available']:
125
+ yield {'type': 'coordination_status', 'content': '⚑ Initializing HF endpoint (2-4 minutes)...'}
126
+ warmup_success = hf_monitor.handle_scale_to_zero()
 
127
  if not warmup_success:
128
+ yield {'type': 'coordination_status', 'content': '❌ HF endpoint initialization failed'}
129
+ return
130
 
131
  # Get HF provider
132
  hf_provider = llm_factory.get_provider('huggingface')
133
  if not hf_provider:
134
+ yield {'type': 'coordination_status', 'content': '❌ HF provider not available'}
135
+ return
136
 
137
+ # Prepare enhanced conversation for HF with hierarchical context
138
  enhanced_history = history.copy()
139
 
140
+ # Add system instructions for HF
141
+ enhanced_history.insert(0, {
142
+ "role": "system",
143
+ "content": self.system_instructions['hf_role']
144
+ })
145
+
146
+ # Add context about the coordination
147
  enhanced_history.append({
148
+ "role": "system",
149
+ "content": f"""Context for your authoritative analysis:
150
+ User Query: '{query}'
151
+ Ollama's Initial Response: '{ollama_response}'
152
+ External Data Available: {json.dumps(external_data, indent=2) if external_data else 'None'}
153
+
154
+ Please provide your deep, uncensored analysis that supersedes the local model's response.
155
+ Use your full 8192 token capacity for comprehensive thinking.
156
+ Stream your response for real-time delivery."""
157
  })
158
 
159
+ # Add the user's latest query
160
+ enhanced_history.append({"role": "user", "content": query})
 
 
 
 
 
161
 
162
+ # Stream HF response with full 8192 token capacity
163
+ yield {'type': 'coordination_status', 'content': '🧠 HF endpoint thinking...'}
164
+
165
+ # Use streaming for real-time delivery
166
+ hf_response_stream = hf_provider.stream_generate(query, enhanced_history)
167
+
168
+ if hf_response_stream:
169
+ # Stream the response chunks
170
+ full_hf_response = ""
171
+ for chunk in hf_response_stream:
172
+ if chunk:
173
+ full_hf_response += chunk
174
+ yield {'type': 'hf_thinking', 'content': chunk}
175
+
176
+ # Final HF response
177
+ yield {'type': 'final_response', 'content': full_hf_response}
178
+ yield {'type': 'coordination_status', 'content': '🎯 HF analysis complete and authoritative'}
179
+ else:
180
+ yield {'type': 'coordination_status', 'content': '❌ HF response generation failed'}
181
+
182
+ except Exception as e:
183
+ logger.error(f"Hierarchical HF coordination failed: {e}")
184
+ yield {'type': 'coordination_status', 'content': f'❌ HF coordination error: {str(e)}'}
185
+
186
+ async def _get_hierarchical_ollama_response(self, query: str, history: List, external_data: Dict) -> str:
187
+ """Get Ollama response with hierarchical awareness"""
188
+ try:
189
+ # Get Ollama provider
190
+ ollama_provider = llm_factory.get_provider('ollama')
191
+ if not ollama_provider:
192
+ raise Exception("Ollama provider not available")
193
+
194
+ # Prepare conversation with hierarchical context
195
+ enhanced_history = history.copy()
196
+
197
+ # Add system instruction for Ollama's role
198
+ enhanced_history.insert(0, {
199
  "role": "system",
200
+ "content": self.system_instructions['ollama_role']
201
  })
202
 
203
+ # Add external data context if available
204
+ if external_data:
205
+ context_parts = []
206
+ if 'search_answer' in external_data:
207
+ context_parts.append(f"Current information: {external_data['search_answer']}")
208
+ if 'weather' in external_data:
209
+ weather = external_data['weather']
210
+ context_parts.append(f"Current weather: {weather.get('temperature', 'N/A')}Β°C in {weather.get('city', 'Unknown')}")
211
+ if 'current_datetime' in external_data:
212
+ context_parts.append(f"Current time: {external_data['current_datetime']}")
213
+
214
+ if context_parts:
215
+ context_message = {
216
+ "role": "system",
217
+ "content": "Context: " + " | ".join(context_parts)
218
+ }
219
+ enhanced_history.insert(1, context_message) # Insert after role instruction
220
+
221
+ # Add the user's query
222
  enhanced_history.append({"role": "user", "content": query})
223
 
224
+ # Generate response with awareness of HF's superior capabilities
225
+ response = ollama_provider.generate(query, enhanced_history)
 
226
 
227
+ # Add acknowledgment of HF's authority
228
+ if response:
229
+ return f"{response}\n\n*Note: A more comprehensive analysis from the uncensored HF model is being prepared...*"
230
+ else:
231
+ return "I'm processing your request... A deeper analysis is being prepared by the authoritative model."
232
+
233
  except Exception as e:
234
+ logger.error(f"Hierarchical Ollama response failed: {e}")
235
+ return "I'm thinking about your question... Preparing a comprehensive response."
236
+
237
+ def _check_hf_availability(self) -> bool:
238
+ """Check if HF endpoint is configured and available"""
239
+ try:
240
+ from utils.config import config
241
+ return bool(config.hf_token and config.hf_api_url)
242
+ except:
243
+ return False
244
+
245
+ async def _gather_external_data(self, query: str) -> Dict:
246
+ """Gather external data from various sources"""
247
+ data = {}
248
+
249
+ # Tavily/DuckDuckGo search with justification focus
250
+ if self.tavily_client:
251
+ try:
252
+ search_result = self.tavily_client.search(
253
+ f"current information about {query}",
254
+ max_results=5, # More results for better justification
255
+ include_answer=True,
256
+ include_raw_content=True # For deeper analysis
257
+ )
258
+ data['search_results'] = search_result.get('results', [])
259
+ if search_result.get('answer'):
260
+ data['search_answer'] = search_result['answer']
261
+ # Store raw content for HF to analyze
262
+ data['raw_sources'] = [result.get('raw_content', '')[:1000] for result in search_result.get('results', [])[:3]]
263
+ except Exception as e:
264
+ logger.warning(f"Tavily search failed: {e}")
265
+
266
+ # Weather data
267
+ weather_keywords = ['weather', 'temperature', 'forecast', 'climate', 'rain', 'sunny']
268
+ if any(keyword in query.lower() for keyword in weather_keywords):
269
+ try:
270
+ location = self._extract_location(query) or "New York"
271
+ weather = weather_service.get_current_weather(location)
272
+ if weather:
273
+ data['weather'] = weather
274
+ except Exception as e:
275
+ logger.warning(f"Weather data failed: {e}")
276
+
277
+ # Current date/time
278
+ data['current_datetime'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
279
+
280
+ return data
281
+
282
+ def _extract_location(self, query: str) -> Optional[str]:
283
+ """Extract location from query"""
284
+ locations = ['New York', 'London', 'Tokyo', 'Paris', 'Berlin', 'Sydney',
285
+ 'Los Angeles', 'Chicago', 'Miami', 'Seattle', 'Boston',
286
+ 'San Francisco', 'Toronto', 'Vancouver', 'Montreal']
287
+ for loc in locations:
288
+ if loc.lower() in query.lower():
289
+ return loc
290
+ return "New York" # Default
291
 
292
  def get_coordination_status(self) -> Dict:
293
  """Get current coordination system status"""
 
316
  except:
317
  return {}
318
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
319
  # Global coordinator instance
320
  coordinator = AICoordinator()
core/providers/huggingface.py CHANGED
@@ -3,9 +3,7 @@ import logging
3
  from typing import List, Dict, Optional, Union
4
  from core.providers.base import LLMProvider
5
  from utils.config import config
6
-
7
  logger = logging.getLogger(__name__)
8
-
9
  try:
10
  from openai import OpenAI
11
  HUGGINGFACE_SDK_AVAILABLE = True
@@ -18,17 +16,15 @@ class HuggingFaceProvider(LLMProvider):
18
 
19
  def __init__(self, model_name: str, timeout: int = 30, max_retries: int = 3):
20
  super().__init__(model_name, timeout, max_retries)
21
-
22
  logger.info(f"Initializing HuggingFaceProvider with:")
23
- logger.info(f" HF_API_URL: {config.hf_api_url}")
24
- logger.info(f" HF_TOKEN SET: {bool(config.hf_token)}")
25
 
26
  if not HUGGINGFACE_SDK_AVAILABLE:
27
  raise ImportError("Hugging Face provider requires 'openai' package")
28
-
29
  if not config.hf_token:
30
  raise ValueError("HF_TOKEN not set - required for Hugging Face provider")
31
-
32
  # Make sure NO proxies parameter is included
33
  try:
34
  self.client = OpenAI(
@@ -40,7 +36,7 @@ class HuggingFaceProvider(LLMProvider):
40
  logger.error(f"Failed to initialize HuggingFaceProvider: {e}")
41
  logger.error(f"Error type: {type(e)}")
42
  raise
43
-
44
  def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
45
  """Generate a response synchronously"""
46
  try:
@@ -48,7 +44,7 @@ class HuggingFaceProvider(LLMProvider):
48
  except Exception as e:
49
  logger.error(f"Hugging Face generation failed: {e}")
50
  return None
51
-
52
  def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
53
  """Generate a response with streaming support"""
54
  try:
@@ -56,7 +52,7 @@ class HuggingFaceProvider(LLMProvider):
56
  except Exception as e:
57
  logger.error(f"Hugging Face stream generation failed: {e}")
58
  return None
59
-
60
  def validate_model(self) -> bool:
61
  """Validate if the model is available"""
62
  # For Hugging Face endpoints, we'll assume the model is valid if we can connect
@@ -67,15 +63,18 @@ class HuggingFaceProvider(LLMProvider):
67
  except Exception as e:
68
  logger.warning(f"Hugging Face model validation failed: {e}")
69
  return False
70
-
71
  def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str:
72
- """Implementation of synchronous generation"""
73
  try:
74
  response = self.client.chat.completions.create(
75
  model=self.model_name,
76
  messages=conversation_history,
77
- max_tokens=500,
78
- temperature=0.7
 
 
 
79
  )
80
  return response.choices[0].message.content
81
  except Exception as e:
@@ -87,22 +86,28 @@ class HuggingFaceProvider(LLMProvider):
87
  response = self.client.chat.completions.create(
88
  model=self.model_name,
89
  messages=conversation_history,
90
- max_tokens=500,
91
- temperature=0.7
 
 
 
92
  )
93
  return response.choices[0].message.content
94
  else:
95
  raise
96
-
97
  def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
98
- """Implementation of streaming generation"""
99
  try:
100
  response = self.client.chat.completions.create(
101
  model=self.model_name,
102
  messages=conversation_history,
103
- max_tokens=500,
104
  temperature=0.7,
105
- stream=True
 
 
 
106
  )
107
 
108
  chunks = []
@@ -110,7 +115,7 @@ class HuggingFaceProvider(LLMProvider):
110
  content = chunk.choices[0].delta.content
111
  if content:
112
  chunks.append(content)
113
-
114
  return chunks
115
  except Exception as e:
116
  # Handle scale-to-zero behavior
@@ -121,9 +126,12 @@ class HuggingFaceProvider(LLMProvider):
121
  response = self.client.chat.completions.create(
122
  model=self.model_name,
123
  messages=conversation_history,
124
- max_tokens=500,
125
  temperature=0.7,
126
- stream=True
 
 
 
127
  )
128
 
129
  chunks = []
@@ -131,18 +139,15 @@ class HuggingFaceProvider(LLMProvider):
131
  content = chunk.choices[0].delta.content
132
  if content:
133
  chunks.append(content)
134
-
135
  return chunks
136
  else:
137
  raise
138
-
139
  def _is_scale_to_zero_error(self, error: Exception) -> bool:
140
  """Check if the error is related to scale-to-zero initialization"""
141
  error_str = str(error).lower()
142
  scale_to_zero_indicators = [
143
- "503",
144
- "service unavailable",
145
- "initializing",
146
- "cold start"
147
  ]
148
  return any(indicator in error_str for indicator in scale_to_zero_indicators)
 
3
  from typing import List, Dict, Optional, Union
4
  from core.providers.base import LLMProvider
5
  from utils.config import config
 
6
  logger = logging.getLogger(__name__)
 
7
  try:
8
  from openai import OpenAI
9
  HUGGINGFACE_SDK_AVAILABLE = True
 
16
 
17
  def __init__(self, model_name: str, timeout: int = 30, max_retries: int = 3):
18
  super().__init__(model_name, timeout, max_retries)
 
19
  logger.info(f"Initializing HuggingFaceProvider with:")
20
+ logger.info(f" HF_API_URL: {config.hf_api_url}")
21
+ logger.info(f" HF_TOKEN SET: {bool(config.hf_token)}")
22
 
23
  if not HUGGINGFACE_SDK_AVAILABLE:
24
  raise ImportError("Hugging Face provider requires 'openai' package")
 
25
  if not config.hf_token:
26
  raise ValueError("HF_TOKEN not set - required for Hugging Face provider")
27
+
28
  # Make sure NO proxies parameter is included
29
  try:
30
  self.client = OpenAI(
 
36
  logger.error(f"Failed to initialize HuggingFaceProvider: {e}")
37
  logger.error(f"Error type: {type(e)}")
38
  raise
39
+
40
  def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
41
  """Generate a response synchronously"""
42
  try:
 
44
  except Exception as e:
45
  logger.error(f"Hugging Face generation failed: {e}")
46
  return None
47
+
48
  def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
49
  """Generate a response with streaming support"""
50
  try:
 
52
  except Exception as e:
53
  logger.error(f"Hugging Face stream generation failed: {e}")
54
  return None
55
+
56
  def validate_model(self) -> bool:
57
  """Validate if the model is available"""
58
  # For Hugging Face endpoints, we'll assume the model is valid if we can connect
 
63
  except Exception as e:
64
  logger.warning(f"Hugging Face model validation failed: {e}")
65
  return False
66
+
67
  def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str:
68
+ """Implementation of synchronous generation with proper configuration"""
69
  try:
70
  response = self.client.chat.completions.create(
71
  model=self.model_name,
72
  messages=conversation_history,
73
+ max_tokens=8192, # Set to 8192 as requested
74
+ temperature=0.7,
75
+ top_p=0.9,
76
+ frequency_penalty=0.1,
77
+ presence_penalty=0.1
78
  )
79
  return response.choices[0].message.content
80
  except Exception as e:
 
86
  response = self.client.chat.completions.create(
87
  model=self.model_name,
88
  messages=conversation_history,
89
+ max_tokens=8192, # Set to 8192 as requested
90
+ temperature=0.7,
91
+ top_p=0.9,
92
+ frequency_penalty=0.1,
93
+ presence_penalty=0.1
94
  )
95
  return response.choices[0].message.content
96
  else:
97
  raise
98
+
99
  def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
100
+ """Implementation of streaming generation with proper configuration"""
101
  try:
102
  response = self.client.chat.completions.create(
103
  model=self.model_name,
104
  messages=conversation_history,
105
+ max_tokens=8192, # Set to 8192 as requested
106
  temperature=0.7,
107
+ top_p=0.9,
108
+ frequency_penalty=0.1,
109
+ presence_penalty=0.1,
110
+ stream=True # Enable streaming
111
  )
112
 
113
  chunks = []
 
115
  content = chunk.choices[0].delta.content
116
  if content:
117
  chunks.append(content)
118
+
119
  return chunks
120
  except Exception as e:
121
  # Handle scale-to-zero behavior
 
126
  response = self.client.chat.completions.create(
127
  model=self.model_name,
128
  messages=conversation_history,
129
+ max_tokens=8192, # Set to 8192 as requested
130
  temperature=0.7,
131
+ top_p=0.9,
132
+ frequency_penalty=0.1,
133
+ presence_penalty=0.1,
134
+ stream=True # Enable streaming
135
  )
136
 
137
  chunks = []
 
139
  content = chunk.choices[0].delta.content
140
  if content:
141
  chunks.append(content)
142
+
143
  return chunks
144
  else:
145
  raise
146
+
147
  def _is_scale_to_zero_error(self, error: Exception) -> bool:
148
  """Check if the error is related to scale-to-zero initialization"""
149
  error_str = str(error).lower()
150
  scale_to_zero_indicators = [
151
+ "503", "service unavailable", "initializing", "cold start"
 
 
 
152
  ]
153
  return any(indicator in error_str for indicator in scale_to_zero_indicators)
core/session.py CHANGED
@@ -74,7 +74,7 @@ class SessionManager:
74
  redis_data = {}
75
  for key, value in session.items():
76
  if isinstance(value, (list, dict)):
77
- redis_data[key] = json.dumps(value)
78
  elif isinstance(value, (int, float, str, bool)):
79
  redis_data[key] = value
80
  else:
@@ -121,7 +121,7 @@ class SessionManager:
121
  redis_data = {}
122
  for key, value in session.items():
123
  if isinstance(value, (dict, list)):
124
- redis_data[key] = json.dumps(value)
125
  else:
126
  redis_data[key] = value
127
 
@@ -137,6 +137,60 @@ class SessionManager:
137
  logger.error(f"Error updating coordination session for user {user_id}: {e}")
138
  return False
139
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
140
  def clear_session(self, user_id: str) -> bool:
141
  """Clear user session data
142
  Args:
 
74
  redis_data = {}
75
  for key, value in session.items():
76
  if isinstance(value, (list, dict)):
77
+ redis_data[key] = json.dumps(value, default=str)
78
  elif isinstance(value, (int, float, str, bool)):
79
  redis_data[key] = value
80
  else:
 
121
  redis_data = {}
122
  for key, value in session.items():
123
  if isinstance(value, (dict, list)):
124
+ redis_data[key] = json.dumps(value, default=str)
125
  else:
126
  redis_data[key] = value
127
 
 
137
  logger.error(f"Error updating coordination session for user {user_id}: {e}")
138
  return False
139
 
140
+ def update_hierarchical_coordination(self, user_id: str, coordination_data: Dict) -> bool:
141
+ """Update session with hierarchical coordination data"""
142
+ try:
143
+ # Get existing session
144
+ session = self.get_session(user_id)
145
+
146
+ # Add hierarchical coordination tracking
147
+ if 'hierarchical_coordination' not in session:
148
+ session['hierarchical_coordination'] = {
149
+ 'total_conversations': 0,
150
+ 'hf_engagements': 0,
151
+ 'ollama_responses': 0,
152
+ 'coordination_success': 0,
153
+ 'last_coordination': None
154
+ }
155
+
156
+ coord_stats = session['hierarchical_coordination']
157
+
158
+ # Update statistics
159
+ coord_stats['total_conversations'] += 1
160
+ coord_stats['last_coordination'] = datetime.now().isoformat()
161
+
162
+ # Update specific counters based on coordination data
163
+ if coordination_data.get('hf_engaged'):
164
+ coord_stats['hf_engagements'] += 1
165
+ if coordination_data.get('ollama_responded'):
166
+ coord_stats['ollama_responses'] += 1
167
+ if coordination_data.get('success'):
168
+ coord_stats['coordination_success'] += 1
169
+
170
+ # Convert complex data to JSON strings for Redis
171
+ redis_data = {}
172
+ for key, value in session.items():
173
+ if isinstance(value, (dict, list)):
174
+ redis_data[key] = json.dumps(value, default=str)
175
+ else:
176
+ redis_data[key] = value
177
+
178
+ # Save updated session
179
+ result = save_user_state(user_id, redis_data)
180
+ return result
181
+ except Exception as e:
182
+ logger.error(f"Error updating hierarchical coordination for user {user_id}: {e}")
183
+ return False
184
+
185
+ def get_hierarchical_stats(self, user_id: str) -> Dict:
186
+ """Get hierarchical coordination statistics"""
187
+ try:
188
+ session = self.get_session(user_id)
189
+ return session.get('hierarchical_coordination', {})
190
+ except Exception as e:
191
+ logger.error(f"Error getting hierarchical stats for user {user_id}: {e}")
192
+ return {}
193
+
194
  def clear_session(self, user_id: str) -> bool:
195
  """Clear user session data
196
  Args:
test_hierarchical_coordination.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ import asyncio
3
+ from pathlib import Path
4
+
5
+ # Add project root to path
6
+ project_root = Path(__file__).parent
7
+ sys.path.append(str(project_root))
8
+
9
+ from core.coordinator import coordinator
10
+ from core.session import session_manager
11
+
12
+ async def test_hierarchical_coordination():
13
+ """Test the hierarchical coordination system"""
14
+ print("=== Hierarchical Coordination System Test ===")
15
+ print()
16
+
17
+ # Test user query
18
+ user_query = "What are the key principles of effective time management?"
19
+ user_id = "test_user"
20
+
21
+ print(f"User Query: {user_query}")
22
+ print()
23
+
24
+ # Test coordination status
25
+ print("1. Testing Coordination Status:")
26
+ try:
27
+ coord_status = coordinator.get_coordination_status()
28
+ print(f" Tavily Available: {coord_status.get('tavily_available', False)}")
29
+ print(f" Weather Available: {coord_status.get('weather_available', False)}")
30
+ print(f" Web Search Enabled: {coord_status.get('web_search_enabled', False)}")
31
+ print(" βœ… Coordination Status Check Passed")
32
+ except Exception as e:
33
+ print(f" ❌ Coordination Status Check Failed: {e}")
34
+
35
+ print()
36
+
37
+ # Test hierarchical conversation coordination
38
+ print("2. Testing Hierarchical Conversation Coordination:")
39
+ try:
40
+ print(" Starting hierarchical coordination...")
41
+ response_count = 0
42
+
43
+ async for response_chunk in coordinator.coordinate_hierarchical_conversation(user_id, user_query):
44
+ response_count += 1
45
+ print(f" Chunk {response_count}: {response_chunk['type']} - {response_chunk['content'][:50]}...")
46
+
47
+ # Limit output for readability
48
+ if response_count >= 5:
49
+ print(" ... (truncated for brevity)")
50
+ break
51
+
52
+ print(" βœ… Hierarchical Coordination Test Passed")
53
+ except Exception as e:
54
+ print(f" ❌ Hierarchical Coordination Test Failed: {e}")
55
+
56
+ print()
57
+
58
+ # Test hierarchical session tracking
59
+ print("3. Testing Hierarchical Session Tracking:")
60
+ try:
61
+ # Update with test coordination data
62
+ test_data = {
63
+ 'hf_engaged': True,
64
+ 'ollama_responded': True,
65
+ 'success': True
66
+ }
67
+ update_result = session_manager.update_hierarchical_coordination(user_id, test_data)
68
+ print(f" Update Result: {'βœ… Success' if update_result else '❌ Failed'}")
69
+
70
+ # Get hierarchical stats
71
+ stats = session_manager.get_hierarchical_stats(user_id)
72
+ print(f" Total Conversations: {stats.get('total_conversations', 0)}")
73
+ print(f" HF Engagements: {stats.get('hf_engagements', 0)}")
74
+ print(f" Ollama Responses: {stats.get('ollama_responses', 0)}")
75
+ print(" βœ… Hierarchical Session Tracking Passed")
76
+ except Exception as e:
77
+ print(f" ❌ Hierarchical Session Tracking Failed: {e}")
78
+
79
+ print()
80
+ print("πŸŽ‰ Hierarchical Coordination System Test Completed!")
81
+
82
+ if __name__ == "__main__":
83
+ asyncio.run(test_hierarchical_coordination())