Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import sys | |
| import json | |
| import time | |
| import base64 | |
| import hashlib | |
| import sqlite3 | |
| import logging | |
| import requests | |
| from datetime import datetime | |
| from typing import Optional, Tuple, Set, List, Dict | |
| from PIL import Image | |
| from tqdm import tqdm | |
| from gradio_client import Client | |
| from transformers import VisionEncoderDecoderModel, TrOCRProcessor | |
| import torch | |
| # ========================= | |
| # CONFIGURATION | |
| # ========================= | |
| # --- Core Settings --- | |
| API_CONFIGS = [ | |
| { | |
| "url": "https://sarthaksidhant-epic-id-chek.hf.space", | |
| "output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful.jsonl", | |
| "name": "API-1" | |
| }, | |
| { | |
| "url": "https://sarthaksidhant-kjrkejrke.hf.space", | |
| "output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful2.jsonl", | |
| "name": "API-2" | |
| }, | |
| { | |
| "url": "https://sarthaksidhant-shugma.hf.space", | |
| "output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful3.jsonl", | |
| "name": "API-3" | |
| } | |
| ] | |
| EPIC_INPUT_FILE = "/home/sofiyan/Desktop/Sarthak/main_file_all_epics.txt" # One EPIC per line | |
| FAILED_OUTPUT_FILE = "/home/sofiyan/Desktop/Sarthak/voter_details_failed.txt" # For EPICs that failed all retries | |
| # --- Processing Controls --- | |
| MAX_ATTEMPTS_PER_EPIC = 10 # Maximum attempts per EPIC before giving up | |
| INITIAL_DELAY_SEC = 1 # Initial delay between attempts | |
| MAX_DELAY_SEC = 30 # Maximum delay between attempts | |
| TIMEOUT_PER_ATTEMPT_SEC = 60 # Timeout for a single attempt | |
| # --- Caching and Assets --- | |
| CACHE_DB = "captcha_cache.db" # Cache DB for solved captcha images | |
| CAPTCHA_SAVE_DIR = "/home/sofiyan/Desktop/Sarthak/captchas" # Directory to save captcha images | |
| # ========================= | |
| # INITIALIZATION & LOGGING | |
| # ========================= | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') | |
| os.makedirs(CAPTCHA_SAVE_DIR, exist_ok=True) | |
| # Initialize clients for each API | |
| clients = [] | |
| for config in API_CONFIGS: | |
| try: | |
| client = Client(config["url"], httpx_kwargs={"timeout": 120.0}) | |
| clients.append({ | |
| "client": client, | |
| "config": config | |
| }) | |
| logging.info("Successfully connected to %s at %s", config["name"], config["url"]) | |
| except Exception as e: | |
| logging.error("Could not connect to %s at %s: %s", config["name"], config["url"], e) | |
| if not clients: | |
| logging.error("No clients could be initialized. Exiting.") | |
| sys.exit(1) | |
| # Initialize thread-safe SQLite cache for solved captchas | |
| conn = sqlite3.connect(CACHE_DB, check_same_thread=False) | |
| c = conn.cursor() | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS captcha_cache ( | |
| md5_hash TEXT PRIMARY KEY, | |
| captcha_text TEXT | |
| ) | |
| """) | |
| conn.commit() | |
| def cache_get(md5_hash: str) -> Optional[str]: | |
| c.execute("SELECT captcha_text FROM captcha_cache WHERE md5_hash = ?", (md5_hash,)) | |
| row = c.fetchone() | |
| return row[0] if row else None | |
| def cache_put(md5_hash: str, captcha_text: str) -> None: | |
| c.execute("INSERT OR REPLACE INTO captcha_cache(md5_hash, captcha_text) VALUES (?,?)", (md5_hash, captcha_text)) | |
| conn.commit() | |
| # Initialize 3 separate OCR models (one per API) | |
| ocr_models = [] | |
| try: | |
| logging.info("Loading 3 OCR captcha solver instances (anuashok/ocr-captcha-v3)...") | |
| for i in range(3): | |
| processor = TrOCRProcessor.from_pretrained("anuashok/ocr-captcha-v3", use_fast=True) | |
| model = VisionEncoderDecoderModel.from_pretrained("anuashok/ocr-captcha-v3") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| ocr_models.append({ | |
| "processor": processor, | |
| "model": model, | |
| "device": device | |
| }) | |
| logging.info("OCR model %d ready on %s", i+1, device) | |
| except Exception as e: | |
| logging.error("Failed to load OCR models. Please check transformers installation and model name. Error: %s", e) | |
| sys.exit(1) | |
| # ========================= | |
| # PROCESSED EPICS TRACKING | |
| # ========================= | |
| def load_processed_epics() -> Set[str]: | |
| """Load already processed EPICs from all success output files to avoid re-work.""" | |
| processed = set() | |
| for config in API_CONFIGS: | |
| output_file = config["output_file"] | |
| if os.path.exists(output_file): | |
| try: | |
| with open(output_file, "r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| data = json.loads(line.strip()) | |
| if "epic" in data and data.get("success"): | |
| processed.add(data["epic"]) | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logging.warning("Could not load processed EPICs from %s: %s", output_file, e) | |
| return processed | |
| # ========================= | |
| # CORE API & CAPTCHA FUNCTIONS | |
| # ========================= | |
| def md5_of_image_bytes(img: Image.Image) -> str: | |
| """Calculates the MD5 hash of a PIL Image.""" | |
| buf = io.BytesIO() | |
| img.save(buf, format="PNG") | |
| return hashlib.md5(buf.getvalue()).hexdigest() | |
| def validate_captcha(captcha_text: str) -> bool: | |
| """Validate that captcha is exactly 6 alphanumeric characters.""" | |
| return len(captcha_text) == 6 and captcha_text.isalnum() | |
| def get_captcha_via_api(client_info: dict) -> Tuple[Image.Image, str]: | |
| """ | |
| Calls the API-friendly captcha endpoint to get a new captcha. | |
| Uses /api_get_captcha which returns base64 encoded image. | |
| """ | |
| try: | |
| logging.info(f"Getting captcha from {client_info['config']['name']} using /api_get_captcha") | |
| result = client_info["client"].predict(api_name="/api_get_captcha") | |
| # The /api_get_captcha returns a JSON response | |
| if isinstance(result, dict): | |
| if not result.get("success"): | |
| raise ValueError(f"API returned failure: {result}") | |
| captcha_id = result.get("captcha_id") | |
| captcha_base64 = result.get("captcha_base64") | |
| if not captcha_id or not captcha_base64: | |
| raise ValueError(f"Missing captcha_id or captcha_base64 in response: {result}") | |
| # Decode base64 image | |
| img_bytes = base64.b64decode(captcha_base64) | |
| img = Image.open(io.BytesIO(img_bytes)).convert("RGB") | |
| # Save image locally as backup | |
| timestamp = int(time.time()) | |
| save_path = os.path.join(CAPTCHA_SAVE_DIR, f"captcha_{client_info['config']['name']}_{captcha_id}_{timestamp}.png") | |
| img.save(save_path) | |
| logging.info(f"Captcha received: ID={captcha_id}, saved to {save_path}") | |
| return img, captcha_id | |
| else: | |
| raise ValueError(f"Unexpected response format from /api_get_captcha: {result}") | |
| except Exception as e: | |
| logging.error(f"Failed to get captcha from {client_info['config']['name']}: {e}") | |
| raise | |
| # ========================= | |
| # IMAGE PREPROCESSING FOR OCR | |
| # ========================= | |
| def preprocess_image_for_ocr(img: Image.Image) -> Image.Image: | |
| """Preprocess image for better OCR accuracy.""" | |
| try: | |
| # Convert to RGB if not already | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| # Resize to a standard size for OCR | |
| img = img.resize((200, 80), Image.Resampling.LANCZOS) | |
| # Enhance contrast | |
| from PIL import ImageEnhance | |
| enhancer = ImageEnhance.Contrast(img) | |
| img = enhancer.enhance(1.5) # Increase contrast by 50% | |
| # Enhance sharpness | |
| enhancer = ImageEnhance.Sharpness(img) | |
| img = enhancer.enhance(1.2) # Increase sharpness by 20% | |
| return img | |
| except Exception as e: | |
| logging.warning(f"Image preprocessing failed: {e}, using original image") | |
| return img | |
| def solve_captcha_with_model(img: Image.Image, ocr_model_index: int) -> str: | |
| """Uses a specific OCR model to solve the captcha, with caching and validation.""" | |
| md5_key = md5_of_image_bytes(img) | |
| cached = cache_get(md5_key) | |
| if cached and validate_captcha(cached): | |
| logging.info(f"[CACHE] Using cached captcha text: {cached}") | |
| return cached | |
| # Get the specific OCR model | |
| ocr_model = ocr_models[ocr_model_index] | |
| try: | |
| # Preprocess image for better OCR | |
| processed_img = preprocess_image_for_ocr(img) | |
| # Save processed image for debugging | |
| debug_filename = f"debug_captcha_{ocr_model_index}_{int(time.time())}.png" | |
| debug_path = os.path.join(CAPTCHA_SAVE_DIR, debug_filename) | |
| processed_img.save(debug_path) | |
| # Run OCR | |
| pixel_values = ocr_model["processor"](processed_img, return_tensors="pt").pixel_values.to(ocr_model["device"]) | |
| generated_ids = ocr_model["model"].generate(pixel_values, num_beams=4, max_length=12) | |
| generated_text = ocr_model["processor"].batch_decode(generated_ids, skip_special_tokens=True)[0].strip() | |
| # Validate the generated text | |
| if not validate_captcha(generated_text): | |
| raise ValueError(f"OCR produced invalid text: '{generated_text}' (not 6 alphanumeric chars)") | |
| logging.info(f"[OCR-{ocr_model_index}] Predicted captcha: {generated_text}") | |
| cache_put(md5_key, generated_text) | |
| return generated_text | |
| except Exception as e: | |
| logging.error(f"[OCR-{ocr_model_index}-ERROR] {e}") | |
| raise | |
| def submit_search(client_info: dict, epic_number: str, captcha_text: str, captcha_id: str): | |
| """ | |
| Submits the EPIC and solved captcha to the API using /api_search_epic endpoint. | |
| """ | |
| try: | |
| logging.info(f"Submitting search for EPIC {epic_number} to {client_info['config']['name']} with captcha: {captcha_text}") | |
| # Use the API-friendly search endpoint | |
| result = client_info["client"].predict( | |
| epic_number=epic_number, | |
| captcha_data=captcha_text, | |
| captcha_id=captcha_id, | |
| api_name="/api_search_epic" | |
| ) | |
| logging.info(f"Received response from {client_info['config']['name']} for EPIC {epic_number}") | |
| # The /api_search_epic returns a single JSON response | |
| if isinstance(result, dict): | |
| return result | |
| else: | |
| return {"success": False, "error": f"Unexpected response format: {result}"} | |
| except Exception as e: | |
| logging.error(f"Failed to submit search to {client_info['config']['name']}: {e}") | |
| raise | |
| def write_success_result(client_info: dict, result: dict): | |
| """Write successful result to the appropriate output file.""" | |
| output_file = client_info["config"]["output_file"] | |
| with open(output_file, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(result, ensure_ascii=False) + "\n") | |
| f.flush() # Ensure immediate write to disk | |
| # ========================= | |
| # SINGLE ATTEMPT PIPELINE | |
| # ========================= | |
| def run_single_attempt(client_info: dict, epic_number: str, ocr_model_index: int, attempt_num: int) -> Dict: | |
| """ | |
| Executes one full attempt to fetch details for a single EPIC. | |
| Returns a dictionary with the outcome. | |
| """ | |
| result = {"epic": epic_number, "success": False, "captcha_incorrect": False, "error": None, "data": None, "api": client_info["config"]["name"]} | |
| try: | |
| # 1. Get captcha using API-friendly endpoint | |
| img, captcha_id = get_captcha_via_api(client_info) | |
| # 2. Solve captcha with specific OCR model | |
| captcha_text = solve_captcha_with_model(img, ocr_model_index) | |
| # 3. Submit search using API-friendly endpoint | |
| search_result = submit_search(client_info, epic_number, captcha_text, captcha_id) | |
| # 4. Process result | |
| if isinstance(search_result, dict): | |
| result["data"] = search_result | |
| if search_result.get("success"): | |
| result["success"] = True | |
| logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - SUCCESS") | |
| else: | |
| result["error"] = search_result.get("error", "Unknown error from API") | |
| result["captcha_incorrect"] = bool(search_result.get("captcha_incorrect")) | |
| if result["captcha_incorrect"]: | |
| logging.warning(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED (Incorrect Captcha)") | |
| else: | |
| logging.warning(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED ({result['error']})") | |
| else: | |
| result["error"] = "Unexpected response format from API" | |
| result["data"] = {"raw": search_result} | |
| logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED ({result['error']})") | |
| except Exception as e: | |
| result["error"] = f"{type(e).__name__}: {e}" | |
| logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED with exception: {e}") | |
| return result | |
| # ========================= | |
| # SEQUENTIAL PROCESSING WORKFLOW | |
| # ========================= | |
| def process_single_epic_with_api(client_info: dict, epic_number: str, ocr_model_index: int) -> Dict: | |
| """ | |
| Processes a single EPIC with a specific API and OCR model with retry logic. | |
| """ | |
| delay = INITIAL_DELAY_SEC | |
| for attempt in range(1, MAX_ATTEMPTS_PER_EPIC + 1): | |
| try: | |
| result = run_single_attempt(client_info, epic_number, ocr_model_index, attempt) | |
| if result["success"]: | |
| # Write result immediately | |
| write_success_result(client_info, result) | |
| logging.info(f"SUCCESS: EPIC {epic_number} completed via {client_info['config']['name']} (OCR-{ocr_model_index})") | |
| return result | |
| # If captcha was incorrect, retry immediately (no delay) | |
| if result["captcha_incorrect"]: | |
| logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Captcha incorrect, retrying immediately... (Attempt {attempt + 1})") | |
| continue | |
| # For other errors, use exponential backoff | |
| if attempt < MAX_ATTEMPTS_PER_EPIC: | |
| logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Waiting {delay}s before retry {attempt + 1}") | |
| time.sleep(delay) | |
| delay = min(delay * 2, MAX_DELAY_SEC) # Exponential backoff | |
| except Exception as e: | |
| logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Unhandled exception in attempt {attempt}: {e}") | |
| if attempt < MAX_ATTEMPTS_PER_EPIC: | |
| time.sleep(delay) | |
| delay = min(delay * 2, MAX_DELAY_SEC) | |
| # If all attempts fail | |
| logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): FAILED after {MAX_ATTEMPTS_PER_EPIC} attempts") | |
| return {"epic": epic_number, "success": False, "error": f"Failed all {MAX_ATTEMPTS_PER_EPIC} attempts", "api": client_info['config']['name']} | |
| def process_epics_sequentially(epics_to_process: List[str]): | |
| """ | |
| Process EPICs sequentially - one EPIC at a time, rotating through APIs. | |
| EPIC1 -> API1, EPIC2 -> API2, EPIC3 -> API3, EPIC4 -> API1, etc. | |
| """ | |
| logging.info(f"Processing {len(epics_to_process)} EPICs sequentially with 3 APIs and 3 OCR models") | |
| failed_epics = [] | |
| successful_count = 0 | |
| # Create progress bar | |
| pbar = tqdm(total=len(epics_to_process), desc="Processing EPICs", unit="epic", | |
| bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') | |
| try: | |
| for i, epic_number in enumerate(epics_to_process): | |
| # Select API and OCR model based on round-robin | |
| api_index = i % len(clients) | |
| client_info = clients[api_index] | |
| ocr_model_index = api_index # Each API uses its own OCR model | |
| logging.info(f"[{i+1}/{len(epics_to_process)}] Processing EPIC {epic_number} with {client_info['config']['name']} (OCR-{ocr_model_index})") | |
| # Process single EPIC | |
| result = process_single_epic_with_api(client_info, epic_number, ocr_model_index) | |
| if result["success"]: | |
| successful_count += 1 | |
| pbar.set_description(f"Success: {successful_count}, Failed: {len(failed_epics)}") | |
| else: | |
| failed_epics.append(epic_number) | |
| pbar.set_description(f"Success: {successful_count}, Failed: {len(failed_epics)}") | |
| pbar.update(1) | |
| # Small delay between EPICs to be polite to APIs | |
| time.sleep(0.5) | |
| finally: | |
| pbar.close() | |
| return failed_epics | |
| def main(): | |
| """Main function to orchestrate the entire process.""" | |
| if not os.path.exists(EPIC_INPUT_FILE): | |
| logging.critical(f"Input file not found: {EPIC_INPUT_FILE}") | |
| logging.critical("Create a text file with one EPIC per line.") | |
| sys.exit(1) | |
| # Create output files if they don't exist | |
| for config in API_CONFIGS: | |
| output_file = config["output_file"] | |
| if not os.path.exists(output_file): | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| pass # Create empty file | |
| logging.info(f"Created output file: {output_file}") | |
| with open(EPIC_INPUT_FILE, "r", encoding="utf-8") as f: | |
| epic_numbers = [ln.strip() for ln in f if ln.strip()] | |
| # Remove duplicates while preserving order | |
| unique_epics = list(dict.fromkeys(epic_numbers)) | |
| processed_epics = load_processed_epics() | |
| epics_to_process = [e for e in unique_epics if e not in processed_epics] | |
| logging.info(f"Loaded {len(unique_epics)} unique EPIC(s) from input file.") | |
| logging.info(f"Found {len(processed_epics)} already processed EPICs in success files.") | |
| logging.info(f"Starting process for {len(epics_to_process)} remaining EPICs.") | |
| if not epics_to_process: | |
| logging.info("All EPICs already processed successfully!") | |
| return | |
| # Process all EPICs sequentially | |
| logging.info(f"--- Starting Sequential Processing ---") | |
| logging.info(f"Processing: EPIC1->API1, EPIC2->API2, EPIC3->API3, EPIC4->API1, etc.") | |
| failed_epics = process_epics_sequentially(epics_to_process) | |
| # Write final failures to their own file | |
| if failed_epics: | |
| logging.info(f"Writing {len(failed_epics)} failed EPICs to {FAILED_OUTPUT_FILE}") | |
| with open(FAILED_OUTPUT_FILE, "w", encoding="utf-8") as ffail: | |
| for epic in failed_epics: | |
| ffail.write(f"{epic}\n") | |
| # --- FINAL SUMMARY --- | |
| logging.info("=" * 50) | |
| logging.info("Processing complete.") | |
| logging.info(f"Total Successful: {len(epics_to_process) - len(failed_epics)}") | |
| logging.info(f"Total Failed: {len(failed_epics)}") | |
| for config in API_CONFIGS: | |
| logging.info(f"Results for {config['name']}: {config['output_file']}") | |
| if failed_epics: | |
| logging.info(f"Persistently failed EPICs are in: {FAILED_OUTPUT_FILE}") | |
| logging.info("=" * 50) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| logging.info("\nProcess interrupted by user. Exiting.") | |
| sys.exit(0) |