import numpy as np from typing import List, Dict, Optional, Set class MetadataBuilder: def __init__(self, db, quant_analyzer, semantic_analyzer): self.db = db self.quant_analyzer = quant_analyzer self.semantic_analyzer = semantic_analyzer def build_variable_metadata(self) -> List[Dict]: variables = self.db.get_variables() statistics = self.quant_analyzer.generate_statistics() coefficient_scores = statistics.get("coefficient_scores", {}) centrality = self.quant_analyzer.compute_centrality() clusters = self.semantic_analyzer.cluster_variables() variable_metadata = [] for var in variables: var_id = var["id"] var_name = var["name"] metadata = { "id": var_id, "name": var_name, "category": var["category"], "priority_level": var["priority_level"], "is_hard_constraint": var["is_hard_constraint"], "principal_assignment": var["principal_assignment"], "frequency": var["frequency"], "description": var.get("description", ""), "mentions": self._get_variable_mentions(var_id), "related_variables": self._get_related_variables( var_id, variables, centrality ), "definition": self._get_definition(var_name), "coefficient_score": coefficient_scores.get(var_name, {}).get( "coefficient", 0.0 ), "hierarchy_position": self._get_hierarchy_position( var["priority_level"] ), "weight": self._calculate_weight(var, coefficient_scores), "centrality_measures": { "degree": centrality["degree_centrality"].get(var_id, 0.0), "betweenness": centrality["betweenness_centrality"].get( var_id, 0.0 ), "eigenvector": centrality["eigenvector_centrality"].get( var_id, 0.0 ), "pagerank": centrality["pagerank"].get(var_id, 0.0), }, "cluster_id": self._find_cluster(var_id, clusters), } variable_metadata.append(metadata) return variable_metadata def _get_variable_mentions(self, variable_id: int) -> List[Dict]: sentences = self.db.get_sentences() variables = self.db.get_variables() var_name = next((v["name"] for v in variables if v["id"] == variable_id), "") mentions = [] sections_map = {} for section in self.db.get_sections_with_embeddings(): sections_map[section["id"]] = { "title": section["title"], "path": section.get("path", ""), } for sent in sentences: if var_name.lower() in sent["text"].lower(): section_info = sections_map.get(sent["section_id"], {}) mentions.append( { "section_id": sent["section_id"], "section_title": section_info.get("title", ""), "sentence_id": sent["id"], "context": sent["text"], } ) return mentions def _get_related_variables( self, variable_id: int, variables: List[Dict], centrality: Dict ) -> List[Dict]: related = [] for edge in centrality.get("edges", []): if edge["source"] == variable_id or edge["target"] == variable_id: other_id = ( edge["target"] if edge["source"] == variable_id else edge["source"] ) other_var = next((v for v in variables if v["id"] == other_id), None) if other_var: relationship = self._determine_relationship( variable_id, other_id, variables ) related.append( { "id": other_id, "name": other_var["name"], "relationship": relationship, "weight": edge["weight"], } ) related.sort(key=lambda x: x["weight"], reverse=True) return related[:10] def _determine_relationship( self, var_id_1: int, var_id_2: int, variables: List[Dict] ) -> str: var_1 = next((v for v in variables if v["id"] == var_id_1), None) var_2 = next((v for v in variables if v["id"] == var_id_2), None) if not var_1 or not var_2: return "unknown" if var_1["priority_level"] and var_2["priority_level"]: if var_1["priority_level"] < var_2["priority_level"]: return "lower_priority" elif var_1["priority_level"] > var_2["priority_level"]: return "higher_priority" if var_1["category"] == var_2["category"]: return f"{var_1['category']}_peer" return "related" def _get_definition(self, var_name: str) -> str: sentences = self.db.get_sentences() for sent in sentences: if var_name in sent["text"]: if ":" in sent["text"] or "means" in sent["text"].lower(): return sent["text"][:200] return "" def _get_hierarchy_position(self, priority_level: Optional[int]) -> str: if priority_level == 1: return "top" elif priority_level == 2: return "high" elif priority_level == 3: return "medium" elif priority_level == 4: return "low" return "unspecified" def _calculate_weight(self, var: Dict, coefficient_scores: Dict) -> float: var_name = var["name"] score_data = coefficient_scores.get(var_name, {}) return score_data.get("coefficient", 0.0) def _find_cluster( self, variable_id: int, clusters: Dict[int, List[int]] ) -> Optional[int]: for cluster_id, var_ids in clusters.items(): if variable_id in var_ids: return cluster_id return None def build_section_metadata(self) -> List[Dict]: sections = self.db.get_sections_with_embeddings() section_stats = self.quant_analyzer.compute_section_statistics() section_metadata = [] for section in sections: section_id = section["id"] stats = section_stats.get(section_id, {}) metadata = { "id": section_id, "title": section.get("title", ""), "section_type": section.get("section_type", ""), "content": section.get("content", "")[:500], "path": section.get("path", ""), "line_range": ( section.get("line_start", 0), section.get("line_end", 0), ), "hierarchy_level": section.get("hierarchy_level", 0), "token_count": stats.get("token_count", 0), "embedding_available": self.db.get_embedding(section_id) is not None, "similar_sections": self._get_similar_sections(section_id), } section_metadata.append(metadata) return section_metadata def _get_similar_sections(self, section_id: int, top_k: int = 5) -> List[Dict]: cursor = self.db.conn.cursor() cursor.execute( """ SELECT s2.id, s2.title, s2.path, sim.similarity_score FROM similarity sim JOIN sections s2 ON sim.content_id_2 = s2.id WHERE sim.content_id_1 = ? ORDER BY sim.similarity_score DESC LIMIT ? """, (section_id, top_k), ) similar_sections = [] for row in cursor.fetchall(): similar_sections.append( { "id": row["id"], "title": row["title"], "path": row["path"], "similarity_score": row["similarity_score"], } ) return similar_sections