229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
import numpy as np
|
|
from typing import List, Dict, Optional, Set
|
|
|
|
|
|
class MetadataBuilder:
|
|
def __init__(self, db, quant_analyzer, semantic_analyzer):
|
|
self.db = db
|
|
self.quant_analyzer = quant_analyzer
|
|
self.semantic_analyzer = semantic_analyzer
|
|
|
|
def build_variable_metadata(self) -> List[Dict]:
|
|
variables = self.db.get_variables()
|
|
statistics = self.quant_analyzer.generate_statistics()
|
|
coefficient_scores = statistics.get("coefficient_scores", {})
|
|
centrality = self.quant_analyzer.compute_centrality()
|
|
clusters = self.semantic_analyzer.cluster_variables()
|
|
|
|
variable_metadata = []
|
|
|
|
for var in variables:
|
|
var_id = var["id"]
|
|
var_name = var["name"]
|
|
|
|
metadata = {
|
|
"id": var_id,
|
|
"name": var_name,
|
|
"category": var["category"],
|
|
"priority_level": var["priority_level"],
|
|
"is_hard_constraint": var["is_hard_constraint"],
|
|
"principal_assignment": var["principal_assignment"],
|
|
"frequency": var["frequency"],
|
|
"description": var.get("description", ""),
|
|
"mentions": self._get_variable_mentions(var_id),
|
|
"related_variables": self._get_related_variables(
|
|
var_id, variables, centrality
|
|
),
|
|
"definition": self._get_definition(var_name),
|
|
"coefficient_score": coefficient_scores.get(var_name, {}).get(
|
|
"coefficient", 0.0
|
|
),
|
|
"hierarchy_position": self._get_hierarchy_position(
|
|
var["priority_level"]
|
|
),
|
|
"weight": self._calculate_weight(var, coefficient_scores),
|
|
"centrality_measures": {
|
|
"degree": centrality["degree_centrality"].get(var_id, 0.0),
|
|
"betweenness": centrality["betweenness_centrality"].get(
|
|
var_id, 0.0
|
|
),
|
|
"eigenvector": centrality["eigenvector_centrality"].get(
|
|
var_id, 0.0
|
|
),
|
|
"pagerank": centrality["pagerank"].get(var_id, 0.0),
|
|
},
|
|
"cluster_id": self._find_cluster(var_id, clusters),
|
|
}
|
|
|
|
variable_metadata.append(metadata)
|
|
|
|
return variable_metadata
|
|
|
|
def _get_variable_mentions(self, variable_id: int) -> List[Dict]:
|
|
sentences = self.db.get_sentences()
|
|
variables = self.db.get_variables()
|
|
var_name = next((v["name"] for v in variables if v["id"] == variable_id), "")
|
|
|
|
mentions = []
|
|
sections_map = {}
|
|
|
|
for section in self.db.get_sections_with_embeddings():
|
|
sections_map[section["id"]] = {
|
|
"title": section["title"],
|
|
"path": section.get("path", ""),
|
|
}
|
|
|
|
for sent in sentences:
|
|
if var_name.lower() in sent["text"].lower():
|
|
section_info = sections_map.get(sent["section_id"], {})
|
|
mentions.append(
|
|
{
|
|
"section_id": sent["section_id"],
|
|
"section_title": section_info.get("title", ""),
|
|
"sentence_id": sent["id"],
|
|
"context": sent["text"],
|
|
}
|
|
)
|
|
|
|
return mentions
|
|
|
|
def _get_related_variables(
|
|
self, variable_id: int, variables: List[Dict], centrality: Dict
|
|
) -> List[Dict]:
|
|
related = []
|
|
|
|
for edge in centrality.get("edges", []):
|
|
if edge["source"] == variable_id or edge["target"] == variable_id:
|
|
other_id = (
|
|
edge["target"] if edge["source"] == variable_id else edge["source"]
|
|
)
|
|
other_var = next((v for v in variables if v["id"] == other_id), None)
|
|
|
|
if other_var:
|
|
relationship = self._determine_relationship(
|
|
variable_id, other_id, variables
|
|
)
|
|
|
|
related.append(
|
|
{
|
|
"id": other_id,
|
|
"name": other_var["name"],
|
|
"relationship": relationship,
|
|
"weight": edge["weight"],
|
|
}
|
|
)
|
|
|
|
related.sort(key=lambda x: x["weight"], reverse=True)
|
|
return related[:10]
|
|
|
|
def _determine_relationship(
|
|
self, var_id_1: int, var_id_2: int, variables: List[Dict]
|
|
) -> str:
|
|
var_1 = next((v for v in variables if v["id"] == var_id_1), None)
|
|
var_2 = next((v for v in variables if v["id"] == var_id_2), None)
|
|
|
|
if not var_1 or not var_2:
|
|
return "unknown"
|
|
|
|
if var_1["priority_level"] and var_2["priority_level"]:
|
|
if var_1["priority_level"] < var_2["priority_level"]:
|
|
return "lower_priority"
|
|
elif var_1["priority_level"] > var_2["priority_level"]:
|
|
return "higher_priority"
|
|
|
|
if var_1["category"] == var_2["category"]:
|
|
return f"{var_1['category']}_peer"
|
|
|
|
return "related"
|
|
|
|
def _get_definition(self, var_name: str) -> str:
|
|
sentences = self.db.get_sentences()
|
|
|
|
for sent in sentences:
|
|
if var_name in sent["text"]:
|
|
if ":" in sent["text"] or "means" in sent["text"].lower():
|
|
return sent["text"][:200]
|
|
|
|
return ""
|
|
|
|
def _get_hierarchy_position(self, priority_level: Optional[int]) -> str:
|
|
if priority_level == 1:
|
|
return "top"
|
|
elif priority_level == 2:
|
|
return "high"
|
|
elif priority_level == 3:
|
|
return "medium"
|
|
elif priority_level == 4:
|
|
return "low"
|
|
return "unspecified"
|
|
|
|
def _calculate_weight(self, var: Dict, coefficient_scores: Dict) -> float:
|
|
var_name = var["name"]
|
|
score_data = coefficient_scores.get(var_name, {})
|
|
return score_data.get("coefficient", 0.0)
|
|
|
|
def _find_cluster(
|
|
self, variable_id: int, clusters: Dict[int, List[int]]
|
|
) -> Optional[int]:
|
|
for cluster_id, var_ids in clusters.items():
|
|
if variable_id in var_ids:
|
|
return cluster_id
|
|
return None
|
|
|
|
def build_section_metadata(self) -> List[Dict]:
|
|
sections = self.db.get_sections_with_embeddings()
|
|
section_stats = self.quant_analyzer.compute_section_statistics()
|
|
|
|
section_metadata = []
|
|
|
|
for section in sections:
|
|
section_id = section["id"]
|
|
stats = section_stats.get(section_id, {})
|
|
|
|
metadata = {
|
|
"id": section_id,
|
|
"title": section.get("title", ""),
|
|
"section_type": section.get("section_type", ""),
|
|
"content": section.get("content", "")[:500],
|
|
"path": section.get("path", ""),
|
|
"line_range": (
|
|
section.get("line_start", 0),
|
|
section.get("line_end", 0),
|
|
),
|
|
"hierarchy_level": section.get("hierarchy_level", 0),
|
|
"token_count": stats.get("token_count", 0),
|
|
"embedding_available": self.db.get_embedding(section_id) is not None,
|
|
"similar_sections": self._get_similar_sections(section_id),
|
|
}
|
|
|
|
section_metadata.append(metadata)
|
|
|
|
return section_metadata
|
|
|
|
def _get_similar_sections(self, section_id: int, top_k: int = 5) -> List[Dict]:
|
|
cursor = self.db.conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT s2.id, s2.title, s2.path, sim.similarity_score
|
|
FROM similarity sim
|
|
JOIN sections s2 ON sim.content_id_2 = s2.id
|
|
WHERE sim.content_id_1 = ?
|
|
ORDER BY sim.similarity_score DESC
|
|
LIMIT ?
|
|
""",
|
|
(section_id, top_k),
|
|
)
|
|
|
|
similar_sections = []
|
|
for row in cursor.fetchall():
|
|
similar_sections.append(
|
|
{
|
|
"id": row["id"],
|
|
"title": row["title"],
|
|
"path": row["path"],
|
|
"similarity_score": row["similarity_score"],
|
|
}
|
|
)
|
|
|
|
return similar_sections
|