import json import time from typing import Dict, Any, Optional from core.memory import load_user_state, save_user_state import logging from datetime import datetime # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SessionManager: """Manages user sessions and conversation context""" def __init__(self, session_timeout: int = 3600): """Initialize session manager Args: session_timeout: Session timeout in seconds (default: 1 hour) """ self.session_timeout = session_timeout def get_session(self, user_id: str) -> Dict[str, Any]: """Retrieve user session data Args: user_id: Unique identifier for the user Returns: Dictionary containing session data """ try: state = load_user_state(user_id) if not state: logger.info(f"Creating new session for user {user_id}") return self._create_new_session() # Check if session has expired last_activity = float(state.get('last_activity', 0)) if time.time() - last_activity > self.session_timeout: logger.info(f"Session expired for user {user_id}, creating new session") return self._create_new_session() # Deserialize complex data types for key, value in state.items(): if isinstance(value, str): try: # Try to parse as JSON for lists/dicts if value.startswith('[') or value.startswith('{'): state[key] = json.loads(value) except (json.JSONDecodeError, TypeError): # Keep as string if not valid JSON pass return state except Exception as e: logger.error(f"Error retrieving session for user {user_id}: {e}") return self._create_new_session() def update_session(self, user_id: str, data: Dict[str, Any]) -> bool: """Update user session data Args: user_id: Unique identifier for the user data: Data to update in the session Returns: Boolean indicating success """ try: # Get existing session session = self.get_session(user_id) # Update with new data session.update(data) session['last_activity'] = time.time() # Convert complex data types to strings for Redis redis_data = {} for key, value in session.items(): if isinstance(value, (list, dict)): redis_data[key] = json.dumps(value) elif isinstance(value, (int, float, str, bool)): redis_data[key] = value else: redis_data[key] = str(value) # Save updated session result = save_user_state(user_id, redis_data) if result: logger.debug(f"Successfully updated session for user {user_id}") else: logger.warning(f"Failed to save session for user {user_id}") return result except Exception as e: logger.error(f"Error updating session for user {user_id}: {e}") return False def update_session_with_ai_coordination(self, user_id: str, ai_data: Dict) -> bool: """Update session with AI coordination data""" try: # Get existing session session = self.get_session(user_id) # Add AI coordination tracking if 'ai_coordination' not in session: session['ai_coordination'] = { 'requests_processed': 0, 'ollama_responses': 0, 'hf_responses': 0, 'last_coordination': None } coord_data = session['ai_coordination'] coord_data['requests_processed'] += 1 coord_data['last_coordination'] = datetime.now().isoformat() # Track response types if 'immediate_response' in ai_data: coord_data['ollama_responses'] += 1 if ai_data.get('hf_response'): coord_data['hf_responses'] += 1 # Convert complex data to JSON strings for Redis redis_data = {} for key, value in session.items(): if isinstance(value, (dict, list)): redis_data[key] = json.dumps(value) else: redis_data[key] = value # Save updated session result = save_user_state(user_id, redis_data) if result: logger.debug(f"Successfully updated coordination session for user {user_id}") else: logger.warning(f"Failed to save coordination session for user {user_id}") return result except Exception as e: logger.error(f"Error updating coordination session for user {user_id}: {e}") return False def clear_session(self, user_id: str) -> bool: """Clear user session data Args: user_id: Unique identifier for the user Returns: Boolean indicating success """ try: result = save_user_state(user_id, {}) if result: logger.info(f"Cleared session for user {user_id}") return result except Exception as e: logger.error(f"Error clearing session for user {user_id}: {e}") return False def _create_new_session(self) -> Dict[str, Any]: """Create a new session with default values Returns: Dictionary containing new session data """ session = { 'conversation': [], 'preferences': {}, 'last_activity': time.time(), 'created_at': time.time() } logger.debug("Created new session") return session # Global session manager instance session_manager = SessionManager()