Spaces:
Runtime error
Runtime error
| # ohamlab_agent_full.py | |
| """ | |
| Ohamlab β PhD Specialist Multi-Agent Shell (Refactored & Fixed) | |
| See header of assistant response for features & notes. | |
| """ | |
| import os | |
| import time | |
| import json | |
| import textwrap | |
| import traceback | |
| import re | |
| import tempfile | |
| import subprocess | |
| from typing import Dict, List, Tuple, Optional, Any | |
| import io | |
| import logging | |
| from logging.handlers import RotatingFileHandler | |
| import gradio as gr | |
| from openai import OpenAI | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| from typing import Tuple | |
| # --------------------------- | |
| # Logging configuration | |
| # --------------------------- | |
| LOG_DIR = os.environ.get("OHAMLAB_LOG_DIR", ".") | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| LOG_FILE = os.path.join(LOG_DIR, "ohamlab.log") | |
| logger = logging.getLogger("ohamlab") | |
| logger.setLevel(logging.DEBUG) | |
| # Console handler (INFO) | |
| ch = logging.StreamHandler() | |
| ch.setLevel(logging.INFO) | |
| ch_formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s - %(message)s", "%Y-%m-%d %H:%M:%S") | |
| ch.setFormatter(ch_formatter) | |
| logger.addHandler(ch) | |
| # Rotating file handler (DEBUG) | |
| fh = RotatingFileHandler(LOG_FILE, maxBytes=5 * 1024 * 1024, backupCount=3) | |
| fh.setLevel(logging.DEBUG) | |
| fh_formatter = logging.Formatter("%(asctime)s %(levelname)s %(name)s [%(filename)s:%(lineno)d] - %(message)s", "%Y-%m-%d %H:%M:%S") | |
| fh.setFormatter(fh_formatter) | |
| logger.addHandler(fh) | |
| logger.info("Starting Ohamlab (logging initialized).") | |
| # --------------------------- | |
| # CONFIG | |
| # --------------------------- | |
| # Accept HF token env var in multiple common names | |
| HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("OPENAI_API_KEY") or os.environ.get("HUGGINGFACE_TOKEN") | |
| if not HF_TOKEN: | |
| logger.critical("Missing HF_TOKEN / OPENAI_API_KEY / HUGGINGFACE_TOKEN environment variable.") | |
| raise RuntimeError("ERROR: set env var HF_TOKEN or OPENAI_API_KEY with your Hugging Face / Router token.") | |
| MODEL_ID = "openai/gpt-oss-20b" # chat model via HF router | |
| EMBED_MODEL = "text-embedding-3-small" | |
| HF_REPO = "rahul7star/OhamLab-LLM" | |
| HF_REPO_DIR = "./hf_capsules" # local download folder | |
| ALLOWED_EXT = (".md",) | |
| MAX_CONTEXT_CHARS = 5000 # increased to allow larger capsules | |
| CHUNK_SIZE = 1600 | |
| CHUNK_OVERLAP = 200 | |
| TOP_K_PASSAGES = 3 | |
| CHAT_MAX_TOKENS = 1400 | |
| RESEARCH_MAX_TOKENS = 4000 | |
| CODING_MAX_TOKENS = 3000 | |
| PY_SANDBOX_TIMEOUT = 16 # seconds | |
| SUMMARY_CACHE_FILE = ".capsule_summaries.json" | |
| telemetry = { | |
| "model_calls": 0, | |
| "tokens_used_est": 0, | |
| "last_call_ms": None, | |
| "calls": [], | |
| } | |
| # --------------------------- | |
| # Client (OpenAI router via HF) | |
| # --------------------------- | |
| try: | |
| client = OpenAI(base_url="https://router.huggingface.co/v1", api_key=HF_TOKEN) | |
| logger.info("OpenAI client initialized via HF router.") | |
| except Exception as e: | |
| logger.exception("Failed initializing OpenAI client: %s", e) | |
| raise | |
| # --------------------------- | |
| # Utilities | |
| # --------------------------- | |
| def now_ms(): return int(time.time() * 1000) | |
| def normalize_text(s: str) -> str: | |
| s = s.lower() | |
| s = re.sub(r"\s+", " ", s) | |
| s = re.sub(r"[^0-9a-zA-Z\s\-_/\.]", " ", s) | |
| return s.strip() | |
| def tokenize_terms(s: str) -> List[str]: | |
| return [t for t in normalize_text(s).split() if len(t) > 1] | |
| def simple_overlap_score(query: str, passage: str) -> float: | |
| q_terms = set(tokenize_terms(query)) | |
| p_terms = tokenize_terms(passage) | |
| if not q_terms or not p_terms: | |
| return 0.0 | |
| overlap = sum(1 for t in p_terms if t in q_terms) | |
| score = overlap / (len(p_terms) ** 0.5 + 1e-6) | |
| return float(score) | |
| # --------------------------- | |
| # Hugging Face fetch | |
| # --------------------------- | |
| def fetch_md_from_hf(local_dir: str = HF_REPO_DIR) -> List[str]: | |
| """ | |
| Download all .md files in HF_REPO into local_dir. | |
| Returns list of local file paths. | |
| """ | |
| os.makedirs(local_dir, exist_ok=True) | |
| api = HfApi() | |
| try: | |
| repo_files = list_repo_files(HF_REPO) | |
| logger.debug("Listed files in HF repo '%s': %d entries.", HF_REPO, len(repo_files)) | |
| except Exception as e: | |
| logger.warning("list_repo_files failed for %s: %s", HF_REPO, e) | |
| repo_files = [] | |
| md_files = [f for f in repo_files if f.lower().endswith(".md")] | |
| downloaded = [] | |
| for f in md_files: | |
| try: | |
| local_path = hf_hub_download(repo_id=HF_REPO, filename=f, repo_type="model", local_dir=local_dir, token=HF_TOKEN) | |
| downloaded.append(local_path) | |
| logger.info("Downloaded %s -> %s", f, local_path) | |
| except Exception as e: | |
| # continue but report | |
| logger.warning("Failed to download %s: %s", f, e) | |
| # If list_repo_files returned nothing (private repo?), try scanning local dir only. | |
| if not md_files: | |
| logger.debug("No markdown files discovered via HF list; will rely on local directory scanning.") | |
| return downloaded | |
| # --------------------------- | |
| # Knowledge Store | |
| # --------------------------- | |
| class KnowledgeStore: | |
| def __init__(self, root: str = HF_REPO_DIR, exts: Tuple[str, ...] = ALLOWED_EXT): | |
| self.root = root | |
| self.exts = exts | |
| self.capsules: Dict[str, Dict[str, Any]] = {} | |
| self.summary_cache: Dict[str, Dict] = {} | |
| os.makedirs(self.root, exist_ok=True) | |
| self._load_summary_cache() | |
| self._load_all() | |
| def _read_file(self, path: str) -> str: | |
| try: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| except Exception as e: | |
| logger.error("Error reading %s: %s", path, e) | |
| return f"[Error reading {os.path.basename(path)}: {e}]" | |
| def _chunk_text(self, text: str) -> List[str]: | |
| chunks = [] | |
| i = 0 | |
| length = len(text) | |
| while i < length: | |
| chunk = text[i:i+CHUNK_SIZE] | |
| chunks.append(chunk) | |
| i += CHUNK_SIZE - CHUNK_OVERLAP | |
| return chunks or [text] | |
| def _scan_local_md_files(self) -> List[str]: | |
| paths = [] | |
| for root, _, files in os.walk(self.root): | |
| for name in files: | |
| if name.lower().endswith(self.exts): | |
| paths.append(os.path.join(root, name)) | |
| logger.debug("Scanned %d local markdown files under %s", len(paths), self.root) | |
| return sorted(paths) | |
| def _load_all(self): | |
| # first attempt: download from HF into local dir (safe to call repeatedly) | |
| try: | |
| fetch_md_from_hf(self.root) | |
| except Exception as e: | |
| logger.warning("fetch_md_from_hf failed during load_all: %s", e) | |
| files = self._scan_local_md_files() | |
| total_len = 0 | |
| for path in files: | |
| name = os.path.basename(path) | |
| try: | |
| mtime = os.path.getmtime(path) | |
| except Exception: | |
| mtime = 0 | |
| key = os.path.splitext(name)[0].lower() | |
| text = self._read_file(path) | |
| if len(text) > MAX_CONTEXT_CHARS: | |
| text = text[:MAX_CONTEXT_CHARS] + "\n\n[TRUNCATED]\n" | |
| logger.debug("Truncated %s to MAX_CONTEXT_CHARS", name) | |
| chunks = self._chunk_text(text) | |
| self.capsules[key] = {"filename": name, "text": text, "chunks": chunks, "mtime": mtime, "path": path} | |
| total_len += len(text) | |
| logger.info("Loaded capsule '%s' (file=%s, chars=%d, chunks=%d)", key, name, len(text), len(chunks)) | |
| logger.info("[KnowledgeStore] Loaded %d capsule(s). total_chars=%d", len(self.capsules), total_len) | |
| def maybe_reload(self): | |
| # Reload any file that changed | |
| changed = False | |
| for key, meta in list(self.capsules.items()): | |
| path = meta.get("path") | |
| if not path or not os.path.exists(path): | |
| continue | |
| try: | |
| mtime = os.path.getmtime(path) | |
| except Exception: | |
| mtime = None | |
| if mtime and mtime != meta.get("mtime"): | |
| logger.info("Detected change in capsule %s; reloading.", key) | |
| text = self._read_file(path) | |
| if len(text) > MAX_CONTEXT_CHARS: | |
| text = text[:MAX_CONTEXT_CHARS] + "\n\n[TRUNCATED]\n" | |
| chunks = self._chunk_text(text) | |
| self.capsules[key].update({"text": text, "chunks": chunks, "mtime": mtime}) | |
| if key in self.summary_cache: | |
| del self.summary_cache[key] | |
| self._persist_summary_cache() | |
| changed = True | |
| if changed: | |
| logger.debug("maybe_reload: changes detected in knowledge store.") | |
| return changed | |
| def refresh_all(self): | |
| logger.info("Refreshing all capsules from HF and local dir.") | |
| fetch_md_from_hf(self.root) | |
| self.capsules = {} | |
| self._load_all() | |
| self.summary_cache = {} | |
| self._persist_summary_cache() | |
| def list_agents(self) -> List[str]: | |
| return sorted(list(self.capsules.keys())) | |
| def get_capsule(self, key: str) -> Optional[Dict]: | |
| return self.capsules.get(key) | |
| # Embeddings (lazy) | |
| def _embed_text(self, text: str) -> Optional[List[float]]: | |
| try: | |
| r = client.embeddings.create(model=EMBED_MODEL, input=[text]) | |
| emb = r.data[0].embedding | |
| logger.debug("Generated embedding (len=%d) for text snippet (chars=%d)", len(emb), len(text)) | |
| return emb | |
| except Exception as e: | |
| logger.warning("Embedding generation failed: %s", e) | |
| return None | |
| def build_embeddings_for_capsule(self, key: str): | |
| meta = self.get_capsule(key) | |
| if not meta: | |
| logger.warning("build_embeddings_for_capsule: missing capsule %s", key) | |
| return | |
| if meta.get("embeddings"): | |
| logger.debug("build_embeddings_for_capsule: embeddings already exist for %s", key) | |
| return | |
| embeddings = [] | |
| for i, chunk in enumerate(meta["chunks"], 1): | |
| emb = self._embed_text(chunk) | |
| embeddings.append(emb) | |
| logger.debug("Embedding chunk %d/%d for capsule %s", i, len(meta["chunks"]), key) | |
| meta["embeddings"] = embeddings | |
| logger.info("Built embeddings for capsule %s (%d chunks)", key, len(embeddings)) | |
| def _cosine(a: List[float], b: List[float]) -> float: | |
| if a is None or b is None: | |
| return -1.0 | |
| import math | |
| dot = sum(x*y for x,y in zip(a,b)) | |
| na = math.sqrt(sum(x*x for x in a)) | |
| nb = math.sqrt(sum(x*x for x in b)) | |
| if na==0 or nb==0: | |
| return -1.0 | |
| return dot / (na*nb) | |
| # Find relevant capsules (embedding fallback to overlap) | |
| def find_relevant_capsules(self, query: str, top_n: int = 3) -> List[Tuple[str, float]]: | |
| query_emb = None | |
| try: | |
| query_emb = self._embed_text(query) | |
| except Exception as e: | |
| query_emb = None | |
| logger.debug("Embedding query failed during find_relevant_capsules: %s", e) | |
| scores = [] | |
| for key, meta in self.capsules.items(): | |
| best_score = 0.0 | |
| if meta.get("embeddings") and query_emb: | |
| for emb in meta["embeddings"]: | |
| sc = self._cosine(query_emb, emb) | |
| if sc > best_score: | |
| best_score = sc | |
| else: | |
| for chunk in meta.get("chunks", []): | |
| sc = simple_overlap_score(query, chunk) | |
| if sc > best_score: | |
| best_score = sc | |
| if key in normalize_text(query): | |
| best_score += 0.15 | |
| if best_score > 0: | |
| scores.append((key, best_score)) | |
| scores.sort(key=lambda x: x[1], reverse=True) | |
| logger.debug("find_relevant_capsules for query '%s' -> top: %s", query[:80], scores[:top_n]) | |
| return scores[:top_n] | |
| def get_top_passages_for_capsule(self, capsule_key: str, query: str, k: int = TOP_K_PASSAGES) -> List[Tuple[str, float]]: | |
| meta = self.get_capsule(capsule_key) or {} | |
| chunks = meta.get("chunks", []) | |
| scored = [] | |
| query_emb = None | |
| try: | |
| query_emb = self._embed_text(query) | |
| except Exception as e: | |
| query_emb = None | |
| logger.debug("Query embedding failed for top passages: %s", e) | |
| if meta.get("embeddings") and query_emb: | |
| for chunk, emb in zip(chunks, meta.get("embeddings", [])): | |
| score = self._cosine(query_emb, emb) or 0.0 | |
| scored.append((chunk, float(score))) | |
| else: | |
| scored = [(chunk, simple_overlap_score(query, chunk)) for chunk in chunks] | |
| scored = [s for s in scored if s[1] > 0] | |
| scored.sort(key=lambda x: x[1], reverse=True) | |
| logger.debug("Top passages for capsule %s (query=%s): %d hits", capsule_key, query[:80], len(scored)) | |
| return scored[:k] | |
| # Summaries | |
| def _load_summary_cache(self): | |
| if os.path.exists(SUMMARY_CACHE_FILE): | |
| try: | |
| with open(SUMMARY_CACHE_FILE, "r", encoding="utf-8") as f: | |
| self.summary_cache = json.load(f) | |
| logger.info("Loaded summary cache (%d items).", len(self.summary_cache)) | |
| except Exception as e: | |
| self.summary_cache = {} | |
| logger.warning("Failed to load summary cache: %s", e) | |
| else: | |
| self.summary_cache = {} | |
| logger.debug("No summary cache file found; starting fresh.") | |
| def _persist_summary_cache(self): | |
| try: | |
| with open(SUMMARY_CACHE_FILE, "w", encoding="utf-8") as f: | |
| json.dump(self.summary_cache, f, indent=2) | |
| logger.debug("Persisted summary cache (%d items).", len(self.summary_cache)) | |
| except Exception as e: | |
| logger.warning("Failed to persist summary cache: %s", e) | |
| def get_or_build_summary(self, key: str, force: bool = False) -> str: | |
| if not force and key in self.summary_cache: | |
| logger.debug("Returning cached summary for %s", key) | |
| return self.summary_cache[key]["summary"] | |
| meta = self.get_capsule(key) | |
| if not meta: | |
| logger.debug("get_or_build_summary: no capsule for key %s", key) | |
| return "" | |
| top_chunks = meta.get("chunks", [])[:6] | |
| combined = "\n\n".join(top_chunks) | |
| prompt = textwrap.dedent(f""" | |
| Summarize the following markdown content into a concise (4-8 sentence) technical summary. | |
| ----- BEGIN CONTENT ----- | |
| {combined} | |
| ----- END CONTENT ----- | |
| """).strip() | |
| try: | |
| messages = [{"role":"system","content":"You are a succinct summarizer."}, | |
| {"role":"user","content":prompt}] | |
| logger.debug("Requesting summary for capsule %s (chars=%d)", key, len(combined)) | |
| resp = client.chat.completions.create(model=MODEL_ID, messages=messages, max_tokens=400, temperature=0.0) | |
| summary = resp.choices[0].message.content.strip() | |
| # if usage is available, log it | |
| usage = getattr(resp, "usage", None) | |
| if usage: | |
| try: | |
| telemetry['tokens_used_est'] += int(usage.total_tokens) | |
| except Exception: | |
| pass | |
| logger.info("Summary built for %s (usage=%s)", key, usage) | |
| except Exception as e: | |
| logger.warning("Failed to build summary via model for %s: %s", key, e) | |
| summary = combined[:800] + ("\n\n[TRUNCATED]" if len(combined) > 800 else "") | |
| self.summary_cache[key] = {"summary": summary, "updated": int(time.time())} | |
| self._persist_summary_cache() | |
| return summary | |
| # PDF β capsule helper | |
| def add_pdf_capsule(self, capsule_name: str, filepath: str) -> None: | |
| """ | |
| Create a .md capsule from PDF content at filepath. | |
| """ | |
| if not os.path.exists(filepath): | |
| logger.error("add_pdf_capsule: filepath does not exist: %s", filepath) | |
| raise FileNotFoundError(filepath) | |
| # Extract text bytes | |
| try: | |
| b = open(filepath, "rb").read() | |
| except Exception as e: | |
| logger.exception("Failed to read uploaded pdf bytes: %s", e) | |
| raise | |
| txt = _extract_text_from_pdf_bytes(b) | |
| md_name = f"{capsule_name}.md" | |
| out_path = os.path.join(self.root, md_name) | |
| try: | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| f.write(f"# {capsule_name}\n\n") | |
| f.write(txt) | |
| logger.info("Wrote PDF capsule to %s", out_path) | |
| except Exception as e: | |
| logger.exception("Failed to write capsule file %s: %s", out_path, e) | |
| raise | |
| # reload into store | |
| self._load_all() | |
| store = KnowledgeStore(HF_REPO_DIR) | |
| # --------------------------- | |
| # Conversation manager | |
| # --------------------------- | |
| class ConversationManager: | |
| def __init__(self): | |
| self.histories: Dict[str, List[Dict]] = {} | |
| def get_history(self, key): | |
| if key not in self.histories: | |
| self.histories[key] = [] | |
| return self.histories[key] | |
| def append(self, key, role, content): | |
| logger.debug("Appending to history: agent=%s role=%s chars=%d", key, role, len(content or "")) | |
| self.get_history(key).append({"role": role, "content": content}) | |
| def reset_agent(self, key): | |
| logger.info("Resetting history for agent %s", key) | |
| self.histories[key] = [] | |
| def reset_all(self): | |
| logger.info("Resetting all conversation histories") | |
| self.histories = {} | |
| conv_manager = ConversationManager() | |
| # --------------------------- | |
| # Python sandbox helpers | |
| # --------------------------- | |
| def run_user_code_simple(code: str) -> str: | |
| """ | |
| Simple local exec (not safe for untrusted code). Kept for quick tests only. | |
| We will prefer the sandbox-run below for actual user-run code paths. | |
| """ | |
| try: | |
| exec_globals = {} | |
| exec(code, exec_globals) | |
| logger.info("run_user_code_simple executed code successfully (chars=%d)", len(code)) | |
| return "β Code ran without errors." | |
| except Exception: | |
| tb = traceback.format_exc() | |
| logger.error("run_user_code_simple error: %s", tb) | |
| return f"β Error:\n{tb}" | |
| def run_python_sandbox(code: str, timeout: int = PY_SANDBOX_TIMEOUT) -> Tuple[str, str]: | |
| blocked = ["os.system", "subprocess", "socket", "pty", "multiprocessing", "shutil", "requests", "urllib", "open("] | |
| lowered = code.lower() | |
| for b in blocked: | |
| if b in lowered: | |
| logger.warning("Refused to run sandboxed code due to blocked pattern: %s", b) | |
| return "", f"Refused to run: code contains blocked pattern '{b}'." | |
| with tempfile.TemporaryDirectory() as d: | |
| script_path = os.path.join(d, "script.py") | |
| with open(script_path, "w", encoding="utf-8") as f: f.write(code) | |
| try: | |
| proc = subprocess.Popen( | |
| ["python", script_path], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| cwd=d, | |
| text=True, | |
| env={"PYTHONPATH": d} | |
| ) | |
| try: | |
| out, err = proc.communicate(timeout=timeout) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| logger.warning("Sandbox code timed out after %ds", timeout) | |
| return "", f"Execution timed out after {timeout}s." | |
| logger.info("Sandbox executed successfully (stdout=%d bytes, stderr=%d bytes)", len(out or ""), len(err or "")) | |
| return out, err | |
| except Exception as e: | |
| logger.exception("Sandbox execution failed: %s", e) | |
| return "", f"Execution failed: {e}" | |
| # --------------------------- | |
| # Diffusers / HF codeblock helpers | |
| # --------------------------- | |
| CODEBLOCK_RE = re.compile(r"```(?:python)?\n(.*?)```", re.S | re.I) | |
| def extract_codeblocks_from_text(text: str) -> List[str]: | |
| return [m.group(1).strip() for m in CODEBLOCK_RE.finditer(text)] | |
| def diffusers_helper_for_capsule(key: str) -> str: | |
| cap = store.get_capsule(key) | |
| if not cap: | |
| logger.debug("diffusers_helper_for_capsule: no capsule found for %s", key) | |
| return "No capsule selected or capsule not found." | |
| codeblocks = extract_codeblocks_from_text(cap["text"]) | |
| relevant = [cb for cb in codeblocks if "diffusers" in cb or "huggingface" in cb or "transformers" in cb] | |
| if not relevant: | |
| logger.debug("No diffusers/transformers blocks in capsule %s", key) | |
| return "No diffusers/transformers code blocks found in capsule." | |
| instructions = "Found Diffusers-related code blocks:\n\n" | |
| for i, cb in enumerate(relevant,1): | |
| header = f"--- Code Block {i} ---\n" | |
| instructions += header + cb[:2000] + ("\n\n[TRUNCATED]\n" if len(cb)>2000 else "\n\n") | |
| instructions += "Ensure `pip install diffusers accelerate transformers torch` before running." | |
| logger.info("diffusers helper produced %d blocks for %s", len(relevant), key) | |
| return instructions | |
| # --------------------------- | |
| # PDF extraction helper | |
| # --------------------------- | |
| def _extract_text_from_pdf_bytes(b: bytes) -> str: | |
| try: | |
| from PyPDF2 import PdfReader | |
| except Exception: | |
| logger.warning("PyPDF2 not installed; PDF extraction unavailable.") | |
| return "[PDF text extraction not available: PyPDF2 not installed]" | |
| try: | |
| reader = PdfReader(io.BytesIO(b)) | |
| texts = [] | |
| for i, page in enumerate(reader.pages): | |
| try: | |
| t = page.extract_text() | |
| if t: | |
| texts.append(t) | |
| except Exception as e: | |
| logger.debug("PDF page %d extraction error: %s", i, e) | |
| continue | |
| result = "\n\n".join(texts) if texts else "[No extractable text in PDF pages]" | |
| logger.info("Extracted text from PDF (pages=%d, chars=%d)", len(reader.pages), len(result)) | |
| return result | |
| except Exception as e: | |
| logger.exception("PDF extraction failed: %s", e) | |
| return f"[PDF extraction failed: {e}]" | |
| # --------------------------- | |
| # Routing & chat helpers | |
| # --------------------------- | |
| def pick_agent_and_passages(user_message: str) -> Tuple[str, List[Tuple[str, float]]]: | |
| logger.debug("pick_agent_and_passages for message (chars=%d): %s", len(user_message or ""), (user_message or "")[:120]) | |
| store.maybe_reload() | |
| # explicit "agent: <name>" pattern | |
| m = re.search(r"agent[:=]\s*([a-z0-9_\-]+)", user_message.lower()) | |
| if m: | |
| candidate = m.group(1) | |
| if candidate in store.capsules: | |
| passages = store.get_top_passages_for_capsule(candidate, user_message, k=TOP_K_PASSAGES) | |
| logger.info("User requested explicit agent '%s' -> %d passages", candidate, len(passages)) | |
| return candidate, passages | |
| lowered = normalize_text(user_message) | |
| for key in store.list_agents(): | |
| if key in lowered: | |
| passages = store.get_top_passages_for_capsule(key, user_message, k=TOP_K_PASSAGES) | |
| logger.info("Agent key found in message: %s", key) | |
| return key, passages | |
| relevant = store.find_relevant_capsules(user_message, top_n=3) | |
| if relevant: | |
| best_key, best_score = relevant[0] | |
| logger.debug("find_relevant_capsules top: %s score=%.3f", best_key, best_score) | |
| if best_score < 0.03: | |
| logger.debug("Best score below threshold (%.3f) -> general", best_score) | |
| return "general", [] | |
| passages = store.get_top_passages_for_capsule(best_key, user_message, k=TOP_K_PASSAGES) | |
| return best_key, passages | |
| return "general", [] | |
| def build_system_prompt_for_agent(agent_key: str, mode: str, passages: List[Tuple[str, float]]) -> str: | |
| agent_name = agent_key.capitalize() if agent_key else "General" | |
| meta = store.get_capsule(agent_key) or {} | |
| filename = meta.get("filename", "unknown") | |
| passage_texts = [] | |
| for i,(p,score) in enumerate(passages,1): | |
| excerpt = p.strip() | |
| if len(excerpt)>1400: excerpt = excerpt[:1400]+"\n\n[TRUNCATED]" | |
| passage_texts.append(f"--- Passage {i} (score={score:.3f}) from {filename} ---\n{excerpt}") | |
| summary = store.get_or_build_summary(agent_key) if agent_key in store.capsules else "" | |
| context_block = "\n\n".join(passage_texts) or f"[No direct passages matched. Capsule summary:]\n{summary or '[No summary available]'}" | |
| if mode == "chat": | |
| mask = f""" | |
| You are Ohamlab β PhD-level specialist in **{agent_name}**. | |
| MODE: Conversational. | |
| --- CONTEXT --- | |
| {context_block} | |
| """ | |
| elif mode == "research": | |
| mask = f""" | |
| You are Ohamlab β PhD-level research AI in **{agent_name}**. | |
| MODE: Research / Analytical. | |
| --- CONTEXT --- | |
| {context_block} | |
| """ | |
| elif mode == "coding": | |
| mask = f""" | |
| You are Ohamlab β an expert **coding assistant**. | |
| MODE: Debugging / Code Fixer. | |
| User may provide: (a) code, (b) error messages, (c) questions about programming. | |
| Your job: | |
| - Carefully analyze the code and error. | |
| - Explain what went wrong in simple terms. | |
| - Show the corrected code (with a Python/JS block if possible). | |
| - Give step-by-step reasoning why your fix works. | |
| - If multiple issues, list them clearly. | |
| --- CONTEXT --- | |
| {context_block} | |
| """ | |
| else: | |
| mask = f"You are Ohamlab β General Specialist.\n--- CONTEXT ---\n{context_block}" | |
| logger.debug("Built system prompt for agent=%s mode=%s (context_chars=%d)", agent_key, mode, len(context_block or "")) | |
| return textwrap.dedent(mask).strip() | |
| def call_model_get_response(model_id: str, messages: list, max_tokens: int = 1200, temperature: float = 0.2): | |
| start = now_ms() | |
| telemetry['model_calls'] += 1 | |
| logger.info("Calling model %s (max_tokens=%d, temp=%s). Messages=%d", model_id, max_tokens, temperature, len(messages)) | |
| try: | |
| resp = client.chat.completions.create(model=model_id, messages=messages, max_tokens=max_tokens, temperature=temperature) | |
| choice = resp.choices[0] | |
| elapsed = now_ms()-start | |
| telemetry['last_call_ms'] = elapsed | |
| telemetry['calls'].append({'time_ms': elapsed,'max_tokens':max_tokens,'temperature':temperature}) | |
| # try to use usage if present | |
| usage = getattr(resp, "usage", None) | |
| if usage: | |
| try: | |
| telemetry['tokens_used_est'] += int(usage.total_tokens) | |
| logger.debug("Model usage reported: %s", usage) | |
| except Exception: | |
| pass | |
| # fallback token estimation | |
| chars = sum(len(m.get('content','')) for m in messages) | |
| telemetry['tokens_used_est'] += int(chars/4) | |
| response = getattr(choice.message, "content", None) | |
| if not response: | |
| logger.warning("LLM returned no response for model %s", model_id) | |
| return "β οΈ LLM returned no response." | |
| logger.info("Model call completed in %d ms; tokens_est=%d", elapsed, telemetry.get('tokens_used_est', 0)) | |
| return response.strip() | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| logger.exception("Model call failed: %s", e) | |
| raise RuntimeError(f"Model call failed: {e}\n{tb}") | |
| def chat_with_model(user_message: str, chat_history_ui: List[Tuple[str,str]], selected_agent: str, mode: str): | |
| """ | |
| Primary chat function used by Gradio callbacks. | |
| - user_message: text | |
| - chat_history_ui: current Chatbot tuples | |
| - selected_agent: value of agent_selector dropdown (e.g. 'general' or capsule key) | |
| - mode: 'chat'|'research'|'coding' | |
| Returns (cleared_input, updated_chat_history) | |
| """ | |
| if not user_message or not str(user_message).strip(): | |
| logger.debug("chat_with_model called with empty message.") | |
| return "", chat_history_ui | |
| try: | |
| # If user explicitly selected an agent via UI, prefer it. Otherwise pick via content. | |
| if selected_agent and selected_agent != "general": | |
| agent_key = selected_agent | |
| passages = store.get_top_passages_for_capsule(agent_key, user_message, k=TOP_K_PASSAGES) | |
| logger.debug("User selected agent %s", agent_key) | |
| else: | |
| agent_key, passages = pick_agent_and_passages(user_message) | |
| logger.debug("Auto-picked agent %s", agent_key) | |
| system_prompt = build_system_prompt_for_agent(agent_key, mode, passages) | |
| history_msgs_struct = conv_manager.get_history(agent_key) | |
| # Convert our stored history (list of dicts) into model messages | |
| msgs_for_model = [{"role":"system","content":system_prompt}] | |
| msgs_for_model += history_msgs_struct | |
| msgs_for_model.append({"role":"user","content":user_message}) | |
| max_toks = RESEARCH_MAX_TOKENS if mode == "research" else (CODING_MAX_TOKENS if mode == "coding" else CHAT_MAX_TOKENS) | |
| reply = call_model_get_response(MODEL_ID, msgs_for_model, max_tokens=max_toks, temperature=0.15 if mode!="research" else 0.0) | |
| # Append to conversation manager | |
| conv_manager.append(agent_key, "user", user_message) | |
| conv_manager.append(agent_key, "assistant", reply) | |
| chat_history_ui = chat_history_ui + [(user_message, reply)] | |
| logger.info("chat_with_model: responded for agent=%s mode=%s", agent_key, mode) | |
| return "", chat_history_ui | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| logger.exception("chat_with_model ERROR: %s", e) | |
| return f"[ERROR] {e}\n{tb}", chat_history_ui | |
| # --------------------------- | |
| # UI & callbacks | |
| # --------------------------- | |
| def reset_agent_ui(selected_agent: str): | |
| conv_manager.reset_agent(selected_agent) | |
| return [] | |
| def reset_all_ui(): | |
| conv_manager.reset_all() | |
| return [] | |
| def refresh_capsules_ui(): | |
| logger.info("UI requested refresh_capsules_ui") | |
| store.refresh_all() | |
| return gr.update(choices=["general"] + store.list_agents()) | |
| from typing import Tuple | |
| def add_uploaded_pdf_as_capsule(filepath) -> Tuple[str, dict]: | |
| """ | |
| Add uploaded PDF as a new capsule and return updated agent selector choices. | |
| """ | |
| if not filepath: | |
| logger.warning("add_uploaded_pdf_as_capsule: no file provided") | |
| return "β οΈ No file uploaded", gr.update() | |
| try: | |
| capsule_name = os.path.splitext(os.path.basename(filepath))[0] | |
| store.add_pdf_capsule(capsule_name, filepath) | |
| new_choices = ["general"] + store.list_agents() | |
| logger.info("Added PDF capsule via UI: %s", capsule_name) | |
| return f"β Added PDF Capsule: {capsule_name}", gr.update(choices=new_choices, value=capsule_name) | |
| except Exception as e: | |
| logger.exception("Failed to add PDF capsule: %s", e) | |
| return f"β Failed to add PDF: {str(e)}", gr.update() | |
| def copy_all_chat(chat_list): | |
| return "\n\n".join([f"{u}:\n{a}" for u,a in chat_list]) | |
| def chat_to_markdown(chat_list): | |
| md_lines = [] | |
| for u, a in chat_list: | |
| md_lines.append(f"**{u}**\n```\n{a}\n```") | |
| return "\n\n".join(md_lines) | |
| # --------------------------- | |
| # Build Gradio app | |
| # --------------------------- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("<h2 style='text-align:center;'>π€ Ohamlab AI β PhD Specialist Multi-Agent Shell</h2>") | |
| # Top controls | |
| with gr.Row(): | |
| agent_selector = gr.Dropdown(choices=["general"] + store.list_agents(), value="general", label="Select Agent", interactive=True) | |
| mode_selector = gr.Radio(choices=["chat","research","coding"], value="chat", label="Conversation Mode", interactive=True) | |
| refresh_btn = gr.Button("π Refresh Capsules") | |
| summary_btn = gr.Button("π Show Capsule Summary") | |
| diff_btn = gr.Button("π Show Diffusers Helper") | |
| # PDF Upload | |
| with gr.Row(): | |
| pdf_upload = gr.File(file_types=[".pdf"], type="filepath", label="π Upload a PDF (file path)", interactive=True) | |
| upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
| # Chat window | |
| chat_window = gr.Chatbot([], label="Conversation", height=600, type="tuples", show_copy_button=True) | |
| # Input row | |
| with gr.Row(): | |
| user_input = gr.Textbox(placeholder="Type your message here...", lines=3) | |
| send_btn = gr.Button("π Send") | |
| # Bottom controls | |
| with gr.Row(): | |
| clear_agent_btn = gr.Button("π§Ή Clear Agent History") | |
| clear_all_btn = gr.Button("π§Ό Reset All Histories") | |
| copy_btn = gr.Button("π Copy All") | |
| share_md_btn = gr.Button("π Share as Markdown") | |
| # Summary & diff output boxes (static) | |
| summary_output = gr.Textbox(lines=10, interactive=False, label="Capsule Summary") | |
| diff_output = gr.Textbox(lines=12, interactive=False, label="Diffusers Helper") | |
| copied_output = gr.Textbox(label="Copied Chat", interactive=True) | |
| md_export_output = gr.Textbox(label="Markdown Export", interactive=True) | |
| # Callbacks wiring | |
| refresh_btn.click(fn=refresh_capsules_ui, outputs=[agent_selector]) | |
| pdf_upload.upload(fn=add_uploaded_pdf_as_capsule, inputs=[pdf_upload], outputs=[upload_status, agent_selector]) | |
| # Send (pass agent and mode) | |
| send_btn.click(fn=chat_with_model, inputs=[user_input, chat_window, agent_selector, mode_selector], outputs=[user_input, chat_window]) | |
| user_input.submit(fn=chat_with_model, inputs=[user_input, chat_window, agent_selector, mode_selector], outputs=[user_input, chat_window]) | |
| clear_agent_btn.click(fn=reset_agent_ui, inputs=[agent_selector], outputs=[chat_window]) | |
| clear_all_btn.click(fn=reset_all_ui, inputs=None, outputs=[chat_window]) | |
| summary_btn.click(fn=lambda a: store.get_or_build_summary(a) if a in store.capsules else "No capsule selected.", inputs=[agent_selector], outputs=[summary_output]) | |
| diff_btn.click(fn=diffusers_helper_for_capsule, inputs=[agent_selector], outputs=[diff_output]) | |
| copy_btn.click(fn=copy_all_chat, inputs=[chat_window], outputs=[copied_output]) | |
| share_md_btn.click(fn=chat_to_markdown, inputs=[chat_window], outputs=[md_export_output]) | |
| # Show loaded capsules | |
| capsule_md = "### Loaded Agents / Capsules\n\n" + "\n".join([f"- **{k}** (file: `{v['filename']}`)" for k,v in store.capsules.items()]) | |
| gr.Markdown(capsule_md) | |
| # --------------------------- | |
| # Run | |
| # --------------------------- | |
| if __name__ == "__main__": | |
| logger.info("Launching Gradio app.") | |
| demo.launch(server_name="0.0.0.0", share=False) | |