170 lines
6.1 KiB
Python
170 lines
6.1 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
from typing import List, Dict, Tuple, Optional
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
import networkx as nx
|
|
from collections import Counter
|
|
|
|
|
|
class QuantitativeAnalyzer:
|
|
def __init__(self, db):
|
|
self.db = db
|
|
|
|
def compute_tfidf(self) -> Dict:
|
|
sentences = self.db.get_sentences()
|
|
texts = [s["text"] for s in sentences]
|
|
sentence_ids = [s["id"] for s in sentences]
|
|
|
|
vectorizer = TfidfVectorizer(max_features=1000, stop_words="english")
|
|
tfidf_matrix = vectorizer.fit_transform(texts)
|
|
|
|
feature_names = vectorizer.get_feature_names_out()
|
|
|
|
tfidf_results = {
|
|
"matrix": tfidf_matrix.toarray().tolist(),
|
|
"feature_names": feature_names.tolist(),
|
|
"sentence_ids": sentence_ids,
|
|
}
|
|
|
|
return tfidf_results
|
|
|
|
def compute_centrality(self) -> Dict:
|
|
variables = self.db.get_variables()
|
|
|
|
G = nx.Graph()
|
|
|
|
for var in variables:
|
|
G.add_node(var["id"], name=var["name"], frequency=var["frequency"])
|
|
|
|
sentences = self.db.get_sentences()
|
|
var_names = [v["name"].lower() for v in variables]
|
|
|
|
for sent in sentences:
|
|
text = sent["text"].lower()
|
|
found_vars = []
|
|
|
|
for i, var_name in enumerate(var_names):
|
|
if var_name in text:
|
|
found_vars.append(variables[i]["id"])
|
|
|
|
for i in range(len(found_vars)):
|
|
for j in range(i + 1, len(found_vars)):
|
|
if G.has_edge(found_vars[i], found_vars[j]):
|
|
G[found_vars[i]][found_vars[j]]["weight"] += 1
|
|
else:
|
|
G.add_edge(found_vars[i], found_vars[j], weight=1)
|
|
|
|
centrality_results = {
|
|
"degree_centrality": nx.degree_centrality(G),
|
|
"betweenness_centrality": nx.betweenness_centrality(G),
|
|
"eigenvector_centrality": nx.eigenvector_centrality(G, max_iter=1000),
|
|
"pagerank": nx.pagerank(G),
|
|
"edges": [
|
|
{"source": u, "target": v, "weight": d["weight"]}
|
|
for u, v, d in G.edges(data=True)
|
|
],
|
|
}
|
|
|
|
return centrality_results
|
|
|
|
def generate_statistics(self) -> Dict:
|
|
sections = self.db.get_sections_with_embeddings()
|
|
sentences = self.db.get_sentences()
|
|
variables = self.db.get_variables()
|
|
|
|
total_tokens = 0
|
|
sentence_lengths = []
|
|
all_words = []
|
|
|
|
for sent in sentences:
|
|
words = sent["text"].split()
|
|
total_tokens += len(words)
|
|
sentence_lengths.append(len(words))
|
|
all_words.extend([w.lower() for w in words])
|
|
|
|
unique_tokens = len(set(all_words))
|
|
avg_sentence_length = np.mean(sentence_lengths) if sentence_lengths else 0
|
|
|
|
type_token_ratio = unique_tokens / total_tokens if total_tokens > 0 else 0
|
|
|
|
token_frequency = Counter(all_words)
|
|
|
|
priority_distribution = {f"priority_{i}": 0 for i in range(1, 5)}
|
|
constraint_distribution = {"hard": 0, "soft": 0}
|
|
|
|
for var in variables:
|
|
if var["priority_level"]:
|
|
priority_distribution[f"priority_{var['priority_level']}"] += 1
|
|
if var["is_hard_constraint"]:
|
|
constraint_distribution["hard"] += 1
|
|
else:
|
|
constraint_distribution["soft"] += 1
|
|
|
|
category_counts = Counter([v["category"] for v in variables])
|
|
|
|
coefficient_scores = {}
|
|
for var in variables:
|
|
priority_weight = self._get_priority_weight(var["priority_level"])
|
|
freq_normalized = var["frequency"] / max(
|
|
1, max(v["frequency"] for v in variables)
|
|
)
|
|
coefficient_scores[var["name"]] = {
|
|
"priority_weight": priority_weight,
|
|
"frequency_normalized": freq_normalized,
|
|
"coefficient": (priority_weight * 0.6)
|
|
+ (freq_normalized * 0.3)
|
|
+ (0.1 * 0.5),
|
|
}
|
|
|
|
statistics = {
|
|
"total_variables": len(variables),
|
|
"core_values": category_counts.get("core_value", 0),
|
|
"hard_constraints": category_counts.get("hard_constraint", 0),
|
|
"soft_factors": category_counts.get("factor", 0)
|
|
+ category_counts.get("soft_constraint", 0),
|
|
"sections": len(sections),
|
|
"sentences": len(sentences),
|
|
"total_tokens": total_tokens,
|
|
"unique_tokens": unique_tokens,
|
|
"avg_sentence_length": avg_sentence_length,
|
|
"type_token_ratio": type_token_ratio,
|
|
"priority_distribution": priority_distribution,
|
|
"constraint_distribution": constraint_distribution,
|
|
"variable_categories": dict(category_counts),
|
|
"variable_frequency_histogram": dict(token_frequency.most_common(50)),
|
|
"coefficient_scores": coefficient_scores,
|
|
"sentence_length_stats": {
|
|
"min": min(sentence_lengths) if sentence_lengths else 0,
|
|
"max": max(sentence_lengths) if sentence_lengths else 0,
|
|
"mean": avg_sentence_length,
|
|
"median": np.median(sentence_lengths) if sentence_lengths else 0,
|
|
},
|
|
}
|
|
|
|
return statistics
|
|
|
|
def _get_priority_weight(self, priority_level: Optional[int]) -> float:
|
|
weights = {1: 1.0, 2: 0.75, 3: 0.5, 4: 0.25}
|
|
return weights.get(priority_level, 0.1)
|
|
|
|
def compute_section_statistics(self) -> Dict:
|
|
sections = self.db.get_sections_with_embeddings()
|
|
section_stats = {}
|
|
|
|
for section in sections:
|
|
content = section.get("content", "")
|
|
words = content.split()
|
|
|
|
section_stats[section["id"]] = {
|
|
"token_count": len(words),
|
|
"word_count": len([w for w in words if w]),
|
|
"hierarchy_level": section.get("hierarchy_level", 0),
|
|
"line_range": (
|
|
section.get("line_start", 0),
|
|
section.get("line_end", 0),
|
|
),
|
|
}
|
|
|
|
return section_stats
|