# context_manager.py import sqlite3 import json from datetime import datetime, timedelta class EfficientContextManager: def __init__(self): self.session_cache = {} # In-memory for active sessions self.cache_config = { "max_session_size": 10, # MB per session "ttl": 3600, # 1 hour "compression": "gzip", "eviction_policy": "LRU" } self.db_path = "sessions.db" self._init_database() def _init_database(self): """Initialize database and create tables""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create sessions table if not exists cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, created_at TIMESTAMP, last_activity TIMESTAMP, context_data TEXT, user_metadata TEXT ) """) # Create interactions table cursor.execute(""" CREATE TABLE IF NOT EXISTS interactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT REFERENCES sessions(session_id), user_input TEXT, context_snapshot TEXT, created_at TIMESTAMP, FOREIGN KEY(session_id) REFERENCES sessions(session_id) ) """) conn.commit() conn.close() except Exception as e: print(f"Database initialization warning: {e}") async def manage_context(self, session_id: str, user_input: str) -> dict: """ Efficient context management with multi-level caching """ # Level 1: In-memory session cache context = self._get_from_memory_cache(session_id) if not context: # Level 2: Database retrieval with embeddings context = await self._retrieve_from_db(session_id, user_input) # Cache warming self._warm_memory_cache(session_id, context) # Update context with new interaction updated_context = self._update_context(context, user_input) return self._optimize_context(updated_context) def _optimize_context(self, context: dict) -> dict: """ Optimize context for LLM consumption """ return { "essential_entities": self._extract_entities(context), "conversation_summary": self._generate_summary(context), "recent_interactions": context.get("interactions", [])[-3:], "user_preferences": context.get("preferences", {}), "active_tasks": context.get("active_tasks", []) } def _get_from_memory_cache(self, session_id: str) -> dict: """ Retrieve context from in-memory session cache """ # TODO: Implement in-memory cache retrieval return self.session_cache.get(session_id) async def _retrieve_from_db(self, session_id: str, user_input: str) -> dict: """ Retrieve context from database with semantic search """ try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get session data cursor.execute(""" SELECT context_data, user_metadata, last_activity FROM sessions WHERE session_id = ? """, (session_id,)) row = cursor.fetchone() if row: context_data = json.loads(row[0]) if row[0] else {} user_metadata = json.loads(row[1]) if row[1] else {} last_activity = row[2] # Get recent interactions cursor.execute(""" SELECT user_input, context_snapshot, created_at FROM interactions WHERE session_id = ? ORDER BY created_at DESC LIMIT 10 """, (session_id,)) recent_interactions = [] for interaction_row in cursor.fetchall(): recent_interactions.append({ "user_input": interaction_row[0], "context": json.loads(interaction_row[1]) if interaction_row[1] else {}, "timestamp": interaction_row[2] }) context = { "session_id": session_id, "interactions": recent_interactions, "preferences": user_metadata.get("preferences", {}), "active_tasks": user_metadata.get("active_tasks", []), "last_activity": last_activity } conn.close() return context else: # Create new session cursor.execute(""" INSERT INTO sessions (session_id, created_at, last_activity, context_data, user_metadata) VALUES (?, ?, ?, ?, ?) """, (session_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}")) conn.commit() conn.close() return { "session_id": session_id, "interactions": [], "preferences": {}, "active_tasks": [] } except Exception as e: print(f"Database retrieval error: {e}") # Fallback to empty context return { "session_id": session_id, "interactions": [], "preferences": {}, "active_tasks": [] } def _warm_memory_cache(self, session_id: str, context: dict): """ Warm the in-memory cache with retrieved context """ # TODO: Implement cache warming with LRU eviction self.session_cache[session_id] = context def _update_context(self, context: dict, user_input: str) -> dict: """ Update context with new user interaction and persist to database """ try: # Add new interaction to context if "interactions" not in context: context["interactions"] = [] new_interaction = { "user_input": user_input, "timestamp": datetime.now().isoformat(), "context": context } # Keep only last 10 interactions in memory context["interactions"] = [new_interaction] + context["interactions"][:9] # Persist to database conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Update session cursor.execute(""" UPDATE sessions SET last_activity = ?, context_data = ? WHERE session_id = ? """, (datetime.now().isoformat(), json.dumps(context), context["session_id"])) # Insert interaction cursor.execute(""" INSERT INTO interactions (session_id, user_input, context_snapshot, created_at) VALUES (?, ?, ?, ?) """, (context["session_id"], user_input, json.dumps(context), datetime.now().isoformat())) conn.commit() conn.close() except Exception as e: print(f"Context update error: {e}") return context def _extract_entities(self, context: dict) -> list: """ Extract essential entities from context """ # TODO: Implement entity extraction return [] def _generate_summary(self, context: dict) -> str: """ Generate conversation summary """ # TODO: Implement summary generation return ""