170 lines
6.1 KiB
Python

import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Optional
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import networkx as nx
from collections import Counter
class QuantitativeAnalyzer:
def __init__(self, db):
self.db = db
def compute_tfidf(self) -> Dict:
sentences = self.db.get_sentences()
texts = [s["text"] for s in sentences]
sentence_ids = [s["id"] for s in sentences]
vectorizer = TfidfVectorizer(max_features=1000, stop_words="english")
tfidf_matrix = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
tfidf_results = {
"matrix": tfidf_matrix.toarray().tolist(),
"feature_names": feature_names.tolist(),
"sentence_ids": sentence_ids,
}
return tfidf_results
def compute_centrality(self) -> Dict:
variables = self.db.get_variables()
G = nx.Graph()
for var in variables:
G.add_node(var["id"], name=var["name"], frequency=var["frequency"])
sentences = self.db.get_sentences()
var_names = [v["name"].lower() for v in variables]
for sent in sentences:
text = sent["text"].lower()
found_vars = []
for i, var_name in enumerate(var_names):
if var_name in text:
found_vars.append(variables[i]["id"])
for i in range(len(found_vars)):
for j in range(i + 1, len(found_vars)):
if G.has_edge(found_vars[i], found_vars[j]):
G[found_vars[i]][found_vars[j]]["weight"] += 1
else:
G.add_edge(found_vars[i], found_vars[j], weight=1)
centrality_results = {
"degree_centrality": nx.degree_centrality(G),
"betweenness_centrality": nx.betweenness_centrality(G),
"eigenvector_centrality": nx.eigenvector_centrality(G, max_iter=1000),
"pagerank": nx.pagerank(G),
"edges": [
{"source": u, "target": v, "weight": d["weight"]}
for u, v, d in G.edges(data=True)
],
}
return centrality_results
def generate_statistics(self) -> Dict:
sections = self.db.get_sections_with_embeddings()
sentences = self.db.get_sentences()
variables = self.db.get_variables()
total_tokens = 0
sentence_lengths = []
all_words = []
for sent in sentences:
words = sent["text"].split()
total_tokens += len(words)
sentence_lengths.append(len(words))
all_words.extend([w.lower() for w in words])
unique_tokens = len(set(all_words))
avg_sentence_length = np.mean(sentence_lengths) if sentence_lengths else 0
type_token_ratio = unique_tokens / total_tokens if total_tokens > 0 else 0
token_frequency = Counter(all_words)
priority_distribution = {f"priority_{i}": 0 for i in range(1, 5)}
constraint_distribution = {"hard": 0, "soft": 0}
for var in variables:
if var["priority_level"]:
priority_distribution[f"priority_{var['priority_level']}"] += 1
if var["is_hard_constraint"]:
constraint_distribution["hard"] += 1
else:
constraint_distribution["soft"] += 1
category_counts = Counter([v["category"] for v in variables])
coefficient_scores = {}
for var in variables:
priority_weight = self._get_priority_weight(var["priority_level"])
freq_normalized = var["frequency"] / max(
1, max(v["frequency"] for v in variables)
)
coefficient_scores[var["name"]] = {
"priority_weight": priority_weight,
"frequency_normalized": freq_normalized,
"coefficient": (priority_weight * 0.6)
+ (freq_normalized * 0.3)
+ (0.1 * 0.5),
}
statistics = {
"total_variables": len(variables),
"core_values": category_counts.get("core_value", 0),
"hard_constraints": category_counts.get("hard_constraint", 0),
"soft_factors": category_counts.get("factor", 0)
+ category_counts.get("soft_constraint", 0),
"sections": len(sections),
"sentences": len(sentences),
"total_tokens": total_tokens,
"unique_tokens": unique_tokens,
"avg_sentence_length": avg_sentence_length,
"type_token_ratio": type_token_ratio,
"priority_distribution": priority_distribution,
"constraint_distribution": constraint_distribution,
"variable_categories": dict(category_counts),
"variable_frequency_histogram": dict(token_frequency.most_common(50)),
"coefficient_scores": coefficient_scores,
"sentence_length_stats": {
"min": min(sentence_lengths) if sentence_lengths else 0,
"max": max(sentence_lengths) if sentence_lengths else 0,
"mean": avg_sentence_length,
"median": np.median(sentence_lengths) if sentence_lengths else 0,
},
}
return statistics
def _get_priority_weight(self, priority_level: Optional[int]) -> float:
weights = {1: 1.0, 2: 0.75, 3: 0.5, 4: 0.25}
return weights.get(priority_level, 0.1)
def compute_section_statistics(self) -> Dict:
sections = self.db.get_sections_with_embeddings()
section_stats = {}
for section in sections:
content = section.get("content", "")
words = content.split()
section_stats[section["id"]] = {
"token_count": len(words),
"word_count": len([w for w in words if w]),
"hierarchy_level": section.get("hierarchy_level", 0),
"line_range": (
section.get("line_start", 0),
section.get("line_end", 0),
),
}
return section_stats