esgdata / app.py
darisdzakwanhoesien2
label new
68bd627
raw
history blame
26.9 kB
import re
import subprocess
from src.ontology_adapter import ESGOntologyAdapter
import gradio as gr
from collections import Counter, defaultdict
from rdflib import Graph, Literal, Namespace, URIRef
from rdflib.namespace import RDF, RDFS
from keybert import KeyBERT
import pandas as pd
import plotly.express as px
import networkx as nx
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import io
from PIL import Image
import os
from src.preprocess import preprocess_and_save # New import
import gradio as gr
import pandas as pd
import numpy as np
import json
import os
import re
import subprocess
import sys
from sentence_transformers import SentenceTransformer, util
from sklearn.metrics.pairwise import cosine_similarity
from sanitize_csv import sanitize_csv, input_file_path, output_file_path
# --- Model Configuration ---
# To use a fine-tuned model, change this path to the directory where your model is saved.
# For example: "./fine_tuned_esg_model".
# A training script template (train_finetune.py) is provided to help you create this model.
FINE_TUNED_MODEL_PATH = "all-MiniLM-L6-v2" # <-- REPLACE WITH YOUR FINE-TUNED MODEL PATH
try:
# Attempt to load the fine-tuned model
adapter = ESGOntologyAdapter(
"ontology/esg_ontology.owl",
model_name_or_path=FINE_TUNED_MODEL_PATH
)
except Exception as e:
print(f"Warning: Could not load fine-tuned model from '{FINE_TUNED_MODEL_PATH}'. Error: {e}")
print("Falling back to default pre-trained model 'all-MiniLM-L6-v2'.")
# Fallback to the default model if the fine-tuned one isn't available
adapter = ESGOntologyAdapter(
"ontology/esg_ontology.owl",
model_name_or_path="all-MiniLM-L6-v2"
)
kw_model = KeyBERT()
# Define namespaces for our knowledge graph
ESG = Namespace("http://example.org/esg-ontology/")
DOC = Namespace("http://example.org/documents/")
def detect_sections(text):
"""Detects and categorizes sections into 'promise' or 'performance'."""
sections = []
# Split by common section headers
# This pattern is a simple heuristic and can be improved
pattern = r'\n\s*(?=[A-Z][a-zA-Z\s&]{5,})\s*\n'
parts = re.split(pattern, text)
for part in parts:
if not part.strip():
continue
lines = part.strip().split('\n')
title = lines[0].strip()
content = ' '.join(lines[1:])
category = 'unknown'
if any(kw in title.lower() for kw in ['strategy', 'goals', 'commitment', 'outlook', 'forward-looking']):
category = 'promise'
elif any(kw in title.lower() for kw in ['results', 'performance', 'data', 'review', 'achievements']):
category = 'performance'
sections.append({"title": title, "content": content, "category": category})
return sections
def analyze_single_document(text, doc_name="Document"):
"""Analyzes a single document and returns aspect-level sentiment and other metrics."""
detected_sections = detect_sections(text)
aspect_sentiments = defaultdict(list)
aspect_confidence = defaultdict(list)
optimism_bias_scores = defaultdict(list)
promise_performance_sentiments = defaultdict(lambda: defaultdict(list)) # New: for promise/performance analysis
for section in detected_sections:
title = section['title']
content = section['content']
category = section['category'] # New: section category
if not content.strip():
continue
# Section-aware weighting (Bias Analysis)
# Increase weight for forward-looking/promise sections, decrease for results
tone_weight = 1.0
if category == 'promise':
tone_weight = 1.2 # Higher optimism bias likely
elif category == 'performance':
tone_weight = 0.8 # Lower optimism bias likely
mapping = adapter.map_term(content)
if mapping['matches']:
sentiment = mapping['sentiment']
for match in mapping['matches']:
aspect = match['mapped_to']
score = match['similarity']
aspect_sentiments[aspect].append(sentiment)
aspect_confidence[aspect].append(score)
# Calculate a simple optimism score
if sentiment == 'positive':
optimism_bias_scores[aspect].append(tone_weight * score)
elif sentiment == 'negative':
optimism_bias_scores[aspect].append(-1 * score) # Negative sentiment counts against optimism
# New: Store sentiment for promise/performance analysis
if category != 'unknown':
promise_performance_sentiments[category][sentiment].append(score)
# Aggregate results
aggregated_sentiments = {}
avg_confidence = {}
final_optimism_bias = {}
for aspect, sentiments in aspect_sentiments.items():
if sentiments:
# Sentiment: most common
aggregated_sentiments[aspect] = Counter(sentiments).most_common(1)[0][0]
# Confidence: average score for the aspect
avg_confidence[aspect] = sum(aspect_confidence[aspect]) / len(aspect_confidence[aspect])
# Optimism Bias: average of the weighted scores
if aspect in optimism_bias_scores:
final_optimism_bias[aspect] = sum(optimism_bias_scores[aspect]) / len(optimism_bias_scores[aspect])
else:
final_optimism_bias[aspect] = 0
# New: Aggregate promise/performance sentiments
promise_performance_data = []
for category, sentiments_by_type in promise_performance_sentiments.items():
for sentiment_type, scores in sentiments_by_type.items():
if scores:
promise_performance_data.append({
'Document': doc_name,
'Category': category.capitalize(),
'Sentiment': sentiment_type,
'Average Confidence': sum(scores) / len(scores)
})
# Create a DataFrame for visualization
df = pd.DataFrame({
'Aspect': [a.replace('_', ' ').title() for a in aggregated_sentiments.keys()],
'Sentiment': list(aggregated_sentiments.values()),
'Confidence': [avg_confidence.get(a, 0) for a in aggregated_sentiments.keys()],
'Optimism Bias': [final_optimism_bias.get(a, 0) for a in aggregated_sentiments.keys()],
'Document': doc_name
})
return aggregated_sentiments, df, pd.DataFrame(promise_performance_data) # New: return promise/performance DataFrame
def discover_new_aspects(text, existing_aspects):
"""Discovers new potential ESG aspects from text using KeyBERT."""
text = text.replace('\n', ' ')
keywords = kw_model.extract_keywords(
text, keyphrase_ngram_range=(1, 3), stop_words='english',
use_mmr=True, diversity=0.7, top_n=10
)
suggested_aspects = []
existing_aspect_labels = {aspect.replace('_', ' ') for aspect in existing_aspects}
for keyword, score in keywords:
if keyword.lower() not in existing_aspect_labels and len(keyword) > 5:
suggested_aspects.append(f"- **{keyword.title()}** (Confidence: {score:.2f})")
return "\n".join(suggested_aspects) if suggested_aspects else "No new aspects discovered."
def generate_knowledge_graph(sentiments1, sentiments2):
"""Generates an RDF knowledge graph from sentiment analysis results."""
g = Graph()
g.bind("esg", ESG); g.bind("doc", DOC); g.bind("rdf", RDF); g.bind("rdfs", RDFS)
# Document 1
doc1_uri = DOC['report_1']
g.add((doc1_uri, RDF.type, ESG.Document)); g.add((doc1_uri, RDFS.label, Literal("Document 1")))
for aspect, sentiment in sentiments1.items():
aspect_uri = ESG[aspect]
g.add((doc1_uri, ESG.hasAspect, aspect_uri)); g.add((aspect_uri, RDF.type, ESG.Aspect))
g.add((aspect_uri, RDFS.label, Literal(aspect.replace('_', ' ').title())))
g.add((aspect_uri, ESG.hasSentiment, Literal(sentiment)))
# Document 2
doc2_uri = DOC['report_2']
g.add((doc2_uri, RDF.type, ESG.Document)); g.add((doc2_uri, RDFS.label, Literal("Document 2")))
for aspect, sentiment in sentiments2.items():
aspect_uri = ESG[aspect]
g.add((doc2_uri, ESG.hasAspect, aspect_uri)); g.add((aspect_uri, RDF.type, ESG.Aspect))
g.add((aspect_uri, RDFS.label, Literal(aspect.replace('_', ' ').title())))
g.add((aspect_uri, ESG.hasSentiment, Literal(sentiment)))
output_path = "esg_knowledge_graph.ttl"
g.serialize(destination=output_path, format='turtle')
# Generate visualization
graph_image = visualize_knowledge_graph(g)
return output_path, graph_image
def visualize_knowledge_graph(g):
"""Creates a visual representation of the knowledge graph."""
nx_graph = nx.DiGraph()
node_types = {}
node_sentiments = {}
def get_label_from_uri(uri):
"""Gets a shortened, readable name from a URI."""
if '#' in str(uri):
return str(uri).split('#')[-1].replace('_', ' ').title()
return str(uri).split('/')[-1].replace('_', ' ').title()
# First pass: identify node types and sentiments
for s, p, o in g:
if isinstance(s, URIRef):
s_label = get_label_from_uri(s)
if p == RDF.type and isinstance(o, URIRef):
o_label = get_label_from_uri(o)
node_types[s_label] = o_label
elif p == ESG.hasSentiment and isinstance(o, Literal):
node_sentiments[s_label] = str(o)
# Second pass: build the graph structure
for s, p, o in g:
if p in [RDF.type, RDFS.label, ESG.hasSentiment]:
continue
if isinstance(s, URIRef) and isinstance(o, URIRef):
s_label = get_label_from_uri(s)
o_label = get_label_from_uri(o)
p_label = get_label_from_uri(p)
nx_graph.add_edge(s_label, o_label, label=p_label)
# Handle empty graph case
if not nx_graph.nodes():
plt.figure(figsize=(12, 8))
plt.text(0.5, 0.5, "No knowledge graph to display.\\n(No aspects were detected in the documents)",
ha='center', va='center', fontsize=14, color='gray')
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
img = Image.open(buf)
plt.close()
return img
# Create the visualization
plt.figure(figsize=(16, 12))
pos = nx.spring_layout(nx_graph, k=1.5, iterations=50)
# Assign colors based on type and sentiment
node_colors = []
for node in nx_graph.nodes():
node_type = node_types.get(node)
sentiment = node_sentiments.get(node)
if node_type == 'Document':
node_colors.append('skyblue')
elif node_type == 'Aspect':
if sentiment == 'positive':
node_colors.append('lightgreen')
elif sentiment == 'negative':
node_colors.append('#ff9999') # light red
else: # neutral or other
node_colors.append('lightyellow')
else:
node_colors.append('lightgray')
nx.draw(nx_graph, pos, with_labels=True, node_size=3500, node_color=node_colors,
font_size=10, font_weight='bold', width=1.5, edge_color='darkgray',
arrows=True, arrowstyle='->', arrowsize=20)
edge_labels = nx.get_edge_attributes(nx_graph, 'label')
nx.draw_networkx_edge_labels(nx_graph, pos, edge_labels=edge_labels, font_color='firebrick', font_size=9)
plt.title("Knowledge Graph of ESG Aspects", size=18)
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
img = Image.open(buf)
plt.close()
return img
def run_training():
"""Runs the fine-tuning script and returns the tail of the output."""
try:
# Get the absolute path to the script's directory
script_dir = os.path.dirname(os.path.abspath(__file__))
triplets_path = os.path.join(script_dir, "data", "esg_triplets.csv")
create_triplets_script = os.path.join(script_dir, "create_triplets.py")
train_finetune_script = os.path.join(script_dir, "train_finetune.py")
# Ensure the triplets file exists before running training
if not os.path.exists(triplets_path):
print("Triplet file not found. Running 'create_triplets.py' script...")
subprocess.run([sys.executable, create_triplets_script], check=True)
result = subprocess.run(
[sys.executable, train_finetune_script],
capture_output=True,
text=True,
check=True
)
return f"Training finished successfully!\\n...\\n{result.stdout[-1000:]}"
except FileNotFoundError:
return "Error: 'python' executable not found. Please ensure Python is installed and in your system's PATH."
except subprocess.CalledProcessError as e:
error_message = f"Training script failed with exit code {e.returncode}.\\n"
if e.stderr:
error_message += f"...\\n{e.stderr[-1000:]}"
else:
error_message += "No error output was captured."
return error_message
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def run_evaluation():
"""Runs the evaluation script and returns the path to the generated chart and the evaluation table."""
try:
# Ensure the processed corpus exists
if not os.path.exists("data/esg_corpus.csv"):
return None, "Error: Processed corpus not found. Please run the preprocessing step first."
# The evaluation script now returns both the chart path and the table
chart_path, eval_table = evaluate_esg_coverage.evaluate_models()
if os.path.exists(chart_path):
return chart_path, f"Evaluation successful!\n\n{eval_table}"
else:
return None, f"Evaluation script ran, but the output image was not found.\n\n{eval_table}"
except Exception as e:
return None, f"An unexpected error occurred during evaluation: {str(e)}"
def create_ontology_tree_view():
"""Creates a markdown representation of the ontology hierarchy."""
tree = "**ESG Ontology Structure**\n\n"
parents = adapter.get_direct_parents()
children = defaultdict(list)
all_nodes = set(parents.keys()) | set(parents.values())
for child, parent in parents.items():
children[parent].append(child)
def build_tree(node, prefix=""):
nonlocal tree
tree += f"{prefix}- **{node.replace('_', ' ').title()}**\n"
if node in children:
for child in sorted(children[node]):
build_tree(child, prefix + " ")
# Find root nodes (pillars or classes not listed as children)
root_nodes = sorted([n for n in all_nodes if n not in parents.keys() and n in children.keys()])
for root in root_nodes:
build_tree(root)
return tree
def create_promise_performance_plot(df_promise_performance):
"""Creates a bar chart visualizing promise vs. performance sentiment."""
if df_promise_performance.empty:
return None
fig = px.bar(df_promise_performance, x='Category', y='Average Confidence', color='Sentiment',
facet_col='Document', barmode='group',
color_discrete_map={'positive': '#2ca02c', 'negative': '#d62728', 'neutral': '#7f7f7f'},
title="Promise vs. Performance Sentiment Analysis")
return fig
def run_data_pipeline():
"""
Runs the full data pipeline: sanitize, create triplets, and evaluate.
"""
print("--- Starting Data Pipeline ---")
# 1. Sanitize CSV
print("Step 1: Sanitizing CSV...")
sanitize_csv(input_file_path, output_file_path)
# 2. Create Triplets
print("\nStep 2: Creating Triplets...")
create_triplets(corpus_path=output_file_path, output_path="data/esg_triplets.csv")
# 3. Evaluate ESG Coverage
print("\nStep 3: Evaluating ESG Coverage...")
coverage_plot, coverage_df_string = evaluate_esg_coverage(corpus_path=output_file_path)
print("--- Data Pipeline Finished ---")
return "Data pipeline completed successfully!", coverage_plot, coverage_df_string
def analyze_and_compare(text1, text2):
"""Main function to drive the analysis and comparison for the dashboard."""
# Analyze both documents
sentiments1, df1, pp_df1 = analyze_single_document(text1, "Document 1")
sentiments2, df2, pp_df2 = analyze_single_document(text2, "Document 2")
# --- Generate Comparison Reports ---
# 1. Cross-Document Consistency Analysis
consistency_report = "**Sentiment Drift Analysis**\n\n"
all_aspects = sorted(list(set(sentiments1.keys()) | set(sentiments2.keys())))
found_drift = False
for aspect in all_aspects:
s1 = sentiments1.get(aspect); s2 = sentiments2.get(aspect)
name = aspect.replace('_', ' ').title()
if s1 and s2 and s1 != s2:
consistency_report += f"🟑 **Drift in '{name}'**: `{s1.title()}` ⟢ `{s2.title()}`\n"
found_drift = True
elif s1 and not s2:
consistency_report += f"βšͺ️ **'{name}'** only in Document 1 (Sentiment: {s1.title()})\n"
elif not s1 and s2:
consistency_report += f"βšͺ️ **'{name}'** only in Document 2 (Sentiment: {s2.title()})\n"
if not found_drift and any(all_aspects):
consistency_report += "βœ… No sentiment contradictions detected for common aspects.\n"
elif not any(all_aspects):
consistency_report = "No aspects detected in either document."
# 2. Weakly Supervised Aspect Discovery
all_text = text1 + "\n\n" + text2
existing_aspects = set(sentiments1.keys()) | set(sentiments2.keys())
suggestions_report = "**Suggested New Aspects**\n\n" + discover_new_aspects(all_text, existing_aspects)
# --- Create Visualizations ---
combined_df = pd.concat([df1, df2])
# Sentiment Distribution Plot
sentiment_fig = None
if not combined_df.empty:
sentiment_counts = combined_df.groupby(['Document', 'Sentiment']).size().reset_index(name='Count')
sentiment_fig = px.bar(sentiment_counts, x='Document', y='Count', color='Sentiment',
title="Sentiment Distribution Across Documents",
color_discrete_map={'positive': '#2ca02c', 'negative': '#d62728', 'neutral': '#7f7f7f'})
# Bias & Confidence Plot
bias_fig = None
if not combined_df.empty:
bias_fig = px.scatter(combined_df, x='Confidence', y='Optimism Bias', color='Aspect',
size=abs(combined_df['Optimism Bias']), hover_data=['Document'],
title="Optimism Bias vs. Mapping Confidence")
bias_fig.add_hline(y=0, line_dash="dot", line_color="grey")
# New: Promise vs. Performance Plot
combined_pp_df = pd.concat([pp_df1, pp_df2])
promise_performance_fig = create_promise_performance_plot(combined_pp_df)
# Generate and save the knowledge graph
kg_file_path, kg_image = generate_knowledge_graph(sentiments1, sentiments2)
return consistency_report, suggestions_report, sentiment_fig, bias_fig, promise_performance_fig, kg_file_path, kg_image
# --- Gradio Interface ---
with gr.Blocks(theme=gr.themes.Soft(), title="ESG Interpretability Dashboard") as iface:
gr.Markdown("# 🧩 ESG Interpretability Dashboard & Bias Analysis")
with gr.Row():
with gr.Column(scale=1):
text1 = gr.Textbox(label="Input ESG Report Text 1", lines=20)
with gr.Column(scale=1):
text2 = gr.Textbox(label="Input ESG Report Text 2", lines=20)
analyze_btn = gr.Button("Analyze & Compare Documents", variant="primary")
with gr.Tabs():
with gr.TabItem("πŸ“Š Analysis & Visualizations"):
with gr.Row():
with gr.Column():
sentiment_plot = gr.Plot(label="Sentiment Distribution")
with gr.Column():
bias_plot = gr.Plot(label="Bias & Confidence Analysis")
with gr.Row():
promise_performance_plot = gr.Plot(label="Promise vs. Performance Sentiment")
with gr.Row():
with gr.Column():
consistency_output = gr.Markdown(label="Cross-Document Analysis")
with gr.Column():
suggestions_output = gr.Markdown(label="Weak Supervision Suggestions")
with gr.TabItem("🌳 Ontology & Knowledge Graph"):
with gr.Row():
with gr.Column(scale=1):
ontology_tree = gr.Markdown(
value=create_ontology_tree_view(),
label="ESG Ontology Hierarchy"
)
with gr.Column(scale=2):
with gr.Group():
kg_plot = gr.Image(label="Knowledge Graph Visualization")
with gr.Row():
kg_output = gr.File(label="Download Knowledge Graph (RDF/Turtle)")
with gr.TabItem("πŸ“„ Preprocessing"):
gr.Markdown("## Clean and Prepare ESG Text\nInput raw OCR or Markdown text to clean it for analysis. The module will remove artifacts, normalize text, and detect the language.")
with gr.Row():
with gr.Column():
raw_text_input = gr.Textbox(label="Raw Text Input", lines=20)
file_name_input = gr.Textbox(label="Source Filename", value="document.pdf")
preprocess_btn = gr.Button("Clean Text", variant="primary")
with gr.Column():
processed_output = gr.JSON(label="Processed Data")
preprocess_btn.click(
fn=preprocess_and_save,
inputs=[raw_text_input, file_name_input],
outputs=processed_output
)
with gr.TabItem("🧠 Fine-Tuning"):
gr.Markdown("## ESG TAPT Fine-Tuning\nRun Task-Adaptive Pre-Training (TAPT) on your own corpus to adapt the sentence transformer model to your specific domain.")
train_button = gr.Button("πŸš€ Start Fine-Tuning")
train_output = gr.Textbox(label="Training Log", lines=15, interactive=False)
train_button.click(fn=run_training, inputs=None, outputs=train_output)
with gr.TabItem("πŸ“ˆ Evaluation"):
gr.Markdown("## Evaluate Model Coverage\nBenchmark the fine-tuned model against other models to evaluate its coverage of key ESG terms.")
eval_button = gr.Button("πŸ“Š Run Evaluation")
with gr.Row():
eval_plot = gr.Image(label="Coverage Comparison Chart")
eval_output = gr.Textbox(label="Evaluation Log", lines=10, interactive=False)
eval_button.click(fn=run_evaluation, inputs=None, outputs=[eval_plot, eval_output])
analyze_btn.click(
fn=analyze_and_compare,
inputs=[text1, text2],
outputs=[
consistency_output,
suggestions_output,
sentiment_plot,
bias_plot,
promise_performance_plot,
kg_output,
kg_plot
]
)
iface.launch()
def run_full_pipeline():
"""
Runs the complete preprocessing, training, and evaluation pipeline.
"""
log_output = ""
try:
# 1. Preprocessing
log_output += "Starting preprocessing...\n"
yield log_output, None, None
# This line is updated to call the correct function
processed_file_path = process_corpus_and_save()
log_output += f"Preprocessing complete. Data saved to {processed_file_path}\n\n"
yield log_output, None, None
# 2. Fine-tuning
log_output += "Starting fine-tuning...\n"
yield log_output, None, None
train_process = subprocess.run(
[sys.executable, "train_esg_tapt.py", processed_file_path],
capture_output=True, text=True, check=True
)
log_output += train_process.stdout
log_output += "\nFine-tuning complete.\n\n"
yield log_output, None, None
# 3. Evaluation
log_output += "Starting evaluation...\n"
yield log_output, None, None
eval_process = subprocess.run(
[sys.executable, "evaluate_esg_coverage.py", processed_file_path],
capture_output=True, text=True, check=True
)
log_output += eval_process.stdout
chart_path = "coverage_comparison.png"
if os.path.exists(chart_path):
# The evaluation script now returns the table string in stdout
table_string = eval_process.stdout.split("ESG Domain Coverage Comparison:")[-1].strip()
yield log_output, chart_path, table_string
else:
yield log_output, None, "Evaluation finished, but chart not found."
except subprocess.CalledProcessError as e:
error_message = f"A script failed to execute:\nSTDOUT: {e.stdout}\nSTDERR: {e.stderr}"
log_output += error_message
yield log_output, None, None
except Exception as e:
log_output += f"An unexpected error occurred: {e}"
yield log_output, None, None
import gradio as gr
import pandas as pd
from sanitize_csv import sanitize_csv, input_file_path, output_file_path
from create_triplets import create_triplets
from evaluate_esg_coverage import evaluate_models as evaluate_esg_coverage
# --- Model Configuration ---\
# To use a fine-tuned model, change this path to the directory where your model is saved.
# --- Step 1: Sanitize the CSV when the app starts ---
print("Running CSV sanitizer...")
sanitize_csv(input_file_path, output_file_path)
print("Sanitization complete.")
# --- Step 2: Load the sanitized data ---
# Your app should now load the clean file.
try:
df = pd.read_csv(output_file_path)
except FileNotFoundError:
# Handle case where sanitized file wasn't created
df = pd.DataFrame({"Error": ["Sanitized file not found. Check logs."]})
# --- Step 3: Build your application interface ---
# (This is an example using Gradio to display the data)
def show_data():
return df
with gr.Blocks() as demo:
gr.Markdown("# ESG Corpus Data Viewer")
gr.Markdown("Displaying the sanitized ESG corpus data.")
with gr.Tab("Sanitized Data"):
gr.DataFrame(show_data)
# --- Step 4: Launch the app ---
if __name__ == "__main__":
demo.launch()