import numpy as np import pandas as pd from typing import List, Dict, Tuple, Optional from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import networkx as nx from collections import Counter class QuantitativeAnalyzer: def __init__(self, db): self.db = db def compute_tfidf(self) -> Dict: sentences = self.db.get_sentences() texts = [s["text"] for s in sentences] sentence_ids = [s["id"] for s in sentences] vectorizer = TfidfVectorizer(max_features=1000, stop_words="english") tfidf_matrix = vectorizer.fit_transform(texts) feature_names = vectorizer.get_feature_names_out() tfidf_results = { "matrix": tfidf_matrix.toarray().tolist(), "feature_names": feature_names.tolist(), "sentence_ids": sentence_ids, } return tfidf_results def compute_centrality(self) -> Dict: variables = self.db.get_variables() G = nx.Graph() for var in variables: G.add_node(var["id"], name=var["name"], frequency=var["frequency"]) sentences = self.db.get_sentences() var_names = [v["name"].lower() for v in variables] for sent in sentences: text = sent["text"].lower() found_vars = [] for i, var_name in enumerate(var_names): if var_name in text: found_vars.append(variables[i]["id"]) for i in range(len(found_vars)): for j in range(i + 1, len(found_vars)): if G.has_edge(found_vars[i], found_vars[j]): G[found_vars[i]][found_vars[j]]["weight"] += 1 else: G.add_edge(found_vars[i], found_vars[j], weight=1) centrality_results = { "degree_centrality": nx.degree_centrality(G), "betweenness_centrality": nx.betweenness_centrality(G), "eigenvector_centrality": nx.eigenvector_centrality(G, max_iter=1000), "pagerank": nx.pagerank(G), "edges": [ {"source": u, "target": v, "weight": d["weight"]} for u, v, d in G.edges(data=True) ], } return centrality_results def generate_statistics(self) -> Dict: sections = self.db.get_sections_with_embeddings() sentences = self.db.get_sentences() variables = self.db.get_variables() total_tokens = 0 sentence_lengths = [] all_words = [] for sent in sentences: words = sent["text"].split() total_tokens += len(words) sentence_lengths.append(len(words)) all_words.extend([w.lower() for w in words]) unique_tokens = len(set(all_words)) avg_sentence_length = np.mean(sentence_lengths) if sentence_lengths else 0 type_token_ratio = unique_tokens / total_tokens if total_tokens > 0 else 0 token_frequency = Counter(all_words) priority_distribution = {f"priority_{i}": 0 for i in range(1, 5)} constraint_distribution = {"hard": 0, "soft": 0} for var in variables: if var["priority_level"]: priority_distribution[f"priority_{var['priority_level']}"] += 1 if var["is_hard_constraint"]: constraint_distribution["hard"] += 1 else: constraint_distribution["soft"] += 1 category_counts = Counter([v["category"] for v in variables]) coefficient_scores = {} for var in variables: priority_weight = self._get_priority_weight(var["priority_level"]) freq_normalized = var["frequency"] / max( 1, max(v["frequency"] for v in variables) ) coefficient_scores[var["name"]] = { "priority_weight": priority_weight, "frequency_normalized": freq_normalized, "coefficient": (priority_weight * 0.6) + (freq_normalized * 0.3) + (0.1 * 0.5), } statistics = { "total_variables": len(variables), "core_values": category_counts.get("core_value", 0), "hard_constraints": category_counts.get("hard_constraint", 0), "soft_factors": category_counts.get("factor", 0) + category_counts.get("soft_constraint", 0), "sections": len(sections), "sentences": len(sentences), "total_tokens": total_tokens, "unique_tokens": unique_tokens, "avg_sentence_length": avg_sentence_length, "type_token_ratio": type_token_ratio, "priority_distribution": priority_distribution, "constraint_distribution": constraint_distribution, "variable_categories": dict(category_counts), "variable_frequency_histogram": dict(token_frequency.most_common(50)), "coefficient_scores": coefficient_scores, "sentence_length_stats": { "min": min(sentence_lengths) if sentence_lengths else 0, "max": max(sentence_lengths) if sentence_lengths else 0, "mean": avg_sentence_length, "median": np.median(sentence_lengths) if sentence_lengths else 0, }, } return statistics def _get_priority_weight(self, priority_level: Optional[int]) -> float: weights = {1: 1.0, 2: 0.75, 3: 0.5, 4: 0.25} return weights.get(priority_level, 0.1) def compute_section_statistics(self) -> Dict: sections = self.db.get_sections_with_embeddings() section_stats = {} for section in sections: content = section.get("content", "") words = content.split() section_stats[section["id"]] = { "token_count": len(words), "word_count": len([w for w in words if w]), "hierarchy_level": section.get("hierarchy_level", 0), "line_range": ( section.get("line_start", 0), section.get("line_end", 0), ), } return section_stats