File size: 8,703 Bytes
66dbebd ae20ff2 7efa4f1 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd ae20ff2 66dbebd f809b88 7efa4f1 ae20ff2 f809b88 ae20ff2 f809b88 ae20ff2 f809b88 ae20ff2 f809b88 ae20ff2 f809b88 ae20ff2 f809b88 ae20ff2 7efa4f1 f809b88 7efa4f1 ae20ff2 7efa4f1 ae20ff2 7efa4f1 ae20ff2 66dbebd 7efa4f1 66dbebd 2bb821d 7efa4f1 66dbebd 2bb821d f809b88 66dbebd 7efa4f1 66dbebd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# orchestrator_engine.py
import uuid
import logging
import time
from datetime import datetime
logger = logging.getLogger(__name__)
class MVPOrchestrator:
def __init__(self, llm_router, context_manager, agents):
self.llm_router = llm_router
self.context_manager = context_manager
self.agents = agents
self.execution_trace = []
logger.info("MVPOrchestrator initialized")
async def process_request(self, session_id: str, user_input: str) -> dict:
"""
Main orchestration flow with academic differentiation
"""
logger.info(f"Processing request for session {session_id}")
logger.info(f"User input: {user_input[:100]}")
# Clear previous trace for new request
self.execution_trace = []
start_time = time.time()
try:
# Step 1: Generate unique interaction ID
interaction_id = self._generate_interaction_id(session_id)
logger.info(f"Generated interaction ID: {interaction_id}")
# Step 2: Context management
logger.info("Step 2: Managing context...")
context = await self.context_manager.manage_context(session_id, user_input)
logger.info(f"Context retrieved: {len(context.get('interactions', []))} interactions")
# Step 3: Intent recognition with CoT
logger.info("Step 3: Recognizing intent...")
self.execution_trace.append({
"step": "intent_recognition",
"agent": "intent_recognition",
"status": "executing"
})
intent_result = await self.agents['intent_recognition'].execute(
user_input=user_input,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"primary_intent": intent_result.get('primary_intent', 'unknown')}
})
logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}")
# Step 4: Agent execution planning
logger.info("Step 4: Creating execution plan...")
execution_plan = await self._create_execution_plan(intent_result, context)
# Step 5: Parallel agent execution
logger.info("Step 5: Executing agents...")
agent_results = await self._execute_agents(execution_plan, user_input, context)
logger.info(f"Agent execution complete: {len(agent_results)} results")
# Step 6: Response synthesis
logger.info("Step 6: Synthesizing response...")
self.execution_trace.append({
"step": "response_synthesis",
"agent": "response_synthesis",
"status": "executing"
})
final_response = await self.agents['response_synthesis'].execute(
agent_outputs=agent_results,
user_input=user_input,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')}
})
# Step 7: Safety and bias check
logger.info("Step 7: Safety check...")
self.execution_trace.append({
"step": "safety_check",
"agent": "safety_check",
"status": "executing"
})
safety_checked = await self.agents['safety_check'].execute(
response=final_response,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"warnings": safety_checked.get('warnings', [])}
})
processing_time = time.time() - start_time
result = self._format_final_output(safety_checked, interaction_id, {
'intent': intent_result.get('primary_intent', 'unknown'),
'execution_plan': execution_plan,
'processing_steps': [
'Context management',
'Intent recognition',
'Execution planning',
'Agent execution',
'Response synthesis',
'Safety check'
],
'processing_time': processing_time,
'agents_used': list(self.agents.keys()),
'intent_result': intent_result,
'synthesis_result': final_response
})
logger.info(f"Request processing complete. Response length: {len(str(result.get('response', '')))}")
return result
except Exception as e:
logger.error(f"Error in process_request: {e}", exc_info=True)
processing_time = time.time() - start_time
return {
"response": f"Error processing request: {str(e)}",
"error": str(e),
"interaction_id": str(uuid.uuid4())[:8],
"agent_trace": [],
"timestamp": datetime.now().isoformat(),
"metadata": {
"agents_used": [],
"processing_time": processing_time,
"token_count": 0,
"warnings": []
}
}
def _generate_interaction_id(self, session_id: str) -> str:
"""
Generate unique interaction identifier
"""
timestamp = datetime.now().isoformat()
unique_id = str(uuid.uuid4())[:8]
return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}"
async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict:
"""
Create execution plan based on intent recognition
"""
# TODO: Implement agent selection and sequencing logic
return {
"agents_to_execute": [],
"execution_order": "parallel",
"priority": "normal"
}
async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict:
"""
Execute agents in parallel or sequential order based on plan
"""
# TODO: Implement parallel/sequential agent execution
return {}
def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict:
"""
Format final output with tracing and metadata
"""
# Extract the actual response text from various possible locations
response_text = (
response.get("final_response") or
response.get("safety_checked_response") or
response.get("original_response") or
response.get("response") or
str(response.get("result", ""))
)
if not response_text:
response_text = "I apologize, but I'm having trouble generating a response right now. Please try again."
# Extract warnings from safety check result
warnings = []
if "warnings" in response:
warnings = response["warnings"] if isinstance(response["warnings"], list) else []
# Build metadata dict
metadata = {
"agents_used": response.get("agents_used", []),
"processing_time": response.get("processing_time", 0),
"token_count": response.get("token_count", 0),
"warnings": warnings
}
# Merge in any additional metadata
if additional_metadata:
metadata.update(additional_metadata)
return {
"interaction_id": interaction_id,
"response": response_text,
"final_response": response_text, # Also provide as final_response for compatibility
"confidence_score": response.get("confidence_score", 0.7),
"agent_trace": self.execution_trace if self.execution_trace else [
{"step": "complete", "agent": "orchestrator", "status": "completed"}
],
"timestamp": datetime.now().isoformat(),
"metadata": metadata
}
def get_execution_trace(self) -> list:
"""
Return execution trace for debugging and analysis
"""
return self.execution_trace
def clear_execution_trace(self):
"""
Clear the execution trace
"""
self.execution_trace = []
|