# ============================================ # AgriCopilot AI Backend — Optimized Stable Release # ============================================ import os import logging import io import base64 from fastapi import FastAPI, Request, Header, HTTPException, UploadFile, File from fastapi.responses import JSONResponse from pydantic import BaseModel from openai import OpenAI from duckduckgo_search import DDGS from vector import query_vector # ============================== # Logging Setup # ============================== logging.basicConfig(level=logging.INFO) logger = logging.getLogger("AgriCopilot") # ============================== # FastAPI App Init # ============================== app = FastAPI(title="AgriCopilot") @app.get("/") async def root(): return {"status": "✅ AgriCopilot AI Backend is running and stable."} # ============================== # Auth Config # ============================== PROJECT_API_KEY = os.getenv("PROJECT_API_KEY", "agricopilot404") def check_auth(authorization: str | None): """Verifies Bearer token for all requests.""" if not PROJECT_API_KEY: return if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing bearer token") token = authorization.split(" ", 1)[1] if token != PROJECT_API_KEY: raise HTTPException(status_code=403, detail="Invalid token") # ============================== # Exception Handler # ============================== @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Unhandled error: {exc}") return JSONResponse(status_code=500, content={"error": str(exc)}) # ============================== # Request Schemas # ============================== class ChatRequest(BaseModel): query: str class DisasterRequest(BaseModel): report: str class MarketRequest(BaseModel): product: str class VectorRequest(BaseModel): query: str # ============================== # OpenRouter Config # ============================== OPENROUTER_API_KEY = os.getenv("AgriCopilot_OPENROUTER_MODEL_KEY") if not OPENROUTER_API_KEY: logger.error("🛑 Missing AgriCopilot_OPENROUTER_MODEL_KEY") client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=OPENROUTER_API_KEY, ) MODEL_NAME = "qwen/qwen3-vl-235b-a22b-thinking" # ============================== # Helper Functions # ============================== def format_ai_response(text: str) -> str: return f"{text}\n\n---\n**AI: AgriCopilot | Provider: Team Astra**" def run_with_fallback(system_prompt: str, user_prompt: str) -> str: try: # First pass response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": system_prompt + " If you do not know the answer and lack sufficient knowledge, explicitly reply with only 'WEB_SEARCH_REQUIRED'."}, {"role": "user", "content": user_prompt} ], temperature=0.7, max_tokens=800 ) reply = response.choices[0].message.content.strip() if "WEB_SEARCH_REQUIRED" in reply.upper(): logger.info("🌍 Web search triggered!") ddgs = DDGS() search_results = list(ddgs.text(user_prompt, max_results=3)) context = " ".join([r['body'] for r in search_results]) if search_results else "No relevant search results found online." # Second pass with search context second_prompt = f"Using this web search context: {context}\n\nAnswer the user: {user_prompt}" response2 = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": second_prompt} ], temperature=0.7, max_tokens=800 ) reply = response2.choices[0].message.content.strip() reply += "\n\n*(Information augmented with real-time web search)*" return format_ai_response(reply) except Exception as e: logger.error(f"Conversational pipeline error: {e}") return f"⚠️ Model error: {str(e)}" def run_crop_doctor(image_bytes: bytes, symptoms: str): try: # Extract base64 encoded image base64_image = base64.b64encode(image_bytes).decode('utf-8') image_url = f"data:image/jpeg;base64,{base64_image}" vector_matches = query_vector(symptoms) related_knowledge = " ".join(vector_matches[:3]) if isinstance(vector_matches, list) else str(vector_matches) prompt = ( "Analyze this crop image. " f"The farmer reported these symptoms: {symptoms}. " f"Knowledge base reference: {related_knowledge}. " "Generate a structured diagnostic report with:\n" "1. Disease Name\n2. Cause\n3. Treatment\n4. Prevention Tips\n" "Keep the explanation short and easy for farmers to understand." ) response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "You are AgriCopilot, an expert agricultural AI assistant."}, {"role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": image_url}} ]} ], temperature=0.6, max_tokens=800 ) reply = response.choices[0].message.content.strip() if not reply: return "⚠️ No response generated. Try again with clearer image or symptoms." return format_ai_response(reply) except Exception as e: logger.error(f"Crop Doctor error: {e}") return f"⚠️ Crop Doctor encountered an issue: {str(e)}" # ============================== # Endpoints # ============================== @app.post("/crop-doctor") async def crop_doctor( symptoms: str = Header(...), image: UploadFile = File(...), authorization: str | None = Header(None) ): """Diagnose crop disease from image and text.""" check_auth(authorization) image_bytes = await image.read() diagnosis = run_crop_doctor(image_bytes, symptoms) return {"diagnosis": diagnosis} @app.post("/multilingual-chat") async def multilingual_chat(req: ChatRequest, authorization: str | None = Header(None)): check_auth(authorization) sys_prompt = "You are AgriCopilot, an expert agricultural AI assistant. Answer the farmer's question helpfully." reply = run_with_fallback(sys_prompt, req.query) return {"reply": reply} @app.post("/disaster-summarizer") async def disaster_summarizer(req: DisasterRequest, authorization: str | None = Header(None)): check_auth(authorization) sys_prompt = "You are AgriCopilot. Summarize the following disaster report for a farmer and provide actionable immediate next steps." summary = run_with_fallback(sys_prompt, req.report) return {"summary": summary} @app.post("/marketplace") async def marketplace(req: MarketRequest, authorization: str | None = Header(None)): check_auth(authorization) sys_prompt = "You are AgriCopilot. Provide marketplace recommendations, pricing strategies, and selling tips for the specified agricultural product." recommendation = run_with_fallback(sys_prompt, req.product) return {"recommendation": recommendation} @app.post("/vector-search") async def vector_search(req: VectorRequest, authorization: str | None = Header(None)): check_auth(authorization) try: results = query_vector(req.query) return {"results": results} except Exception as e: logger.error(f"Vector search error: {e}") return {"error": f"Vector search error: {str(e)}"} # ============================================ # END OF FILE # ============================================