279 lines
8.0 KiB
Python
Executable File
279 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Signet Memory Functional Test
|
|
|
|
Tests memory operations across all harnesses:
|
|
- Save with auto-embedding
|
|
- Hybrid search (vector + keyword)
|
|
- Cross-harness persistence
|
|
|
|
Run: python ~/.agents/tests/test_memory.py
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import sqlite3
|
|
import uuid
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
|
|
MEMORY_DB = Path.home() / ".agents/memory/memories.db"
|
|
MEMORY_SCRIPT = Path.home() / ".agents/memory/scripts/memory.py"
|
|
|
|
# Unique test markers
|
|
TEST_ID = uuid.uuid4().hex[:8]
|
|
TEST_CONTENT_OPENCLAW = f"[test-{TEST_ID}-openclaw] Signet memory test from OpenClaw"
|
|
TEST_CONTENT_CLAUDE = f"[test-{TEST_ID}-claude] Signet memory test from Claude Code"
|
|
TEST_CONTENT_OPENCODE = f"[test-{TEST_ID}-opencode] Signet memory test from OpenCode"
|
|
|
|
|
|
def run_memory_script(args: list) -> tuple[bool, str]:
|
|
"""Run memory.py with given args."""
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, str(MEMORY_SCRIPT)] + args,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
return result.returncode == 0, result.stdout + result.stderr
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def test_database_schema():
|
|
"""Test database has required Level 2 schema."""
|
|
print("1. Testing database schema (Level 2)...")
|
|
|
|
required_tables = {"memories", "schema_migrations", "conversations", "embeddings"}
|
|
|
|
conn = sqlite3.connect(MEMORY_DB)
|
|
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = {row[0] for row in cursor.fetchall()}
|
|
|
|
# Check schema version
|
|
cursor = conn.execute("SELECT MAX(version) FROM schema_migrations")
|
|
version = cursor.fetchone()[0] or 0
|
|
conn.close()
|
|
|
|
missing = required_tables - tables
|
|
|
|
if missing:
|
|
print(f" ✗ Missing tables: {missing}")
|
|
return False
|
|
|
|
if version < 3:
|
|
print(f" ✗ Schema version {version} < 3")
|
|
return False
|
|
|
|
print(f" ✓ Schema version {version}, all tables present")
|
|
return True
|
|
|
|
|
|
def test_vector_store():
|
|
"""Test vector store is initialized and working."""
|
|
print("\n2. Testing vector store...")
|
|
|
|
vector_dir = Path.home() / ".agents/memory/vectors.zvec"
|
|
|
|
if not vector_dir.exists():
|
|
print(f" ✗ Vector store not found: {vector_dir}")
|
|
return False
|
|
|
|
# Test vector store status
|
|
success, output = run_memory_script(["query", "test", "--limit", "1"])
|
|
|
|
if "vector" in output.lower() or "hybrid" in output.lower():
|
|
print(f" ✓ Vector store active (hybrid search enabled)")
|
|
return True
|
|
elif success:
|
|
print(f" ~ Vector store exists, search working")
|
|
return True
|
|
else:
|
|
print(f" ✗ Vector store not working")
|
|
return False
|
|
|
|
|
|
def test_save_openclaw():
|
|
"""Test saving memory via direct script (OpenClaw style)."""
|
|
print("\n3. Testing memory save (OpenClaw/direct)...")
|
|
|
|
success, output = run_memory_script([
|
|
"save", "--mode", "explicit",
|
|
"--who", "openclaw-test",
|
|
"--content", TEST_CONTENT_OPENCLAW
|
|
])
|
|
|
|
if success:
|
|
print(f" ✓ Memory saved via script")
|
|
return True
|
|
else:
|
|
print(f" ✗ Save failed: {output[:100]}")
|
|
return False
|
|
|
|
|
|
def test_save_claude_code():
|
|
"""Test saving memory with Claude Code attribution."""
|
|
print("\n4. Testing memory save (Claude Code attribution)...")
|
|
|
|
# Save memory attributed to Claude Code (via script, not invoking harness)
|
|
success, output = run_memory_script([
|
|
"save", "--mode", "explicit",
|
|
"--who", "claude-code-test",
|
|
"--content", TEST_CONTENT_CLAUDE
|
|
])
|
|
|
|
if success:
|
|
print(f" ✓ Memory saved (Claude Code attribution)")
|
|
return True
|
|
else:
|
|
print(f" ✗ Save failed: {output[:100]}")
|
|
return False
|
|
|
|
|
|
def test_save_opencode():
|
|
"""Test saving memory via OpenCode."""
|
|
print("\n5. Testing memory save (OpenCode)...")
|
|
|
|
# For OpenCode, save directly and attribute to opencode
|
|
success, output = run_memory_script([
|
|
"save", "--mode", "explicit",
|
|
"--who", "opencode-test",
|
|
"--content", TEST_CONTENT_OPENCODE
|
|
])
|
|
|
|
if success:
|
|
print(f" ✓ Memory saved (OpenCode attribution)")
|
|
return True
|
|
else:
|
|
print(f" ✗ Save failed: {output[:100]}")
|
|
return False
|
|
|
|
|
|
def test_hybrid_search():
|
|
"""Test hybrid search finds test memories."""
|
|
print("\n6. Testing hybrid search...")
|
|
|
|
# Wait for embeddings to be generated
|
|
time.sleep(2)
|
|
|
|
success, output = run_memory_script(["query", f"test-{TEST_ID}", "--limit", "5"])
|
|
|
|
if not success:
|
|
print(f" ✗ Query failed")
|
|
return False
|
|
|
|
# Check for hybrid indicators
|
|
is_hybrid = "hybrid" in output.lower() or "vector" in output.lower()
|
|
|
|
# Check if test memories found
|
|
found_openclaw = "openclaw" in output.lower()
|
|
found_claude = "claude" in output.lower()
|
|
found_opencode = "opencode" in output.lower()
|
|
|
|
found_count = sum([found_openclaw, found_claude, found_opencode])
|
|
|
|
if found_count >= 2:
|
|
search_type = "hybrid" if is_hybrid else "keyword"
|
|
print(f" ✓ Found {found_count}/3 test memories ({search_type} search)")
|
|
return True
|
|
else:
|
|
print(f" ~ Found {found_count}/3 test memories")
|
|
return found_count > 0
|
|
|
|
|
|
def test_cross_harness_read():
|
|
"""Test that memories from one harness can be read by another."""
|
|
print("\n7. Testing cross-harness persistence...")
|
|
|
|
# Query for all test memories
|
|
conn = sqlite3.connect(MEMORY_DB)
|
|
cursor = conn.execute(
|
|
"SELECT who, content FROM memories WHERE content LIKE ?",
|
|
(f"%{TEST_ID}%",)
|
|
)
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
|
|
harnesses_found = set()
|
|
for who, content in rows:
|
|
if "openclaw" in who.lower() or "openclaw" in content.lower():
|
|
harnesses_found.add("openclaw")
|
|
if "claude" in who.lower() or "claude" in content.lower():
|
|
harnesses_found.add("claude")
|
|
if "opencode" in who.lower() or "opencode" in content.lower():
|
|
harnesses_found.add("opencode")
|
|
|
|
if len(harnesses_found) >= 2:
|
|
print(f" ✓ Memories from {len(harnesses_found)} harnesses in shared DB")
|
|
return True
|
|
else:
|
|
print(f" ✗ Only {len(harnesses_found)} harness(es) found")
|
|
return False
|
|
|
|
|
|
def cleanup_test_data():
|
|
"""Remove test data from database."""
|
|
print("\n8. Cleaning up test data...")
|
|
|
|
try:
|
|
conn = sqlite3.connect(MEMORY_DB)
|
|
cursor = conn.execute(
|
|
"DELETE FROM memories WHERE content LIKE ?",
|
|
(f"%{TEST_ID}%",)
|
|
)
|
|
deleted = cursor.rowcount
|
|
conn.commit()
|
|
conn.close()
|
|
print(f" ✓ Cleaned up {deleted} test record(s)")
|
|
return True
|
|
except Exception as e:
|
|
print(f" ✗ Cleanup failed: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("SIGNET MEMORY FUNCTIONAL TEST")
|
|
print("=" * 60)
|
|
print(f"Test ID: {TEST_ID}")
|
|
print()
|
|
|
|
tests = [
|
|
("Database schema", test_database_schema),
|
|
("Vector store", test_vector_store),
|
|
("Save (OpenClaw)", test_save_openclaw),
|
|
("Save (Claude Code)", test_save_claude_code),
|
|
("Save (OpenCode)", test_save_opencode),
|
|
("Hybrid search", test_hybrid_search),
|
|
("Cross-harness read", test_cross_harness_read),
|
|
("Cleanup", cleanup_test_data),
|
|
]
|
|
|
|
results = []
|
|
for name, test_fn in tests:
|
|
try:
|
|
results.append(test_fn())
|
|
except Exception as e:
|
|
print(f" ✗ Error: {e}")
|
|
results.append(False)
|
|
|
|
print()
|
|
print("=" * 60)
|
|
passed = sum(results)
|
|
total = len(results)
|
|
|
|
if all(results):
|
|
print(f"RESULT: ALL TESTS PASSED ({passed}/{total})")
|
|
print("=" * 60)
|
|
return 0
|
|
else:
|
|
print(f"RESULT: {passed}/{total} TESTS PASSED")
|
|
print("=" * 60)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|