210 lines
7.4 KiB
Python
210 lines
7.4 KiB
Python
import numpy as np
|
|
import ollama
|
|
from typing import List, Dict, Tuple, Optional
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
from sklearn.cluster import KMeans
|
|
|
|
|
|
class SemanticAnalyzer:
|
|
def __init__(self, db):
|
|
self.db = db
|
|
self.embedding_dim = 768
|
|
|
|
def generate_embedding(self, text: str) -> Optional[np.ndarray]:
|
|
try:
|
|
response = ollama.embeddings(model="nomic-embed-text", prompt=text)
|
|
return np.array(response["embedding"], dtype=np.float32)
|
|
except Exception as e:
|
|
print(f"Error generating embedding: {e}")
|
|
return None
|
|
|
|
def chunk_text(
|
|
self, text: str, max_tokens: int = 500, overlap: int = 50
|
|
) -> List[Tuple[str, int, int]]:
|
|
words = text.split()
|
|
chunks = []
|
|
start_idx = 0
|
|
|
|
while start_idx < len(words):
|
|
end_idx = min(start_idx + max_tokens, len(words))
|
|
chunk_words = words[start_idx:end_idx]
|
|
chunk_text = " ".join(chunk_words)
|
|
|
|
word_count = len(chunk_words)
|
|
char_start = len(" ".join(words[:start_idx]))
|
|
char_end = char_start + len(chunk_text)
|
|
|
|
chunks.append((chunk_text, char_start, char_end))
|
|
|
|
start_idx += max_tokens - overlap
|
|
|
|
if end_idx >= len(words):
|
|
break
|
|
|
|
return chunks
|
|
|
|
def generate_all_embeddings(self):
|
|
sections = self.db.get_sections_with_embeddings()
|
|
sentences = self.db.get_sentences()
|
|
variables = self.db.get_variables()
|
|
|
|
print("Generating document-level embeddings...")
|
|
for section in sections:
|
|
if section["content"]:
|
|
text = section["content"]
|
|
if len(text) > 2000:
|
|
chunks = self.chunk_text(text, max_tokens=500, overlap=50)
|
|
if chunks:
|
|
first_chunk_text = chunks[0][0]
|
|
embedding = self.generate_embedding(first_chunk_text)
|
|
if embedding is not None and np.any(embedding):
|
|
self.db.add_embedding(section["id"], "section", embedding)
|
|
else:
|
|
embedding = self.generate_embedding(text)
|
|
if embedding is not None and np.any(embedding):
|
|
self.db.add_embedding(section["id"], "section", embedding)
|
|
|
|
print("Generating sentence-level embeddings...")
|
|
for sent in sentences[:100]:
|
|
embedding = self.generate_embedding(sent["text"])
|
|
if embedding is not None and np.any(embedding):
|
|
self.db.add_embedding(sent["id"], "sentence", embedding)
|
|
|
|
print("Generating variable-level embeddings...")
|
|
for var in variables:
|
|
text = f"{var['name']}: {var.get('description', '')}"
|
|
embedding = self.generate_embedding(text)
|
|
if embedding is not None and np.any(embedding):
|
|
self.db.add_embedding(var["id"], "variable", embedding)
|
|
|
|
print("Embeddings generated and stored in database.")
|
|
|
|
def compute_similarities(self, top_k: int = 10):
|
|
sections = self.db.get_sections_with_embeddings()
|
|
|
|
embeddings = []
|
|
section_ids = []
|
|
|
|
for section in sections:
|
|
embedding = self.db.get_embedding(section["id"])
|
|
if embedding is not None:
|
|
embeddings.append(embedding)
|
|
section_ids.append(section["id"])
|
|
|
|
if len(embeddings) < 2:
|
|
print("Not enough embeddings to compute similarities.")
|
|
return
|
|
|
|
embeddings_matrix = np.array(embeddings)
|
|
similarity_matrix = cosine_similarity(embeddings_matrix)
|
|
|
|
print(f"Computing top-{top_k} similarities...")
|
|
for i, section_id_1 in enumerate(section_ids):
|
|
similarities = similarity_matrix[i]
|
|
top_indices = np.argsort(similarities)[-top_k - 1 : -1][::-1]
|
|
|
|
for j in top_indices:
|
|
section_id_2 = section_ids[j]
|
|
similarity_score = float(similarities[j])
|
|
|
|
if section_id_1 != section_id_2:
|
|
self.db.add_similarity(section_id_1, section_id_2, similarity_score)
|
|
|
|
print("Similarities computed and stored.")
|
|
|
|
def find_similar_sections(self, query: str, top_k: int = 5) -> List[Dict]:
|
|
query_embedding = self.generate_embedding(query)
|
|
if query_embedding is None:
|
|
return []
|
|
|
|
sections = self.db.get_sections_with_embeddings()
|
|
|
|
similarities = []
|
|
for section in sections:
|
|
embedding = self.db.get_embedding(section["id"])
|
|
if embedding is not None:
|
|
similarity = float(
|
|
cosine_similarity([query_embedding], [embedding])[0][0]
|
|
)
|
|
|
|
similarities.append({"section": section, "similarity": similarity})
|
|
|
|
similarities.sort(key=lambda x: x["similarity"], reverse=True)
|
|
return similarities[:top_k]
|
|
|
|
def cluster_variables(self, n_clusters: int = 5) -> Dict[int, List[int]]:
|
|
variables = self.db.get_variables()
|
|
|
|
embeddings = []
|
|
var_ids = []
|
|
|
|
for var in variables:
|
|
embedding = self.db.get_embedding(var["id"])
|
|
if embedding is not None:
|
|
embeddings.append(embedding)
|
|
var_ids.append(var["id"])
|
|
|
|
if len(embeddings) < n_clusters:
|
|
print(
|
|
f"Not enough variables for {n_clusters} clusters. Using {len(embeddings)} clusters."
|
|
)
|
|
n_clusters = max(1, len(embeddings))
|
|
|
|
embeddings_matrix = np.array(embeddings)
|
|
|
|
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init="auto")
|
|
cluster_labels = kmeans.fit_predict(embeddings_matrix)
|
|
|
|
clusters = {}
|
|
for var_id, label in zip(var_ids, cluster_labels):
|
|
if label not in clusters:
|
|
clusters[label] = []
|
|
clusters[label].append(var_id)
|
|
|
|
return clusters
|
|
|
|
def compute_cluster_centers(
|
|
self, clusters: Dict[int, List[int]]
|
|
) -> Dict[int, np.ndarray]:
|
|
cluster_centers = {}
|
|
|
|
for cluster_id, var_ids in clusters.items():
|
|
embeddings = []
|
|
for var_id in var_ids:
|
|
embedding = self.db.get_embedding(var_id)
|
|
if embedding is not None:
|
|
embeddings.append(embedding)
|
|
|
|
if embeddings:
|
|
cluster_centers[cluster_id] = np.mean(embeddings, axis=0)
|
|
|
|
return cluster_centers
|
|
|
|
def get_embeddings_metadata(self) -> Dict:
|
|
sections = self.db.get_sections_with_embeddings()
|
|
sentences = self.db.get_sentences()
|
|
variables = self.db.get_variables()
|
|
|
|
section_embeddings = sum(
|
|
1 for s in sections if self.db.get_embedding(s["id"]) is not None
|
|
)
|
|
sentence_embeddings = sum(
|
|
1 for s in sentences if self.db.get_embedding(s["id"]) is not None
|
|
)
|
|
variable_embeddings = sum(
|
|
1 for v in variables if self.db.get_embedding(v["id"]) is not None
|
|
)
|
|
|
|
metadata = {
|
|
"total_embeddings": section_embeddings
|
|
+ sentence_embeddings
|
|
+ variable_embeddings,
|
|
"section_embeddings": section_embeddings,
|
|
"sentence_embeddings": sentence_embeddings,
|
|
"variable_embeddings": variable_embeddings,
|
|
"embedding_dimension": self.embedding_dim,
|
|
"embedding_model": "nomic-embed-text",
|
|
}
|
|
|
|
return metadata
|