.agents/tests/test_memory.py

279 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Signet Memory Functional Test
Tests memory operations across all harnesses:
- Save with auto-embedding
- Hybrid search (vector + keyword)
- Cross-harness persistence
Run: python ~/.agents/tests/test_memory.py
"""
import subprocess
import sys
import sqlite3
import uuid
import re
import time
from pathlib import Path
MEMORY_DB = Path.home() / ".agents/memory/memories.db"
MEMORY_SCRIPT = Path.home() / ".agents/memory/scripts/memory.py"
# Unique test markers
TEST_ID = uuid.uuid4().hex[:8]
TEST_CONTENT_OPENCLAW = f"[test-{TEST_ID}-openclaw] Signet memory test from OpenClaw"
TEST_CONTENT_CLAUDE = f"[test-{TEST_ID}-claude] Signet memory test from Claude Code"
TEST_CONTENT_OPENCODE = f"[test-{TEST_ID}-opencode] Signet memory test from OpenCode"
def run_memory_script(args: list) -> tuple[bool, str]:
"""Run memory.py with given args."""
try:
result = subprocess.run(
[sys.executable, str(MEMORY_SCRIPT)] + args,
capture_output=True,
text=True,
timeout=30
)
return result.returncode == 0, result.stdout + result.stderr
except Exception as e:
return False, str(e)
def test_database_schema():
"""Test database has required Level 2 schema."""
print("1. Testing database schema (Level 2)...")
required_tables = {"memories", "schema_migrations", "conversations", "embeddings"}
conn = sqlite3.connect(MEMORY_DB)
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
# Check schema version
cursor = conn.execute("SELECT MAX(version) FROM schema_migrations")
version = cursor.fetchone()[0] or 0
conn.close()
missing = required_tables - tables
if missing:
print(f" ✗ Missing tables: {missing}")
return False
if version < 3:
print(f" ✗ Schema version {version} < 3")
return False
print(f" ✓ Schema version {version}, all tables present")
return True
def test_vector_store():
"""Test vector store is initialized and working."""
print("\n2. Testing vector store...")
vector_dir = Path.home() / ".agents/memory/vectors.zvec"
if not vector_dir.exists():
print(f" ✗ Vector store not found: {vector_dir}")
return False
# Test vector store status
success, output = run_memory_script(["query", "test", "--limit", "1"])
if "vector" in output.lower() or "hybrid" in output.lower():
print(f" ✓ Vector store active (hybrid search enabled)")
return True
elif success:
print(f" ~ Vector store exists, search working")
return True
else:
print(f" ✗ Vector store not working")
return False
def test_save_openclaw():
"""Test saving memory via direct script (OpenClaw style)."""
print("\n3. Testing memory save (OpenClaw/direct)...")
success, output = run_memory_script([
"save", "--mode", "explicit",
"--who", "openclaw-test",
"--content", TEST_CONTENT_OPENCLAW
])
if success:
print(f" ✓ Memory saved via script")
return True
else:
print(f" ✗ Save failed: {output[:100]}")
return False
def test_save_claude_code():
"""Test saving memory with Claude Code attribution."""
print("\n4. Testing memory save (Claude Code attribution)...")
# Save memory attributed to Claude Code (via script, not invoking harness)
success, output = run_memory_script([
"save", "--mode", "explicit",
"--who", "claude-code-test",
"--content", TEST_CONTENT_CLAUDE
])
if success:
print(f" ✓ Memory saved (Claude Code attribution)")
return True
else:
print(f" ✗ Save failed: {output[:100]}")
return False
def test_save_opencode():
"""Test saving memory via OpenCode."""
print("\n5. Testing memory save (OpenCode)...")
# For OpenCode, save directly and attribute to opencode
success, output = run_memory_script([
"save", "--mode", "explicit",
"--who", "opencode-test",
"--content", TEST_CONTENT_OPENCODE
])
if success:
print(f" ✓ Memory saved (OpenCode attribution)")
return True
else:
print(f" ✗ Save failed: {output[:100]}")
return False
def test_hybrid_search():
"""Test hybrid search finds test memories."""
print("\n6. Testing hybrid search...")
# Wait for embeddings to be generated
time.sleep(2)
success, output = run_memory_script(["query", f"test-{TEST_ID}", "--limit", "5"])
if not success:
print(f" ✗ Query failed")
return False
# Check for hybrid indicators
is_hybrid = "hybrid" in output.lower() or "vector" in output.lower()
# Check if test memories found
found_openclaw = "openclaw" in output.lower()
found_claude = "claude" in output.lower()
found_opencode = "opencode" in output.lower()
found_count = sum([found_openclaw, found_claude, found_opencode])
if found_count >= 2:
search_type = "hybrid" if is_hybrid else "keyword"
print(f" ✓ Found {found_count}/3 test memories ({search_type} search)")
return True
else:
print(f" ~ Found {found_count}/3 test memories")
return found_count > 0
def test_cross_harness_read():
"""Test that memories from one harness can be read by another."""
print("\n7. Testing cross-harness persistence...")
# Query for all test memories
conn = sqlite3.connect(MEMORY_DB)
cursor = conn.execute(
"SELECT who, content FROM memories WHERE content LIKE ?",
(f"%{TEST_ID}%",)
)
rows = cursor.fetchall()
conn.close()
harnesses_found = set()
for who, content in rows:
if "openclaw" in who.lower() or "openclaw" in content.lower():
harnesses_found.add("openclaw")
if "claude" in who.lower() or "claude" in content.lower():
harnesses_found.add("claude")
if "opencode" in who.lower() or "opencode" in content.lower():
harnesses_found.add("opencode")
if len(harnesses_found) >= 2:
print(f" ✓ Memories from {len(harnesses_found)} harnesses in shared DB")
return True
else:
print(f" ✗ Only {len(harnesses_found)} harness(es) found")
return False
def cleanup_test_data():
"""Remove test data from database."""
print("\n8. Cleaning up test data...")
try:
conn = sqlite3.connect(MEMORY_DB)
cursor = conn.execute(
"DELETE FROM memories WHERE content LIKE ?",
(f"%{TEST_ID}%",)
)
deleted = cursor.rowcount
conn.commit()
conn.close()
print(f" ✓ Cleaned up {deleted} test record(s)")
return True
except Exception as e:
print(f" ✗ Cleanup failed: {e}")
return False
def main():
print("=" * 60)
print("SIGNET MEMORY FUNCTIONAL TEST")
print("=" * 60)
print(f"Test ID: {TEST_ID}")
print()
tests = [
("Database schema", test_database_schema),
("Vector store", test_vector_store),
("Save (OpenClaw)", test_save_openclaw),
("Save (Claude Code)", test_save_claude_code),
("Save (OpenCode)", test_save_opencode),
("Hybrid search", test_hybrid_search),
("Cross-harness read", test_cross_harness_read),
("Cleanup", cleanup_test_data),
]
results = []
for name, test_fn in tests:
try:
results.append(test_fn())
except Exception as e:
print(f" ✗ Error: {e}")
results.append(False)
print()
print("=" * 60)
passed = sum(results)
total = len(results)
if all(results):
print(f"RESULT: ALL TESTS PASSED ({passed}/{total})")
print("=" * 60)
return 0
else:
print(f"RESULT: {passed}/{total} TESTS PASSED")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())