Spaces:
Running
Running
| """ | |
| Professional WhatsApp Bot using Green-API | |
| Author: Assistant | |
| Description: A comprehensive WhatsApp bot with a professional, class-based structure. | |
| Features include image generation, image editing, voice replies, | |
| and various utility functions, all handled by an asynchronous task queue. | |
| Includes request logging and chat ID filtering. | |
| Version 2.3.0: Fixed image generation ID mismatch issue. | |
| """ | |
| import os | |
| import threading | |
| import requests | |
| import logging | |
| import queue | |
| import json | |
| import base64 | |
| from typing import List, Optional, Union, Literal, Dict, Any, Tuple, Set | |
| from collections import defaultdict, deque | |
| from concurrent.futures import ThreadPoolExecutor | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel, Field, ValidationError | |
| import uvicorn | |
| # Assume these are your custom libraries for AI functionalities | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from polLLM import generate_llm, LLMBadRequestError | |
| import gemini_flash_lib | |
| # --- Configuration --------------------------------------------------------- | |
| class BotConfig: | |
| """Manages all bot configuration from environment variables.""" | |
| GREEN_API_URL: str | |
| GREEN_API_TOKEN: str | |
| GREEN_API_ID_INSTANCE: str | |
| WEBHOOK_AUTH_TOKEN: str | |
| IMAGE_DIR: str = "/tmp/whatsapp_images" | |
| AUDIO_DIR: str = "/tmp/whatsapp_audio" | |
| TEMP_DIR: str = "/tmp/whatsapp_edit" | |
| DEFAULT_IMAGE_COUNT: int = 4 | |
| MAX_HISTORY_SIZE: int = 20 | |
| WORKER_THREADS: int = 4 | |
| # Set log level to DEBUG to capture all incoming requests | |
| LOG_LEVEL: str = "DEBUG" | |
| # Whitelisted chat IDs. The bot will only respond to these chats. | |
| ALLOWED_CHATS: Set[str] = {"[email protected]", "[email protected]"} | |
| def __init__(self): | |
| """Initializes configuration from environment variables.""" | |
| self.GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| self.GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| self.GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| self.WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| # Allow overriding log level from environment | |
| self.LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() | |
| self._validate() | |
| def _validate(self): | |
| """Ensures all required environment variables are set.""" | |
| missing = [ | |
| var for var in ("GREEN_API_URL", "GREEN_API_TOKEN", | |
| "GREEN_API_ID_INSTANCE", "WEBHOOK_AUTH_TOKEN") | |
| if not getattr(self, var) | |
| ] | |
| if missing: | |
| raise ValueError(f"Missing required environment variables: {', '.join(missing)}") | |
| # --- Logging Setup --------------------------------------------------------- | |
| class LoggerSetup: | |
| """Sets up and manages structured logging for the application.""" | |
| def setup(level: str) -> logging.Logger: | |
| """ | |
| Configures the root logger for the application. | |
| Args: | |
| level: The logging level (e.g., "DEBUG", "INFO"). | |
| Returns: | |
| A configured logger instance. | |
| """ | |
| logger = logging.getLogger("whatsapp_bot") | |
| logger.setLevel(level) | |
| # Avoid adding duplicate handlers | |
| if logger.hasHandlers(): | |
| logger.handlers.clear() | |
| handler = logging.StreamHandler() | |
| # Added a more detailed formatter | |
| formatter = logging.Formatter( | |
| "%(asctime)s - %(name)s - [%(levelname)s] - [%(chat_id)s] - %(funcName)s:%(lineno)d - %(message)s" | |
| ) | |
| handler.setFormatter(formatter) | |
| class ContextFilter(logging.Filter): | |
| """Injects contextual information into log records.""" | |
| def filter(self, record): | |
| record.chat_id = ThreadContext.get_context().get("chat_id", "NO_CONTEXT") | |
| return True | |
| handler.addFilter(ContextFilter()) | |
| logger.addHandler(handler) | |
| return logger | |
| # --- Thread Context Management --------------------------------------------- | |
| class ThreadContext: | |
| """Manages thread-local context for chat and message IDs.""" | |
| _context = threading.local() | |
| def set_context(cls, chat_id: str, message_id: str): | |
| """Sets the context for the current thread.""" | |
| cls._context.chat_id = chat_id | |
| cls._context.message_id = message_id | |
| def get_context(cls) -> Dict[str, Optional[str]]: | |
| """Retrieves the context for the current thread.""" | |
| return { | |
| "chat_id": getattr(cls._context, "chat_id", None), | |
| "message_id": getattr(cls._context, "message_id", None), | |
| } | |
| # --- Conversation History ------------------------------------------------- | |
| class ConversationManager: | |
| """Manages conversation history for each chat.""" | |
| def __init__(self, max_size: int): | |
| self.history = defaultdict(lambda: deque(maxlen=max_size)) | |
| def add_user_message(self, chat_id: str, message: str): | |
| self.history[chat_id].append(f"User: {message}") | |
| def add_bot_message(self, chat_id: str, message: str): | |
| self.history[chat_id].append(f"Assistant: {message}") | |
| def get_history_text(self, chat_id: str) -> str: | |
| return "\n".join(self.history[chat_id]) | |
| def clear_history(self, chat_id: str): | |
| self.history[chat_id].clear() | |
| # --- Green-API Client ----------------------------------------------------- | |
| class GreenApiClient: | |
| """A client for interacting with the Green-API for WhatsApp.""" | |
| def __init__(self, config: BotConfig, logger: logging.Logger): | |
| self.config = config | |
| self.logger = logger | |
| self.session = requests.Session() | |
| self.base_url = ( | |
| f"{self.config.GREEN_API_URL}/waInstance" | |
| f"{self.config.GREEN_API_ID_INSTANCE}" | |
| ) | |
| def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: | |
| """Makes a request to the Green-API with retries.""" | |
| url = f"{self.base_url}/{endpoint}/{self.config.GREEN_API_TOKEN}" | |
| for attempt in range(3): | |
| try: | |
| self.logger.debug(f"Sending API request to {url} with payload: {kwargs.get('json', kwargs.get('data'))}") | |
| response = self.session.request(method, url, timeout=20, **kwargs) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| self.logger.warning( | |
| f"API request to {endpoint} failed (attempt {attempt + 1}): {e}" | |
| ) | |
| self.logger.error(f"API request to {endpoint} failed after all retries.") | |
| return None | |
| def send_message(self, chat_id: str, text: str, quoted_message_id: str = None): | |
| payload = {"chatId": chat_id, "message": text} | |
| if quoted_message_id: | |
| payload["quotedMessageId"] = quoted_message_id | |
| return self._request("POST", "sendMessage", json=payload) | |
| def send_file(self, chat_id: str, file_path: str, caption: str = "", quoted_message_id: str = None): | |
| """Uploads and sends a file (image or audio).""" | |
| filename = os.path.basename(file_path) | |
| payload = {"chatId": chat_id, "caption": caption} | |
| if quoted_message_id: | |
| payload["quotedMessageId"] = quoted_message_id | |
| with open(file_path, "rb") as f: | |
| files = {"file": (filename, f)} | |
| return self._request("POST", "sendFileByUpload", data=payload, files=files) | |
| def download_file(self, url: str) -> Optional[bytes]: | |
| """Downloads a file from a given URL.""" | |
| try: | |
| response = self.session.get(url, timeout=30) | |
| response.raise_for_status() | |
| return response.content | |
| except requests.RequestException as e: | |
| self.logger.error(f"Failed to download file from {url}: {e}") | |
| return None | |
| # --- Pydantic Models for Intent Recognition -------------------------------- | |
| class BaseIntent(BaseModel): | |
| action: str | |
| class SummarizeIntent(BaseIntent): action: Literal["summarize"]; text: str | |
| class TranslateIntent(BaseIntent): action: Literal["translate"]; lang: str; text: str | |
| class JokeIntent(BaseIntent): action: Literal["joke"] | |
| class WeatherIntent(BaseIntent): action: Literal["weather"]; location: str | |
| class InspireIntent(BaseIntent): action: Literal["inspire"] | |
| class MemeIntent(BaseIntent): action: Literal["meme"]; text: str | |
| class EditImageIntent(BaseIntent): action: Literal["edit_image"]; prompt: str | |
| class GenerateImageIntent(BaseModel): | |
| action: Literal["generate_image"] | |
| prompt: str | |
| count: int = Field(default=1, ge=1, le=10) | |
| width: Optional[int] = Field(default=None, ge=512, le=2048) | |
| height: Optional[int] = Field(default=None, ge=512, le=2048) | |
| class SendTextIntent(BaseIntent): | |
| action: Literal["send_text"] | |
| message: str | |
| # --- Intent Router -------------------------------------------------------- | |
| class IntentRouter: | |
| """Recognizes user intent using an LLM and routes to appropriate actions.""" | |
| INTENT_MODELS = [ | |
| SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, | |
| InspireIntent, MemeIntent, GenerateImageIntent, EditImageIntent, SendTextIntent | |
| ] | |
| def __init__(self, conv_manager: ConversationManager, logger: logging.Logger): | |
| self.conv_manager = conv_manager | |
| self.logger = logger | |
| def get_intent(self, user_input: str, chat_id: str) -> BaseIntent: | |
| history_text = self.conv_manager.get_history_text(chat_id) | |
| system_prompt = self._build_system_prompt(history_text, user_input) | |
| try: | |
| raw_response = generate_llm(system_prompt) | |
| except LLMBadRequestError: | |
| self.logger.warning(f"LLM request failed due to bad request for chat {chat_id}. Clearing history.") | |
| self.conv_manager.clear_history(chat_id) | |
| return SendTextIntent(action="send_text", message="Oops! Let's start fresh! π") | |
| return self._parse_response(raw_response) | |
| def _build_system_prompt(self, history: str, user_input: str) -> str: | |
| return ( | |
| "You are a function dispatcher. You only invoke functions by returning a single JSON object.\n" | |
| "Available functions:\n" | |
| "- summarize(text): Summarize given text\n" | |
| "- translate(lang, text): Translate text to a language\n" | |
| "- joke(): Tell a random joke\n" | |
| "- weather(location): Get weather for a location\n" | |
| "- inspire(): Get an inspirational quote\n" | |
| "- meme(text): Generate a meme from text\n" | |
| "- generate_image(prompt, count, width, height): Generate images\n" | |
| "- edit_image(prompt): Edit an image (requires replying to an image)\n" | |
| "- send_text(message): Send a plain text response\n\n" | |
| "Return only raw JSON. Examples:\n" | |
| '{"action":"generate_image","prompt":"a red fox","count":2}\n' | |
| '{"action":"edit_image","prompt":"make the sky purple"}\n' | |
| '{"action":"send_text","message":"Hello there!"}\n\n' | |
| f"Conversation history:\n{history}\n\n" | |
| f"Current message: User: {user_input}" | |
| ) | |
| def _parse_response(self, raw_response: str) -> BaseIntent: | |
| try: | |
| # Clean the response to ensure it's valid JSON | |
| cleaned_response = raw_response.strip().replace("`json", "").replace("`", "") | |
| parsed = json.loads(cleaned_response) | |
| for model in self.INTENT_MODELS: | |
| try: | |
| return model.model_validate(parsed) | |
| except ValidationError: | |
| continue | |
| except json.JSONDecodeError: | |
| self.logger.warning(f"Could not decode LLM response to JSON: {raw_response}") | |
| # Fallback for non-JSON or unparsable responses | |
| return SendTextIntent(action="send_text", message=raw_response) | |
| # --- Main Application Class ------------------------------------------------ | |
| class WhatsAppBot: | |
| def __init__(self, config: BotConfig): | |
| self.config = config | |
| self.logger = LoggerSetup.setup(config.LOG_LEVEL) | |
| self.api_client = GreenApiClient(config, self.logger) | |
| self.conv_manager = ConversationManager(config.MAX_HISTORY_SIZE) | |
| self.intent_router = IntentRouter(self.conv_manager, self.logger) | |
| self.task_queue = queue.Queue() | |
| # Cache for user-sent image message IDs and their download URLs | |
| self.image_cache = defaultdict(lambda: deque(maxlen=50)) | |
| # NEW: Cache for bot-sent image message IDs and their local file paths | |
| self.sent_image_path_cache = defaultdict(lambda: deque(maxlen=50)) | |
| self.fastapi_app = FastAPI(title="WhatsApp Eve Bot", version="2.3.0") | |
| self._setup_routes() | |
| self._start_workers() | |
| def _setup_routes(self): | |
| async def webhook(request: Request): | |
| if request.headers.get("Authorization") != f"Bearer {self.config.WEBHOOK_AUTH_TOKEN}": | |
| self.logger.warning("Unauthorized webhook access attempt.") | |
| raise HTTPException(403, "Unauthorized") | |
| try: | |
| payload = await request.json() | |
| self.logger.debug(f"Incoming webhook payload: {json.dumps(payload, indent=2)}") | |
| if payload.get("typeWebhook") == "incomingMessageReceived": | |
| executor.submit(self._process_incoming_message, payload) | |
| else: | |
| self.logger.info(f"Received non-message webhook type: {payload.get('typeWebhook')}") | |
| except json.JSONDecodeError: | |
| self.logger.error("Failed to decode JSON from webhook request.") | |
| raise HTTPException(400, "Invalid JSON payload.") | |
| return JSONResponse(content={"status": "received"}) | |
| def health_check(): | |
| return JSONResponse(content={"status": "healthy"}) | |
| def _start_workers(self): | |
| for i in range(self.config.WORKER_THREADS): | |
| threading.Thread(target=self._worker, name=f"Worker-{i}", daemon=True).start() | |
| self.logger.info(f"Started {self.config.WORKER_THREADS} worker threads.") | |
| def _worker(self): | |
| """Worker thread to process tasks from the queue.""" | |
| while True: | |
| task = self.task_queue.get() | |
| try: | |
| ThreadContext.set_context(task["chat_id"], task["message_id"]) | |
| handler = getattr(self, f"_task_{task['type']}", None) | |
| if handler: | |
| self.logger.debug(f"Processing task: {task['type']} for chat {task['chat_id']}") | |
| handler(task) | |
| else: | |
| self.logger.warning(f"Unknown task type received: {task['type']}") | |
| except Exception as e: | |
| self.logger.error(f"Error processing task {task.get('type', 'N/A')}: {e}", exc_info=True) | |
| finally: | |
| self.task_queue.task_done() | |
| def _process_incoming_message(self, payload: Dict[str, Any]): | |
| """Main logic for handling an incoming message payload.""" | |
| try: | |
| chat_id = payload["senderData"]["chatId"] | |
| message_id = payload["idMessage"] | |
| if chat_id not in self.config.ALLOWED_CHATS: | |
| self.logger.warning(f"Ignoring message from unauthorized chat ID: {chat_id}") | |
| return | |
| ThreadContext.set_context(chat_id, message_id) | |
| message_data = payload.get("messageData", {}) | |
| type_message = message_data.get("typeMessage") | |
| text, download_url, is_image_file = "", None, False | |
| if type_message in ["imageMessage", "documentMessage"]: | |
| file_data = message_data.get("fileMessageData", {}) | |
| mime_type = file_data.get("mimeType", "") | |
| if type_message == "imageMessage" or mime_type.startswith("image/"): | |
| is_image_file = True | |
| download_url = file_data.get("downloadUrl") | |
| text = file_data.get("caption", "").strip() | |
| if download_url: | |
| self.logger.debug(f"Caching incoming image message {message_id} from {chat_id}") | |
| self.image_cache[chat_id].append((message_id, download_url)) | |
| elif type_message == "quotedMessage": | |
| text = message_data.get("extendedTextMessageData", {}).get("text", "").strip() | |
| elif type_message == "textMessage": | |
| text = message_data.get("textMessageData", {}).get("textMessage", "").strip() | |
| elif type_message == "extendedTextMessage": | |
| text = message_data.get("extendedTextMessageData", {}).get("text", "").strip() | |
| if not text: | |
| self.logger.debug(f"Received message type '{type_message}' without actionable text from {chat_id}.") | |
| return | |
| self.logger.info(f"Processing text from {chat_id}: '{text}'") | |
| self.conv_manager.add_user_message(chat_id, text) | |
| if is_image_file and text: | |
| intent = self.intent_router.get_intent(text, chat_id) | |
| if isinstance(intent, EditImageIntent): | |
| self.logger.info("Detected direct edit command in image caption.") | |
| self.task_queue.put({ | |
| "type": "edit_image", "chat_id": chat_id, "message_id": message_id, | |
| "prompt": intent.prompt, "input_source": download_url, "source_type": 'url', | |
| }) | |
| return | |
| if text.startswith('/'): | |
| self._handle_command(chat_id, message_id, text, payload) | |
| else: | |
| self._handle_natural_language(chat_id, message_id, text, payload) | |
| except KeyError as e: | |
| self.logger.error(f"Missing expected key in message payload: {e}. Payload: {payload}") | |
| except Exception as e: | |
| self.logger.error(f"Failed to process message payload: {e}", exc_info=True) | |
| def _handle_command(self, chat_id, message_id, text, payload): | |
| """Processes direct slash commands.""" | |
| parts = text.lower().split() | |
| command = parts[0] | |
| args = text.split(maxsplit=1)[1] if len(parts) > 1 else "" | |
| if command == "/help": | |
| help_text = ( | |
| "*π€ Eve's Command Center:*\n\n" | |
| "πΉ `/help` - Show this help message\n" | |
| "πΉ `/gen <prompt>` - Generate an image\n" | |
| "πΉ `/edit <prompt>` - Reply to an image to edit it\n" | |
| "πΉ `/joke` - Get a random joke\n" | |
| "πΉ `/inspire` - Receive an inspirational quote\n" | |
| "πΉ `/weather <location>` - Check the weather\n\n" | |
| "You can also just chat with me naturally!" | |
| ) | |
| self.api_client.send_message(chat_id, help_text, message_id) | |
| elif command == "/gen": | |
| self.task_queue.put({"type": "generate_image", "chat_id": chat_id, "message_id": message_id, "prompt": args, "count": self.config.DEFAULT_IMAGE_COUNT}) | |
| elif command == "/edit": | |
| self._dispatch_edit_image(chat_id, message_id, args, payload) | |
| elif command == "/joke": | |
| self.task_queue.put({"type": "joke", "chat_id": chat_id, "message_id": message_id}) | |
| elif command == "/inspire": | |
| self.task_queue.put({"type": "inspire", "chat_id": chat_id, "message_id": message_id}) | |
| elif command == "/weather": | |
| self.task_queue.put({"type": "weather", "chat_id": chat_id, "message_id": message_id, "location": args}) | |
| else: | |
| self.api_client.send_message(chat_id, "Unknown command. Type /help for options.", message_id) | |
| def _handle_natural_language(self, chat_id, message_id, text, payload): | |
| """Processes natural language using the intent router.""" | |
| intent = self.intent_router.get_intent(text, chat_id) | |
| task_data = {"chat_id": chat_id, "message_id": message_id, **intent.model_dump()} | |
| if intent.action == "edit_image": | |
| self._dispatch_edit_image(chat_id, message_id, intent.prompt, payload) | |
| elif hasattr(self, f"_task_{intent.action}"): | |
| self.task_queue.put({"type": intent.action, **task_data}) | |
| else: | |
| self.logger.warning(f"No handler found for intent action: {intent.action}") | |
| self.api_client.send_message(chat_id, "Sorry, I'm not sure how to handle that.", message_id) | |
| def _dispatch_edit_image(self, chat_id, message_id, prompt, payload): | |
| """Finds a replied-to image from cache and dispatches the edit task.""" | |
| message_data = payload.get("messageData", {}) | |
| if message_data.get("typeMessage") != "quotedMessage": | |
| self.api_client.send_message(chat_id, "To edit an image, please reply to an image with your instructions.", message_id) | |
| return | |
| quoted_info = message_data.get("quotedMessage", {}) | |
| quoted_message_id = quoted_info.get("stanzaId") | |
| if not quoted_message_id: | |
| self.logger.error(f"Could not find stanzaId in quoted message: {quoted_info}") | |
| self.api_client.send_message(chat_id, "Sorry, I couldn't identify which image to edit.", message_id) | |
| return | |
| input_source, source_type = None, None | |
| # 1. Check cache for bot-sent images (local paths) | |
| if chat_id in self.sent_image_path_cache: | |
| for msg_id, path in reversed(self.sent_image_path_cache[chat_id]): | |
| if msg_id == quoted_message_id: | |
| if os.path.exists(path): | |
| input_source, source_type = path, 'path' | |
| self.logger.info(f"Found replied-to image in local path cache: {path}") | |
| else: | |
| self.logger.warning(f"Found path for message {msg_id} but file missing: {path}") | |
| break | |
| # 2. If not found, check cache for user-sent images (URLs) | |
| if not input_source and chat_id in self.image_cache: | |
| for msg_id, url in reversed(self.image_cache[chat_id]): | |
| if msg_id == quoted_message_id: | |
| input_source, source_type = url, 'url' | |
| self.logger.info(f"Found replied-to image in URL cache: {url}") | |
| break | |
| if not input_source: | |
| self.logger.warning(f"Could not find any image source for message ID {quoted_message_id} in cache for chat {chat_id}.") | |
| self.api_client.send_message(chat_id, "I couldn't find the original image. It might be too old. Please send it again before editing.", message_id) | |
| return | |
| self.logger.info(f"Dispatching edit task for message {quoted_message_id} with source type '{source_type}'.") | |
| self.task_queue.put({ | |
| "type": "edit_image", "chat_id": chat_id, "message_id": message_id, | |
| "prompt": prompt, "input_source": input_source, "source_type": source_type, | |
| }) | |
| def _manage_cached_file(self, chat_id: str, message_id: str, file_path: str): | |
| """Adds a file to the cache and cleans up the oldest file if the cache is full.""" | |
| # Check if the cache is full before adding a new item | |
| if len(self.sent_image_path_cache[chat_id]) == self.sent_image_path_cache[chat_id].maxlen: | |
| old_id, old_path = self.sent_image_path_cache[chat_id][0] # Oldest item is at the left | |
| if os.path.exists(old_path): | |
| self.logger.debug(f"Cache full. Removing oldest cached image file: {old_path}") | |
| try: | |
| os.remove(old_path) | |
| except OSError as e: | |
| self.logger.error(f"Error removing old cached file {old_path}: {e}") | |
| self.sent_image_path_cache[chat_id].append((message_id, file_path)) | |
| # --- Task Handler Methods --- | |
| def _task_send_text(self, task: Dict[str, Any]): | |
| chat_id, message_id, message = task["chat_id"], task["message_id"], task["message"] | |
| self.api_client.send_message(chat_id, message, message_id) | |
| self.conv_manager.add_bot_message(chat_id, message) | |
| self.task_queue.put({"type": "voice_reply", "chat_id": chat_id, "message_id": message_id, "text": message}) | |
| def _task_generate_image(self, task: Dict[str, Any]): | |
| chat_id, mid, prompt = task["chat_id"], task["message_id"], task["prompt"] | |
| count = task.get("count", self.config.DEFAULT_IMAGE_COUNT) | |
| self.api_client.send_message(chat_id, f"π¨ Generating {count} image(s) for: \"{prompt}\"...", mid) | |
| for i in range(count): | |
| path = None | |
| try: | |
| # Create a unique ID for each image in the batch to avoid mismatches. | |
| unique_image_id = f"{mid}_{i}" | |
| # Call the image generation library with matching IDs | |
| generation_result = generate_image( | |
| prompt, | |
| unique_image_id, | |
| unique_image_id, | |
| self.config.IMAGE_DIR, | |
| width=task.get("width"), | |
| height=task.get("height") | |
| ) | |
| # If generation fails, the result will be None. Handle this gracefully. | |
| if not generation_result: | |
| self.logger.error(f"Image generation {i+1} failed because the generator returned None.") | |
| self.api_client.send_message(chat_id, f"π’ Failed to generate image {i+1}.", mid) | |
| continue # Move to the next image in the loop | |
| _, path, _, _ = generation_result | |
| caption = f"β¨ Image {i+1}/{count}: {prompt}" | |
| response = self.api_client.send_file(chat_id, path, caption, mid) | |
| if response and response.get("idMessage"): | |
| sent_message_id = response["idMessage"] | |
| self.logger.info(f"Caching sent image path for message {sent_message_id}: {path}") | |
| self._manage_cached_file(chat_id, sent_message_id, path) | |
| else: | |
| # If we can't get the message ID, we can't cache the image for later editing. | |
| # It's better to delete it to avoid filling up the disk. | |
| self.logger.warning(f"Could not get sent message ID for {path}. Deleting file.") | |
| if path and os.path.exists(path): | |
| os.remove(path) | |
| except Exception as e: | |
| # Catch any other unexpected errors during the process | |
| self.logger.error(f"Image generation {i+1} failed with an exception: {e}", exc_info=True) | |
| self.api_client.send_message(chat_id, f"π’ An error occurred while generating image {i+1}.", mid) | |
| if path and os.path.exists(path): | |
| os.remove(path) # Clean up partially created files | |
| def _task_edit_image(self, task: Dict[str, Any]): | |
| chat_id, mid, prompt = task["chat_id"], task["message_id"], task["prompt"] | |
| input_source, source_type = task["input_source"], task["source_type"] | |
| self.api_client.send_message(chat_id, f"π¨ Editing image with prompt: \"{prompt}\"...", mid) | |
| input_path_temp, output_path = None, None | |
| try: | |
| os.makedirs(self.config.TEMP_DIR, exist_ok=True) | |
| output_path = os.path.join(self.config.TEMP_DIR, f"output_{mid}.jpg") | |
| if source_type == 'url': | |
| image_data = self.api_client.download_file(input_source) | |
| if not image_data: raise ValueError(f"Failed to download image from URL: {input_source}") | |
| input_path_temp = os.path.join(self.config.TEMP_DIR, f"input_{mid}.jpg") | |
| with open(input_path_temp, 'wb') as f: f.write(image_data) | |
| edit_input_path = input_path_temp | |
| elif source_type == 'path': | |
| edit_input_path = input_source | |
| else: | |
| raise ValueError(f"Unknown source_type: {source_type}") | |
| gemini_flash_lib.generate_image(prompt, edit_input_path, download_path=output_path) | |
| if os.path.exists(output_path): | |
| caption = f"β¨ Edited: {prompt}" | |
| response = self.api_client.send_file(chat_id, output_path, caption, mid) | |
| if response and response.get("idMessage"): | |
| sent_message_id = response["idMessage"] | |
| self.logger.info(f"Caching sent image path for message {sent_message_id}: {output_path}") | |
| self._manage_cached_file(chat_id, sent_message_id, output_path) | |
| else: | |
| self.logger.warning(f"Could not get sent message ID for {output_path}. Deleting file.") | |
| os.remove(output_path) | |
| else: | |
| raise ValueError("Edited image file not found.") | |
| except Exception as e: | |
| self.logger.error(f"Image editing task failed: {e}", exc_info=True) | |
| self.api_client.send_message(chat_id, "π’ Sorry, I failed to edit the image.", mid) | |
| if output_path and os.path.exists(output_path): os.remove(output_path) | |
| finally: | |
| if input_path_temp and os.path.exists(input_path_temp): os.remove(input_path_temp) | |
| def _task_voice_reply(self, task: Dict[str, Any]): | |
| text = task["text"] | |
| prompt = f"Say this in a friendly, playful, and slightly clumsy-cute way: {text}" | |
| try: | |
| result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=self.config.AUDIO_DIR) | |
| if result and result[0]: | |
| path, _ = result | |
| self.api_client.send_file(task["chat_id"], path, quoted_message_id=task["message_id"]) | |
| os.remove(path) | |
| except Exception as e: | |
| self.logger.warning(f"Voice reply generation failed: {e}", exc_info=True) | |
| def _task_joke(self, task: Dict[str, Any]): | |
| try: | |
| j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
| joke = f"{j['setup']}\n\n{j['punchline']}" | |
| except Exception: | |
| self.logger.warning("Joke API failed, falling back to LLM.") | |
| joke = generate_llm("Tell me a short, clean joke.") | |
| self._task_send_text({"type": "send_text", **task, "message": f"π {joke}"}) | |
| def _task_inspire(self, task: Dict[str, Any]): | |
| quote = generate_llm("Give me a unique, short, uplifting inspirational quote with attribution.") | |
| self._task_send_text({"type": "send_text", **task, "message": f"β¨ {quote}"}) | |
| def _task_weather(self, task: Dict[str, Any]): | |
| location = task.get("location") | |
| if not location: | |
| self.api_client.send_message(task["chat_id"], "Please provide a location for the weather.", task["message_id"]) | |
| return | |
| try: | |
| raw = requests.get(f"http://wttr.in/{location.replace(' ', '+')}?format=4", timeout=10).text | |
| report = generate_llm(f"Create a friendly weather report in Celsius from this data:\n\n{raw}") | |
| self._task_send_text({"type": "send_text", **task, "message": f"π€οΈ Weather for {location}:\n{report}"}) | |
| except Exception as e: | |
| self.logger.error(f"Weather task failed: {e}", exc_info=True) | |
| self.api_client.send_message(task["chat_id"], "Sorry, I couldn't get the weather.", task["message_id"]) | |
| def run(self): | |
| """Starts the bot and FastAPI server.""" | |
| self.logger.info("Starting Eve WhatsApp Bot...") | |
| for d in [self.config.IMAGE_DIR, self.config.AUDIO_DIR, self.config.TEMP_DIR]: | |
| os.makedirs(d, exist_ok=True) | |
| self.logger.debug(f"Ensured directory exists: {d}") | |
| self.api_client.send_message( | |
| "[email protected]", | |
| "π Eve is online and ready to help! Type /help to see commands." | |
| ) | |
| uvicorn.run(self.fastapi_app, host="0.0.0.0", port=7860, log_config=None) | |
| if __name__ == "__main__": | |
| try: | |
| bot_config = BotConfig() | |
| executor = ThreadPoolExecutor(max_workers=bot_config.WORKER_THREADS * 2) | |
| bot = WhatsAppBot(bot_config) | |
| bot.run() | |
| except ValueError as e: | |
| logging.basicConfig() | |
| logging.getLogger().critical(f"β CONFIGURATION ERROR: {e}") | |
| except KeyboardInterrupt: | |
| print("\nπ Bot stopped by user.") | |
| except Exception as e: | |
| logging.getLogger().critical(f"β A fatal error occurred: {e}", exc_info=True) | |