import plotly.graph_objects as go import numpy as np import pandas as pd import logging from typing import Optional import base64 import html logger = logging.getLogger(__name__) INFORMAL_TO_FORMAL_NAME_MAP = { # Short Names "lit": "Literature Understanding", "data": "Data Analysis", "code": "Code Execution", "discovery": "Discovery", # Validation Names "arxivdigestables_validation": "Arxivdigestables Validation", "sqa_dev": "Sqa Dev", "litqa2_validation": "Litqa2 Validation", "paper_finder_validation": "Paper Finder Validation", "paper_finder_litqa2_validation": "Paper Finder Litqa2 Validation", "discoverybench_validation": "Discoverybench Validation", "core_bench_validation": "Core Bench Validation", "ds1000_validation": "DS1000 Validation", "e2e_discovery_validation": "E2E Discovery Validation", "e2e_discovery_hard_validation": "E2E Discovery Hard Validation", "super_validation": "Super Validation", # Test Names "paper_finder_test": "Paper Finder Test", "paper_finder_litqa2_test": "Paper Finder Litqa2 Test", "sqa_test": "Sqa Test", "arxivdigestables_test": "Arxivdigestables Test", "litqa2_test": "Litqa2 Test", "discoverybench_test": "Discoverybench Test", "core_bench_test": "Core Bench Test", "ds1000_test": "DS1000 Test", "e2e_discovery_test": "E2E Discovery Test", "e2e_discovery_hard_test": "E2E Discovery Hard Test", "super_test": "Super Test", } ### 2. The Updated Helper Functions ### def _safe_round(value, digits=2): """Rounds a number if it's a valid float/int, otherwise returns it as is.""" return round(value, digits) if isinstance(value, (float, int)) and pd.notna(value) else value def _pretty_column_name(raw_col: str) -> str: """ Takes a raw column name from the DataFrame and returns a "pretty" version. Handles three cases: 1. Fixed names (e.g., 'User/organization' -> 'Submitter'). 2. Dynamic names (e.g., 'ds1000_validation score' -> 'DS1000 Validation Score'). 3. Fallback for any other names. """ # Case 1: Handle fixed, special-case mappings first. fixed_mappings = { 'id': 'id', 'Agent': 'Agent', 'Agent description': 'Agent Description', 'User/organization': 'Submitter', 'Submission date': 'Date', 'Overall': 'Overall Score', 'Overall cost': 'Overall Cost', 'Logs': 'Logs', 'Openness': 'Openness', 'Agent tooling': 'Agent Tooling', 'LLM base': 'LLM Base', } if raw_col in fixed_mappings: return fixed_mappings[raw_col] # Case 2: Handle dynamic names by finding the longest matching base name. # We sort by length (desc) to match 'core_bench_validation' before 'core_bench'. sorted_base_names = sorted(INFORMAL_TO_FORMAL_NAME_MAP.keys(), key=len, reverse=True) for base_name in sorted_base_names: if raw_col.startswith(base_name): formal_name = INFORMAL_TO_FORMAL_NAME_MAP[base_name] # Get the metric part (e.g., ' score' or ' cost 95% CI') metric_part = raw_col[len(base_name):].strip() # Capitalize the metric part correctly (e.g., 'score' -> 'Score') pretty_metric = metric_part.capitalize() return f"{formal_name} {pretty_metric}" # Case 3: If no specific rule applies, just make it title case. return raw_col.title() def create_pretty_tag_map(raw_tag_map: dict, name_map: dict) -> dict: """ Converts a tag map with raw names into a tag map with pretty, formal names. Args: raw_tag_map: The map with raw keys and values (e.g., {'lit': ['litqa2_validation']}). name_map: The INFORMAL_TO_FORMAL_NAME_MAP used for translation. Returns: A new dictionary with pretty names (e.g., {'Literature Understanding': ['Litqa2 Validation']}). """ pretty_map = {} # A reverse map to find raw keys from formal names if needed, though not used here # This is just for understanding; the main logic uses the forward map. # Helper to get pretty name with a fallback def get_pretty(raw_name): return name_map.get(raw_name, raw_name.replace("_", " ").title()) for raw_key, raw_value_list in raw_tag_map.items(): pretty_key = get_pretty(raw_key) pretty_value_list = [get_pretty(raw_val) for raw_val in raw_value_list] pretty_map[pretty_key] = sorted(list(set(pretty_value_list))) return pretty_map def transform_raw_dataframe(raw_df: pd.DataFrame) -> pd.DataFrame: """ Transforms a raw leaderboard DataFrame into a presentation-ready format. This function performs two main actions: 1. Rounds all numeric metric values (columns containing 'score' or 'cost'). 2. Renames all columns to a "pretty", human-readable format. Args: raw_df (pd.DataFrame): The DataFrame with raw data and column names like 'agent_name', 'overall/score', 'tag/code/cost'. Returns: pd.DataFrame: A new DataFrame ready for display. """ if not isinstance(raw_df, pd.DataFrame): raise TypeError("Input 'raw_df' must be a pandas DataFrame.") df = raw_df.copy() # Create the mapping for pretty column names pretty_cols_map = {col: _pretty_column_name(col) for col in df.columns} # Rename the columns and return the new DataFrame transformed_df = df.rename(columns=pretty_cols_map) # Apply safe rounding to all metric columns for col in transformed_df.columns: if 'Score' in col or 'Cost' in col: transformed_df[col] = transformed_df[col].apply(_safe_round) logger.info("Raw DataFrame transformed: numbers rounded and columns renamed.") return transformed_df class DataTransformer: """ Visualizes a pre-processed leaderboard DataFrame. This class takes a "pretty" DataFrame and a tag map, and provides methods to view filtered versions of the data and generate plots. """ def __init__(self, dataframe: pd.DataFrame, tag_map: dict[str, list[str]]): """ Initializes the viewer. Args: dataframe (pd.DataFrame): The presentation-ready leaderboard data. tag_map (dict): A map of formal tag names to formal task names. """ if not isinstance(dataframe, pd.DataFrame): raise TypeError("Input 'dataframe' must be a pandas DataFrame.") if not isinstance(tag_map, dict): raise TypeError("Input 'tag_map' must be a dictionary.") self.data = dataframe self.tag_map = tag_map logger.info(f"DataTransformer initialized with a DataFrame of shape {self.data.shape}.") def view( self, tag: Optional[str] = "Overall", # Default to "Overall" for clarity use_plotly: bool = False, ) -> tuple[pd.DataFrame, dict[str, go.Figure]]: """ Generates a filtered view of the DataFrame and a corresponding scatter plot. """ if self.data.empty: logger.warning("No data available to view.") return self.data, {} # --- 1. Determine Primary and Group Metrics Based on the Tag --- if tag is None or tag == "Overall": primary_metric = "Overall" group_metrics = list(self.tag_map.keys()) else: primary_metric = tag # For a specific tag, the group is its list of sub-tasks. group_metrics = self.tag_map.get(tag, []) # --- 2. Sort the DataFrame by the Primary Score --- primary_score_col = f"{primary_metric} Score" df_sorted = self.data if primary_score_col in self.data.columns: df_sorted = self.data.sort_values(primary_score_col, ascending=False, na_position='last') df_view = df_sorted.copy() # 3. Combine "Agent" and "Submitter" into a single HTML-formatted column # We do this *before* defining the final column list. if 'Agent' in df_view.columns and 'Submitter' in df_view.columns: #preserve just agent name for scatterplot hover df_view['agent_for_hover'] = df_view['Agent'] def combine_agent_submitter(row): agent = row['Agent'] submitter = row['Submitter'] # Check if submitter exists and is not empty if pd.notna(submitter) and submitter.strip() != '': # Create a two-line HTML string with styled submitter text return ( f"
{agent}
" f"{submitter}" f"
" ) else: # If no submitter, just return the agent name return agent # Apply the function to create the new combined 'Agent' column df_view['Agent'] = df_view.apply(combine_agent_submitter, axis=1) # The 'Submitter' column is no longer needed df_view = df_view.drop(columns=['Submitter']) # 4. Build the List of Columns to Display (now simplified) base_cols = ["id","Agent","LLM Base", "agent_for_hover"] new_cols = ["Openness", "Agent Tooling"] ending_cols = ["Logs"] metrics_to_display = [primary_score_col, f"{primary_metric} Cost"] for item in group_metrics: metrics_to_display.append(f"{item} Score") metrics_to_display.append(f"{item} Cost") final_cols_ordered = new_cols + base_cols + list(dict.fromkeys(metrics_to_display)) + ending_cols for col in final_cols_ordered: if col not in df_view.columns: df_view[col] = pd.NA # The final selection will now use the new column structure df_view = df_view[final_cols_ordered].reset_index(drop=True) cols = len(final_cols_ordered) # Calculated and add "Categories Attempted" column if primary_metric == "Overall": def calculate_attempted(row): main_categories = ['Literature Understanding', 'Data Analysis', 'Code Execution', 'Discovery'] count = sum(1 for category in main_categories if pd.notna(row.get(f"{category} Cost"))) # Return the formatted string with the correct emoji if count == 4: return f"4/4" if count == 0: return f"0/4" return f"{count}/4" # Apply the function row-wise to create the new column attempted_column = df_view.apply(calculate_attempted, axis=1) # Insert the new column at a nice position (e.g., after "Date") df_view.insert((cols - 1), "Categories Attempted", attempted_column) else: total_benchmarks = len(group_metrics) def calculate_benchmarks_attempted(row): # Count how many benchmarks in this category have COST data reported count = sum(1 for benchmark in group_metrics if pd.notna(row.get(f"{benchmark} Cost"))) if count == total_benchmarks: return f"{count}/{total_benchmarks} " elif count == 0: return f"{count}/{total_benchmarks} " else: return f"{count}/{total_benchmarks}" # Insert the new column, for example, after "Date" df_view.insert((cols - 1), "Benchmarks Attempted", df_view.apply(calculate_benchmarks_attempted, axis=1)) # --- 4. Generate the Scatter Plot for the Primary Metric --- plots: dict[str, go.Figure] = {} if use_plotly: primary_cost_col = f"{primary_metric} Cost" # Check if the primary score and cost columns exist in the FINAL view if primary_score_col in df_view.columns and primary_cost_col in df_view.columns: fig = _plot_scatter_plotly( data=df_view, x=primary_cost_col, y=primary_score_col, agent_col="agent_for_hover" ) # Use a consistent key for easy retrieval later plots['scatter_plot'] = fig else: logger.warning( f"Skipping plot for '{primary_metric}': score column '{primary_score_col}' " f"or cost column '{primary_cost_col}' not found." ) # Add an empty figure to avoid downstream errors plots['scatter_plot'] = go.Figure() return df_view, plots DEFAULT_Y_COLUMN = "Overall Score" DUMMY_X_VALUE_FOR_MISSING_COSTS = 0 def _plot_scatter_plotly( data: pd.DataFrame, x: Optional[str], y: str, agent_col: str = 'agent_for_hover' ) -> go.Figure: # --- Section 1: Define Mappings --- color_map = { "Closed": "red", "API Available": "orange", "Open Source": "green", "Open Source + Open Weights": "blue" } category_order = list(color_map.keys()) shape_map = { "Standard": "star", "Custom with Standard Search": "diamond", "Fully Custom": "circle" } default_shape = 'square' x_col_to_use = x y_col_to_use = y required_cols = [y_col_to_use, agent_col, "Openness", "Agent Tooling"] if not all(col in data.columns for col in required_cols): logger.error(f"Missing one or more required columns for plotting: {required_cols}") return go.Figure() data_plot = data.copy() data_plot[y_col_to_use] = pd.to_numeric(data_plot[y_col_to_use], errors='coerce') x_axis_label = f"{x} per task (USD)" if x else "Cost (Data N/A)" x_data_is_valid = False if x and x in data_plot.columns: try: data_plot[x_col_to_use] = pd.to_numeric(data_plot[x_col_to_use], errors='coerce') if data_plot[x_col_to_use].notna().any(): x_data_is_valid = True except Exception as e: logger.warning(f"Error converting x-column '{x_col_to_use}' to numeric: {e}") if not x_data_is_valid: dummy_x_col_name = "__dummy_x_for_plotting__" data_plot[dummy_x_col_name] = DUMMY_X_VALUE_FOR_MISSING_COSTS x_col_to_use = dummy_x_col_name logger.info("Using dummy x-values for plotting.") # Clean data based on all necessary columns data_plot.dropna(subset=[y_col_to_use, x_col_to_use, "Openness", "Agent Tooling"], inplace=True) # --- Section 3: Initialize Figure --- fig = go.Figure() if data_plot.empty: logger.warning(f"No valid data to plot after cleaning.") return fig # --- Section 4: Calculate and Draw Pareto Frontier (Restored from your original code) --- if x_data_is_valid: sorted_data = data_plot.sort_values(by=[x_col_to_use, y_col_to_use], ascending=[True, False]) frontier_points = [] max_score_so_far = float('-inf') for _, row in sorted_data.iterrows(): score = row[y_col_to_use] if score >= max_score_so_far: frontier_points.append({'x': row[x_col_to_use], 'y': score}) max_score_so_far = score if frontier_points: frontier_df = pd.DataFrame(frontier_points) fig.add_trace(go.Scatter( x=frontier_df['x'], y=frontier_df['y'], mode='lines', name='Efficiency Frontier', line=dict(color='firebrick', width=2, dash='dash'), hoverinfo='skip' )) # --- Section 5: Prepare for Marker Plotting --- # Pre-generate hover text and shapes for each point data_plot['hover_text'] = data_plot.apply( lambda row: f"{row[agent_col]}
{x_axis_label}: ${row[x_col_to_use]:.2f}
{y_col_to_use}: {row[y_col_to_use]:.2f}", axis=1 ) data_plot['shape_symbol'] = data_plot['Agent Tooling'].map(shape_map).fillna(default_shape) # --- Section 6: Plot Markers by "Openness" Category --- for category in category_order: group = data_plot[data_plot['Openness'] == category] if group.empty: continue fig.add_trace(go.Scatter( x=group[x_col_to_use], y=group[y_col_to_use], mode='markers', name=category, showlegend=False, text=group['hover_text'], hoverinfo='text', marker=dict( color=color_map.get(category, 'black'), symbol=group['shape_symbol'], size=10, opacity=0.8, line=dict(width=1, color='DarkSlateGrey') ) )) # ---- Add logic for making the legend ----------- for i, category in enumerate(category_order): fig.add_trace(go.Scatter( x=[None], y=[None], mode='markers', name=category, legendgroup="openness_group", legendgrouptitle_text="Agent Openness" if i == 0 else None, marker=dict( color=color_map.get(category, 'grey'), symbol='circle', size=12 ) )) # Part B: Dummy traces for the SHAPES ("Agent Tooling") shape_items = list(shape_map.items()) for i, (shape_name, shape_symbol) in enumerate(shape_items): fig.add_trace(go.Scatter( x=[None], y=[None], mode='markers', name=shape_name, legendgroup="tooling_group", legendgrouptitle_text="Agent Tooling" if i == 0 else None, marker=dict(color='black', symbol=shape_symbol, size=12) )) # --- Section 8: Configure Layout (Restored from your original code) --- xaxis_config = dict(title=x_axis_label) if not x_data_is_valid: xaxis_config['range'] = [DUMMY_X_VALUE_FOR_MISSING_COSTS - 1, DUMMY_X_VALUE_FOR_MISSING_COSTS + 1] xaxis_config['tickvals'] = [DUMMY_X_VALUE_FOR_MISSING_COSTS] else: xaxis_config['rangemode'] = "tozero" logo_data_uri = svg_to_data_uri("assets/just-icon.svg") fig.update_layout( template="plotly_white", title=f"{y_col_to_use} vs. {x_axis_label}", xaxis=xaxis_config, yaxis=dict(title=y_col_to_use, rangemode="tozero"), legend=dict( bgcolor='#FAF2E9', ) ) fig.add_layout_image( dict( source=logo_data_uri, xref="x domain", yref="y domain", x=1.1, y=1.1, sizex=0.2, sizey=0.2, xanchor="left", yanchor="bottom", layer="above", ), ) return fig def format_cost_column(df: pd.DataFrame, cost_col_name: str) -> pd.DataFrame: """ Applies custom formatting to a cost column based on its corresponding score column. - If cost is not null, it remains unchanged. - If cost is null but score is not, it becomes "Missing Cost". - If both cost and score are null, it becomes "Not Attempted". Args: df: The DataFrame to modify. cost_col_name: The name of the cost column to format (e.g., "Overall Cost"). Returns: The DataFrame with the formatted cost column. """ # Find the corresponding score column by replacing "Cost" with "Score" score_col_name = cost_col_name.replace("Cost", "Score") # Ensure the score column actually exists to avoid errors if score_col_name not in df.columns: return df # Return the DataFrame unmodified if there's no matching score def apply_formatting_logic(row): cost_value = row[cost_col_name] score_value = row[score_col_name] status_color = "#ec4899" if pd.notna(cost_value) and isinstance(cost_value, (int, float)): return f"${cost_value:.2f}" elif pd.notna(score_value): return f'Missing' # Score exists, but cost is missing else: return f'Not Submitted' # Neither score nor cost exists # Apply the logic to the specified cost column and update the DataFrame df[cost_col_name] = df.apply(apply_formatting_logic, axis=1) return df def format_score_column(df: pd.DataFrame, score_col_name: str) -> pd.DataFrame: """ Applies custom formatting to a score column for display. - If a score is 0 or NaN, it's displayed as a colored "0". - Other scores are formatted to two decimal places. """ status_color = "#ec4899" # The same color as your other status text # First, fill any NaN values with 0 so we only have one case to handle. # We must use reassignment to avoid the SettingWithCopyWarning. df[score_col_name] = df[score_col_name].fillna(0) def apply_formatting(score_value): # Now, we just check if the value is 0. if score_value == 0: return f'0.0' # For all other numbers, format them for consistency. if isinstance(score_value, (int, float)): return f"{score_value:.2f}" # Fallback for any unexpected non-numeric data return score_value # Apply the formatting and return the updated DataFrame return df.assign(**{score_col_name: df[score_col_name].apply(apply_formatting)}) def get_pareto_df(data): # This is a placeholder; use your actual function that handles dynamic column names # A robust version might look for any column with "Cost" and "Score" cost_cols = [c for c in data.columns if 'Cost' in c] score_cols = [c for c in data.columns if 'Score' in c] if not cost_cols or not score_cols: return pd.DataFrame() x_col, y_col = cost_cols[0], score_cols[0] frontier_data = data.dropna(subset=[x_col, y_col]).copy() frontier_data[y_col] = pd.to_numeric(frontier_data[y_col], errors='coerce') frontier_data[x_col] = pd.to_numeric(frontier_data[x_col], errors='coerce') frontier_data.dropna(subset=[x_col, y_col], inplace=True) if frontier_data.empty: return pd.DataFrame() frontier_data = frontier_data.sort_values(by=[x_col, y_col], ascending=[True, False]) pareto_points = [] max_score_at_cost = -np.inf for _, row in frontier_data.iterrows(): if row[y_col] >= max_score_at_cost: pareto_points.append(row) max_score_at_cost = row[y_col] return pd.DataFrame(pareto_points) def svg_to_data_uri(path: str) -> str: """Reads an SVG file and encodes it as a Data URI for Plotly.""" try: with open(path, "rb") as f: encoded_string = base64.b64encode(f.read()).decode() return f"data:image/svg+xml;base64,{encoded_string}" except FileNotFoundError: logger.warning(f"SVG file not found at: {path}") return None