import os import json import tempfile from datetime import datetime, timezone from huggingface_hub import hf_hub_download, upload_file from huggingface_hub.utils import HfHubHTTPError # Constant for the dataset repository, configurable via environment variable DATASET_REPO = os.getenv("HF_DATASET_REPO", "Jofthomas/geoguessr_game_of_the_day") BLOCK_MULTIPLE_GAMES = os.getenv("Block_Multiple_games", "False") == "true" def get_todays_records_path() -> str: """Gets the path for today's game records file, e.g., 'records/2025-10-03.json'.""" date_str = datetime.now(timezone.utc).strftime('%Y-%m-%d') return f"records/{date_str}.json" def get_todays_games(token: str) -> list: """ Downloads and reads the game records for the current day from the HF Hub. Returns an empty list if the file for today doesn't exist yet. """ filepath = get_todays_records_path() try: # Use the provided token for read access local_path = hf_hub_download( repo_id=DATASET_REPO, filename=filepath, repo_type="dataset", token=token, ) with open(local_path, "r", encoding="utf-8") as f: return json.load(f) except HfHubHTTPError as e: if e.response.status_code == 404: return [] # No games played today yet, which is normal. else: print(f"Error downloading daily records: {e}") raise # Re-raise other HTTP errors except Exception as e: print(f"An unexpected error occurred while getting today's games: {e}") return [] def has_user_played_today(username: str, todays_games: list) -> bool: """Checks if a user has completed a game today.""" today_date = datetime.now(timezone.utc).date().isoformat() for game in todays_games: if game.get("username") == username and game.get("completed", False): # Additional check: ensure the game's timestamp is from today game_timestamp = game.get("timestamp", "") if game_timestamp: try: game_date = datetime.fromisoformat(game_timestamp.replace('Z', '+00:00')).date().isoformat() if game_date == today_date: return True except: # If we can't parse the timestamp, assume it's from today (since it's in todays_games) return True else: # If no timestamp, assume it's from today (since it's in todays_games) return True return False def get_user_game_today(username: str, todays_games: list, game_id: str = None) -> dict: """ Gets the user's game record for today, if it exists. If game_id is provided, returns the specific game with that ID. Otherwise, returns the first game found for the user. """ for game in todays_games: if game.get("username") == username: if game_id and game.get("game_id") == game_id: return game elif not game_id: return game return None def update_game_record(username: str, game_id: str, round_data: dict = None, final_score: float = None, final_ai_score: float = None): """ Updates or creates a game record for a user after each round. This ensures data is recorded incrementally and prevents abuse. Args: username: The player's username game_id: Unique identifier for this game session round_data: Single round details to append to the record final_score: Final total human score (only set when game is complete) final_ai_score: Final total AI score (only set when game is complete) """ write_token = os.getenv("HF_TOKEN", "") if not write_token: print("Warning: Server HF_TOKEN not set. Cannot record game data.") return try: # Fetch the latest records todays_games = get_todays_games(token=write_token) # Find existing game record for this user today with this specific game_id existing_game = get_user_game_today(username, todays_games, game_id) if existing_game: # Update existing record if round_data: if "rounds" not in existing_game: existing_game["rounds"] = [] existing_game["rounds"].append(round_data) if final_score is not None: existing_game["human_score"] = float(round(final_score)) existing_game["completed"] = True if final_ai_score is not None: existing_game["ai_score"] = float(round(final_ai_score)) else: # Create new game record game_record = { "username": username, "game_id": game_id, "human_score": 0.0, # Will be updated when game completes "ai_score": 0.0, # Will be updated when game completes "timestamp": datetime.now(timezone.utc).isoformat(), "rounds": [round_data] if round_data else [], "completed": False } todays_games.append(game_record) filepath_in_repo = get_todays_records_path() with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json", encoding="utf-8") as tmp_file: json.dump(todays_games, tmp_file, indent=2) tmp_file_path = tmp_file.name upload_file( path_or_fileobj=tmp_file_path, path_in_repo=filepath_in_repo, repo_id=DATASET_REPO, repo_type="dataset", token=write_token, commit_message=f"Update game for {username}" ) print(f"Successfully updated game record for {username}") except Exception as e: print(f"Error updating game record for {username}: {e}") finally: if 'tmp_file_path' in locals() and os.path.exists(tmp_file_path): os.remove(tmp_file_path) def record_game(username: str, score: float, rounds_data: list = None, ai_score: float = None, game_id: str = None): """ Legacy function - now just calls update_game_record with final score. Kept for backwards compatibility. """ if not game_id: import uuid game_id = str(uuid.uuid4()) update_game_record(username, game_id, final_score=score, final_ai_score=ai_score)