229 lines
8.2 KiB
Python

import numpy as np
from typing import List, Dict, Optional, Set
class MetadataBuilder:
def __init__(self, db, quant_analyzer, semantic_analyzer):
self.db = db
self.quant_analyzer = quant_analyzer
self.semantic_analyzer = semantic_analyzer
def build_variable_metadata(self) -> List[Dict]:
variables = self.db.get_variables()
statistics = self.quant_analyzer.generate_statistics()
coefficient_scores = statistics.get("coefficient_scores", {})
centrality = self.quant_analyzer.compute_centrality()
clusters = self.semantic_analyzer.cluster_variables()
variable_metadata = []
for var in variables:
var_id = var["id"]
var_name = var["name"]
metadata = {
"id": var_id,
"name": var_name,
"category": var["category"],
"priority_level": var["priority_level"],
"is_hard_constraint": var["is_hard_constraint"],
"principal_assignment": var["principal_assignment"],
"frequency": var["frequency"],
"description": var.get("description", ""),
"mentions": self._get_variable_mentions(var_id),
"related_variables": self._get_related_variables(
var_id, variables, centrality
),
"definition": self._get_definition(var_name),
"coefficient_score": coefficient_scores.get(var_name, {}).get(
"coefficient", 0.0
),
"hierarchy_position": self._get_hierarchy_position(
var["priority_level"]
),
"weight": self._calculate_weight(var, coefficient_scores),
"centrality_measures": {
"degree": centrality["degree_centrality"].get(var_id, 0.0),
"betweenness": centrality["betweenness_centrality"].get(
var_id, 0.0
),
"eigenvector": centrality["eigenvector_centrality"].get(
var_id, 0.0
),
"pagerank": centrality["pagerank"].get(var_id, 0.0),
},
"cluster_id": self._find_cluster(var_id, clusters),
}
variable_metadata.append(metadata)
return variable_metadata
def _get_variable_mentions(self, variable_id: int) -> List[Dict]:
sentences = self.db.get_sentences()
variables = self.db.get_variables()
var_name = next((v["name"] for v in variables if v["id"] == variable_id), "")
mentions = []
sections_map = {}
for section in self.db.get_sections_with_embeddings():
sections_map[section["id"]] = {
"title": section["title"],
"path": section.get("path", ""),
}
for sent in sentences:
if var_name.lower() in sent["text"].lower():
section_info = sections_map.get(sent["section_id"], {})
mentions.append(
{
"section_id": sent["section_id"],
"section_title": section_info.get("title", ""),
"sentence_id": sent["id"],
"context": sent["text"],
}
)
return mentions
def _get_related_variables(
self, variable_id: int, variables: List[Dict], centrality: Dict
) -> List[Dict]:
related = []
for edge in centrality.get("edges", []):
if edge["source"] == variable_id or edge["target"] == variable_id:
other_id = (
edge["target"] if edge["source"] == variable_id else edge["source"]
)
other_var = next((v for v in variables if v["id"] == other_id), None)
if other_var:
relationship = self._determine_relationship(
variable_id, other_id, variables
)
related.append(
{
"id": other_id,
"name": other_var["name"],
"relationship": relationship,
"weight": edge["weight"],
}
)
related.sort(key=lambda x: x["weight"], reverse=True)
return related[:10]
def _determine_relationship(
self, var_id_1: int, var_id_2: int, variables: List[Dict]
) -> str:
var_1 = next((v for v in variables if v["id"] == var_id_1), None)
var_2 = next((v for v in variables if v["id"] == var_id_2), None)
if not var_1 or not var_2:
return "unknown"
if var_1["priority_level"] and var_2["priority_level"]:
if var_1["priority_level"] < var_2["priority_level"]:
return "lower_priority"
elif var_1["priority_level"] > var_2["priority_level"]:
return "higher_priority"
if var_1["category"] == var_2["category"]:
return f"{var_1['category']}_peer"
return "related"
def _get_definition(self, var_name: str) -> str:
sentences = self.db.get_sentences()
for sent in sentences:
if var_name in sent["text"]:
if ":" in sent["text"] or "means" in sent["text"].lower():
return sent["text"][:200]
return ""
def _get_hierarchy_position(self, priority_level: Optional[int]) -> str:
if priority_level == 1:
return "top"
elif priority_level == 2:
return "high"
elif priority_level == 3:
return "medium"
elif priority_level == 4:
return "low"
return "unspecified"
def _calculate_weight(self, var: Dict, coefficient_scores: Dict) -> float:
var_name = var["name"]
score_data = coefficient_scores.get(var_name, {})
return score_data.get("coefficient", 0.0)
def _find_cluster(
self, variable_id: int, clusters: Dict[int, List[int]]
) -> Optional[int]:
for cluster_id, var_ids in clusters.items():
if variable_id in var_ids:
return cluster_id
return None
def build_section_metadata(self) -> List[Dict]:
sections = self.db.get_sections_with_embeddings()
section_stats = self.quant_analyzer.compute_section_statistics()
section_metadata = []
for section in sections:
section_id = section["id"]
stats = section_stats.get(section_id, {})
metadata = {
"id": section_id,
"title": section.get("title", ""),
"section_type": section.get("section_type", ""),
"content": section.get("content", "")[:500],
"path": section.get("path", ""),
"line_range": (
section.get("line_start", 0),
section.get("line_end", 0),
),
"hierarchy_level": section.get("hierarchy_level", 0),
"token_count": stats.get("token_count", 0),
"embedding_available": self.db.get_embedding(section_id) is not None,
"similar_sections": self._get_similar_sections(section_id),
}
section_metadata.append(metadata)
return section_metadata
def _get_similar_sections(self, section_id: int, top_k: int = 5) -> List[Dict]:
cursor = self.db.conn.cursor()
cursor.execute(
"""
SELECT s2.id, s2.title, s2.path, sim.similarity_score
FROM similarity sim
JOIN sections s2 ON sim.content_id_2 = s2.id
WHERE sim.content_id_1 = ?
ORDER BY sim.similarity_score DESC
LIMIT ?
""",
(section_id, top_k),
)
similar_sections = []
for row in cursor.fetchall():
similar_sections.append(
{
"id": row["id"],
"title": row["title"],
"path": row["path"],
"similarity_score": row["similarity_score"],
}
)
return similar_sections