Spaces:
Running
Running
| import os | |
| import atexit | |
| import asyncio | |
| import inspect | |
| import base64 | |
| import mimetypes | |
| from pathlib import Path | |
| import gradio as gr | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| from langsmith import Client as LangSmithClient | |
| from langsmith.run_trees import RunTree | |
| load_dotenv() | |
| INFERENCE_GEMINI = "Gemini" | |
| INFERENCE_QWEN3_VL = "Qwen3-VL" | |
| INFERENCE = INFERENCE_GEMINI | |
| # Configure Gemini via OpenAI-compatible endpoint | |
| GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/" | |
| GEMINI_MODEL = "gemini-2.5-flash" | |
| # Configure Qwen3-VL via OpenAI-compatible endpoint | |
| QWEN3_VL_BASE_URL = "https://router.huggingface.co/v1" | |
| QWEN3_VL_MODEL = "Qwen/Qwen3-VL-235B-A22B-Thinking:novita" | |
| if INFERENCE == INFERENCE_GEMINI: | |
| _api_key = os.getenv("GEMINI_API_KEY") | |
| _client = OpenAI(api_key=_api_key, base_url=GEMINI_BASE_URL) if _api_key else None | |
| elif INFERENCE == INFERENCE_QWEN3_VL: | |
| _api_key = os.getenv("HUGGINGFACE_INFERENCE_PROVIDERS_API_KEY") | |
| _client = OpenAI(api_key=_api_key, base_url=QWEN3_VL_BASE_URL) if _api_key else None | |
| # Optional LangSmith client for guaranteed flush | |
| _ls_api_key_env = os.getenv("LANGSMITH_API_KEY") | |
| _ls_client = LangSmithClient() if _ls_api_key_env else None | |
| def _flush_langsmith(): | |
| """Ensure LangSmith traces are sent before process exit or between runs.""" | |
| if not _ls_client: | |
| return | |
| try: | |
| result = _ls_client.flush() | |
| if inspect.isawaitable(result): | |
| try: | |
| asyncio.run(result) | |
| except RuntimeError: | |
| # If an event loop is already running (e.g., in some servers), fallback | |
| loop = asyncio.get_event_loop() | |
| loop.create_task(result) | |
| except Exception: | |
| # Best-effort flush; do not break the app | |
| pass | |
| if _ls_client: | |
| try: | |
| atexit.register(_flush_langsmith) | |
| except Exception: | |
| pass | |
| # Load system prompt from external file | |
| system_prompt_file = Path(__file__).parent / "system_prompt.md" | |
| if system_prompt_file.exists(): | |
| with open(system_prompt_file, "r") as f: | |
| system_prompt = f.read() | |
| # Load CSS from external file | |
| css_file = Path(__file__).parent / "style.css" | |
| with open(css_file, "r") as f: | |
| style = f.read() | |
| def _extract_text_and_files(message): | |
| """Extract user text and attached files from a multimodal message value.""" | |
| if isinstance(message, str): | |
| return message, [] | |
| # Common multimodal shapes: dict with keys, or list of parts | |
| files = [] | |
| text_parts = [] | |
| try: | |
| if isinstance(message, dict): | |
| if "text" in message: | |
| text_parts.append(message.get("text") or "") | |
| if "files" in message and message["files"]: | |
| files = message["files"] or [] | |
| elif isinstance(message, (list, tuple)): | |
| for part in message: | |
| if isinstance(part, str): | |
| text_parts.append(part) | |
| elif isinstance(part, dict): | |
| # Heuristic: file-like dicts may have 'path' or 'name' | |
| if any(k in part for k in ("path", "name", "mime_type")): | |
| files.append(part) | |
| elif "text" in part: | |
| text_parts.append(part.get("text") or "") | |
| except Exception: | |
| pass | |
| text_combined = " ".join([t for t in text_parts if t]) | |
| return text_combined, files | |
| def _build_image_parts(files): | |
| image_parts = [] | |
| for f in files or []: | |
| path = None | |
| if isinstance(f, str): | |
| path = f | |
| elif isinstance(f, dict): | |
| path = f.get("path") or f.get("name") | |
| if not path or not os.path.exists(path): | |
| continue | |
| mime, _ = mimetypes.guess_type(path) | |
| if not mime or not mime.startswith("image/"): | |
| continue | |
| try: | |
| with open(path, "rb") as fp: | |
| b64 = base64.b64encode(fp.read()).decode("utf-8") | |
| data_url = f"data:{mime};base64,{b64}" | |
| image_parts.append({ | |
| "type": "image_url", | |
| "image_url": {"url": data_url}, | |
| }) | |
| except Exception: | |
| continue | |
| return image_parts | |
| def _value_to_user_content(value): | |
| """Normalize any gradio message value to OpenAI user 'content'.""" | |
| text, files = _extract_text_and_files(value) | |
| final_user_text = (text or "").strip() or "Describe el contenido de la(s) imagen(es)." | |
| image_parts = _build_image_parts(files) | |
| if image_parts: | |
| return [{"type": "text", "text": final_user_text}] + image_parts | |
| return final_user_text | |
| def _value_preview(value, limit: int = 600) -> str: | |
| """Safe preview string for any kind of message value.""" | |
| if isinstance(value, str): | |
| return _preview_text(value, limit) | |
| text, files = _extract_text_and_files(value) | |
| suffix = "" | |
| if files: | |
| suffix = f" [images:{len(files)}]" | |
| return _preview_text((text or "").strip() + suffix, limit) | |
| def _preview_text(text: str | None, limit: int = 600) -> str: | |
| if not text: | |
| return "" | |
| if len(text) <= limit: | |
| return text | |
| return text[:limit] + "…" | |
| def _history_preview(history: list[tuple[str, str]] | None, max_turns: int = 3, max_chars: int = 1200) -> str: | |
| if not history: | |
| return "" | |
| tail = history[-max_turns:] | |
| parts: list[str] = [] | |
| for user_turn, assistant_turn in tail: | |
| if user_turn: | |
| parts.append(f"User 👤: {_preview_text(user_turn, 300)}") | |
| if assistant_turn: | |
| parts.append(f"Assistant 🤖: {_preview_text(assistant_turn, 300)}") | |
| joined = "\n".join(parts) | |
| return _preview_text(joined, max_chars) | |
| def respond(message, history: list[tuple[str, str]]): | |
| """Stream assistant reply via Gemini using OpenAI-compatible API. | |
| Yields partial text chunks so the UI shows a live stream. | |
| """ | |
| user_text, files = _extract_text_and_files(message) | |
| if not _client: | |
| if INFERENCE == INFERENCE_GEMINI: | |
| yield ( | |
| "Gemini API key not configured. Set environment variable GEMINI_API_KEY " | |
| "and restart the app." | |
| ) | |
| elif INFERENCE == INFERENCE_QWEN3_VL: | |
| yield ( | |
| "Qwen3-VL API key not configured. Set environment variable QWEN3_VL_API_KEY " | |
| "and restart the app." | |
| ) | |
| else: | |
| yield "Inference engine not configured. Set environment variable INFERENCE to 'Gemini' or 'Qwen3-VL' and restart the app." | |
| return | |
| # Build OpenAI-style messages from history | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": system_prompt, | |
| } | |
| ] | |
| for user_turn, assistant_turn in history or []: | |
| if user_turn: | |
| messages.append({"role": "user", "content": _value_to_user_content(user_turn)}) | |
| if assistant_turn: | |
| messages.append({"role": "assistant", "content": assistant_turn}) | |
| # Build user content with optional inline images (data URLs) | |
| final_user_text = (user_text or "").strip() or "Describe el contenido de la(s) imagen(es)." | |
| # Collect image parts using helper | |
| image_parts = _build_image_parts(files) | |
| if image_parts: | |
| user_content = [{"type": "text", "text": final_user_text}] + image_parts | |
| else: | |
| user_content = final_user_text | |
| messages.append({"role": "user", "content": user_content}) | |
| # Optional RunTree instrumentation (does not require LANGSMITH_TRACING) | |
| _ls_api_key = os.getenv("LANGSMITH_API_KEY") | |
| pipeline = None | |
| child_build = None | |
| child_llm = None | |
| if _ls_api_key: | |
| try: | |
| pipeline = RunTree( | |
| name="Chat Session", | |
| run_type="chain", | |
| inputs={ | |
| "user_text": _value_preview(message, 600), | |
| "has_images": bool(image_parts), | |
| "history_preview": _history_preview(history), | |
| }, | |
| ) | |
| pipeline.post() | |
| child_build = pipeline.create_child( | |
| name="BuildMessages", | |
| run_type="chain", | |
| inputs={ | |
| "system_prompt_preview": _preview_text(system_prompt, 400), | |
| "user_content_type": "multimodal" if image_parts else "text", | |
| "history_turns": len(history or []), | |
| }, | |
| ) | |
| child_build.post() | |
| child_build.end( | |
| outputs={ | |
| "messages_count": len(messages), | |
| } | |
| ) | |
| child_build.patch() | |
| except Exception: | |
| pipeline = None | |
| try: | |
| if pipeline: | |
| try: | |
| if INFERENCE == INFERENCE_GEMINI: | |
| child_llm = pipeline.create_child( | |
| name="LLMCall", | |
| run_type="llm", | |
| inputs={ | |
| "model": GEMINI_MODEL, | |
| "provider": "gemini-openai", | |
| "messages_preview": _preview_text(str(messages[-1]), 600), | |
| }, | |
| ) | |
| elif INFERENCE == INFERENCE_QWEN3_VL: | |
| child_llm = pipeline.create_child( | |
| name="LLMCall", | |
| run_type="llm", | |
| inputs={ | |
| "model": QWEN3_VL_MODEL, | |
| "provider": "qwen3-vl-openai", | |
| "messages_preview": _preview_text(str(messages[-1]), 600), | |
| }, | |
| ) | |
| child_llm.post() | |
| except Exception: | |
| child_llm = None | |
| if INFERENCE == INFERENCE_GEMINI: | |
| stream = _client.chat.completions.create( | |
| model=GEMINI_MODEL, | |
| messages=messages, | |
| stream=True, | |
| ) | |
| elif INFERENCE == INFERENCE_QWEN3_VL: | |
| stream = _client.chat.completions.create( | |
| model=QWEN3_VL_MODEL, | |
| messages=messages, | |
| stream=True, | |
| ) | |
| accumulated = "" | |
| for chunk in stream: | |
| try: | |
| choice = chunk.choices[0] | |
| delta_text = None | |
| # OpenAI v1: delta.content | |
| if getattr(choice, "delta", None) is not None: | |
| delta_text = getattr(choice.delta, "content", None) | |
| # Fallback: some providers emit message.content in chunks | |
| if delta_text is None and getattr(choice, "message", None) is not None: | |
| delta_text = choice.message.get("content") if isinstance(choice.message, dict) else None | |
| if not delta_text: | |
| continue | |
| accumulated += delta_text | |
| yield accumulated | |
| except Exception: | |
| continue | |
| if not accumulated: | |
| yield "(Sin contenido de respuesta)" | |
| if child_llm: | |
| try: | |
| child_llm.end(outputs={"content": _preview_text(accumulated, 5000)}) | |
| child_llm.patch() | |
| except Exception: | |
| pass | |
| if pipeline: | |
| try: | |
| pipeline.end(outputs={"answer": _preview_text(accumulated, 5000)}) | |
| pipeline.patch() | |
| except Exception: | |
| pass | |
| # Ensure traces are flushed between requests | |
| _flush_langsmith() | |
| except Exception as e: | |
| if child_llm: | |
| try: | |
| child_llm.end(outputs={"error": str(e)}) | |
| child_llm.patch() | |
| except Exception: | |
| pass | |
| if pipeline: | |
| try: | |
| pipeline.end(outputs={"error": str(e)}) | |
| pipeline.patch() | |
| except Exception: | |
| pass | |
| yield f"Ocurrió un error al llamar a Gemini: {e}" | |
| _flush_langsmith() | |
| # Create the Gradio app with Blocks for better control | |
| with gr.Blocks(theme=gr.themes.Monochrome(), css=style, fill_height=True) as demo: | |
| # Title component | |
| title = gr.Markdown( | |
| value="# Gmail & Outlook API Helper", | |
| visible=True | |
| ) | |
| # Description component that can be hidden | |
| description = gr.HTML( | |
| value='<div class="app-description">🤖 Este chatbot te guía <strong>paso a paso</strong> para crear credenciales de API de <strong>Gmail</strong> (Google Cloud) ☁️ o <strong>OneDrive</strong> (Microsoft Entra ID) 🔑. Puedes enviar 📸 <strong>capturas de pantalla</strong> para recibir ayuda visual personalizada. El asistente te dará <strong>una instrucción a la vez</strong> para que no te abrumes ✨</div>', | |
| visible=True | |
| ) | |
| # State to track if first message has been sent | |
| first_message_sent = gr.State(False) | |
| # ChatInterface without title and description (handled separately above) | |
| chat = gr.ChatInterface( | |
| fn=respond, | |
| title="", | |
| description="", | |
| textbox=gr.MultimodalTextbox( | |
| file_types=["image", ".png", ".jpg", ".jpeg", ".webp", ".gif"], | |
| placeholder="Escribe o pega (⌘/Ctrl+V) una imagen o arrástrala aquí", | |
| file_count="multiple", | |
| ), | |
| multimodal=True, | |
| fill_height=True, | |
| examples=[ | |
| "¿Cómo creo una API Key de Gmail?", | |
| "Guíame para obtener credenciales de OneDrive", | |
| ], | |
| ) | |
| # Hide description on first message | |
| def hide_description_on_first_message(message, is_sent): | |
| if not is_sent: | |
| return gr.update(visible=False), True | |
| return gr.update(), is_sent | |
| # Connect the event to hide description when user submits first message | |
| chat.textbox.submit( | |
| fn=hide_description_on_first_message, | |
| inputs=[chat.textbox, first_message_sent], | |
| outputs=[description, first_message_sent], | |
| queue=False | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |