import os import re import json import base64 import gradio as gr import random from gradio_leaderboard import Leaderboard, ColumnFilter, SelectColumns import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import snapshot_download, login from collections import defaultdict from datasets import Dataset, DatasetDict from datasets import load_dataset from huggingface_hub import HfApi from src.about import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, EVALUATION_QUEUE_TEXT, INTRODUCTION_TEXT, LLM_BENCHMARKS_TEXT, TITLE, ) from src.display.css_html_js import custom_css from src.display.utils import ( BENCHMARK_COLS, COLS, EVAL_COLS, EVAL_TYPES, AutoEvalColumn, ModelType, fields, WeightType, Precision ) from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, QUEUE_REPO, REPO_ID, RESULTS_REPO, TOKEN from src.populate import get_evaluation_queue_df, get_leaderboard_df from src.submission.submit import add_new_eval HF_DATASET_REPO = "JunJiaGuo/Vid_result" HF_TOKEN = os.getenv("HF_TOKEN") login(HF_TOKEN) def restart_space(): API.restart_space(repo_id=REPO_ID) ### Space initialisation try: print(EVAL_REQUESTS_PATH) snapshot_download( repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() try: print(EVAL_RESULTS_PATH) snapshot_download( repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN ) except Exception: restart_space() LEADERBOARD_DF = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) ( finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df, ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) def init_leaderboard(dataframe): if dataframe is None or dataframe.empty: raise ValueError("Leaderboard DataFrame is empty or None.") return Leaderboard( value=dataframe, datatype=[c.type for c in fields(AutoEvalColumn)], select_columns=SelectColumns( default_selection=[c.name for c in fields(AutoEvalColumn) if c.displayed_by_default], cant_deselect=[c.name for c in fields(AutoEvalColumn) if c.never_hidden], label="Select Columns to Display:", ), search_columns=[AutoEvalColumn.model.name, AutoEvalColumn.license.name], hide_columns=[c.name for c in fields(AutoEvalColumn) if c.hidden], filter_columns=[ ColumnFilter(AutoEvalColumn.model_type.name, type="checkboxgroup", label="Model types"), ColumnFilter(AutoEvalColumn.precision.name, type="checkboxgroup", label="Precision"), ColumnFilter( AutoEvalColumn.params.name, type="slider", min=0.01, max=150, label="Select the number of parameters (B)", ), ColumnFilter( AutoEvalColumn.still_on_hub.name, type="boolean", label="Deleted/incomplete", default=True ), ], bool_checkboxgroup_label="Hide models", interactive=False, ) current_dir = os.path.dirname(os.path.abspath(__file__)) print(current_dir) CSV_FILE = os.path.join(current_dir, "acc.csv") CLASS_LIST = [ "script_matching", "plot_ordering", "background_perception", "scene_counting", "lighting_perception", "character_counting", "action_perception", "CMP_perception", "emotion_perception", "art_style", "special_effect", "cut_counting", "camera_movement", "camera_angle", "shot_size", "Narrative", "Scene", "Character", "Making", "Cinematography" ] CATEGORY_MAPPING = { "Narrative": ["script_matching", "plot_ordering"], "Scene": ["background_perception", "scene_counting", "lighting_perception"], "Character": ["character_counting", "action_perception", "CMP_perception", "emotion_perception"], "Making": ["art_style", "special_effect", "cut_counting"], "Cinematography": ["camera_movement", "camera_angle", "shot_size"] } def load_id_answer_mapping(): id_answer_mapping = os.getenv("ID_ANSWER_MAPPING") if not id_answer_mapping: raise ValueError("ID_ANSWER_MAPPING secret not found!") # print(id_answer_mapping) # print(type(id_answer_mapping)) return json.loads(id_answer_mapping) def answer_matching(text): if isinstance(text, list): text = text[0] if text else random.choice(['A', 'B', 'C', 'D']) if not isinstance(text, str): return random.choice(['A', 'B', 'C', 'D']) patterns = [ r'\((A|B|C|D)\)', r'^(A|B|C|D)[\s\W]*', r'\b[A-D]\b', r'\((a|b|c|d)\)', r'\b(A|B|C|D)\.', ] for pattern in patterns: match = re.search(pattern, text) if match: return match.group(1).upper() letters = re.findall(r'[a-zA-Z]', text) return letters[0].upper() if len(letters) == 1 else random.choice(['A', 'B', 'C', 'D']) def evaluate_uploaded_json( user_file: str, model_name: str, multi_choice_file: str = "multi_choice.json", ): print(f"Model Name: {model_name}") print(f"Uploaded File: {user_file}") id_answer_mapping = load_id_answer_mapping() with open(multi_choice_file, "r", encoding="utf-8") as f: mc_data = json.load(f) id_class_mapping = {q["id"]: q["class"] for q in mc_data} with open(user_file, "r", encoding="utf-8") as f: user_data = json.load(f) correct = 0 total = 0 class_correct = defaultdict(int) class_total = defaultdict(int) for item in user_data: question_id = item["id"] raw_user_answer = item.get("model_answer", "") user_answer = answer_matching(raw_user_answer) question_class = id_class_mapping.get(question_id, "Unknown") class_total[question_class] += 1 total += 1 if id_answer_mapping.get(question_id) == user_answer: class_correct[question_class] += 1 correct += 1 subclass_data = [] subclass_result = {} for cls in CLASS_LIST[:-5]: acc = class_correct[cls] / class_total[cls] if class_total[cls] > 0 else 0 subclass_data.append({ "Subclass": cls, "Accuracy": f"{acc:.2%}", "Correct/Total": f"{class_correct[cls]}/{class_total[cls]}" }) subclass_result[cls] = acc category_data = [] for category, sub_classes in CATEGORY_MAPPING.items(): cat_correct = sum(class_correct.get(sub_cls, 0) for sub_cls in sub_classes) cat_total = sum(class_total.get(sub_cls, 0) for sub_cls in sub_classes) acc = cat_correct / cat_total if cat_total > 0 else 0 category_data.append({ "Category": category, "Accuracy": f"{acc:.2%}", "Correct/Total": f"{cat_correct}/{cat_total}" }) subclass_result[category] = acc overall_accuracy = f"{correct / total:.2%} ({correct}/{total} correct)" subclass_df = pd.DataFrame(subclass_data) category_df = pd.DataFrame(category_data) save_class_accuracy_to_hf_dataset(model_name, subclass_result) return overall_accuracy, category_df, subclass_df def save_class_accuracy_to_hf_dataset(model_name, class_accuracy): new_data = {"Model Name": model_name} for cls in CLASS_LIST: new_data[cls] = class_accuracy.get(cls, 0) new_df = pd.DataFrame([new_data]) try: dataset = load_dataset(HF_DATASET_REPO, split="train") existing_df = dataset.to_pandas() print(existing_df) updated_df = pd.concat([existing_df, new_df], ignore_index=True) except: updated_df = new_df updated_dataset = Dataset.from_pandas(updated_df) updated_dataset.push_to_hub(HF_DATASET_REPO, split="train", token=HF_TOKEN) demo = gr.Blocks(css=custom_css) with demo: gr.HTML('