|
|
|
|
|
import sqlite3 |
|
|
import json |
|
|
import logging |
|
|
import uuid |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class EfficientContextManager: |
|
|
def __init__(self, llm_router=None): |
|
|
self.session_cache = {} |
|
|
self.cache_config = { |
|
|
"max_session_size": 10, |
|
|
"ttl": 3600, |
|
|
"compression": "gzip", |
|
|
"eviction_policy": "LRU" |
|
|
} |
|
|
self.db_path = "sessions.db" |
|
|
self.llm_router = llm_router |
|
|
logger.info(f"Initializing ContextManager with DB path: {self.db_path}") |
|
|
self._init_database() |
|
|
|
|
|
def _init_database(self): |
|
|
"""Initialize database and create tables""" |
|
|
try: |
|
|
logger.info("Initializing database...") |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS sessions ( |
|
|
session_id TEXT PRIMARY KEY, |
|
|
user_id TEXT DEFAULT 'Test_Any', |
|
|
created_at TIMESTAMP, |
|
|
last_activity TIMESTAMP, |
|
|
context_data TEXT, |
|
|
user_metadata TEXT |
|
|
) |
|
|
""") |
|
|
|
|
|
|
|
|
try: |
|
|
cursor.execute("ALTER TABLE sessions ADD COLUMN user_id TEXT DEFAULT 'Test_Any'") |
|
|
logger.info("✓ Added user_id column to sessions table") |
|
|
except sqlite3.OperationalError: |
|
|
|
|
|
pass |
|
|
|
|
|
logger.info("✓ Sessions table ready") |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS interactions ( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
session_id TEXT REFERENCES sessions(session_id), |
|
|
user_input TEXT, |
|
|
context_snapshot TEXT, |
|
|
created_at TIMESTAMP, |
|
|
FOREIGN KEY(session_id) REFERENCES sessions(session_id) |
|
|
) |
|
|
""") |
|
|
logger.info("✓ Interactions table ready") |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS user_contexts ( |
|
|
user_id TEXT PRIMARY KEY, |
|
|
persona_summary TEXT, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
""") |
|
|
logger.info("✓ User contexts table ready") |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS session_contexts ( |
|
|
session_id TEXT PRIMARY KEY, |
|
|
user_id TEXT, |
|
|
session_summary TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY(session_id) REFERENCES sessions(session_id), |
|
|
FOREIGN KEY(user_id) REFERENCES user_contexts(user_id) |
|
|
) |
|
|
""") |
|
|
logger.info("✓ Session contexts table ready") |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS interaction_contexts ( |
|
|
interaction_id TEXT PRIMARY KEY, |
|
|
session_id TEXT, |
|
|
user_input TEXT, |
|
|
system_response TEXT, |
|
|
interaction_summary TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
FOREIGN KEY(session_id) REFERENCES sessions(session_id) |
|
|
) |
|
|
""") |
|
|
logger.info("✓ Interaction contexts table ready") |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
logger.info("Database initialization complete") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Database initialization error: {e}", exc_info=True) |
|
|
|
|
|
async def manage_context(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict: |
|
|
""" |
|
|
Efficient context management with user-based context system |
|
|
STEP 1: Fetch User Context (if available) |
|
|
STEP 2: Get Previous Interaction Contexts |
|
|
STEP 3: Combine for workflow use |
|
|
""" |
|
|
|
|
|
cache_key = f"{session_id}_{user_id}" |
|
|
context = self._get_from_memory_cache(cache_key) |
|
|
|
|
|
if not context: |
|
|
|
|
|
context = await self._retrieve_from_db(session_id, user_input, user_id) |
|
|
|
|
|
|
|
|
if not context.get("user_context_loaded"): |
|
|
user_context = await self.get_user_context(user_id) |
|
|
context["user_context"] = user_context |
|
|
context["user_context_loaded"] = True |
|
|
|
|
|
|
|
|
self._warm_memory_cache(cache_key, context) |
|
|
|
|
|
|
|
|
updated_context = self._update_context(context, user_input, user_id=user_id) |
|
|
|
|
|
return self._optimize_context(updated_context) |
|
|
|
|
|
async def get_user_context(self, user_id: str) -> str: |
|
|
""" |
|
|
STEP 1: Fetch or generate User Context (500-token persona summary) |
|
|
Available for all interactions except first time per user |
|
|
""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT persona_summary FROM user_contexts WHERE user_id = ? |
|
|
""", (user_id,)) |
|
|
|
|
|
row = cursor.fetchone() |
|
|
if row and row[0]: |
|
|
|
|
|
conn.close() |
|
|
logger.info(f"✓ User context loaded for {user_id}") |
|
|
return row[0] |
|
|
|
|
|
|
|
|
logger.info(f"Generating new user context for {user_id}") |
|
|
|
|
|
|
|
|
all_session_summaries = [] |
|
|
all_interaction_summaries = [] |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT session_summary FROM session_contexts WHERE user_id = ? |
|
|
ORDER BY created_at DESC LIMIT 50 |
|
|
""", (user_id,)) |
|
|
for row in cursor.fetchall(): |
|
|
if row[0]: |
|
|
all_session_summaries.append(row[0]) |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT ic.interaction_summary |
|
|
FROM interaction_contexts ic |
|
|
JOIN sessions s ON ic.session_id = s.session_id |
|
|
WHERE s.user_id = ? |
|
|
ORDER BY ic.created_at DESC LIMIT 100 |
|
|
""", (user_id,)) |
|
|
for row in cursor.fetchall(): |
|
|
if row[0]: |
|
|
all_interaction_summaries.append(row[0]) |
|
|
|
|
|
conn.close() |
|
|
|
|
|
if not all_session_summaries and not all_interaction_summaries: |
|
|
|
|
|
logger.info(f"No historical data for {user_id} - first time user") |
|
|
return "" |
|
|
|
|
|
|
|
|
historical_data = "\n\n".join(all_session_summaries + all_interaction_summaries[:20]) |
|
|
|
|
|
if self.llm_router: |
|
|
prompt = f"""Generate a concise 500-token persona summary for user {user_id} based on their interaction history: |
|
|
|
|
|
Historical Context: |
|
|
{historical_data} |
|
|
|
|
|
Create a persona summary that captures: |
|
|
- Communication style and preferences |
|
|
- Common topics and interests |
|
|
- Interaction patterns |
|
|
- Key information shared across sessions |
|
|
|
|
|
Keep the summary concise and focused (approximately 500 tokens).""" |
|
|
|
|
|
try: |
|
|
persona_summary = await self.llm_router.route_inference( |
|
|
task_type="general_reasoning", |
|
|
prompt=prompt, |
|
|
max_tokens=500, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
if persona_summary and isinstance(persona_summary, str) and persona_summary.strip(): |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
INSERT OR REPLACE INTO user_contexts (user_id, persona_summary, updated_at) |
|
|
VALUES (?, ?, ?) |
|
|
""", (user_id, persona_summary.strip(), datetime.now().isoformat())) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
logger.info(f"✓ Generated and stored user context for {user_id}") |
|
|
return persona_summary.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating user context: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
logger.warning(f"Could not generate user context for {user_id} - using empty") |
|
|
return "" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting user context: {e}", exc_info=True) |
|
|
return "" |
|
|
|
|
|
async def generate_interaction_context(self, interaction_id: str, session_id: str, |
|
|
user_input: str, system_response: str, |
|
|
user_id: str = "Test_Any") -> str: |
|
|
""" |
|
|
STEP 2: Generate Interaction Context (50-token summary) |
|
|
Called after each response |
|
|
""" |
|
|
try: |
|
|
if not self.llm_router: |
|
|
return "" |
|
|
|
|
|
prompt = f"""Summarize this interaction in approximately 50 tokens: |
|
|
|
|
|
User Input: {user_input[:200]} |
|
|
System Response: {system_response[:300]} |
|
|
|
|
|
Provide a brief summary capturing the key exchange.""" |
|
|
|
|
|
try: |
|
|
summary = await self.llm_router.route_inference( |
|
|
task_type="general_reasoning", |
|
|
prompt=prompt, |
|
|
max_tokens=50, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
if summary and isinstance(summary, str) and summary.strip(): |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
INSERT OR REPLACE INTO interaction_contexts |
|
|
(interaction_id, session_id, user_input, system_response, interaction_summary, created_at) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
""", ( |
|
|
interaction_id, |
|
|
session_id, |
|
|
user_input[:500], |
|
|
system_response[:1000], |
|
|
summary.strip(), |
|
|
datetime.now().isoformat() |
|
|
)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
logger.info(f"✓ Generated interaction context for {interaction_id}") |
|
|
return summary.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating interaction context: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
return "" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in generate_interaction_context: {e}", exc_info=True) |
|
|
return "" |
|
|
|
|
|
async def generate_session_context(self, session_id: str, user_id: str = "Test_Any") -> str: |
|
|
""" |
|
|
FINAL STEP: Generate Session Context (100-token summary) |
|
|
Called at session end |
|
|
""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT interaction_summary FROM interaction_contexts |
|
|
WHERE session_id = ? |
|
|
ORDER BY created_at ASC |
|
|
""", (session_id,)) |
|
|
|
|
|
interaction_summaries = [row[0] for row in cursor.fetchall() if row[0]] |
|
|
conn.close() |
|
|
|
|
|
if not interaction_summaries: |
|
|
logger.info(f"No interactions to summarize for session {session_id}") |
|
|
return "" |
|
|
|
|
|
|
|
|
if self.llm_router: |
|
|
combined_context = "\n".join(interaction_summaries) |
|
|
|
|
|
prompt = f"""Summarize this session's interactions in approximately 100 tokens: |
|
|
|
|
|
Interaction Summaries: |
|
|
{combined_context} |
|
|
|
|
|
Create a concise session summary capturing: |
|
|
- Main topics discussed |
|
|
- Key outcomes or information shared |
|
|
- User's focus areas |
|
|
|
|
|
Keep the summary concise (approximately 100 tokens).""" |
|
|
|
|
|
try: |
|
|
session_summary = await self.llm_router.route_inference( |
|
|
task_type="general_reasoning", |
|
|
prompt=prompt, |
|
|
max_tokens=100, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
if session_summary and isinstance(session_summary, str) and session_summary.strip(): |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
cursor.execute(""" |
|
|
INSERT OR REPLACE INTO session_contexts |
|
|
(session_id, user_id, session_summary, created_at) |
|
|
VALUES (?, ?, ?, ?) |
|
|
""", (session_id, user_id, session_summary.strip(), datetime.now().isoformat())) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
logger.info(f"✓ Generated session context for {session_id}") |
|
|
return session_summary.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating session context: {e}", exc_info=True) |
|
|
|
|
|
|
|
|
return "" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in generate_session_context: {e}", exc_info=True) |
|
|
return "" |
|
|
|
|
|
async def end_session(self, session_id: str, user_id: str = "Test_Any"): |
|
|
""" |
|
|
FINAL STEP: Generate Session Context and clear cache |
|
|
""" |
|
|
try: |
|
|
|
|
|
await self.generate_session_context(session_id, user_id) |
|
|
|
|
|
|
|
|
cache_key = f"{session_id}_{user_id}" |
|
|
if cache_key in self.session_cache: |
|
|
del self.session_cache[cache_key] |
|
|
logger.info(f"✓ Cleared cache for session {session_id}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error ending session: {e}", exc_info=True) |
|
|
|
|
|
def _optimize_context(self, context: dict) -> dict: |
|
|
""" |
|
|
Optimize context for LLM consumption |
|
|
Format: [Interaction Context #N, #N-1, ...] + User Context |
|
|
""" |
|
|
user_context = context.get("user_context", "") |
|
|
interaction_contexts = context.get("interaction_contexts", []) |
|
|
|
|
|
|
|
|
formatted_interactions = [] |
|
|
for idx, ic in enumerate(interaction_contexts[:10]): |
|
|
formatted_interactions.append(f"[Interaction Context #{len(interaction_contexts) - idx}]\n{ic.get('summary', '')}") |
|
|
|
|
|
|
|
|
combined_context = "" |
|
|
if user_context: |
|
|
combined_context += f"[User Context]\n{user_context}\n\n" |
|
|
if formatted_interactions: |
|
|
combined_context += "\n\n".join(formatted_interactions) |
|
|
|
|
|
return { |
|
|
"session_id": context.get("session_id"), |
|
|
"user_id": context.get("user_id", "Test_Any"), |
|
|
"user_context": user_context, |
|
|
"interaction_contexts": interaction_contexts, |
|
|
"combined_context": combined_context, |
|
|
"preferences": context.get("preferences", {}), |
|
|
"active_tasks": context.get("active_tasks", []), |
|
|
"last_activity": context.get("last_activity") |
|
|
} |
|
|
|
|
|
def _get_from_memory_cache(self, cache_key: str) -> dict: |
|
|
""" |
|
|
Retrieve context from in-memory session cache |
|
|
""" |
|
|
return self.session_cache.get(cache_key) |
|
|
|
|
|
async def _retrieve_from_db(self, session_id: str, user_input: str, user_id: str = "Test_Any") -> dict: |
|
|
""" |
|
|
Retrieve context from database with semantic search |
|
|
""" |
|
|
try: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT context_data, user_metadata, last_activity, user_id |
|
|
FROM sessions |
|
|
WHERE session_id = ? |
|
|
""", (session_id,)) |
|
|
|
|
|
row = cursor.fetchone() |
|
|
|
|
|
if row: |
|
|
context_data = json.loads(row[0]) if row[0] else {} |
|
|
user_metadata = json.loads(row[1]) if row[1] else {} |
|
|
last_activity = row[2] |
|
|
session_user_id = row[3] if len(row) > 3 else user_id |
|
|
|
|
|
|
|
|
if session_user_id != user_id: |
|
|
cursor.execute(""" |
|
|
UPDATE sessions SET user_id = ? WHERE session_id = ? |
|
|
""", (user_id, session_id)) |
|
|
conn.commit() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT interaction_summary, created_at |
|
|
FROM interaction_contexts |
|
|
WHERE session_id = ? |
|
|
ORDER BY created_at DESC |
|
|
LIMIT 20 |
|
|
""", (session_id,)) |
|
|
|
|
|
interaction_contexts = [] |
|
|
for ic_row in cursor.fetchall(): |
|
|
if ic_row[0]: |
|
|
interaction_contexts.append({ |
|
|
"summary": ic_row[0], |
|
|
"timestamp": ic_row[1] |
|
|
}) |
|
|
|
|
|
context = { |
|
|
"session_id": session_id, |
|
|
"user_id": user_id, |
|
|
"interaction_contexts": interaction_contexts, |
|
|
"preferences": user_metadata.get("preferences", {}), |
|
|
"active_tasks": user_metadata.get("active_tasks", []), |
|
|
"last_activity": last_activity, |
|
|
"user_context_loaded": False |
|
|
} |
|
|
|
|
|
conn.close() |
|
|
return context |
|
|
else: |
|
|
|
|
|
cursor.execute(""" |
|
|
INSERT INTO sessions (session_id, user_id, created_at, last_activity, context_data, user_metadata) |
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
""", (session_id, user_id, datetime.now().isoformat(), datetime.now().isoformat(), "{}", "{}")) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
return { |
|
|
"session_id": session_id, |
|
|
"user_id": user_id, |
|
|
"interaction_contexts": [], |
|
|
"preferences": {}, |
|
|
"active_tasks": [], |
|
|
"user_context_loaded": False |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Database retrieval error: {e}", exc_info=True) |
|
|
|
|
|
return { |
|
|
"session_id": session_id, |
|
|
"user_id": user_id, |
|
|
"interaction_contexts": [], |
|
|
"preferences": {}, |
|
|
"active_tasks": [], |
|
|
"user_context_loaded": False |
|
|
} |
|
|
|
|
|
def _warm_memory_cache(self, cache_key: str, context: dict): |
|
|
""" |
|
|
Warm the in-memory cache with retrieved context |
|
|
""" |
|
|
self.session_cache[cache_key] = context |
|
|
|
|
|
def _update_context(self, context: dict, user_input: str, response: str = None, user_id: str = "Test_Any") -> dict: |
|
|
""" |
|
|
Update context with new user interaction and persist to database |
|
|
Note: Interaction context generation happens separately after response is generated |
|
|
""" |
|
|
try: |
|
|
|
|
|
conn = sqlite3.connect(self.db_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
UPDATE sessions |
|
|
SET last_activity = ?, user_id = ? |
|
|
WHERE session_id = ? |
|
|
""", (datetime.now().isoformat(), user_id, context["session_id"])) |
|
|
|
|
|
|
|
|
session_context = { |
|
|
"preferences": context.get("preferences", {}), |
|
|
"active_tasks": context.get("active_tasks", []) |
|
|
} |
|
|
|
|
|
cursor.execute(""" |
|
|
INSERT INTO interactions (session_id, user_input, context_snapshot, created_at) |
|
|
VALUES (?, ?, ?, ?) |
|
|
""", (context["session_id"], user_input, json.dumps(session_context), datetime.now().isoformat())) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Context update error: {e}", exc_info=True) |
|
|
|
|
|
return context |
|
|
|
|
|
def _extract_entities(self, context: dict) -> list: |
|
|
""" |
|
|
Extract essential entities from context |
|
|
""" |
|
|
|
|
|
return [] |
|
|
|
|
|
def _generate_summary(self, context: dict) -> str: |
|
|
""" |
|
|
Generate conversation summary |
|
|
""" |
|
|
|
|
|
return "" |
|
|
|