Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import datetime | |
| import logging | |
| from typing import List, Dict, Optional, Any, Tuple | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import gradio as gr | |
| from google import genai | |
| from google.genai import types | |
| from asknews_sdk import AskNewsSDK | |
| DEFAULT_MODEL = "gemini-2.0-flash" | |
| DEFAULT_SYSTEM_PROMPT = """Tu es un assistant virtuel conçu pour aider des journalistes d’agence (Agence France-Presse) dans leurs recherches d’information. | |
| Sources : | |
| - Tu disposes d’un agent de recherche en langage naturel (Asknews) qui interroge en temps réel le flux des dépêches AFP. | |
| - Tu dois répondre uniquement avec des informations issues de ces dépêches. | |
| Mission : | |
| - Comprendre les requêtes d’un journaliste (souvent courtes, imprécises, ou en langage naturel). | |
| - Transformer ces requêtes en recherches efficaces dans les dépêches AFP, avec Asknews. | |
| - Résumer les résultats en style journalistique : factuel, concis, hiérarchisé, neutre. | |
| - Proposer, si pertinent, des angles complémentaires (ex. contexte historique, réactions, comparaisons, chiffres clés). | |
| - Permettre au journaliste de raffiner la recherche (par période, sujet, acteurs, pays). | |
| - Citer les dépêches AFP en retour (référence et date/heure). | |
| Contraintes : | |
| - Toujours rester factuel, éviter toute spéculation. | |
| - Si la question est ambiguë, demander des précisions. | |
| - Si aucun résultat n’est trouvé, proposer des formulations alternatives de recherche. | |
| - Résumer les informations de manière actionnable (pour rédaction immédiate). | |
| Style : | |
| - Réponses brèves et efficaces. | |
| - Donner un résumé clair d’abord (les 2–3 points clés). | |
| - Ajouter ensuite plus de détails, ou des pistes pour approfondir. | |
| - Toujours indiquer les sources/dépêches AFP d’où viennent les infos. | |
| """ | |
| INITIAL_SOURCES_MARKDOWN = "*Aucune source pour l'instant.*" | |
| LOG_LEVEL = os.getenv("ASKNEWS_LOG_LEVEL", "INFO").upper() | |
| logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO)) | |
| logger = logging.getLogger("asknews_app") | |
| def format_pub_date(published): | |
| if isinstance(published, datetime.datetime): | |
| return published.strftime("%Y-%m-%d") | |
| if isinstance(published, datetime.date): | |
| return published.strftime("%Y-%m-%d") | |
| if isinstance(published, str): | |
| try: | |
| return datetime.datetime.fromisoformat(published).strftime("%Y-%m-%d") | |
| except ValueError: | |
| return "unknown date" | |
| return "unknown date" | |
| # ---- AskNews setup ---- | |
| def get_asknews_sdk() -> Optional[AskNewsSDK]: | |
| """ | |
| Initialize AskNews SDK using environment variables. | |
| Returns None if missing credentials. | |
| """ | |
| client_id = os.getenv("ASKNEWS_CLIENT_ID", "").strip() | |
| client_secret = os.getenv("ASKNEWS_CLIENT_SECRET", "").strip() | |
| if not client_id or not client_secret: | |
| logger.warning("AskNews credentials are missing; skipping SDK init.") | |
| return None | |
| try: | |
| sdk = AskNewsSDK( | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| scopes=["news"] | |
| ) | |
| logger.info("AskNews SDK initialised successfully.") | |
| return sdk | |
| except Exception as exc: | |
| logger.exception("Failed to initialise AskNews SDK: %s", exc) | |
| return None | |
| def fetch_asknews_context( | |
| sdk: AskNewsSDK, | |
| query: str, | |
| hours_back: int, | |
| n_articles: int, | |
| domains: List[str], | |
| method: str, | |
| diversify_sources: bool, | |
| languages: List[str], | |
| ) -> Tuple[str, List[Dict[str, Any]]]: | |
| """ | |
| Récupère le contexte texte directement depuis AskNews (return_type="string"). | |
| Retourne context_text | |
| """ | |
| logger.info( | |
| "Fetching AskNews context: query=%s, hours_back=%s, n_articles=%s, domains=%s, method=%s, diversify=%s, languages=%s", | |
| query, | |
| hours_back, | |
| n_articles, | |
| domains, | |
| method, | |
| diversify_sources, | |
| languages, | |
| ) | |
| try: | |
| kwargs: Dict[str, Any] = { | |
| "query": query, | |
| "hours_back": hours_back, | |
| "n_articles": n_articles, | |
| "historical": True, | |
| "premium": True, | |
| "method": method, | |
| "domain_url": domains if domains else None, | |
| "return_type": "both", | |
| } | |
| if diversify_sources: | |
| kwargs["diversify_sources"] = True | |
| if languages: | |
| kwargs["languages"] = languages | |
| response = sdk.news.search_news(**kwargs) | |
| context_text = getattr(response, "as_string", "") or "" | |
| raw_dicts = getattr(response, "as_dicts", None) | |
| articles: List[Dict[str, Any]] = [] | |
| if isinstance(raw_dicts, list): | |
| parsed_articles: List[Dict[str, Any]] = [] | |
| for item in raw_dicts: | |
| if isinstance(item, dict): | |
| parsed_articles.append(item) | |
| continue | |
| if hasattr(item, "model_dump"): | |
| try: | |
| data = item.model_dump(by_alias=True) | |
| if isinstance(data, dict): | |
| parsed_articles.append(data) | |
| continue | |
| except Exception: | |
| logger.debug("model_dump(by_alias=True) failed for article", exc_info=True) | |
| if hasattr(item, "dict"): | |
| try: | |
| data = item.dict(by_alias=True) | |
| if isinstance(data, dict): | |
| parsed_articles.append(data) | |
| continue | |
| except Exception: | |
| logger.debug("dict(by_alias=True) failed for article", exc_info=True) | |
| try: | |
| parsed_articles.append(dict(item)) | |
| except Exception: | |
| logger.debug("Fallback dict() conversion failed for article", exc_info=True) | |
| articles = parsed_articles | |
| logger.info( | |
| "AskNews context received (%s chars, %s articles)", | |
| len(context_text), | |
| len(articles), | |
| ) | |
| return context_text, articles | |
| except Exception: | |
| logger.exception("AskNews context fetch failed.") | |
| return "", [] | |
| def parse_languages_csv(csv_input: str) -> List[str]: | |
| return [lang.strip() for lang in csv_input.split(",") if lang.strip()] | |
| def format_sources_markdown(articles: List[Dict[str, Any]]) -> str: | |
| if not articles: | |
| return "*Aucune source disponible pour cette requête.*" | |
| lines: List[str] = [] | |
| for article in articles: | |
| title = article.get("title") | |
| source = article.get("markdown_citation") | |
| key = article.get("as_string_key") | |
| published = article.get("pub_date") | |
| line = f"{key}. {format_pub_date(published)} - {title}" | |
| if source: | |
| line += f"\n {source}" | |
| lines.append(line) | |
| return "\n\n".join(lines) | |
| # ---- Chat respond function ---- | |
| def respond( | |
| message: str, | |
| history: Optional[List[Tuple[str, str]]], | |
| system_message: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| model_name: str, | |
| google_api_key: str, | |
| use_asknews: bool, | |
| asknews_hours_back: int, | |
| asknews_n_articles: int, | |
| asknews_domains_csv: str, | |
| asknews_method: str, | |
| asknews_diversify_sources: bool, | |
| asknews_languages_csv: str, | |
| ): | |
| """ | |
| Stream chat responses from Google Gemini, enriching with AskNews context when enabled. | |
| Returns updates for both the chatbot conversation and the sources panel. | |
| """ | |
| conversation_history: List[Tuple[str, str]] = list(history or []) | |
| user_message = (message or "").strip() | |
| if not user_message: | |
| logger.debug("Empty user message received.") | |
| yield conversation_history, conversation_history, format_sources_markdown([]) | |
| return | |
| api_key = (google_api_key or "").strip() or os.getenv("GOOGLE_API_KEY", "").strip() | |
| if not api_key: | |
| warning = ( | |
| "Définissez GOOGLE_API_KEY dans votre environnement ou saisissez la clé API Google Gemini dans le champ dédié." | |
| ) | |
| logger.warning("Missing Google API key.") | |
| conversation_history.append((user_message, warning)) | |
| yield conversation_history, conversation_history, format_sources_markdown([]) | |
| return | |
| try: | |
| genai_client = genai.Client(api_key=api_key) | |
| except Exception as exc: | |
| logger.exception("Failed to initialise Google GenAI client: %s", exc) | |
| error_msg = f"Échec d'initialisation du client Google GenAI: {exc}" | |
| conversation_history.append((user_message, error_msg)) | |
| yield conversation_history, conversation_history, format_sources_markdown([]) | |
| return | |
| domains = [d.strip() for d in (asknews_domains_csv or "").split(",") if d.strip()] | |
| method = (asknews_method or "both").lower() | |
| if method not in {"nl", "kw", "both"}: | |
| method = "both" | |
| languages = parse_languages_csv(asknews_languages_csv or "") | |
| diversify_sources = bool(asknews_diversify_sources) | |
| asknews_context_text = "" | |
| asknews_articles: List[Dict[str, Any]] = [] | |
| asknews_notice = "" | |
| if use_asknews: | |
| sdk = get_asknews_sdk() | |
| if sdk is None: | |
| asknews_notice = ( | |
| "[AskNews non configuré: définissez ASKNEWS_CLIENT_ID et ASKNEWS_CLIENT_SECRET dans l'environnement.]" | |
| ) | |
| logger.warning("AskNews SDK unavailable while use_asknews is True.") | |
| else: | |
| asknews_context_text, asknews_articles = fetch_asknews_context( | |
| sdk=sdk, | |
| query=user_message, | |
| hours_back=int(asknews_hours_back), | |
| n_articles=int(asknews_n_articles), | |
| domains=domains, | |
| method=method, | |
| diversify_sources=diversify_sources, | |
| languages=languages, | |
| ) | |
| if asknews_context_text: | |
| logger.info( | |
| "AskNews context ready (chars=%s, articles=%s)", | |
| len(asknews_context_text), | |
| len(asknews_articles), | |
| ) | |
| else: | |
| logger.warning("AskNews context is empty after fetch.") | |
| else: | |
| asknews_notice = "[AskNews désactivé pour cette requête.]" | |
| if use_asknews: | |
| if asknews_articles: | |
| sources_markdown = format_sources_markdown(asknews_articles) | |
| elif asknews_notice: | |
| sources_markdown = asknews_notice + "\n\n" + INITIAL_SOURCES_MARKDOWN | |
| else: | |
| sources_markdown = format_sources_markdown([]) | |
| else: | |
| sources_markdown = "*AskNews désactivé.*" | |
| base_system = system_message.strip() if system_message else DEFAULT_SYSTEM_PROMPT | |
| conversation_history.append((user_message, "")) | |
| assistant_reply = "" | |
| if asknews_notice: | |
| assistant_reply += asknews_notice.strip() | |
| # if asknews_context_text: | |
| # context_display = asknews_context_text.strip() | |
| # truncated = False | |
| # if len(context_display) > 4000: | |
| # context_display = context_display[:4000] + "\n[Contexte AskNews tronqué pour affichage]" | |
| # truncated = True | |
| # if assistant_reply: | |
| # assistant_reply += "\n\n" | |
| # assistant_reply += "[Contexte AskNews]\n" + (context_display or "[Vide]") | |
| # if not truncated: | |
| # assistant_reply += "\n" | |
| # elif not assistant_reply and use_asknews: | |
| # assistant_reply = "[Contexte AskNews introuvable pour cette requête.]" | |
| conversation_history[-1] = (user_message, assistant_reply) | |
| yield conversation_history, conversation_history, sources_markdown | |
| system_instruction = base_system | |
| if asknews_context_text: | |
| system_instruction += ( | |
| "\n\nUtilise le contexte AskNews suivant pour ta réponse. Si la question est sans rapport, ignore ce contexte.\n" | |
| f"{asknews_context_text}" | |
| ) | |
| conversation: List[types.Content] = [] | |
| for past_user, past_assistant in conversation_history[:-1]: | |
| past_user_clean = (past_user or "").strip() | |
| past_assistant_clean = (past_assistant or "").strip() | |
| if past_user_clean: | |
| conversation.append( | |
| types.Content(role="user", parts=[types.Part.from_text(text=past_user_clean)]) | |
| ) | |
| if past_assistant_clean: | |
| conversation.append( | |
| types.Content(role="model", parts=[types.Part.from_text(text=past_assistant_clean)]) | |
| ) | |
| conversation.append( | |
| types.Content(role="user", parts=[types.Part.from_text(text=user_message)]) | |
| ) | |
| generation_config = types.GenerateContentConfig( | |
| systemInstruction=system_instruction, | |
| temperature=float(temperature), | |
| topP=float(top_p), | |
| maxOutputTokens=int(max_tokens), | |
| ) | |
| assistant_full_reply = assistant_reply | |
| try: | |
| stream = genai_client.models.generate_content_stream( | |
| model=(model_name or DEFAULT_MODEL).strip() or DEFAULT_MODEL, | |
| contents=conversation, | |
| config=generation_config, | |
| ) | |
| for chunk in stream: | |
| token = getattr(chunk, "text", None) | |
| if not token and getattr(chunk, "candidates", None): | |
| pieces: List[str] = [] | |
| for candidate in chunk.candidates: | |
| content = getattr(candidate, "content", None) | |
| if content and getattr(content, "parts", None): | |
| for part in content.parts: | |
| text_piece = getattr(part, "text", None) | |
| if text_piece: | |
| pieces.append(text_piece) | |
| token = "".join(pieces) | |
| if not token: | |
| continue | |
| assistant_full_reply += token | |
| conversation_history[-1] = (user_message, assistant_full_reply) | |
| yield conversation_history, conversation_history, sources_markdown | |
| except Exception as exc: | |
| logger.exception("Google GenAI generation failed: %s", exc) | |
| error_suffix = f"\n\n[Erreur: {exc}]" | |
| assistant_full_reply = (assistant_full_reply or "") + error_suffix | |
| conversation_history[-1] = (user_message, assistant_full_reply) | |
| yield conversation_history, conversation_history, sources_markdown | |
| def clear_conversation() -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], str]: | |
| """Reset the chat history and sources panel.""" | |
| return [], [], INITIAL_SOURCES_MARKDOWN | |
| # ---- Gradio UI ---- | |
| with gr.Blocks(title="AskNews Gemini") as demo: | |
| gr.Markdown("# Chatbot Gemini avec contexte AskNews") | |
| chat_state = gr.State([]) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(label="Conversation", height=520) | |
| user_input = gr.Textbox( | |
| label="Message", | |
| placeholder="Saisissez votre requête journalistique...", | |
| lines=3, | |
| ) | |
| with gr.Row(): | |
| send_button = gr.Button("Envoyer", variant="primary") | |
| clear_button = gr.Button("Effacer la conversation") | |
| with gr.Accordion("Paramètres", open=False): | |
| system_message_box = gr.Textbox( | |
| value=DEFAULT_SYSTEM_PROMPT, | |
| label="System message", | |
| lines=20, | |
| ) | |
| max_tokens_slider = gr.Slider( | |
| minimum=1, | |
| maximum=4096, | |
| value=4096, | |
| step=100, | |
| label="Max new tokens", | |
| ) | |
| temperature_slider = gr.Slider( | |
| minimum=0.0, | |
| maximum=2.0, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature", | |
| ) | |
| top_p_slider = gr.Slider( | |
| minimum=0.05, | |
| maximum=1.0, | |
| value=0.95, | |
| step=0.05, | |
| label="Top-p", | |
| ) | |
| model_name_box = gr.Textbox(value=DEFAULT_MODEL, label="Model name") | |
| google_api_key_box = gr.Textbox( | |
| value="", | |
| label="Google API Key (optionnel)", | |
| type="password", | |
| ) | |
| use_asknews_checkbox = gr.Checkbox( | |
| value=True, | |
| label="Utiliser AskNews pour le contexte", | |
| ) | |
| asknews_hours_slider = gr.Slider( | |
| minimum=1, | |
| maximum=24 * 120, | |
| value=24 * 120, | |
| step=24, | |
| label="AskNews: heures en arrière", | |
| ) | |
| asknews_articles_slider = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=10, | |
| step=1, | |
| label="AskNews: nombre d'articles", | |
| ) | |
| asknews_domains_box = gr.Textbox( | |
| value="afp.com", | |
| label="AskNews: domaines (CSV)", | |
| ) | |
| asknews_method_radio = gr.Radio( | |
| choices=["both", "nl", "kw"], | |
| value="both", | |
| label="AskNews: méthode de recherche", | |
| ) | |
| asknews_diversify_checkbox = gr.Checkbox( | |
| value=False, | |
| label="AskNews: diversifier les sources", | |
| ) | |
| asknews_languages_box = gr.Textbox( | |
| value="", | |
| label="AskNews: langues (codes CSV)", | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Sources AskNews") | |
| sources_panel = gr.Markdown(INITIAL_SOURCES_MARKDOWN) | |
| input_components = [ | |
| user_input, | |
| chat_state, | |
| system_message_box, | |
| max_tokens_slider, | |
| temperature_slider, | |
| top_p_slider, | |
| model_name_box, | |
| google_api_key_box, | |
| use_asknews_checkbox, | |
| asknews_hours_slider, | |
| asknews_articles_slider, | |
| asknews_domains_box, | |
| asknews_method_radio, | |
| asknews_diversify_checkbox, | |
| asknews_languages_box, | |
| ] | |
| output_components = [chatbot, chat_state, sources_panel] | |
| def _reset_input() -> str: | |
| return "" | |
| send_event = user_input.submit( | |
| respond, | |
| inputs=input_components, | |
| outputs=output_components, | |
| queue=True, | |
| ) | |
| send_event.then(_reset_input, inputs=None, outputs=user_input) | |
| button_event = send_button.click( | |
| respond, | |
| inputs=input_components, | |
| outputs=output_components, | |
| queue=True, | |
| ) | |
| button_event.then(_reset_input, inputs=None, outputs=user_input) | |
| clear_button.click(clear_conversation, None, output_components).then( | |
| _reset_input, inputs=None, outputs=user_input | |
| ) | |
| demo.queue() | |
| if __name__ == "__main__": | |
| demo.launch() | |