.agents/memory/scripts/export_embeddings.py
2026-02-23 04:25:01 -07:00

284 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""Export memory embeddings for dashboard visualization.
Supports both modern Signet databases (embeddings in SQLite) and older
template installs that only have memory rows.
"""
import argparse
import json
import sqlite3
import struct
import sys
from pathlib import Path
from typing import Any
AGENTS_DIR = Path.home() / ".agents"
DB_PATH = AGENTS_DIR / "memory" / "memories.db"
DEFAULT_LIMIT = 600
MIN_LIMIT = 1
MAX_LIMIT = 5000
def clamp_limit(value: int) -> int:
return max(MIN_LIMIT, min(MAX_LIMIT, value))
def build_result(
embeddings: list[dict[str, Any]],
total: int,
limit: int,
offset: int,
error: str | None = None,
) -> dict[str, Any]:
return {
"embeddings": embeddings,
"count": len(embeddings),
"total": total,
"limit": limit,
"offset": offset,
"hasMore": offset + limit < total,
"error": error,
}
def parse_tags(raw: Any) -> list[str]:
if raw is None:
return []
if isinstance(raw, list):
tags = [str(tag).strip() for tag in raw if str(tag).strip()]
return tags
if not isinstance(raw, str):
return []
text = raw.strip()
if not text:
return []
if text.startswith("[") and text.endswith("]"):
try:
parsed = json.loads(text)
if isinstance(parsed, list):
return [
tag.strip()
for tag in parsed
if isinstance(tag, str) and tag.strip()
]
except json.JSONDecodeError:
pass
return [tag.strip() for tag in text.split(",") if tag.strip()]
def to_vector(blob: Any, dimensions: Any) -> list[float]:
if blob is None:
return []
if isinstance(blob, memoryview):
raw = blob.tobytes()
elif isinstance(blob, (bytes, bytearray)):
raw = bytes(blob)
else:
return []
if len(raw) < 4:
return []
usable_length = len(raw) - (len(raw) % 4)
floats = [entry[0] for entry in struct.iter_unpack("<f", raw[:usable_length])]
if isinstance(dimensions, int) and 0 < dimensions < len(floats):
return floats[:dimensions]
return floats
def table_exists(db: sqlite3.Connection, table_name: str) -> bool:
row = db.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ? LIMIT 1",
(table_name,),
).fetchone()
return row is not None
def base_embedding_row(row: sqlite3.Row) -> dict[str, Any]:
memory_id = str(row["id"])
content = row["content"] if isinstance(row["content"], str) else ""
importance = (
row["importance"] if isinstance(row["importance"], (int, float)) else 0.5
)
return {
"id": memory_id,
"content": content,
"text": content,
"who": row["who"] or "unknown",
"importance": float(importance),
"type": row["type"] if isinstance(row["type"], str) else None,
"tags": parse_tags(row["tags"]),
"sourceType": "memory",
"sourceId": memory_id,
"createdAt": row["created_at"],
}
def export_embeddings(limit: int, offset: int) -> dict[str, Any]:
if not DB_PATH.exists():
return build_result([], 0, limit, offset, "No database found")
db = sqlite3.connect(str(DB_PATH))
db.row_factory = sqlite3.Row
try:
total_row = db.execute("SELECT COUNT(*) AS count FROM memories").fetchone()
total = int(total_row["count"]) if total_row else 0
rows = db.execute(
"""
SELECT id, content, who, importance, type, tags, created_at
FROM memories
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
embeddings = [base_embedding_row(row) for row in rows]
return build_result(embeddings, total, limit, offset)
finally:
db.close()
def export_with_vectors_from_table(
db: sqlite3.Connection,
limit: int,
offset: int,
) -> dict[str, Any]:
total_row = db.execute(
"""
SELECT COUNT(*) AS count
FROM embeddings e
INNER JOIN memories m ON m.id = e.source_id
WHERE e.source_type = 'memory'
"""
).fetchone()
total = int(total_row["count"]) if total_row else 0
rows = db.execute(
"""
SELECT
m.id,
m.content,
m.who,
m.importance,
m.type,
m.tags,
m.created_at,
e.vector,
e.dimensions,
e.source_type,
e.source_id
FROM embeddings e
INNER JOIN memories m ON m.id = e.source_id
WHERE e.source_type = 'memory'
ORDER BY m.created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
embeddings: list[dict[str, Any]] = []
for row in rows:
item = base_embedding_row(row)
item["sourceType"] = row["source_type"] or "memory"
item["sourceId"] = row["source_id"] or item["id"]
item["vector"] = to_vector(row["vector"], row["dimensions"])
embeddings.append(item)
return build_result(embeddings, total, limit, offset)
def export_with_vectors_via_embed(
db: sqlite3.Connection,
limit: int,
offset: int,
) -> dict[str, Any]:
sys.path.insert(0, str(AGENTS_DIR / "memory" / "scripts"))
try:
from embeddings import embed
except Exception as exc:
return build_result(
[], 0, limit, offset, f"Failed to load embeddings.py: {exc}"
)
total_row = db.execute("SELECT COUNT(*) AS count FROM memories").fetchone()
total = int(total_row["count"]) if total_row else 0
rows = db.execute(
"""
SELECT id, content, who, importance, type, tags, created_at
FROM memories
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
).fetchall()
embeddings: list[dict[str, Any]] = []
for row in rows:
content = row["content"] if isinstance(row["content"], str) else ""
if not content:
continue
try:
vector, _ = embed(content)
except Exception:
continue
item = base_embedding_row(row)
item["vector"] = vector
embeddings.append(item)
return build_result(embeddings, total, limit, offset)
def export_with_vectors(limit: int, offset: int) -> dict[str, Any]:
if not DB_PATH.exists():
return build_result([], 0, limit, offset, "No database found")
db = sqlite3.connect(str(DB_PATH))
db.row_factory = sqlite3.Row
try:
if table_exists(db, "embeddings"):
return export_with_vectors_from_table(db, limit, offset)
return export_with_vectors_via_embed(db, limit, offset)
finally:
db.close()
def main() -> None:
parser = argparse.ArgumentParser(description="Export embeddings for dashboard")
parser.add_argument(
"--with-vectors", action="store_true", help="Include vector arrays"
)
parser.add_argument("--limit", type=int, default=DEFAULT_LIMIT, help="Page size")
parser.add_argument("--offset", type=int, default=0, help="Page offset")
args = parser.parse_args()
limit = clamp_limit(args.limit)
offset = max(0, args.offset)
if args.with_vectors:
result = export_with_vectors(limit, offset)
else:
result = export_embeddings(limit, offset)
print(json.dumps(result))
if __name__ == "__main__":
main()