Spaces:
Sleeping
Sleeping
| import re | |
| from src.ontology_adapter import ESGOntologyAdapter | |
| import gradio as gr | |
| from collections import Counter, defaultdict | |
| from rdflib import Graph, Literal, Namespace, URIRef | |
| from rdflib.namespace import RDF, RDFS | |
| from keybert import KeyBERT | |
| import pandas as pd | |
| import plotly.express as px | |
| import networkx as nx | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import matplotlib.pyplot as plt | |
| import io | |
| from PIL import Image | |
| # --- Model Configuration --- | |
| # To use a fine-tuned model, change this path to the directory where your model is saved. | |
| # For example: "./fine_tuned_esg_model". | |
| # A training script template (train_finetune.py) is provided to help you create this model. | |
| FINE_TUNED_MODEL_PATH = "all-MiniLM-L6-v2" # <-- REPLACE WITH YOUR FINE-TUNED MODEL PATH | |
| try: | |
| # Attempt to load the fine-tuned model | |
| adapter = ESGOntologyAdapter( | |
| "ontology/esg_ontology.owl", | |
| model_name_or_path=FINE_TUNED_MODEL_PATH | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not load fine-tuned model from '{FINE_TUNED_MODEL_PATH}'. Error: {e}") | |
| print("Falling back to default pre-trained model 'all-MiniLM-L6-v2'.") | |
| # Fallback to the default model if the fine-tuned one isn't available | |
| adapter = ESGOntologyAdapter( | |
| "ontology/esg_ontology.owl", | |
| model_name_or_path="all-MiniLM-L6-v2" | |
| ) | |
| kw_model = KeyBERT() | |
| # Define namespaces for our knowledge graph | |
| ESG = Namespace("http://example.org/esg-ontology/") | |
| DOC = Namespace("http://example.org/documents/") | |
| def detect_sections(text): | |
| """Detects and categorizes sections into 'promise' or 'performance'.""" | |
| sections = [] | |
| # Split by common section headers | |
| # This pattern is a simple heuristic and can be improved | |
| pattern = r'\n\s*(?=[A-Z][a-zA-Z\s&]{5,})\s*\n' | |
| parts = re.split(pattern, text) | |
| for part in parts: | |
| if not part.strip(): | |
| continue | |
| lines = part.strip().split('\n') | |
| title = lines[0].strip() | |
| content = ' '.join(lines[1:]) | |
| category = 'unknown' | |
| if any(kw in title.lower() for kw in ['strategy', 'goals', 'commitment', 'outlook', 'forward-looking']): | |
| category = 'promise' | |
| elif any(kw in title.lower() for kw in ['results', 'performance', 'data', 'review', 'achievements']): | |
| category = 'performance' | |
| sections.append({"title": title, "content": content, "category": category}) | |
| return sections | |
| def analyze_single_document(text, doc_name="Document"): | |
| """Analyzes a single document and returns aspect-level sentiment and other metrics.""" | |
| detected_sections = detect_sections(text) | |
| aspect_sentiments = defaultdict(list) | |
| aspect_confidence = defaultdict(list) | |
| optimism_bias_scores = defaultdict(list) | |
| promise_performance_sentiments = defaultdict(lambda: defaultdict(list)) # New: for promise/performance analysis | |
| for section in detected_sections: | |
| title = section['title'] | |
| content = section['content'] | |
| category = section['category'] # New: section category | |
| if not content.strip(): | |
| continue | |
| # Section-aware weighting (Bias Analysis) | |
| # Increase weight for forward-looking/promise sections, decrease for results | |
| tone_weight = 1.0 | |
| if category == 'promise': | |
| tone_weight = 1.2 # Higher optimism bias likely | |
| elif category == 'performance': | |
| tone_weight = 0.8 # Lower optimism bias likely | |
| mapping = adapter.map_term(content) | |
| if mapping['matches']: | |
| sentiment = mapping['sentiment'] | |
| for match in mapping['matches']: | |
| aspect = match['mapped_to'] | |
| score = match['similarity'] | |
| aspect_sentiments[aspect].append(sentiment) | |
| aspect_confidence[aspect].append(score) | |
| # Calculate a simple optimism score | |
| if sentiment == 'positive': | |
| optimism_bias_scores[aspect].append(tone_weight * score) | |
| elif sentiment == 'negative': | |
| optimism_bias_scores[aspect].append(-1 * score) # Negative sentiment counts against optimism | |
| # New: Store sentiment for promise/performance analysis | |
| if category != 'unknown': | |
| promise_performance_sentiments[category][sentiment].append(score) | |
| # Aggregate results | |
| aggregated_sentiments = {} | |
| avg_confidence = {} | |
| final_optimism_bias = {} | |
| for aspect, sentiments in aspect_sentiments.items(): | |
| if sentiments: | |
| # Sentiment: most common | |
| aggregated_sentiments[aspect] = Counter(sentiments).most_common(1)[0][0] | |
| # Confidence: average score for the aspect | |
| avg_confidence[aspect] = sum(aspect_confidence[aspect]) / len(aspect_confidence[aspect]) | |
| # Optimism Bias: average of the weighted scores | |
| if aspect in optimism_bias_scores: | |
| final_optimism_bias[aspect] = sum(optimism_bias_scores[aspect]) / len(optimism_bias_scores[aspect]) | |
| else: | |
| final_optimism_bias[aspect] = 0 | |
| # New: Aggregate promise/performance sentiments | |
| promise_performance_data = [] | |
| for category, sentiments_by_type in promise_performance_sentiments.items(): | |
| for sentiment_type, scores in sentiments_by_type.items(): | |
| if scores: | |
| promise_performance_data.append({ | |
| 'Document': doc_name, | |
| 'Category': category.capitalize(), | |
| 'Sentiment': sentiment_type, | |
| 'Average Confidence': sum(scores) / len(scores) | |
| }) | |
| # Create a DataFrame for visualization | |
| df = pd.DataFrame({ | |
| 'Aspect': [a.replace('_', ' ').title() for a in aggregated_sentiments.keys()], | |
| 'Sentiment': list(aggregated_sentiments.values()), | |
| 'Confidence': [avg_confidence.get(a, 0) for a in aggregated_sentiments.keys()], | |
| 'Optimism Bias': [final_optimism_bias.get(a, 0) for a in aggregated_sentiments.keys()], | |
| 'Document': doc_name | |
| }) | |
| return aggregated_sentiments, df, pd.DataFrame(promise_performance_data) # New: return promise/performance DataFrame | |
| def discover_new_aspects(text, existing_aspects): | |
| """Discovers new potential ESG aspects from text using KeyBERT.""" | |
| text = text.replace('\n', ' ') | |
| keywords = kw_model.extract_keywords( | |
| text, keyphrase_ngram_range=(1, 3), stop_words='english', | |
| use_mmr=True, diversity=0.7, top_n=10 | |
| ) | |
| suggested_aspects = [] | |
| existing_aspect_labels = {aspect.replace('_', ' ') for aspect in existing_aspects} | |
| for keyword, score in keywords: | |
| if keyword.lower() not in existing_aspect_labels and len(keyword) > 5: | |
| suggested_aspects.append(f"- **{keyword.title()}** (Confidence: {score:.2f})") | |
| return "\n".join(suggested_aspects) if suggested_aspects else "No new aspects discovered." | |
| def generate_knowledge_graph(sentiments1, sentiments2): | |
| """Generates an RDF knowledge graph from sentiment analysis results.""" | |
| g = Graph() | |
| g.bind("esg", ESG); g.bind("doc", DOC); g.bind("rdf", RDF); g.bind("rdfs", RDFS) | |
| # Document 1 | |
| doc1_uri = DOC['report_1'] | |
| g.add((doc1_uri, RDF.type, ESG.Document)); g.add((doc1_uri, RDFS.label, Literal("Document 1"))) | |
| for aspect, sentiment in sentiments1.items(): | |
| aspect_uri = ESG[aspect] | |
| g.add((doc1_uri, ESG.hasAspect, aspect_uri)); g.add((aspect_uri, RDF.type, ESG.Aspect)) | |
| g.add((aspect_uri, RDFS.label, Literal(aspect.replace('_', ' ').title()))) | |
| g.add((aspect_uri, ESG.hasSentiment, Literal(sentiment))) | |
| # Document 2 | |
| doc2_uri = DOC['report_2'] | |
| g.add((doc2_uri, RDF.type, ESG.Document)); g.add((doc2_uri, RDFS.label, Literal("Document 2"))) | |
| for aspect, sentiment in sentiments2.items(): | |
| aspect_uri = ESG[aspect] | |
| g.add((doc2_uri, ESG.hasAspect, aspect_uri)); g.add((aspect_uri, RDF.type, ESG.Aspect)) | |
| g.add((aspect_uri, RDFS.label, Literal(aspect.replace('_', ' ').title()))) | |
| g.add((aspect_uri, ESG.hasSentiment, Literal(sentiment))) | |
| output_path = "esg_knowledge_graph.ttl" | |
| g.serialize(destination=output_path, format='turtle') | |
| # Generate visualization | |
| graph_image = visualize_knowledge_graph(g) | |
| return output_path, graph_image | |
| def visualize_knowledge_graph(g): | |
| """Creates a visual representation of the knowledge graph.""" | |
| nx_graph = nx.DiGraph() | |
| node_types = {} | |
| node_sentiments = {} | |
| def get_label_from_uri(uri): | |
| """Gets a shortened, readable name from a URI.""" | |
| if '#' in str(uri): | |
| return str(uri).split('#')[-1].replace('_', ' ').title() | |
| return str(uri).split('/')[-1].replace('_', ' ').title() | |
| # First pass: identify node types and sentiments | |
| for s, p, o in g: | |
| if isinstance(s, URIRef): | |
| s_label = get_label_from_uri(s) | |
| if p == RDF.type and isinstance(o, URIRef): | |
| o_label = get_label_from_uri(o) | |
| node_types[s_label] = o_label | |
| elif p == ESG.hasSentiment and isinstance(o, Literal): | |
| node_sentiments[s_label] = str(o) | |
| # Second pass: build the graph structure | |
| for s, p, o in g: | |
| if p in [RDF.type, RDFS.label, ESG.hasSentiment]: | |
| continue | |
| if isinstance(s, URIRef) and isinstance(o, URIRef): | |
| s_label = get_label_from_uri(s) | |
| o_label = get_label_from_uri(o) | |
| p_label = get_label_from_uri(p) | |
| nx_graph.add_edge(s_label, o_label, label=p_label) | |
| # Handle empty graph case | |
| if not nx_graph.nodes(): | |
| plt.figure(figsize=(12, 8)) | |
| plt.text(0.5, 0.5, "No knowledge graph to display.\\n(No aspects were detected in the documents)", | |
| ha='center', va='center', fontsize=14, color='gray') | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', bbox_inches='tight') | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close() | |
| return img | |
| # Create the visualization | |
| plt.figure(figsize=(16, 12)) | |
| pos = nx.spring_layout(nx_graph, k=1.5, iterations=50) | |
| # Assign colors based on type and sentiment | |
| node_colors = [] | |
| for node in nx_graph.nodes(): | |
| node_type = node_types.get(node) | |
| sentiment = node_sentiments.get(node) | |
| if node_type == 'Document': | |
| node_colors.append('skyblue') | |
| elif node_type == 'Aspect': | |
| if sentiment == 'positive': | |
| node_colors.append('lightgreen') | |
| elif sentiment == 'negative': | |
| node_colors.append('#ff9999') # light red | |
| else: # neutral or other | |
| node_colors.append('lightyellow') | |
| else: | |
| node_colors.append('lightgray') | |
| nx.draw(nx_graph, pos, with_labels=True, node_size=3500, node_color=node_colors, | |
| font_size=10, font_weight='bold', width=1.5, edge_color='darkgray', | |
| arrows=True, arrowstyle='->', arrowsize=20) | |
| edge_labels = nx.get_edge_attributes(nx_graph, 'label') | |
| nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels=edge_labels, font_color='firebrick', font_size=9) | |
| plt.title("Knowledge Graph of ESG Aspects", size=18) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', bbox_inches='tight') | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close() | |
| return img | |
| def create_ontology_tree_view(): | |
| """Creates a markdown representation of the ontology hierarchy.""" | |
| tree = "**ESG Ontology Structure**\n\n" | |
| parents = adapter.get_direct_parents() | |
| children = defaultdict(list) | |
| all_nodes = set(parents.keys()) | set(parents.values()) | |
| for child, parent in parents.items(): | |
| children[parent].append(child) | |
| def build_tree(node, prefix=""): | |
| nonlocal tree | |
| tree += f"{prefix}- **{node.replace('_', ' ').title()}**\n" | |
| if node in children: | |
| for child in sorted(children[node]): | |
| build_tree(child, prefix + " ") | |
| # Find root nodes (pillars or classes not listed as children) | |
| root_nodes = sorted([n for n in all_nodes if n not in parents.keys() and n in children.keys()]) | |
| for root in root_nodes: | |
| build_tree(root) | |
| return tree | |
| def create_promise_performance_plot(df_promise_performance): | |
| """Creates a bar chart visualizing promise vs. performance sentiment.""" | |
| if df_promise_performance.empty: | |
| return None | |
| fig = px.bar(df_promise_performance, x='Category', y='Average Confidence', color='Sentiment', | |
| facet_col='Document', barmode='group', | |
| color_discrete_map={'positive': '#2ca02c', 'negative': '#d62728', 'neutral': '#7f7f7f'}, | |
| title="Promise vs. Performance Sentiment Analysis") | |
| return fig | |
| def analyze_and_compare(text1, text2): | |
| """Main function to drive the analysis and comparison for the dashboard.""" | |
| # Analyze both documents | |
| sentiments1, df1, pp_df1 = analyze_single_document(text1, "Document 1") | |
| sentiments2, df2, pp_df2 = analyze_single_document(text2, "Document 2") | |
| # --- Generate Comparison Reports --- | |
| # 1. Cross-Document Consistency Analysis | |
| consistency_report = "**Sentiment Drift Analysis**\n\n" | |
| all_aspects = sorted(list(set(sentiments1.keys()) | set(sentiments2.keys()))) | |
| found_drift = False | |
| for aspect in all_aspects: | |
| s1 = sentiments1.get(aspect); s2 = sentiments2.get(aspect) | |
| name = aspect.replace('_', ' ').title() | |
| if s1 and s2 and s1 != s2: | |
| consistency_report += f"🟡 **Drift in '{name}'**: `{s1.title()}` ⟶ `{s2.title()}`\n" | |
| found_drift = True | |
| elif s1 and not s2: | |
| consistency_report += f"⚪️ **'{name}'** only in Document 1 (Sentiment: {s1.title()})\n" | |
| elif not s1 and s2: | |
| consistency_report += f"⚪️ **'{name}'** only in Document 2 (Sentiment: {s2.title()})\n" | |
| if not found_drift and any(all_aspects): | |
| consistency_report += "✅ No sentiment contradictions detected for common aspects.\n" | |
| elif not any(all_aspects): | |
| consistency_report = "No aspects detected in either document." | |
| # 2. Weakly Supervised Aspect Discovery | |
| all_text = text1 + "\n\n" + text2 | |
| existing_aspects = set(sentiments1.keys()) | set(sentiments2.keys()) | |
| suggestions_report = "**Suggested New Aspects**\n\n" + discover_new_aspects(all_text, existing_aspects) | |
| # --- Create Visualizations --- | |
| combined_df = pd.concat([df1, df2]) | |
| # Sentiment Distribution Plot | |
| sentiment_fig = None | |
| if not combined_df.empty: | |
| sentiment_counts = combined_df.groupby(['Document', 'Sentiment']).size().reset_index(name='Count') | |
| sentiment_fig = px.bar(sentiment_counts, x='Document', y='Count', color='Sentiment', | |
| title="Sentiment Distribution Across Documents", | |
| color_discrete_map={'positive': '#2ca02c', 'negative': '#d62728', 'neutral': '#7f7f7f'}) | |
| # Bias & Confidence Plot | |
| bias_fig = None | |
| if not combined_df.empty: | |
| bias_fig = px.scatter(combined_df, x='Confidence', y='Optimism Bias', color='Aspect', | |
| size=abs(combined_df['Optimism Bias']), hover_data=['Document'], | |
| title="Optimism Bias vs. Mapping Confidence") | |
| bias_fig.add_hline(y=0, line_dash="dot", line_color="grey") | |
| # New: Promise vs. Performance Plot | |
| combined_pp_df = pd.concat([pp_df1, pp_df2]) | |
| promise_performance_fig = create_promise_performance_plot(combined_pp_df) | |
| # Generate and save the knowledge graph | |
| kg_file_path, kg_image = generate_knowledge_graph(sentiments1, sentiments2) | |
| return consistency_report, suggestions_report, sentiment_fig, bias_fig, promise_performance_fig, kg_file_path, kg_image | |
| # --- Gradio Interface --- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="ESG Interpretability Dashboard") as iface: | |
| gr.Markdown("# 🧩 ESG Interpretability Dashboard & Bias Analysis") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| text1 = gr.Textbox(label="Input ESG Report Text 1", lines=20) | |
| with gr.Column(scale=1): | |
| text2 = gr.Textbox(label="Input ESG Report Text 2", lines=20) | |
| analyze_btn = gr.Button("Analyze & Compare Documents", variant="primary") | |
| with gr.Tabs(): | |
| with gr.TabItem("📊 Analysis & Visualizations"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| sentiment_plot = gr.Plot(label="Sentiment Distribution") | |
| with gr.Column(): | |
| bias_plot = gr.Plot(label="Bias & Confidence Analysis") | |
| with gr.Row(): | |
| promise_performance_plot = gr.Plot(label="Promise vs. Performance Sentiment") | |
| with gr.Row(): | |
| with gr.Column(): | |
| consistency_output = gr.Markdown(label="Cross-Document Analysis") | |
| with gr.Column(): | |
| suggestions_output = gr.Markdown(label="Weak Supervision Suggestions") | |
| with gr.TabItem("🌳 Ontology & Knowledge Graph"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| ontology_tree = gr.Markdown( | |
| value=create_ontology_tree_view(), | |
| label="ESG Ontology Hierarchy" | |
| ) | |
| with gr.Column(scale=2): | |
| with gr.Group(): | |
| kg_plot = gr.Image(label="Knowledge Graph Visualization") | |
| with gr.Row(): | |
| kg_output = gr.File(label="Download Knowledge Graph (RDF/Turtle)") | |
| analyze_btn.click( | |
| fn=analyze_and_compare, | |
| inputs=[text1, text2], | |
| outputs=[ | |
| consistency_output, | |
| suggestions_output, | |
| sentiment_plot, | |
| bias_plot, | |
| promise_performance_plot, | |
| kg_output, | |
| kg_plot | |
| ] | |
| ) | |
| iface.launch() |