import os import io import sys import json import time import base64 import hashlib import sqlite3 import logging import requests from datetime import datetime from typing import Optional, Tuple, Set, List, Dict from PIL import Image from tqdm import tqdm from gradio_client import Client from transformers import VisionEncoderDecoderModel, TrOCRProcessor import torch # ========================= # CONFIGURATION # ========================= # --- Core Settings --- API_CONFIGS = [ { "url": "https://sarthaksidhant-epic-id-chek.hf.space", "output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful.jsonl", "name": "API-1" }, { "url": "https://sarthaksidhant-kjrkejrke.hf.space", "output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful2.jsonl", "name": "API-2" }, { "url": "https://sarthaksidhant-shugma.hf.space", "output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful3.jsonl", "name": "API-3" } ] EPIC_INPUT_FILE = "/home/sofiyan/Desktop/Sarthak/main_file_all_epics.txt" # One EPIC per line FAILED_OUTPUT_FILE = "/home/sofiyan/Desktop/Sarthak/voter_details_failed.txt" # For EPICs that failed all retries # --- Processing Controls --- MAX_ATTEMPTS_PER_EPIC = 10 # Maximum attempts per EPIC before giving up INITIAL_DELAY_SEC = 1 # Initial delay between attempts MAX_DELAY_SEC = 30 # Maximum delay between attempts TIMEOUT_PER_ATTEMPT_SEC = 60 # Timeout for a single attempt # --- Caching and Assets --- CACHE_DB = "captcha_cache.db" # Cache DB for solved captcha images CAPTCHA_SAVE_DIR = "/home/sofiyan/Desktop/Sarthak/captchas" # Directory to save captcha images # ========================= # INITIALIZATION & LOGGING # ========================= logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') os.makedirs(CAPTCHA_SAVE_DIR, exist_ok=True) # Initialize clients for each API clients = [] for config in API_CONFIGS: try: client = Client(config["url"], httpx_kwargs={"timeout": 120.0}) clients.append({ "client": client, "config": config }) logging.info("Successfully connected to %s at %s", config["name"], config["url"]) except Exception as e: logging.error("Could not connect to %s at %s: %s", config["name"], config["url"], e) if not clients: logging.error("No clients could be initialized. Exiting.") sys.exit(1) # Initialize thread-safe SQLite cache for solved captchas conn = sqlite3.connect(CACHE_DB, check_same_thread=False) c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS captcha_cache ( md5_hash TEXT PRIMARY KEY, captcha_text TEXT ) """) conn.commit() def cache_get(md5_hash: str) -> Optional[str]: c.execute("SELECT captcha_text FROM captcha_cache WHERE md5_hash = ?", (md5_hash,)) row = c.fetchone() return row[0] if row else None def cache_put(md5_hash: str, captcha_text: str) -> None: c.execute("INSERT OR REPLACE INTO captcha_cache(md5_hash, captcha_text) VALUES (?,?)", (md5_hash, captcha_text)) conn.commit() # Initialize 3 separate OCR models (one per API) ocr_models = [] try: logging.info("Loading 3 OCR captcha solver instances (anuashok/ocr-captcha-v3)...") for i in range(3): processor = TrOCRProcessor.from_pretrained("anuashok/ocr-captcha-v3", use_fast=True) model = VisionEncoderDecoderModel.from_pretrained("anuashok/ocr-captcha-v3") device = "cuda" if torch.cuda.is_available() else "cpu" model.to(device) ocr_models.append({ "processor": processor, "model": model, "device": device }) logging.info("OCR model %d ready on %s", i+1, device) except Exception as e: logging.error("Failed to load OCR models. Please check transformers installation and model name. Error: %s", e) sys.exit(1) # ========================= # PROCESSED EPICS TRACKING # ========================= def load_processed_epics() -> Set[str]: """Load already processed EPICs from all success output files to avoid re-work.""" processed = set() for config in API_CONFIGS: output_file = config["output_file"] if os.path.exists(output_file): try: with open(output_file, "r", encoding="utf-8") as f: for line in f: try: data = json.loads(line.strip()) if "epic" in data and data.get("success"): processed.add(data["epic"]) except json.JSONDecodeError: continue except Exception as e: logging.warning("Could not load processed EPICs from %s: %s", output_file, e) return processed # ========================= # CORE API & CAPTCHA FUNCTIONS # ========================= def md5_of_image_bytes(img: Image.Image) -> str: """Calculates the MD5 hash of a PIL Image.""" buf = io.BytesIO() img.save(buf, format="PNG") return hashlib.md5(buf.getvalue()).hexdigest() def validate_captcha(captcha_text: str) -> bool: """Validate that captcha is exactly 6 alphanumeric characters.""" return len(captcha_text) == 6 and captcha_text.isalnum() def get_captcha_via_api(client_info: dict) -> Tuple[Image.Image, str]: """ Calls the API-friendly captcha endpoint to get a new captcha. Uses /api_get_captcha which returns base64 encoded image. """ try: logging.info(f"Getting captcha from {client_info['config']['name']} using /api_get_captcha") result = client_info["client"].predict(api_name="/api_get_captcha") # The /api_get_captcha returns a JSON response if isinstance(result, dict): if not result.get("success"): raise ValueError(f"API returned failure: {result}") captcha_id = result.get("captcha_id") captcha_base64 = result.get("captcha_base64") if not captcha_id or not captcha_base64: raise ValueError(f"Missing captcha_id or captcha_base64 in response: {result}") # Decode base64 image img_bytes = base64.b64decode(captcha_base64) img = Image.open(io.BytesIO(img_bytes)).convert("RGB") # Save image locally as backup timestamp = int(time.time()) save_path = os.path.join(CAPTCHA_SAVE_DIR, f"captcha_{client_info['config']['name']}_{captcha_id}_{timestamp}.png") img.save(save_path) logging.info(f"Captcha received: ID={captcha_id}, saved to {save_path}") return img, captcha_id else: raise ValueError(f"Unexpected response format from /api_get_captcha: {result}") except Exception as e: logging.error(f"Failed to get captcha from {client_info['config']['name']}: {e}") raise # ========================= # IMAGE PREPROCESSING FOR OCR # ========================= def preprocess_image_for_ocr(img: Image.Image) -> Image.Image: """Preprocess image for better OCR accuracy.""" try: # Convert to RGB if not already if img.mode != 'RGB': img = img.convert('RGB') # Resize to a standard size for OCR img = img.resize((200, 80), Image.Resampling.LANCZOS) # Enhance contrast from PIL import ImageEnhance enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(1.5) # Increase contrast by 50% # Enhance sharpness enhancer = ImageEnhance.Sharpness(img) img = enhancer.enhance(1.2) # Increase sharpness by 20% return img except Exception as e: logging.warning(f"Image preprocessing failed: {e}, using original image") return img def solve_captcha_with_model(img: Image.Image, ocr_model_index: int) -> str: """Uses a specific OCR model to solve the captcha, with caching and validation.""" md5_key = md5_of_image_bytes(img) cached = cache_get(md5_key) if cached and validate_captcha(cached): logging.info(f"[CACHE] Using cached captcha text: {cached}") return cached # Get the specific OCR model ocr_model = ocr_models[ocr_model_index] try: # Preprocess image for better OCR processed_img = preprocess_image_for_ocr(img) # Save processed image for debugging debug_filename = f"debug_captcha_{ocr_model_index}_{int(time.time())}.png" debug_path = os.path.join(CAPTCHA_SAVE_DIR, debug_filename) processed_img.save(debug_path) # Run OCR pixel_values = ocr_model["processor"](processed_img, return_tensors="pt").pixel_values.to(ocr_model["device"]) generated_ids = ocr_model["model"].generate(pixel_values, num_beams=4, max_length=12) generated_text = ocr_model["processor"].batch_decode(generated_ids, skip_special_tokens=True)[0].strip() # Validate the generated text if not validate_captcha(generated_text): raise ValueError(f"OCR produced invalid text: '{generated_text}' (not 6 alphanumeric chars)") logging.info(f"[OCR-{ocr_model_index}] Predicted captcha: {generated_text}") cache_put(md5_key, generated_text) return generated_text except Exception as e: logging.error(f"[OCR-{ocr_model_index}-ERROR] {e}") raise def submit_search(client_info: dict, epic_number: str, captcha_text: str, captcha_id: str): """ Submits the EPIC and solved captcha to the API using /api_search_epic endpoint. """ try: logging.info(f"Submitting search for EPIC {epic_number} to {client_info['config']['name']} with captcha: {captcha_text}") # Use the API-friendly search endpoint result = client_info["client"].predict( epic_number=epic_number, captcha_data=captcha_text, captcha_id=captcha_id, api_name="/api_search_epic" ) logging.info(f"Received response from {client_info['config']['name']} for EPIC {epic_number}") # The /api_search_epic returns a single JSON response if isinstance(result, dict): return result else: return {"success": False, "error": f"Unexpected response format: {result}"} except Exception as e: logging.error(f"Failed to submit search to {client_info['config']['name']}: {e}") raise def write_success_result(client_info: dict, result: dict): """Write successful result to the appropriate output file.""" output_file = client_info["config"]["output_file"] with open(output_file, "a", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False) + "\n") f.flush() # Ensure immediate write to disk # ========================= # SINGLE ATTEMPT PIPELINE # ========================= def run_single_attempt(client_info: dict, epic_number: str, ocr_model_index: int, attempt_num: int) -> Dict: """ Executes one full attempt to fetch details for a single EPIC. Returns a dictionary with the outcome. """ result = {"epic": epic_number, "success": False, "captcha_incorrect": False, "error": None, "data": None, "api": client_info["config"]["name"]} try: # 1. Get captcha using API-friendly endpoint img, captcha_id = get_captcha_via_api(client_info) # 2. Solve captcha with specific OCR model captcha_text = solve_captcha_with_model(img, ocr_model_index) # 3. Submit search using API-friendly endpoint search_result = submit_search(client_info, epic_number, captcha_text, captcha_id) # 4. Process result if isinstance(search_result, dict): result["data"] = search_result if search_result.get("success"): result["success"] = True logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - SUCCESS") else: result["error"] = search_result.get("error", "Unknown error from API") result["captcha_incorrect"] = bool(search_result.get("captcha_incorrect")) if result["captcha_incorrect"]: logging.warning(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED (Incorrect Captcha)") else: logging.warning(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED ({result['error']})") else: result["error"] = "Unexpected response format from API" result["data"] = {"raw": search_result} logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED ({result['error']})") except Exception as e: result["error"] = f"{type(e).__name__}: {e}" logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED with exception: {e}") return result # ========================= # SEQUENTIAL PROCESSING WORKFLOW # ========================= def process_single_epic_with_api(client_info: dict, epic_number: str, ocr_model_index: int) -> Dict: """ Processes a single EPIC with a specific API and OCR model with retry logic. """ delay = INITIAL_DELAY_SEC for attempt in range(1, MAX_ATTEMPTS_PER_EPIC + 1): try: result = run_single_attempt(client_info, epic_number, ocr_model_index, attempt) if result["success"]: # Write result immediately write_success_result(client_info, result) logging.info(f"SUCCESS: EPIC {epic_number} completed via {client_info['config']['name']} (OCR-{ocr_model_index})") return result # If captcha was incorrect, retry immediately (no delay) if result["captcha_incorrect"]: logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Captcha incorrect, retrying immediately... (Attempt {attempt + 1})") continue # For other errors, use exponential backoff if attempt < MAX_ATTEMPTS_PER_EPIC: logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Waiting {delay}s before retry {attempt + 1}") time.sleep(delay) delay = min(delay * 2, MAX_DELAY_SEC) # Exponential backoff except Exception as e: logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Unhandled exception in attempt {attempt}: {e}") if attempt < MAX_ATTEMPTS_PER_EPIC: time.sleep(delay) delay = min(delay * 2, MAX_DELAY_SEC) # If all attempts fail logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): FAILED after {MAX_ATTEMPTS_PER_EPIC} attempts") return {"epic": epic_number, "success": False, "error": f"Failed all {MAX_ATTEMPTS_PER_EPIC} attempts", "api": client_info['config']['name']} def process_epics_sequentially(epics_to_process: List[str]): """ Process EPICs sequentially - one EPIC at a time, rotating through APIs. EPIC1 -> API1, EPIC2 -> API2, EPIC3 -> API3, EPIC4 -> API1, etc. """ logging.info(f"Processing {len(epics_to_process)} EPICs sequentially with 3 APIs and 3 OCR models") failed_epics = [] successful_count = 0 # Create progress bar pbar = tqdm(total=len(epics_to_process), desc="Processing EPICs", unit="epic", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') try: for i, epic_number in enumerate(epics_to_process): # Select API and OCR model based on round-robin api_index = i % len(clients) client_info = clients[api_index] ocr_model_index = api_index # Each API uses its own OCR model logging.info(f"[{i+1}/{len(epics_to_process)}] Processing EPIC {epic_number} with {client_info['config']['name']} (OCR-{ocr_model_index})") # Process single EPIC result = process_single_epic_with_api(client_info, epic_number, ocr_model_index) if result["success"]: successful_count += 1 pbar.set_description(f"Success: {successful_count}, Failed: {len(failed_epics)}") else: failed_epics.append(epic_number) pbar.set_description(f"Success: {successful_count}, Failed: {len(failed_epics)}") pbar.update(1) # Small delay between EPICs to be polite to APIs time.sleep(0.5) finally: pbar.close() return failed_epics def main(): """Main function to orchestrate the entire process.""" if not os.path.exists(EPIC_INPUT_FILE): logging.critical(f"Input file not found: {EPIC_INPUT_FILE}") logging.critical("Create a text file with one EPIC per line.") sys.exit(1) # Create output files if they don't exist for config in API_CONFIGS: output_file = config["output_file"] if not os.path.exists(output_file): with open(output_file, "w", encoding="utf-8") as f: pass # Create empty file logging.info(f"Created output file: {output_file}") with open(EPIC_INPUT_FILE, "r", encoding="utf-8") as f: epic_numbers = [ln.strip() for ln in f if ln.strip()] # Remove duplicates while preserving order unique_epics = list(dict.fromkeys(epic_numbers)) processed_epics = load_processed_epics() epics_to_process = [e for e in unique_epics if e not in processed_epics] logging.info(f"Loaded {len(unique_epics)} unique EPIC(s) from input file.") logging.info(f"Found {len(processed_epics)} already processed EPICs in success files.") logging.info(f"Starting process for {len(epics_to_process)} remaining EPICs.") if not epics_to_process: logging.info("All EPICs already processed successfully!") return # Process all EPICs sequentially logging.info(f"--- Starting Sequential Processing ---") logging.info(f"Processing: EPIC1->API1, EPIC2->API2, EPIC3->API3, EPIC4->API1, etc.") failed_epics = process_epics_sequentially(epics_to_process) # Write final failures to their own file if failed_epics: logging.info(f"Writing {len(failed_epics)} failed EPICs to {FAILED_OUTPUT_FILE}") with open(FAILED_OUTPUT_FILE, "w", encoding="utf-8") as ffail: for epic in failed_epics: ffail.write(f"{epic}\n") # --- FINAL SUMMARY --- logging.info("=" * 50) logging.info("Processing complete.") logging.info(f"Total Successful: {len(epics_to_process) - len(failed_epics)}") logging.info(f"Total Failed: {len(failed_epics)}") for config in API_CONFIGS: logging.info(f"Results for {config['name']}: {config['output_file']}") if failed_epics: logging.info(f"Persistently failed EPICs are in: {FAILED_OUTPUT_FILE}") logging.info("=" * 50) if __name__ == "__main__": try: main() except KeyboardInterrupt: logging.info("\nProcess interrupted by user. Exiting.") sys.exit(0)