epic-id-chek / app.py
SarthakSidhant's picture
Update app.py
ed7355e verified
import os
import io
import sys
import json
import time
import base64
import hashlib
import sqlite3
import logging
import requests
from datetime import datetime
from typing import Optional, Tuple, Set, List, Dict
from PIL import Image
from tqdm import tqdm
from gradio_client import Client
from transformers import VisionEncoderDecoderModel, TrOCRProcessor
import torch
# =========================
# CONFIGURATION
# =========================
# --- Core Settings ---
API_CONFIGS = [
{
"url": "https://sarthaksidhant-epic-id-chek.hf.space",
"output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful.jsonl",
"name": "API-1"
},
{
"url": "https://sarthaksidhant-kjrkejrke.hf.space",
"output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful2.jsonl",
"name": "API-2"
},
{
"url": "https://sarthaksidhant-shugma.hf.space",
"output_file": "/home/sofiyan/Desktop/Sarthak/voter_details_successful3.jsonl",
"name": "API-3"
}
]
EPIC_INPUT_FILE = "/home/sofiyan/Desktop/Sarthak/main_file_all_epics.txt" # One EPIC per line
FAILED_OUTPUT_FILE = "/home/sofiyan/Desktop/Sarthak/voter_details_failed.txt" # For EPICs that failed all retries
# --- Processing Controls ---
MAX_ATTEMPTS_PER_EPIC = 10 # Maximum attempts per EPIC before giving up
INITIAL_DELAY_SEC = 1 # Initial delay between attempts
MAX_DELAY_SEC = 30 # Maximum delay between attempts
TIMEOUT_PER_ATTEMPT_SEC = 60 # Timeout for a single attempt
# --- Caching and Assets ---
CACHE_DB = "captcha_cache.db" # Cache DB for solved captcha images
CAPTCHA_SAVE_DIR = "/home/sofiyan/Desktop/Sarthak/captchas" # Directory to save captcha images
# =========================
# INITIALIZATION & LOGGING
# =========================
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
os.makedirs(CAPTCHA_SAVE_DIR, exist_ok=True)
# Initialize clients for each API
clients = []
for config in API_CONFIGS:
try:
client = Client(config["url"], httpx_kwargs={"timeout": 120.0})
clients.append({
"client": client,
"config": config
})
logging.info("Successfully connected to %s at %s", config["name"], config["url"])
except Exception as e:
logging.error("Could not connect to %s at %s: %s", config["name"], config["url"], e)
if not clients:
logging.error("No clients could be initialized. Exiting.")
sys.exit(1)
# Initialize thread-safe SQLite cache for solved captchas
conn = sqlite3.connect(CACHE_DB, check_same_thread=False)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS captcha_cache (
md5_hash TEXT PRIMARY KEY,
captcha_text TEXT
)
""")
conn.commit()
def cache_get(md5_hash: str) -> Optional[str]:
c.execute("SELECT captcha_text FROM captcha_cache WHERE md5_hash = ?", (md5_hash,))
row = c.fetchone()
return row[0] if row else None
def cache_put(md5_hash: str, captcha_text: str) -> None:
c.execute("INSERT OR REPLACE INTO captcha_cache(md5_hash, captcha_text) VALUES (?,?)", (md5_hash, captcha_text))
conn.commit()
# Initialize 3 separate OCR models (one per API)
ocr_models = []
try:
logging.info("Loading 3 OCR captcha solver instances (anuashok/ocr-captcha-v3)...")
for i in range(3):
processor = TrOCRProcessor.from_pretrained("anuashok/ocr-captcha-v3", use_fast=True)
model = VisionEncoderDecoderModel.from_pretrained("anuashok/ocr-captcha-v3")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
ocr_models.append({
"processor": processor,
"model": model,
"device": device
})
logging.info("OCR model %d ready on %s", i+1, device)
except Exception as e:
logging.error("Failed to load OCR models. Please check transformers installation and model name. Error: %s", e)
sys.exit(1)
# =========================
# PROCESSED EPICS TRACKING
# =========================
def load_processed_epics() -> Set[str]:
"""Load already processed EPICs from all success output files to avoid re-work."""
processed = set()
for config in API_CONFIGS:
output_file = config["output_file"]
if os.path.exists(output_file):
try:
with open(output_file, "r", encoding="utf-8") as f:
for line in f:
try:
data = json.loads(line.strip())
if "epic" in data and data.get("success"):
processed.add(data["epic"])
except json.JSONDecodeError:
continue
except Exception as e:
logging.warning("Could not load processed EPICs from %s: %s", output_file, e)
return processed
# =========================
# CORE API & CAPTCHA FUNCTIONS
# =========================
def md5_of_image_bytes(img: Image.Image) -> str:
"""Calculates the MD5 hash of a PIL Image."""
buf = io.BytesIO()
img.save(buf, format="PNG")
return hashlib.md5(buf.getvalue()).hexdigest()
def validate_captcha(captcha_text: str) -> bool:
"""Validate that captcha is exactly 6 alphanumeric characters."""
return len(captcha_text) == 6 and captcha_text.isalnum()
def get_captcha_via_api(client_info: dict) -> Tuple[Image.Image, str]:
"""
Calls the API-friendly captcha endpoint to get a new captcha.
Uses /api_get_captcha which returns base64 encoded image.
"""
try:
logging.info(f"Getting captcha from {client_info['config']['name']} using /api_get_captcha")
result = client_info["client"].predict(api_name="/api_get_captcha")
# The /api_get_captcha returns a JSON response
if isinstance(result, dict):
if not result.get("success"):
raise ValueError(f"API returned failure: {result}")
captcha_id = result.get("captcha_id")
captcha_base64 = result.get("captcha_base64")
if not captcha_id or not captcha_base64:
raise ValueError(f"Missing captcha_id or captcha_base64 in response: {result}")
# Decode base64 image
img_bytes = base64.b64decode(captcha_base64)
img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
# Save image locally as backup
timestamp = int(time.time())
save_path = os.path.join(CAPTCHA_SAVE_DIR, f"captcha_{client_info['config']['name']}_{captcha_id}_{timestamp}.png")
img.save(save_path)
logging.info(f"Captcha received: ID={captcha_id}, saved to {save_path}")
return img, captcha_id
else:
raise ValueError(f"Unexpected response format from /api_get_captcha: {result}")
except Exception as e:
logging.error(f"Failed to get captcha from {client_info['config']['name']}: {e}")
raise
# =========================
# IMAGE PREPROCESSING FOR OCR
# =========================
def preprocess_image_for_ocr(img: Image.Image) -> Image.Image:
"""Preprocess image for better OCR accuracy."""
try:
# Convert to RGB if not already
if img.mode != 'RGB':
img = img.convert('RGB')
# Resize to a standard size for OCR
img = img.resize((200, 80), Image.Resampling.LANCZOS)
# Enhance contrast
from PIL import ImageEnhance
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(1.5) # Increase contrast by 50%
# Enhance sharpness
enhancer = ImageEnhance.Sharpness(img)
img = enhancer.enhance(1.2) # Increase sharpness by 20%
return img
except Exception as e:
logging.warning(f"Image preprocessing failed: {e}, using original image")
return img
def solve_captcha_with_model(img: Image.Image, ocr_model_index: int) -> str:
"""Uses a specific OCR model to solve the captcha, with caching and validation."""
md5_key = md5_of_image_bytes(img)
cached = cache_get(md5_key)
if cached and validate_captcha(cached):
logging.info(f"[CACHE] Using cached captcha text: {cached}")
return cached
# Get the specific OCR model
ocr_model = ocr_models[ocr_model_index]
try:
# Preprocess image for better OCR
processed_img = preprocess_image_for_ocr(img)
# Save processed image for debugging
debug_filename = f"debug_captcha_{ocr_model_index}_{int(time.time())}.png"
debug_path = os.path.join(CAPTCHA_SAVE_DIR, debug_filename)
processed_img.save(debug_path)
# Run OCR
pixel_values = ocr_model["processor"](processed_img, return_tensors="pt").pixel_values.to(ocr_model["device"])
generated_ids = ocr_model["model"].generate(pixel_values, num_beams=4, max_length=12)
generated_text = ocr_model["processor"].batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
# Validate the generated text
if not validate_captcha(generated_text):
raise ValueError(f"OCR produced invalid text: '{generated_text}' (not 6 alphanumeric chars)")
logging.info(f"[OCR-{ocr_model_index}] Predicted captcha: {generated_text}")
cache_put(md5_key, generated_text)
return generated_text
except Exception as e:
logging.error(f"[OCR-{ocr_model_index}-ERROR] {e}")
raise
def submit_search(client_info: dict, epic_number: str, captcha_text: str, captcha_id: str):
"""
Submits the EPIC and solved captcha to the API using /api_search_epic endpoint.
"""
try:
logging.info(f"Submitting search for EPIC {epic_number} to {client_info['config']['name']} with captcha: {captcha_text}")
# Use the API-friendly search endpoint
result = client_info["client"].predict(
epic_number=epic_number,
captcha_data=captcha_text,
captcha_id=captcha_id,
api_name="/api_search_epic"
)
logging.info(f"Received response from {client_info['config']['name']} for EPIC {epic_number}")
# The /api_search_epic returns a single JSON response
if isinstance(result, dict):
return result
else:
return {"success": False, "error": f"Unexpected response format: {result}"}
except Exception as e:
logging.error(f"Failed to submit search to {client_info['config']['name']}: {e}")
raise
def write_success_result(client_info: dict, result: dict):
"""Write successful result to the appropriate output file."""
output_file = client_info["config"]["output_file"]
with open(output_file, "a", encoding="utf-8") as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
f.flush() # Ensure immediate write to disk
# =========================
# SINGLE ATTEMPT PIPELINE
# =========================
def run_single_attempt(client_info: dict, epic_number: str, ocr_model_index: int, attempt_num: int) -> Dict:
"""
Executes one full attempt to fetch details for a single EPIC.
Returns a dictionary with the outcome.
"""
result = {"epic": epic_number, "success": False, "captcha_incorrect": False, "error": None, "data": None, "api": client_info["config"]["name"]}
try:
# 1. Get captcha using API-friendly endpoint
img, captcha_id = get_captcha_via_api(client_info)
# 2. Solve captcha with specific OCR model
captcha_text = solve_captcha_with_model(img, ocr_model_index)
# 3. Submit search using API-friendly endpoint
search_result = submit_search(client_info, epic_number, captcha_text, captcha_id)
# 4. Process result
if isinstance(search_result, dict):
result["data"] = search_result
if search_result.get("success"):
result["success"] = True
logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - SUCCESS")
else:
result["error"] = search_result.get("error", "Unknown error from API")
result["captcha_incorrect"] = bool(search_result.get("captcha_incorrect"))
if result["captcha_incorrect"]:
logging.warning(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED (Incorrect Captcha)")
else:
logging.warning(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED ({result['error']})")
else:
result["error"] = "Unexpected response format from API"
result["data"] = {"raw": search_result}
logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED ({result['error']})")
except Exception as e:
result["error"] = f"{type(e).__name__}: {e}"
logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Attempt {attempt_num} - FAILED with exception: {e}")
return result
# =========================
# SEQUENTIAL PROCESSING WORKFLOW
# =========================
def process_single_epic_with_api(client_info: dict, epic_number: str, ocr_model_index: int) -> Dict:
"""
Processes a single EPIC with a specific API and OCR model with retry logic.
"""
delay = INITIAL_DELAY_SEC
for attempt in range(1, MAX_ATTEMPTS_PER_EPIC + 1):
try:
result = run_single_attempt(client_info, epic_number, ocr_model_index, attempt)
if result["success"]:
# Write result immediately
write_success_result(client_info, result)
logging.info(f"SUCCESS: EPIC {epic_number} completed via {client_info['config']['name']} (OCR-{ocr_model_index})")
return result
# If captcha was incorrect, retry immediately (no delay)
if result["captcha_incorrect"]:
logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Captcha incorrect, retrying immediately... (Attempt {attempt + 1})")
continue
# For other errors, use exponential backoff
if attempt < MAX_ATTEMPTS_PER_EPIC:
logging.info(f"EPIC {epic_number} ({client_info['config']['name']}): Waiting {delay}s before retry {attempt + 1}")
time.sleep(delay)
delay = min(delay * 2, MAX_DELAY_SEC) # Exponential backoff
except Exception as e:
logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): Unhandled exception in attempt {attempt}: {e}")
if attempt < MAX_ATTEMPTS_PER_EPIC:
time.sleep(delay)
delay = min(delay * 2, MAX_DELAY_SEC)
# If all attempts fail
logging.error(f"EPIC {epic_number} ({client_info['config']['name']}): FAILED after {MAX_ATTEMPTS_PER_EPIC} attempts")
return {"epic": epic_number, "success": False, "error": f"Failed all {MAX_ATTEMPTS_PER_EPIC} attempts", "api": client_info['config']['name']}
def process_epics_sequentially(epics_to_process: List[str]):
"""
Process EPICs sequentially - one EPIC at a time, rotating through APIs.
EPIC1 -> API1, EPIC2 -> API2, EPIC3 -> API3, EPIC4 -> API1, etc.
"""
logging.info(f"Processing {len(epics_to_process)} EPICs sequentially with 3 APIs and 3 OCR models")
failed_epics = []
successful_count = 0
# Create progress bar
pbar = tqdm(total=len(epics_to_process), desc="Processing EPICs", unit="epic",
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]')
try:
for i, epic_number in enumerate(epics_to_process):
# Select API and OCR model based on round-robin
api_index = i % len(clients)
client_info = clients[api_index]
ocr_model_index = api_index # Each API uses its own OCR model
logging.info(f"[{i+1}/{len(epics_to_process)}] Processing EPIC {epic_number} with {client_info['config']['name']} (OCR-{ocr_model_index})")
# Process single EPIC
result = process_single_epic_with_api(client_info, epic_number, ocr_model_index)
if result["success"]:
successful_count += 1
pbar.set_description(f"Success: {successful_count}, Failed: {len(failed_epics)}")
else:
failed_epics.append(epic_number)
pbar.set_description(f"Success: {successful_count}, Failed: {len(failed_epics)}")
pbar.update(1)
# Small delay between EPICs to be polite to APIs
time.sleep(0.5)
finally:
pbar.close()
return failed_epics
def main():
"""Main function to orchestrate the entire process."""
if not os.path.exists(EPIC_INPUT_FILE):
logging.critical(f"Input file not found: {EPIC_INPUT_FILE}")
logging.critical("Create a text file with one EPIC per line.")
sys.exit(1)
# Create output files if they don't exist
for config in API_CONFIGS:
output_file = config["output_file"]
if not os.path.exists(output_file):
with open(output_file, "w", encoding="utf-8") as f:
pass # Create empty file
logging.info(f"Created output file: {output_file}")
with open(EPIC_INPUT_FILE, "r", encoding="utf-8") as f:
epic_numbers = [ln.strip() for ln in f if ln.strip()]
# Remove duplicates while preserving order
unique_epics = list(dict.fromkeys(epic_numbers))
processed_epics = load_processed_epics()
epics_to_process = [e for e in unique_epics if e not in processed_epics]
logging.info(f"Loaded {len(unique_epics)} unique EPIC(s) from input file.")
logging.info(f"Found {len(processed_epics)} already processed EPICs in success files.")
logging.info(f"Starting process for {len(epics_to_process)} remaining EPICs.")
if not epics_to_process:
logging.info("All EPICs already processed successfully!")
return
# Process all EPICs sequentially
logging.info(f"--- Starting Sequential Processing ---")
logging.info(f"Processing: EPIC1->API1, EPIC2->API2, EPIC3->API3, EPIC4->API1, etc.")
failed_epics = process_epics_sequentially(epics_to_process)
# Write final failures to their own file
if failed_epics:
logging.info(f"Writing {len(failed_epics)} failed EPICs to {FAILED_OUTPUT_FILE}")
with open(FAILED_OUTPUT_FILE, "w", encoding="utf-8") as ffail:
for epic in failed_epics:
ffail.write(f"{epic}\n")
# --- FINAL SUMMARY ---
logging.info("=" * 50)
logging.info("Processing complete.")
logging.info(f"Total Successful: {len(epics_to_process) - len(failed_epics)}")
logging.info(f"Total Failed: {len(failed_epics)}")
for config in API_CONFIGS:
logging.info(f"Results for {config['name']}: {config['output_file']}")
if failed_epics:
logging.info(f"Persistently failed EPICs are in: {FAILED_OUTPUT_FILE}")
logging.info("=" * 50)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("\nProcess interrupted by user. Exiting.")
sys.exit(0)