Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import requests | |
| import json | |
| from io import StringIO | |
| from datetime import datetime | |
| from src.assets.text_content import REPO, BENCHMARK_FILE | |
| def get_github_data(): | |
| """ | |
| Read and process data from CSV files hosted on GitHub. - https://github.com/clembench/clembench-runs (REPO) | |
| Set the path in src/assets/text_content/REPO | |
| Returns: | |
| github_data (dict): Dictionary containing: | |
| - "text": List of DataFrames for each version's textual leaderboard data. | |
| - "multimodal": List of DataFrames for each version's multimodal leaderboard data. | |
| - "date": Formatted date of the latest version in "DD Month YYYY" format. | |
| """ | |
| json_url = REPO + BENCHMARK_FILE | |
| response = requests.get(json_url) | |
| # Check if the JSON file request was successful | |
| if response.status_code != 200: | |
| print(f"Failed to read JSON file - {BENCHMARK_FILE} in repo {REPO}: Status Code: {response.status_code}") | |
| return None, None, None, None | |
| json_data = response.json() | |
| versions = json_data['versions'] | |
| # Sort the versions in benchmark by latest first | |
| version_names = sorted( | |
| [ver['version'] for ver in versions], | |
| key=lambda v: list(map(int, v[1:].split('_')[0].split('.'))), | |
| reverse=True | |
| ) | |
| # Collect Dataframes - Text and Multimodal Only - Ignoring _quantized, _backends, _ascii | |
| text_data = { | |
| 'version_data': [], | |
| 'dataframes': [] | |
| } | |
| multimodal_data = { | |
| 'version_data': [], | |
| 'dataframes': [] | |
| } | |
| for version in version_names: | |
| results_url = f"{REPO}{version}/results.csv" | |
| csv_response = requests.get(results_url) | |
| if csv_response.status_code == 200: | |
| df = pd.read_csv(StringIO(csv_response.text)) | |
| df = process_df(df) | |
| df = df.sort_values(by=df.columns[1], ascending=False) # Sort by Clemscore | |
| version_data = { | |
| 'name': version, | |
| 'last_updated': [datetime.strptime(v['last_updated'], '%Y-%m-%d').strftime("%d %b %Y") for v in versions if v['version'] == version], | |
| 'release_date': [datetime.strptime(v['release_date'], '%Y-%m-%d').strftime("%d %b %Y") for v in versions if v['version'] == version] | |
| } | |
| if 'multimodal' in version: | |
| multimodal_data['dataframes'].append(df) | |
| multimodal_data['version_data'].append(version_data) | |
| else: | |
| text_data['dataframes'].append(df) | |
| text_data['version_data'].append(version_data) | |
| github_data = { | |
| 'text': text_data, | |
| 'multimodal': multimodal_data | |
| } | |
| return github_data | |
| def process_df(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Process dataframe: | |
| - Convert datatypes to sort by "float" instead of "str" | |
| - Remove repetition in model names | |
| - Update column names | |
| Args: | |
| df: Unprocessed Dataframe (after using update_cols) | |
| Returns: | |
| df: Processed Dataframe | |
| """ | |
| # Convert column values to float, apart from the model names column | |
| for col in df.columns[1:]: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Remove repetition in model names | |
| df[df.columns[0]] = df[df.columns[0]].str.replace(r'-t[0-1]\.\d+', '', regex=True) | |
| df[df.columns[0]] = df[df.columns[0]].apply(lambda x: '--'.join(set(x.split('--')))) | |
| # Rename the first column to 'Model' if it starts with 'Unnamed' | |
| if df.columns[0].startswith('Unnamed'): | |
| df.rename(columns={df.columns[0]: 'Model'}, inplace=True) | |
| # Define the desired column order | |
| desired_columns = ['Model', '-, clemscore', 'all, Average % Played', 'all, Average Quality Score'] | |
| # Ensure the DataFrame has all the desired columns | |
| existing_columns = [col for col in desired_columns if col in df.columns] | |
| # Reorder other DataFrame columns | |
| df = df[existing_columns + [col for col in df.columns if col not in existing_columns]] | |
| # Update column names | |
| custom_column_names = ['Model', 'Clemscore', '% Played', 'Quality Score'] | |
| for i, col in enumerate(df.columns[4:]): # Start Capitalizing from the 5th column | |
| parts = col.split(',') | |
| custom_name = f"{parts[0].strip().capitalize()} {parts[1].strip()}" | |
| custom_column_names.append(custom_name) | |
| # Rename columns | |
| df.columns = custom_column_names | |
| return df | |
| def query_search(df: pd.DataFrame, query: str) -> pd.DataFrame: | |
| """ | |
| Filter the dataframe based on the search query. | |
| Args: | |
| df (pd.DataFrame): Unfiltered dataframe. | |
| query (str): A string of queries separated by ";". | |
| Returns: | |
| pd.DataFrame: Filtered dataframe containing searched queries in the 'Model' column. | |
| """ | |
| if not query.strip(): # Reset Dataframe if empty query is passed | |
| return df | |
| queries = [q.strip().lower() for q in query.split(';') if q.strip()] # Normalize and split queries | |
| # Filter dataframe based on queries in 'Model' column | |
| filtered_df = df[df['Model'].str.lower().str.contains('|'.join(queries))] | |
| return filtered_df | |
| if __name__=='__main__': | |
| data = get_github_data() | |
| print(data['text']['version_data']) | |
| print(data['multimodal']['version_data']) | |