import os import atexit import asyncio import inspect import base64 import mimetypes from pathlib import Path import gradio as gr from openai import OpenAI from dotenv import load_dotenv from langsmith import Client as LangSmithClient from langsmith.run_trees import RunTree load_dotenv() INFERENCE_GEMINI = "Gemini" INFERENCE_QWEN3_VL = "Qwen3-VL" INFERENCE = INFERENCE_GEMINI # Configure Gemini via OpenAI-compatible endpoint GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/" GEMINI_MODEL = "gemini-2.5-flash" # Configure Qwen3-VL via OpenAI-compatible endpoint QWEN3_VL_BASE_URL = "https://router.huggingface.co/v1" QWEN3_VL_MODEL = "Qwen/Qwen3-VL-235B-A22B-Thinking:novita" if INFERENCE == INFERENCE_GEMINI: _api_key = os.getenv("GEMINI_API_KEY") _client = OpenAI(api_key=_api_key, base_url=GEMINI_BASE_URL) if _api_key else None elif INFERENCE == INFERENCE_QWEN3_VL: _api_key = os.getenv("HUGGINGFACE_INFERENCE_PROVIDERS_API_KEY") _client = OpenAI(api_key=_api_key, base_url=QWEN3_VL_BASE_URL) if _api_key else None # Optional LangSmith client for guaranteed flush _ls_api_key_env = os.getenv("LANGSMITH_API_KEY") _ls_client = LangSmithClient() if _ls_api_key_env else None def _flush_langsmith(): """Ensure LangSmith traces are sent before process exit or between runs.""" if not _ls_client: return try: result = _ls_client.flush() if inspect.isawaitable(result): try: asyncio.run(result) except RuntimeError: # If an event loop is already running (e.g., in some servers), fallback loop = asyncio.get_event_loop() loop.create_task(result) except Exception: # Best-effort flush; do not break the app pass if _ls_client: try: atexit.register(_flush_langsmith) except Exception: pass # Load system prompt from external file system_prompt_file = Path(__file__).parent / "system_prompt.md" if system_prompt_file.exists(): with open(system_prompt_file, "r") as f: system_prompt = f.read() # Load CSS from external file css_file = Path(__file__).parent / "style.css" with open(css_file, "r") as f: style = f.read() def _extract_text_and_files(message): """Extract user text and attached files from a multimodal message value.""" if isinstance(message, str): return message, [] # Common multimodal shapes: dict with keys, or list of parts files = [] text_parts = [] try: if isinstance(message, dict): if "text" in message: text_parts.append(message.get("text") or "") if "files" in message and message["files"]: files = message["files"] or [] elif isinstance(message, (list, tuple)): for part in message: if isinstance(part, str): text_parts.append(part) elif isinstance(part, dict): # Heuristic: file-like dicts may have 'path' or 'name' if any(k in part for k in ("path", "name", "mime_type")): files.append(part) elif "text" in part: text_parts.append(part.get("text") or "") except Exception: pass text_combined = " ".join([t for t in text_parts if t]) return text_combined, files def _build_image_parts(files): image_parts = [] for f in files or []: path = None if isinstance(f, str): path = f elif isinstance(f, dict): path = f.get("path") or f.get("name") if not path or not os.path.exists(path): continue mime, _ = mimetypes.guess_type(path) if not mime or not mime.startswith("image/"): continue try: with open(path, "rb") as fp: b64 = base64.b64encode(fp.read()).decode("utf-8") data_url = f"data:{mime};base64,{b64}" image_parts.append({ "type": "image_url", "image_url": {"url": data_url}, }) except Exception: continue return image_parts def _value_to_user_content(value): """Normalize any gradio message value to OpenAI user 'content'.""" text, files = _extract_text_and_files(value) final_user_text = (text or "").strip() or "Describe el contenido de la(s) imagen(es)." image_parts = _build_image_parts(files) if image_parts: return [{"type": "text", "text": final_user_text}] + image_parts return final_user_text def _value_preview(value, limit: int = 600) -> str: """Safe preview string for any kind of message value.""" if isinstance(value, str): return _preview_text(value, limit) text, files = _extract_text_and_files(value) suffix = "" if files: suffix = f" [images:{len(files)}]" return _preview_text((text or "").strip() + suffix, limit) def _preview_text(text: str | None, limit: int = 600) -> str: if not text: return "" if len(text) <= limit: return text return text[:limit] + "…" def _history_preview(history: list[tuple[str, str]] | None, max_turns: int = 3, max_chars: int = 1200) -> str: if not history: return "" tail = history[-max_turns:] parts: list[str] = [] for user_turn, assistant_turn in tail: if user_turn: parts.append(f"User 👤: {_preview_text(user_turn, 300)}") if assistant_turn: parts.append(f"Assistant 🤖: {_preview_text(assistant_turn, 300)}") joined = "\n".join(parts) return _preview_text(joined, max_chars) def respond(message, history: list[tuple[str, str]]): """Stream assistant reply via Gemini using OpenAI-compatible API. Yields partial text chunks so the UI shows a live stream. """ user_text, files = _extract_text_and_files(message) if not _client: if INFERENCE == INFERENCE_GEMINI: yield ( "Gemini API key not configured. Set environment variable GEMINI_API_KEY " "and restart the app." ) elif INFERENCE == INFERENCE_QWEN3_VL: yield ( "Qwen3-VL API key not configured. Set environment variable QWEN3_VL_API_KEY " "and restart the app." ) else: yield "Inference engine not configured. Set environment variable INFERENCE to 'Gemini' or 'Qwen3-VL' and restart the app." return # Build OpenAI-style messages from history messages = [ { "role": "system", "content": system_prompt, } ] for user_turn, assistant_turn in history or []: if user_turn: messages.append({"role": "user", "content": _value_to_user_content(user_turn)}) if assistant_turn: messages.append({"role": "assistant", "content": assistant_turn}) # Build user content with optional inline images (data URLs) final_user_text = (user_text or "").strip() or "Describe el contenido de la(s) imagen(es)." # Collect image parts using helper image_parts = _build_image_parts(files) if image_parts: user_content = [{"type": "text", "text": final_user_text}] + image_parts else: user_content = final_user_text messages.append({"role": "user", "content": user_content}) # Optional RunTree instrumentation (does not require LANGSMITH_TRACING) _ls_api_key = os.getenv("LANGSMITH_API_KEY") pipeline = None child_build = None child_llm = None if _ls_api_key: try: pipeline = RunTree( name="Chat Session", run_type="chain", inputs={ "user_text": _value_preview(message, 600), "has_images": bool(image_parts), "history_preview": _history_preview(history), }, ) pipeline.post() child_build = pipeline.create_child( name="BuildMessages", run_type="chain", inputs={ "system_prompt_preview": _preview_text(system_prompt, 400), "user_content_type": "multimodal" if image_parts else "text", "history_turns": len(history or []), }, ) child_build.post() child_build.end( outputs={ "messages_count": len(messages), } ) child_build.patch() except Exception: pipeline = None try: if pipeline: try: if INFERENCE == INFERENCE_GEMINI: child_llm = pipeline.create_child( name="LLMCall", run_type="llm", inputs={ "model": GEMINI_MODEL, "provider": "gemini-openai", "messages_preview": _preview_text(str(messages[-1]), 600), }, ) elif INFERENCE == INFERENCE_QWEN3_VL: child_llm = pipeline.create_child( name="LLMCall", run_type="llm", inputs={ "model": QWEN3_VL_MODEL, "provider": "qwen3-vl-openai", "messages_preview": _preview_text(str(messages[-1]), 600), }, ) child_llm.post() except Exception: child_llm = None if INFERENCE == INFERENCE_GEMINI: stream = _client.chat.completions.create( model=GEMINI_MODEL, messages=messages, stream=True, ) elif INFERENCE == INFERENCE_QWEN3_VL: stream = _client.chat.completions.create( model=QWEN3_VL_MODEL, messages=messages, stream=True, ) accumulated = "" for chunk in stream: try: choice = chunk.choices[0] delta_text = None # OpenAI v1: delta.content if getattr(choice, "delta", None) is not None: delta_text = getattr(choice.delta, "content", None) # Fallback: some providers emit message.content in chunks if delta_text is None and getattr(choice, "message", None) is not None: delta_text = choice.message.get("content") if isinstance(choice.message, dict) else None if not delta_text: continue accumulated += delta_text yield accumulated except Exception: continue if not accumulated: yield "(Sin contenido de respuesta)" if child_llm: try: child_llm.end(outputs={"content": _preview_text(accumulated, 5000)}) child_llm.patch() except Exception: pass if pipeline: try: pipeline.end(outputs={"answer": _preview_text(accumulated, 5000)}) pipeline.patch() except Exception: pass # Ensure traces are flushed between requests _flush_langsmith() except Exception as e: if child_llm: try: child_llm.end(outputs={"error": str(e)}) child_llm.patch() except Exception: pass if pipeline: try: pipeline.end(outputs={"error": str(e)}) pipeline.patch() except Exception: pass yield f"Ocurrió un error al llamar a Gemini: {e}" _flush_langsmith() # Create the Gradio app with Blocks for better control with gr.Blocks(theme=gr.themes.Monochrome(), css=style, fill_height=True) as demo: # Title component title = gr.Markdown( value="# Gmail & Outlook API Helper", visible=True ) # Description component that can be hidden description = gr.HTML( value='