""" Space Keeper v4 - Keeps HuggingFace Spaces alive during hackathon evaluation """ import os import json from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path import threading import gradio as gr import requests from huggingface_hub import HfApi from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger # ============================================================================= # CONFIGURATION # ============================================================================= ORG_NAME = os.environ.get("ORG_NAME", "MCP-1st-Birthday") PING_INTERVAL_HOURS = int(os.environ.get("PING_INTERVAL_HOURS", "12")) REQUEST_TIMEOUT = int(os.environ.get("REQUEST_TIMEOUT", "30")) PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "10")) HF_TOKEN = os.environ.get("HF_TOKEN", None) LOG_FILE = Path("run_logs.json") MAX_LOG_ENTRIES = 100 # ============================================================================= # GLOBAL STATE # ============================================================================= scheduler = BackgroundScheduler() api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() APP_START_TIME = datetime.now(timezone.utc) log_lock = threading.Lock() # ============================================================================= # LOGGING FUNCTIONS # ============================================================================= def load_logs() -> list: with log_lock: if LOG_FILE.exists(): try: with open(LOG_FILE, "r") as f: return json.load(f) except Exception: return [] return [] def save_logs(logs: list): with log_lock: logs = logs[-MAX_LOG_ENTRIES:] with open(LOG_FILE, "w") as f: json.dump(logs, f, indent=2) # ============================================================================= # PING FUNCTIONS # ============================================================================= def ping_space(space_id: str) -> dict: """Ping a single Space and return the result.""" org, name = space_id.split("/") app_url = f"https://{org.lower()}-{name.lower()}.hf.space" hf_page_url = f"https://huggingface.co/spaces/{space_id}" try: response = requests.get(app_url, timeout=REQUEST_TIMEOUT) return { "space_id": space_id, "status": "success", "status_code": response.status_code, "url_pinged": app_url, "error": None } except requests.Timeout: return { "space_id": space_id, "status": "timeout", "status_code": None, "url_pinged": app_url, "error": f"Timed out after {REQUEST_TIMEOUT}s" } except Exception as e: try: response = requests.get(hf_page_url, timeout=REQUEST_TIMEOUT) return { "space_id": space_id, "status": "success", "status_code": response.status_code, "url_pinged": hf_page_url, "error": None } except Exception: return { "space_id": space_id, "status": "error", "status_code": None, "url_pinged": app_url, "error": str(e) } def scheduled_job(): """Wrapper for the scheduled job.""" print(f"[{datetime.now(timezone.utc).isoformat()}] Running scheduled ping job...") start_time = datetime.now(timezone.utc) try: spaces = list(api.list_spaces(author=ORG_NAME)) except Exception as e: run_result = { "timestamp": start_time.isoformat(), "triggered_by": "scheduler", "status": "error", "error": f"Failed to list Spaces: {str(e)}", "total_spaces": 0, "successful": 0, "failed": 0, "duration_seconds": 0, "private_spaces": [], "results": [] } logs = load_logs() logs.append(run_result) save_logs(logs) return # Separate private and public spaces private_space_names = [s.id.split("/")[1] for s in spaces if getattr(s, 'private', False)] space_ids = [space.id for space in spaces] results = [] with ThreadPoolExecutor(max_workers=PARALLEL_REQUESTS) as executor: future_to_space = {executor.submit(ping_space, sid): sid for sid in space_ids} for future in as_completed(future_to_space): results.append(future.result()) end_time = datetime.now(timezone.utc) duration = (end_time - start_time).total_seconds() successful = sum(1 for r in results if r["status"] == "success") failed = len(results) - successful run_result = { "timestamp": start_time.isoformat(), "triggered_by": "scheduler", "status": "completed", "error": None, "total_spaces": len(results), "successful": successful, "failed": failed, "duration_seconds": round(duration, 2), "private_spaces": private_space_names, "results": results } logs = load_logs() logs.append(run_result) save_logs(logs) print(f"[{end_time.isoformat()}] Completed: {successful}/{len(results)} successful in {duration:.1f}s") # ============================================================================= # UI HELPER FUNCTIONS # ============================================================================= def get_uptime() -> str: delta = datetime.now(timezone.utc) - APP_START_TIME hours, remainder = divmod(int(delta.total_seconds()), 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: return f"{hours}h {minutes}m" elif minutes > 0: return f"{minutes}m {seconds}s" return f"{seconds}s" def get_next_run_time() -> str: jobs = scheduler.get_jobs() if jobs and jobs[0].next_run_time: return jobs[0].next_run_time.strftime('%Y-%m-%d %H:%M:%S UTC') return "Not scheduled" def get_stats() -> dict: logs = load_logs() if not logs: return {"total_runs": 0, "last_run": "Never", "last_run_status": "N/A"} last_run = logs[-1] return { "total_runs": len(logs), "last_run": last_run["timestamp"], "last_run_status": "✅ Success" if last_run.get("failed", 0) == 0 else f"⚠️ {last_run.get('failed', 0)} failed", "last_run_spaces": last_run.get("total_spaces", 0) } def format_status_panel() -> str: stats = get_stats() token_status = "✅ Set (can see private Spaces)" if HF_TOKEN else "❌ Not set (only public Spaces visible)" return f"""## ⚙️ Configuration | Setting | Value | |---------|-------| | **Organization** | `{ORG_NAME}` | | **Ping Interval** | Every {PING_INTERVAL_HOURS} hours | | **Parallel Requests** | {PARALLEL_REQUESTS} at once | | **Request Timeout** | {REQUEST_TIMEOUT}s | | **HF Token** | {token_status} | ## 📊 Statistics | Metric | Value | |--------|-------| | **App Uptime** | {get_uptime()} | | **Total Runs** | {stats['total_runs']} | | **Last Run** | {stats['last_run']} | ## ⏰ Next Scheduled Run **{get_next_run_time()}** """ def format_logs_panel() -> str: logs = load_logs() if not logs: return "## 📜 Run History\n\n*No runs yet. Click **🚀 Run Now** to start!*" output = "## 📜 Run History (most recent first)\n\n" for run in reversed(logs[-15:]): ts = run["timestamp"] by = run["triggered_by"] if run["status"] == "error": output += f"❌ **{ts}** | {by} | Error: {run['error']}\n\n" else: emoji = "✅" if run["failed"] == 0 else "⚠️" private_count = len(run.get("private_spaces", [])) private_info = f" | 🔒 {private_count} private" if private_count > 0 else "" output += f"{emoji} **{ts}** | {by} | " output += f"**{run['total_spaces']}** spaces | " output += f"{run['successful']} ok, {run['failed']} failed{private_info} | " output += f"{run['duration_seconds']}s\n" if run["failed"] > 0: failed_names = [r["space_id"].split("/")[1] for r in run.get("results", []) if r["status"] != "success"][:5] output += f" ↳ Failed: `{'`, `'.join(failed_names)}`" if run["failed"] > 5: output += f" +{run['failed']-5} more" output += "\n" output += "\n" return output def format_private_spaces_accordion(private_spaces: list) -> str: """Format the private spaces list for the accordion.""" if not private_spaces: return "*No private Spaces found in this run.*" output = f"**{len(private_spaces)} private Spaces** were pinged:\n\n" # Create columns for better readability for i, name in enumerate(sorted(private_spaces)): output += f"- `{name}`\n" return output # ============================================================================= # MAIN RUN FUNCTION WITH LIVE PROGRESS # ============================================================================= def manual_trigger_with_progress(): """Manually trigger a ping run with live progress updates.""" yield ( "## 🔄 Starting...\n\nFetching Space list from HuggingFace API...", format_status_panel(), format_logs_panel(), "*Run in progress...*" ) start_time = datetime.now(timezone.utc) # Get spaces from API try: spaces = list(api.list_spaces(author=ORG_NAME)) except Exception as e: error_msg = f"## ❌ Failed\n\nCould not list Spaces: {str(e)}" run_result = { "timestamp": start_time.isoformat(), "triggered_by": "manual", "status": "error", "error": str(e), "total_spaces": 0, "successful": 0, "failed": 0, "duration_seconds": 0, "private_spaces": [], "results": [] } logs = load_logs() logs.append(run_result) save_logs(logs) yield error_msg, format_status_panel(), format_logs_panel(), "*Error during run*" return # Separate private and public spaces private_spaces = [] public_spaces = [] for s in spaces: name = s.id.split("/")[1] if getattr(s, 'private', False): private_spaces.append(name) else: public_spaces.append(name) space_ids = [space.id for space in spaces] total = len(space_ids) info_msg = f"## 🔄 Running...\n\n" info_msg += f"**Found {total} Spaces** ({len(public_spaces)} public, {len(private_spaces)} private)\n\n" info_msg += f"Pinging {PARALLEL_REQUESTS} Spaces in parallel...\n\n" info_msg += f"`[0/{total}]` ░░░░░░░░░░ 0%" yield info_msg, format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces) # Run pings results = [] completed = 0 successful = 0 failed = 0 with ThreadPoolExecutor(max_workers=PARALLEL_REQUESTS) as executor: future_to_space = {executor.submit(ping_space, sid): sid for sid in space_ids} for future in as_completed(future_to_space): result = future.result() results.append(result) completed += 1 if result["status"] == "success": successful += 1 else: failed += 1 pct = int((completed / total) * 100) filled = int(pct / 10) bar = "█" * filled + "░" * (10 - filled) status_emoji = "✅" if result["status"] == "success" else "❌" space_name = result["space_id"].split("/")[1] live_status = f"""## 🔄 Running... **Progress:** `[{completed}/{total}]` {bar} {pct}% **Last:** {status_emoji} `{space_name}` {'— ' + result.get('error', '') if result['status'] != 'success' else ''} **Running total:** ✅ {successful} successful, ❌ {failed} failed """ if completed % 5 == 0 or completed == total: yield live_status, format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces) # Save results end_time = datetime.now(timezone.utc) duration = (end_time - start_time).total_seconds() run_result = { "timestamp": start_time.isoformat(), "triggered_by": "manual", "status": "completed", "error": None, "total_spaces": total, "successful": successful, "failed": failed, "duration_seconds": round(duration, 2), "private_spaces": private_spaces, "results": results } logs = load_logs() logs.append(run_result) save_logs(logs) # Final summary if failed == 0: final = f"""## ✅ Completed Successfully! **Pinged {total} Spaces** ({len(public_spaces)} public, {len(private_spaces)} private) in {duration:.1f}s All Spaces responded! 🎉 """ else: failed_spaces = [r for r in results if r["status"] != "success"] failed_list = "\n".join([f"- `{r['space_id'].split('/')[1]}` — {r.get('error', 'Unknown error')}" for r in failed_spaces[:10]]) if len(failed_spaces) > 10: failed_list += f"\n- ... and {len(failed_spaces) - 10} more" final = f"""## ⚠️ Completed with {failed} issue(s) **Pinged {total} Spaces** ({len(public_spaces)} public, {len(private_spaces)} private) in {duration:.1f}s ✅ {successful} successful ❌ {failed} failed ### Failed Spaces: {failed_list} --- *Failures usually mean the Space is paused, has an error, or uses a non-standard SDK.* """ yield final, format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces) def refresh_all(): # Get private spaces from last run if available logs = load_logs() private_spaces = [] if logs: private_spaces = logs[-1].get("private_spaces", []) return format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces) # ============================================================================= # GRADIO UI # ============================================================================= with gr.Blocks() as demo: gr.Markdown("""# 🔄 Space Keeper Keeps HuggingFace Spaces alive by pinging them to prevent sleeping during hackathon evaluation. """) with gr.Row(): with gr.Column(scale=1): status_panel = gr.Markdown(format_status_panel()) with gr.Row(): run_btn = gr.Button("🚀 Run Now", variant="primary", size="lg") refresh_btn = gr.Button("🔄 Refresh", size="lg") with gr.Column(scale=1): live_output = gr.Markdown("## Ready\n\nClick **🚀 Run Now** to ping all Spaces.\n\nAuto-runs every 12 hours when this Space is awake.") gr.Markdown("---") with gr.Accordion("🔒 Private Spaces", open=False): private_spaces_display = gr.Markdown("*Run a ping to see private Spaces list.*") logs_panel = gr.Markdown(format_logs_panel()) run_btn.click( fn=manual_trigger_with_progress, outputs=[live_output, status_panel, logs_panel, private_spaces_display] ) refresh_btn.click( fn=refresh_all, outputs=[status_panel, logs_panel, private_spaces_display] ) # ============================================================================= # START SCHEDULER # ============================================================================= scheduler.add_job( scheduled_job, trigger=IntervalTrigger(hours=PING_INTERVAL_HOURS), id="ping_job", name="Ping all Spaces", replace_existing=True ) scheduler.start() print(f"[{APP_START_TIME.isoformat()}] Space Keeper started for {ORG_NAME}") print(f" Ping interval: {PING_INTERVAL_HOURS}h | Next run: {get_next_run_time()}") if __name__ == "__main__": demo.launch()