# app.py - Mobile-First Implementation import gradio as gr import uuid import logging import traceback from typing import Optional, Tuple, List, Dict, Any import os # Configure comprehensive logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('app.log') ] ) logger = logging.getLogger(__name__) # Try to import orchestration components orchestrator = None orchestrator_available = False # Import Process Flow Visualization try: from process_flow_visualizer import ( create_process_flow_tab, update_process_flow_visualization, clear_flow_history, export_flow_data ) process_flow_available = True logger.info("✓ Process Flow Visualization available") except ImportError as e: logger.warning(f"Process Flow Visualization not available: {e}") process_flow_available = False try: logger.info("Attempting to import orchestration components...") import sys sys.path.insert(0, '.') sys.path.insert(0, 'src') from src.agents.intent_agent import create_intent_agent from src.agents.synthesis_agent import create_synthesis_agent from src.agents.safety_agent import create_safety_agent from src.agents.skills_identification_agent import create_skills_identification_agent from src.llm_router import LLMRouter from src.orchestrator_engine import MVPOrchestrator from src.context_manager import EfficientContextManager from src.config import settings logger.info("✓ Successfully imported orchestration components") orchestrator_available = True except ImportError as e: logger.warning(f"Could not import orchestration components: {e}") logger.info("Will use placeholder mode") try: from spaces import GPU SPACES_GPU_AVAILABLE = True logger.info("HF Spaces GPU available") except ImportError: # Not running on HF Spaces or spaces module not available SPACES_GPU_AVAILABLE = False GPU = None logger.info("Running without HF Spaces GPU") def create_mobile_optimized_interface(): """Create the mobile-optimized Gradio interface and return demo with components""" # Store components for wiring interface_components = {} with gr.Blocks( title="AI Research Assistant MVP", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="gray", font=("Inter", "system-ui", "sans-serif") ), css=""" /* Mobile-first responsive CSS */ .mobile-container { max-width: 100vw; margin: 0 auto; padding: 0 12px; } /* Touch-friendly button sizing */ .gradio-button { min-height: 44px !important; min-width: 44px !important; font-size: 16px !important; /* Prevents zoom on iOS */ } /* Mobile-optimized chat interface */ .chatbot-container { height: 60vh !important; max-height: 60vh !important; overflow-y: auto !important; -webkit-overflow-scrolling: touch !important; } /* Mobile input enhancements */ .textbox-input { font-size: 16px !important; /* Prevents zoom */ min-height: 44px !important; padding: 12px !important; } /* Responsive grid adjustments */ @media (max-width: 768px) { .gradio-row { flex-direction: column !important; gap: 8px !important; } .gradio-column { width: 100% !important; } .chatbot-container { height: 50vh !important; } } /* Dark mode support */ @media (prefers-color-scheme: dark) { body { background: #1a1a1a; color: #ffffff; } } /* Hide scrollbars but maintain functionality */ .chatbot-container::-webkit-scrollbar { width: 4px; } /* Loading states */ .loading-indicator { display: flex; align-items: center; justify-content: center; padding: 20px; } /* Mobile menu enhancements */ .accordion-content { max-height: 200px !important; overflow-y: auto !important; } /* Skills Tags Styling */ #skills_tags_container { padding: 8px 12px; background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); border-radius: 8px; border: 1px solid #dee2e6; margin: 8px 0; min-height: 40px; display: flex; flex-wrap: wrap; align-items: center; gap: 6px; } .skill-tag { display: inline-block; background: linear-gradient(135deg, #007bff 0%, #0056b3 100%); color: white; padding: 4px 8px; border-radius: 12px; font-size: 12px; font-weight: 500; margin: 2px; box-shadow: 0 2px 4px rgba(0,123,255,0.2); transition: all 0.2s ease; cursor: pointer; } .skill-tag:hover { transform: translateY(-1px); box-shadow: 0 4px 8px rgba(0,123,255,0.3); } .skill-tag.high-confidence { background: linear-gradient(135deg, #28a745 0%, #1e7e34 100%); } .skill-tag.medium-confidence { background: linear-gradient(135deg, #ffc107 0%, #e0a800 100%); color: #212529; } .skill-tag.low-confidence { background: linear-gradient(135deg, #6c757d 0%, #495057 100%); } .skills-header { font-size: 11px; color: #6c757d; margin-right: 8px; font-weight: 600; } /* Dark mode support for skills */ @media (prefers-color-scheme: dark) { #skills_tags_container { background: linear-gradient(135deg, #2d3748 0%, #1a202c 100%); border-color: #4a5568; } .skills-header { color: #a0aec0; } } """ ) as demo: # Session Management (Mobile-Optimized) with gr.Column(elem_classes="mobile-container"): gr.Markdown(""" # 🧠 Research Assistant *Academic AI with transparent reasoning* """) # Session Header Bar (Mobile-Friendly) with gr.Row(): session_info = gr.Textbox( label="Session ID", value=str(uuid.uuid4())[:8], # Shortened for mobile max_lines=1, show_label=False, container=False, scale=3 ) interface_components['session_info'] = session_info new_session_btn = gr.Button( "🔄 New", size="sm", variant="secondary", scale=1, min_width=60 ) interface_components['new_session_btn'] = new_session_btn menu_toggle = gr.Button( "⚙️", size="sm", variant="secondary", scale=1, min_width=60 ) interface_components['menu_toggle'] = menu_toggle # Main Chat Area (Mobile-Optimized) with gr.Tabs() as main_tabs: with gr.TabItem("💬 Chat", id="chat_tab"): chatbot = gr.Chatbot( label="", show_label=False, height="60vh", elem_classes="chatbot-container", type="messages" ) interface_components['chatbot'] = chatbot # Skills Identification Display (between chat and input) with gr.Row(visible=False, elem_id="skills_display") as skills_display_row: skills_tags = gr.HTML( value="", show_label=False, elem_id="skills_tags_container" ) interface_components['skills_tags'] = skills_tags # Mobile Input Area with gr.Row(): message_input = gr.Textbox( placeholder="Ask me anything...", show_label=False, max_lines=3, container=False, scale=4, autofocus=True ) interface_components['message_input'] = message_input send_btn = gr.Button( "↑ Send", variant="primary", scale=1, min_width=80 ) interface_components['send_btn'] = send_btn # Technical Details Tab (Collapsible for Mobile) with gr.TabItem("🔍 Details", id="details_tab"): with gr.Accordion("Reasoning Chain", open=False): reasoning_display = gr.JSON( label="", show_label=False ) interface_components['reasoning_display'] = reasoning_display with gr.Accordion("Agent Performance", open=False): performance_display = gr.JSON( label="", show_label=False ) interface_components['performance_display'] = performance_display with gr.Accordion("Session Context", open=False): context_display = gr.JSON( label="", show_label=False ) interface_components['context_display'] = context_display # Process Flow Tab (if available) if process_flow_available: process_flow_tab = create_process_flow_tab(interface_components) interface_components['process_flow_tab'] = process_flow_tab # Mobile Bottom Navigation with gr.Row(visible=False, elem_id="mobile_nav") as mobile_navigation: chat_nav_btn = gr.Button("💬 Chat", variant="secondary", size="sm", min_width=0) details_nav_btn = gr.Button("🔍 Details", variant="secondary", size="sm", min_width=0) if process_flow_available: flow_nav_btn = gr.Button("🔄 Flow", variant="secondary", size="sm", min_width=0) settings_nav_btn = gr.Button("⚙️ Settings", variant="secondary", size="sm", min_width=0) # Settings Panel (Modal for Mobile) with gr.Column(visible=False, elem_id="settings_panel") as settings: interface_components['settings_panel'] = settings with gr.Accordion("Display Options", open=True): show_reasoning = gr.Checkbox( label="Show reasoning chain", value=True, info="Display step-by-step reasoning" ) interface_components['show_reasoning'] = show_reasoning show_agent_trace = gr.Checkbox( label="Show agent execution trace", value=False, info="Display which agents processed your request" ) interface_components['show_agent_trace'] = show_agent_trace compact_mode = gr.Checkbox( label="Compact mode", value=False, info="Optimize for smaller screens" ) interface_components['compact_mode'] = compact_mode with gr.Accordion("Performance Options", open=False): response_speed = gr.Radio( choices=["Fast", "Balanced", "Thorough"], value="Balanced", label="Response Speed Preference" ) interface_components['response_speed'] = response_speed cache_enabled = gr.Checkbox( label="Enable context caching", value=True, info="Faster responses using session memory" ) interface_components['cache_enabled'] = cache_enabled save_prefs_btn = gr.Button("Save Preferences", variant="primary") interface_components['save_prefs_btn'] = save_prefs_btn # Wire up the submit handler INSIDE the gr.Blocks context if 'send_btn' in interface_components and 'message_input' in interface_components and 'chatbot' in interface_components: # Store interface components globally for dynamic return values global _interface_components _interface_components = interface_components # Build outputs list dynamically outputs = _build_outputs_list(interface_components, process_flow_available) # Include session_info in inputs to pass session ID inputs = [interface_components['message_input'], interface_components['chatbot']] if 'session_info' in interface_components: inputs.append(interface_components['session_info']) interface_components['send_btn'].click( fn=chat_handler_fn, inputs=inputs, outputs=outputs ) # Wire up New Session button if 'new_session_btn' in interface_components and 'session_info' in interface_components: interface_components['new_session_btn'].click( fn=lambda: str(uuid.uuid4())[:8], outputs=[interface_components['session_info']] ) # Wire up Settings button to toggle settings panel if 'menu_toggle' in interface_components and 'settings_panel' in interface_components: def toggle_settings(visible): return gr.update(visible=not visible) interface_components['menu_toggle'].click( fn=toggle_settings, inputs=[interface_components['settings_panel']], outputs=[interface_components['settings_panel']] ) # Wire up Save Preferences button if 'save_prefs_btn' in interface_components: def save_preferences(*args): logger.info("Preferences saved") gr.Info("Preferences saved successfully!") interface_components['save_prefs_btn'].click( fn=save_preferences, inputs=[ interface_components.get('show_reasoning', None), interface_components.get('show_agent_trace', None), interface_components.get('response_speed', None), interface_components.get('cache_enabled', None) ] ) # Wire up Process Flow event handlers if available if process_flow_available: # Clear flow history button if 'clear_flow_btn' in interface_components: interface_components['clear_flow_btn'].click( fn=clear_flow_history, outputs=[ interface_components.get('flow_display'), interface_components.get('flow_stats'), interface_components.get('performance_metrics'), interface_components.get('intent_details'), interface_components.get('synthesis_details'), interface_components.get('safety_details') ] ) # Export flow data button if 'export_flow_btn' in interface_components: interface_components['export_flow_btn'].click( fn=export_flow_data, outputs=[gr.File(label="Download Flow Data")] ) # Share flow button (placeholder) if 'share_flow_btn' in interface_components: interface_components['share_flow_btn'].click( fn=lambda: gr.Info("Flow sharing feature coming soon!"), outputs=[] ) return demo, interface_components def setup_event_handlers(demo, event_handlers): """Setup event handlers for the interface""" # Find components by their labels or types components = {} for block in demo.blocks: if hasattr(block, 'label'): if block.label == 'Session ID': components['session_info'] = block elif hasattr(block, 'value') and 'session' in str(block.value).lower(): components['session_id'] = block # Setup message submission handler try: # This is a simplified version - you'll need to adapt based on your actual component structure if hasattr(demo, 'submit'): demo.submit( fn=event_handlers.handle_message_submit, inputs=[components.get('message_input'), components.get('chatbot')], outputs=[components.get('message_input'), components.get('chatbot')] ) except Exception as e: print(f"Could not setup event handlers: {e}") # Fallback to basic functionality return demo def _generate_skills_html(identified_skills: List[Dict[str, Any]]) -> str: """Generate HTML for skills tags display""" if not identified_skills: return "" # Limit to top 8 skills for UI top_skills = identified_skills[:8] # Generate HTML with confidence-based styling skills_html = '
🎯 Relevant Skills:
' for skill in top_skills: skill_name = skill.get('skill', 'Unknown Skill') probability = skill.get('probability', 0.5) # Determine confidence class based on probability if probability >= 0.7: confidence_class = "high-confidence" elif probability >= 0.4: confidence_class = "medium-confidence" else: confidence_class = "low-confidence" # Create skill tag skills_html += f'{skill_name}' return skills_html def _update_skills_display(skills_html: str) -> Tuple[str, bool]: """Update skills display visibility and content""" if skills_html and len(skills_html.strip()) > 0: return skills_html, True # Show skills display else: return "", False # Hide skills display async def process_message_async(message: str, history: Optional[List], session_id: str) -> Tuple[List, str, dict, dict, dict, str, str]: """ Process message with full orchestration system Returns (updated_history, empty_string, reasoning_data, performance_data, context_data, session_id, skills_html) GUARANTEES: - Always returns a response (never None or empty) - Handles all error cases gracefully - Provides fallback responses at every level - Returns metadata for Details tab - Returns session_id to maintain session continuity - Returns skills HTML for display """ global orchestrator try: logger.info(f"Processing message: {message[:100]}") logger.info(f"Session ID: {session_id}") logger.info(f"Orchestrator available: {orchestrator is not None}") if not message or not message.strip(): logger.debug("Empty message received") return history if history else [], "", {}, {}, {}, session_id, "" if history is None: history = [] new_history = list(history) if isinstance(history, list) else [] # Add user message new_history.append({"role": "user", "content": message.strip()}) # Initialize Details tab data reasoning_data = {} performance_data = {} context_data = {} skills_html = "" # Initialize skills_html # GUARANTEE: Always get a response response = "Hello! I'm processing your request..." # Try to use orchestrator if available if orchestrator is not None: try: logger.info("Attempting full orchestration...") # Use orchestrator to process result = await orchestrator.process_request( session_id=session_id, user_input=message.strip() ) # Extract response from result with multiple fallback checks if isinstance(result, dict): # Extract the text response (not the dict) response = ( result.get('response') or result.get('final_response') or result.get('safety_checked_response') or result.get('original_response') or str(result.get('result', '')) ) # Extract metadata for Details tab with enhanced reasoning chain reasoning_data = result.get('metadata', {}).get('reasoning_chain', { "chain_of_thought": {}, "alternative_paths": [], "uncertainty_areas": [], "evidence_sources": [], "confidence_calibration": {} }) # Ensure we have the enhanced structure even if orchestrator didn't provide it if not reasoning_data.get('chain_of_thought'): reasoning_data = { "chain_of_thought": { "step_1": { "hypothesis": "Processing user request", "evidence": [f"User input: {message[:50]}..."], "confidence": 0.7, "reasoning": "Basic request processing" } }, "alternative_paths": [], "uncertainty_areas": [], "evidence_sources": [], "confidence_calibration": {"overall_confidence": 0.7} } performance_data = { "agent_trace": result.get('agent_trace', []), "processing_time": result.get('metadata', {}).get('processing_time', 0), "token_count": result.get('metadata', {}).get('token_count', 0), "confidence_score": result.get('confidence_score', 0.7), "agents_used": result.get('metadata', {}).get('agents_used', []) } context_data = { "interaction_id": result.get('interaction_id', 'unknown'), "session_id": session_id, "timestamp": result.get('timestamp', ''), "warnings": result.get('metadata', {}).get('warnings', []) } # Extract skills data for UI display skills_html = "" skills_result = result.get('metadata', {}).get('skills_result', {}) if skills_result and skills_result.get('identified_skills'): skills_html = _generate_skills_html(skills_result['identified_skills']) else: response = str(result) if result else "Processing complete." # Final safety check - ensure response is not empty # Handle both string and dict types if isinstance(response, dict): response = str(response.get('content', response)) if not response or (isinstance(response, str) and len(response.strip()) == 0): response = f"I understand you said: '{message}'. I'm here to assist you!" logger.info(f"Orchestrator returned response (length: {len(response)})") except Exception as orch_error: logger.error(f"Orchestrator error: {orch_error}", exc_info=True) # Fallback response with error info and enhanced reasoning response = f"I'm experiencing some technical difficulties. Your message was: '{message[:100]}...' Please try again or rephrase your question." reasoning_data = { "chain_of_thought": { "step_1": { "hypothesis": "System encountered an error during processing", "evidence": [f"Error: {str(orch_error)[:100]}..."], "confidence": 0.3, "reasoning": "Orchestrator failure - fallback mode activated" } }, "alternative_paths": [], "uncertainty_areas": [ { "aspect": "System reliability", "confidence": 0.3, "mitigation": "Error handling and graceful degradation" } ], "evidence_sources": [], "confidence_calibration": {"overall_confidence": 0.3, "error_mode": True} } else: # System initialization message with enhanced reasoning logger.info("Orchestrator not yet available") response = f"Hello! I received your message about: '{message}'.\n\nThe orchestration system is initializing. Your question is important and I'll provide a comprehensive answer shortly." reasoning_data = { "chain_of_thought": { "step_1": { "hypothesis": "System is initializing and not yet ready", "evidence": ["Orchestrator not available", f"User input: {message[:50]}..."], "confidence": 0.5, "reasoning": "System startup phase - components loading" } }, "alternative_paths": [], "uncertainty_areas": [ { "aspect": "System readiness", "confidence": 0.5, "mitigation": "Graceful initialization message" } ], "evidence_sources": [], "confidence_calibration": {"overall_confidence": 0.5, "initialization_mode": True} } skills_html = "" # Initialize skills_html for orchestrator not available case # Add assistant response new_history.append({"role": "assistant", "content": response}) logger.info("✓ Message processing complete") return new_history, "", reasoning_data, performance_data, context_data, session_id, skills_html except Exception as e: # FINAL FALLBACK: Always return something to user with enhanced reasoning logger.error(f"Error in process_message_async: {e}", exc_info=True) # Create error history with helpful message error_history = list(history) if history else [] error_history.append({"role": "user", "content": message}) # User-friendly error message error_message = ( f"I encountered a technical issue processing your message: '{message[:50]}...'. " f"Please try rephrasing your question or contact support if this persists." ) error_history.append({"role": "assistant", "content": error_message}) # Enhanced reasoning for error case reasoning_data = { "chain_of_thought": { "step_1": { "hypothesis": "Critical system error occurred", "evidence": [f"Exception: {str(e)[:100]}...", f"User input: {message[:50]}..."], "confidence": 0.2, "reasoning": "System error handling - final fallback activated" } }, "alternative_paths": [], "uncertainty_areas": [ { "aspect": "System stability", "confidence": 0.2, "mitigation": "Error logging and user notification" } ], "evidence_sources": [], "confidence_calibration": {"overall_confidence": 0.2, "critical_error": True} } return error_history, "", reasoning_data, {}, {}, session_id, "" # Global variable to store interface components for dynamic return values _interface_components = {} def _build_outputs_list(interface_components: dict, process_flow_available: bool) -> list: """ Build outputs list dynamically based on available interface components """ outputs = [interface_components['chatbot'], interface_components['message_input']] # Add Details tab components if 'reasoning_display' in interface_components: outputs.append(interface_components['reasoning_display']) if 'performance_display' in interface_components: outputs.append(interface_components['performance_display']) if 'context_display' in interface_components: outputs.append(interface_components['context_display']) if 'session_info' in interface_components: outputs.append(interface_components['session_info']) if 'skills_tags' in interface_components: outputs.append(interface_components['skills_tags']) # Add Process Flow outputs if available if process_flow_available: if 'flow_display' in interface_components: outputs.append(interface_components['flow_display']) if 'flow_stats' in interface_components: outputs.append(interface_components['flow_stats']) if 'performance_metrics' in interface_components: outputs.append(interface_components['performance_metrics']) if 'intent_details' in interface_components: outputs.append(interface_components['intent_details']) if 'synthesis_details' in interface_components: outputs.append(interface_components['synthesis_details']) if 'safety_details' in interface_components: outputs.append(interface_components['safety_details']) return outputs def _build_dynamic_return_values(result: tuple, skills_content: str, interface_components: dict, process_flow_available: bool = False, flow_updates: dict = None) -> tuple: """ Build return values dynamically based on available interface components This ensures the return values match the outputs list exactly """ return_values = [] # Base components (always present) return_values.extend([ result[0], # chatbot (history) result[1], # message_input (empty_string) ]) # Add Details tab components if 'reasoning_display' in interface_components: return_values.append(result[2]) # reasoning_data if 'performance_display' in interface_components: return_values.append(result[3]) # performance_data if 'context_display' in interface_components: return_values.append(result[4]) # context_data if 'session_info' in interface_components: return_values.append(result[5]) # session_id if 'skills_tags' in interface_components: return_values.append(skills_content) # skills_content # Add Process Flow outputs if available if process_flow_available: if 'flow_display' in interface_components: return_values.append(flow_updates.get("flow_display", "") if flow_updates else "") if 'flow_stats' in interface_components: return_values.append(flow_updates.get("flow_stats", {}) if flow_updates else {}) if 'performance_metrics' in interface_components: return_values.append(flow_updates.get("performance_metrics", {}) if flow_updates else {}) if 'intent_details' in interface_components: return_values.append(flow_updates.get("intent_details", {}) if flow_updates else {}) if 'synthesis_details' in interface_components: return_values.append(flow_updates.get("synthesis_details", {}) if flow_updates else {}) if 'safety_details' in interface_components: return_values.append(flow_updates.get("safety_details", {}) if flow_updates else {}) return tuple(return_values) def process_message(message: str, history: Optional[List], session_id: Optional[str] = None) -> tuple: """ Synchronous wrapper for async processing Returns dynamic tuple based on available interface components """ import asyncio # Use provided session_id or generate a new one if not session_id: session_id = str(uuid.uuid4())[:8] try: # Run async processing loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(process_message_async(message, history, session_id)) # Extract skills_html from result and determine visibility skills_html = result[6] skills_content, skills_visible = _update_skills_display(skills_html) # Return dynamic values based on available components return _build_dynamic_return_values(result, skills_content, _interface_components, process_flow_available) except Exception as e: logger.error(f"Error in process_message: {e}", exc_info=True) error_history = list(history) if history else [] error_history.append({"role": "user", "content": message}) error_history.append({"role": "assistant", "content": f"Error: {str(e)}"}) # Enhanced reasoning for sync error case reasoning_data = { "chain_of_thought": { "step_1": { "hypothesis": "Synchronous processing error", "evidence": [f"Sync error: {str(e)[:100]}...", f"User input: {message[:50]}..."], "confidence": 0.2, "reasoning": "Synchronous wrapper error handling" } }, "alternative_paths": [], "uncertainty_areas": [ { "aspect": "Processing reliability", "confidence": 0.2, "mitigation": "Error logging and fallback response" } ], "evidence_sources": [], "confidence_calibration": {"overall_confidence": 0.2, "sync_error": True} } # Return dynamic values for error case error_result = (error_history, "", reasoning_data, {}, {}, session_id, "") return _build_dynamic_return_values(error_result, "", _interface_components, process_flow_available) # Decorate the chat handler with GPU if available if SPACES_GPU_AVAILABLE and GPU is not None: @GPU # This decorator is detected by HF Spaces for ZeroGPU allocation def gpu_chat_handler(message, history, session_id=None): """Handle chat messages with GPU support""" # Use provided session_id or generate new one if not session_id: session_id = str(uuid.uuid4())[:8] result = process_message(message, history, session_id) # Return all 15 values directly return result chat_handler_fn = gpu_chat_handler else: def chat_handler_wrapper(message, history, session_id=None): """Wrapper to handle session ID with Process Flow Visualization""" if not session_id: session_id = str(uuid.uuid4())[:8] result = process_message(message, history, session_id) # Extract skills_html from result and determine visibility skills_html = result[6] skills_content, skills_visible = _update_skills_display(skills_html) # Prepare process flow updates if available flow_updates = {} if process_flow_available: try: # Extract data for process flow visualization reasoning_data = result[2] performance_data = result[3] context_data = result[4] # Create dynamic agent results for visualization step_results = {} # Intent result step_results["intent_result"] = { "primary_intent": reasoning_data.get("chain_of_thought", {}).get("step_1", {}).get("hypothesis", "unknown"), "confidence_scores": {"overall": reasoning_data.get("confidence_calibration", {}).get("overall_confidence", 0.7)}, "secondary_intents": [], "reasoning_chain": list(reasoning_data.get("chain_of_thought", {}).keys()), "context_tags": ["general"], "processing_time": performance_data.get("processing_time", 0.5), "agent_id": "INTENT_REC_001" } # Skills result (if available) if "skills_result" in reasoning_data: # Check if skills data is in reasoning_data step_results["skills_result"] = reasoning_data["skills_result"] else: step_results["skills_result"] = { "identified_skills": [], "confidence_score": 0.7, "processing_time": performance_data.get("processing_time", 0.5) * 0.2, "agent_id": "SKILLS_ID_001" } # Synthesis result step_results["synthesis_result"] = { "final_response": result[0][-1]["content"] if result[0] else "", "draft_response": "", "source_references": ["INTENT_REC_001"], "coherence_score": 0.85, "synthesis_method": "llm_enhanced", "intent_alignment": {"intent_detected": step_results["intent_result"]["primary_intent"], "alignment_score": 0.8}, "processing_time": performance_data.get("processing_time", 0.5) - 0.15, "agent_id": "RESP_SYNTH_001" } # Safety result step_results["safety_result"] = { "original_response": result[0][-1]["content"] if result[0] else "", "safety_checked_response": result[0][-1]["content"] if result[0] else "", "warnings": [], "safety_analysis": { "toxicity_score": 0.1, "bias_indicators": [], "privacy_concerns": [], "overall_safety_score": 0.9, "confidence_scores": {"safety": 0.9} }, "blocked": False, "processing_time": 0.1, "agent_id": "SAFETY_BIAS_001" } # Final response step_results["final_response"] = result[0][-1]["content"] if result[0] else "" # Update process flow visualization dynamically flow_updates = update_process_flow_visualization( user_input=message, session_id=session_id, processing_time=performance_data.get("processing_time", 1.0), **step_results ) except Exception as e: logger.error(f"Error updating process flow: {e}") flow_updates = {} # Return dynamic values including process flow return _build_dynamic_return_values(result, skills_content, _interface_components, process_flow_available, flow_updates) chat_handler_fn = chat_handler_wrapper # Initialize orchestrator on module load def initialize_orchestrator(): """Initialize the orchestration system with logging""" global orchestrator if not orchestrator_available: logger.info("Orchestrator components not available, skipping initialization") return try: logger.info("=" * 60) logger.info("INITIALIZING ORCHESTRATION SYSTEM") logger.info("=" * 60) # Get HF token hf_token = os.getenv('HF_TOKEN', '') if not hf_token: logger.warning("HF_TOKEN not found in environment") # Initialize LLM Router logger.info("Step 1/6: Initializing LLM Router...") llm_router = LLMRouter(hf_token) logger.info("✓ LLM Router initialized") # Initialize Agents logger.info("Step 2/6: Initializing Agents...") agents = { 'intent_recognition': create_intent_agent(llm_router), 'response_synthesis': create_synthesis_agent(llm_router), 'safety_check': create_safety_agent(llm_router), } # Add skills identification agent skills_agent = create_skills_identification_agent(llm_router) agents['skills_identification'] = skills_agent logger.info("✓ Skills identification agent initialized") logger.info(f"✓ Initialized {len(agents)} agents") # Initialize Context Manager logger.info("Step 3/6: Initializing Context Manager...") context_manager = EfficientContextManager() logger.info("✓ Context Manager initialized") # Initialize Orchestrator logger.info("Step 4/6: Initializing Orchestrator...") orchestrator = MVPOrchestrator(llm_router, context_manager, agents) logger.info("✓ Orchestrator initialized") logger.info("=" * 60) logger.info("ORCHESTRATION SYSTEM READY") logger.info("=" * 60) except Exception as e: logger.error(f"Failed to initialize orchestrator: {e}", exc_info=True) orchestrator = None # Try to initialize orchestrator initialize_orchestrator() if __name__ == "__main__": logger.info("=" * 60) logger.info("STARTING APP") logger.info("=" * 60) demo, components = create_mobile_optimized_interface() logger.info("✓ Interface created") logger.info(f"Orchestrator available: {orchestrator is not None}") # Launch the app logger.info("=" * 60) logger.info("LAUNCHING GRADIO APP") logger.info("=" * 60) demo.launch( server_name="0.0.0.0", server_port=7860, share=False )