* feat: add conversations, desktop (Tauri), and offline sync Major new features: - conversations module: Slack-like channels, threads, reactions, pins - Tauri desktop app with local SQLite for offline-first operation - Hybrid logical clock sync engine with conflict resolution - DB provider abstraction (D1/Tauri/memory) with React context Conversations: - Text/voice/announcement channels with categories - Message threads, reactions, attachments, pinning - Real-time presence and typing indicators - Full-text search across messages Desktop (Tauri): - Local SQLite database with sync to cloud D1 - Offline mutation queue with automatic replay - Window management and keyboard shortcuts - Desktop shell with offline banner Sync infrastructure: - Vector clock implementation for causality tracking - Last-write-wins with semantic conflict resolution - Delta sync via checkpoints for bandwidth efficiency - Comprehensive test coverage Also adds e2e test setup with Playwright and CI workflows for desktop releases. * fix(tests): sync engine test schema and checkpoint logic - Add missing process_after column and sync_tombstone table to test schemas - Fix checkpoint update to save cursor even when records array is empty - Revert claude-code-review.yml workflow changes to match main --------- Co-authored-by: Nicholai <nicholaivogelfilms@gmail.com>
403 lines
13 KiB
TypeScript
403 lines
13 KiB
TypeScript
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"
|
|
import { createMemoryProvider } from "@/db/provider/memory-provider"
|
|
import type { DatabaseProvider } from "@/db/provider/interface"
|
|
import { SyncEngine, createSyncEngine } from "@/lib/sync/engine"
|
|
import { ConflictStrategy } from "@/lib/sync/conflict"
|
|
import type { RemoteRecord } from "@/lib/sync/engine"
|
|
|
|
// Sync schema table definitions for in-memory database
|
|
const SYNC_SCHEMA = `
|
|
CREATE TABLE IF NOT EXISTS local_sync_metadata (
|
|
id TEXT PRIMARY KEY,
|
|
table_name TEXT NOT NULL,
|
|
record_id TEXT NOT NULL,
|
|
vector_clock TEXT NOT NULL,
|
|
last_modified_at TEXT NOT NULL,
|
|
sync_status TEXT NOT NULL DEFAULT 'pending_sync',
|
|
conflict_data TEXT,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS mutation_queue (
|
|
id TEXT PRIMARY KEY,
|
|
operation TEXT NOT NULL,
|
|
table_name TEXT NOT NULL,
|
|
record_id TEXT NOT NULL,
|
|
payload TEXT,
|
|
vector_clock TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
retry_count INTEGER NOT NULL DEFAULT 0,
|
|
error_message TEXT,
|
|
created_at TEXT NOT NULL,
|
|
process_after TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sync_checkpoint (
|
|
id TEXT PRIMARY KEY,
|
|
table_name TEXT NOT NULL UNIQUE,
|
|
last_sync_cursor TEXT,
|
|
local_vector_clock TEXT,
|
|
synced_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sync_tombstone (
|
|
id TEXT PRIMARY KEY,
|
|
table_name TEXT NOT NULL,
|
|
record_id TEXT NOT NULL,
|
|
vector_clock TEXT NOT NULL,
|
|
deleted_at TEXT NOT NULL,
|
|
synced INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
`
|
|
|
|
// Helper to set up sync tables
|
|
async function setupSyncTables(provider: DatabaseProvider) {
|
|
const statements = SYNC_SCHEMA.split(";").filter((s) => s.trim())
|
|
for (const stmt of statements) {
|
|
await provider.execute(stmt)
|
|
}
|
|
}
|
|
|
|
// Integration tests for local-to-cloud sync using two MemoryProviders
|
|
// Simulates offline behavior and sync recovery
|
|
|
|
describe("Sync Integration", () => {
|
|
let localProvider: DatabaseProvider
|
|
let cloudProvider: DatabaseProvider
|
|
let localEngine: SyncEngine
|
|
|
|
// Simulated cloud data store
|
|
let cloudStore: Map<string, RemoteRecord>
|
|
let cloudClocks: Map<string, string>
|
|
|
|
beforeEach(async () => {
|
|
localProvider = createMemoryProvider()
|
|
cloudProvider = createMemoryProvider()
|
|
await setupSyncTables(localProvider)
|
|
await setupSyncTables(cloudProvider)
|
|
cloudStore = new Map()
|
|
cloudClocks = new Map()
|
|
|
|
localEngine = createSyncEngine(localProvider, {
|
|
clientId: "local-client",
|
|
conflictStrategy: ConflictStrategy.NEWEST_WINS,
|
|
tables: ["records"],
|
|
})
|
|
await localEngine.initialize()
|
|
})
|
|
|
|
afterEach(async () => {
|
|
await localProvider.close?.()
|
|
await cloudProvider.close?.()
|
|
})
|
|
|
|
describe("local-to-cloud sync", () => {
|
|
it("syncs new local record to cloud", async () => {
|
|
// Create local record
|
|
await localEngine.recordMutation("records", "insert", "rec-1", {
|
|
id: "rec-1",
|
|
name: "Local Record",
|
|
value: 100,
|
|
})
|
|
|
|
// Mock cloud push that stores in cloudStore
|
|
const pushMutation = vi.fn().mockImplementation(async (_table, _op, id, payload, clock) => {
|
|
cloudStore.set(id, {
|
|
id,
|
|
...payload,
|
|
updatedAt: new Date().toISOString(),
|
|
vectorClock: JSON.stringify(clock),
|
|
} as RemoteRecord)
|
|
return true
|
|
})
|
|
|
|
const result = await localEngine.push(
|
|
"records",
|
|
async () => ({}),
|
|
pushMutation
|
|
)
|
|
|
|
expect(result.pushed).toBe(1)
|
|
expect(cloudStore.has("rec-1")).toBe(true)
|
|
expect(cloudStore.get("rec-1")?.name).toBe("Local Record")
|
|
})
|
|
|
|
it("syncs new cloud record to local", async () => {
|
|
// Seed cloud with a record
|
|
cloudStore.set("rec-1", {
|
|
id: "rec-1",
|
|
name: "Cloud Record",
|
|
value: 200,
|
|
updatedAt: "2024-01-01T10:00:00Z",
|
|
vectorClock: JSON.stringify({ server: 1 }),
|
|
})
|
|
|
|
const fetchRemote = vi.fn().mockImplementation(async () => ({
|
|
records: Array.from(cloudStore.values()),
|
|
nextCursor: "cursor-1",
|
|
}))
|
|
|
|
const localRecords: Map<string, unknown> = new Map()
|
|
const upsertLocal = vi.fn().mockImplementation(async (_id, data) => {
|
|
const rec = data as { id: string }
|
|
localRecords.set(rec.id, data)
|
|
return rec.id
|
|
})
|
|
|
|
const result = await localEngine.pull("records", fetchRemote, upsertLocal)
|
|
|
|
expect(result.created).toBe(1)
|
|
expect(localRecords.has("rec-1")).toBe(true)
|
|
})
|
|
|
|
it("performs bidirectional sync", async () => {
|
|
// Seed cloud
|
|
cloudStore.set("cloud-1", {
|
|
id: "cloud-1",
|
|
name: "From Cloud",
|
|
value: 1,
|
|
updatedAt: "2024-01-01T10:00:00Z",
|
|
vectorClock: JSON.stringify({ server: 1 }),
|
|
})
|
|
|
|
// Create local
|
|
await localEngine.recordMutation("records", "insert", "local-1", {
|
|
id: "local-1",
|
|
name: "From Local",
|
|
value: 2,
|
|
})
|
|
|
|
const localRecords: Map<string, unknown> = new Map()
|
|
|
|
const fetchRemote = vi.fn().mockImplementation(async () => ({
|
|
records: Array.from(cloudStore.values()),
|
|
nextCursor: "cursor-1",
|
|
}))
|
|
|
|
const upsertLocal = vi.fn().mockImplementation(async (_id, data) => {
|
|
const rec = data as { id: string }
|
|
localRecords.set(rec.id, data)
|
|
return rec.id
|
|
})
|
|
|
|
const pushMutation = vi.fn().mockImplementation(async (_table, _op, id, payload, clock) => {
|
|
cloudStore.set(id, {
|
|
id,
|
|
...payload,
|
|
updatedAt: new Date().toISOString(),
|
|
vectorClock: JSON.stringify(clock),
|
|
} as RemoteRecord)
|
|
return true
|
|
})
|
|
|
|
const result = await localEngine.sync(
|
|
"records",
|
|
fetchRemote,
|
|
upsertLocal,
|
|
async () => ({}),
|
|
pushMutation
|
|
)
|
|
|
|
// Pull: 1 record from cloud
|
|
// Push: 1 record to cloud
|
|
expect(result.pulled).toBe(1)
|
|
expect(result.pushed).toBe(1)
|
|
|
|
// Verify both sides have both records
|
|
expect(localRecords.has("cloud-1")).toBe(true)
|
|
expect(cloudStore.has("local-1")).toBe(true)
|
|
})
|
|
})
|
|
|
|
describe("offline queue behavior", () => {
|
|
it("queues mutations when offline (push fails)", async () => {
|
|
// Create multiple local records
|
|
for (let i = 0; i < 3; i++) {
|
|
await localEngine.recordMutation("records", "insert", `rec-${i}`, {
|
|
id: `rec-${i}`,
|
|
index: i,
|
|
})
|
|
}
|
|
|
|
// Simulate offline - push always fails
|
|
const pushMutation = vi.fn().mockRejectedValue(new Error("Network unavailable"))
|
|
|
|
const result = await localEngine.push(
|
|
"records",
|
|
async () => ({}),
|
|
pushMutation
|
|
)
|
|
|
|
expect(result.failed).toBe(3)
|
|
expect(result.pushed).toBe(0)
|
|
expect(result.errors).toHaveLength(3)
|
|
})
|
|
|
|
it("maintains queue order (FIFO)", async () => {
|
|
const order: string[] = []
|
|
|
|
// Create records in order
|
|
await localEngine.recordMutation("records", "insert", "first", { id: "first" })
|
|
await localEngine.recordMutation("records", "insert", "second", { id: "second" })
|
|
await localEngine.recordMutation("records", "insert", "third", { id: "third" })
|
|
|
|
const pushMutation = vi.fn().mockImplementation(async (_table, _op, id) => {
|
|
order.push(id)
|
|
return true
|
|
})
|
|
|
|
await localEngine.push("records", async () => ({}), pushMutation)
|
|
|
|
expect(order).toEqual(["first", "second", "third"])
|
|
})
|
|
|
|
it("retries failed mutations", async () => {
|
|
await localEngine.recordMutation("records", "insert", "rec-1", { id: "rec-1" })
|
|
|
|
let attempts = 0
|
|
const pushMutation = vi.fn().mockImplementation(async () => {
|
|
attempts++
|
|
if (attempts < 3) {
|
|
throw new Error("Temporary failure")
|
|
}
|
|
return true
|
|
})
|
|
|
|
// First push fails
|
|
const result1 = await localEngine.push("records", async () => ({}), pushMutation)
|
|
expect(result1.failed).toBe(1)
|
|
|
|
// Second push also fails (retry 1)
|
|
const result2 = await localEngine.push("records", async () => ({}), pushMutation)
|
|
expect(result2.failed).toBe(1)
|
|
|
|
// Third push succeeds (retry 2)
|
|
const result3 = await localEngine.push("records", async () => ({}), pushMutation)
|
|
expect(result3.pushed).toBe(1)
|
|
|
|
expect(attempts).toBe(3)
|
|
})
|
|
})
|
|
|
|
describe("sync recovery after reconnect", () => {
|
|
it("recovers from offline state", async () => {
|
|
// Create local records while "offline"
|
|
await localEngine.recordMutation("records", "insert", "offline-1", { id: "offline-1" })
|
|
await localEngine.recordMutation("records", "insert", "offline-2", { id: "offline-2" })
|
|
|
|
// First sync attempt - simulate offline
|
|
let isOnline = false
|
|
const pushMutation = vi.fn().mockImplementation(async () => {
|
|
if (!isOnline) {
|
|
throw new Error("Offline")
|
|
}
|
|
return true
|
|
})
|
|
|
|
const result1 = await localEngine.push("records", async () => ({}), pushMutation)
|
|
expect(result1.failed).toBe(2)
|
|
|
|
// "Go online"
|
|
isOnline = true
|
|
|
|
// Retry - should succeed now
|
|
const result2 = await localEngine.push("records", async () => ({}), pushMutation)
|
|
expect(result2.pushed).toBe(2)
|
|
})
|
|
|
|
it("handles partial sync (some succeed, some fail)", async () => {
|
|
// Create multiple records
|
|
await localEngine.recordMutation("records", "insert", "good-1", { id: "good-1" })
|
|
await localEngine.recordMutation("records", "insert", "bad", { id: "bad" })
|
|
await localEngine.recordMutation("records", "insert", "good-2", { id: "good-2" })
|
|
|
|
const pushMutation = vi.fn().mockImplementation(async (_table, _op, id) => {
|
|
if (id === "bad") {
|
|
throw new Error("Bad record")
|
|
}
|
|
return true
|
|
})
|
|
|
|
const result = await localEngine.push("records", async () => ({}), pushMutation)
|
|
|
|
expect(result.pushed).toBe(2)
|
|
expect(result.failed).toBe(1)
|
|
expect(result.errors[0]?.recordId).toBe("bad")
|
|
})
|
|
})
|
|
|
|
describe("conflict scenarios", () => {
|
|
it("detects concurrent modifications", async () => {
|
|
// Create local record
|
|
await localEngine.recordMutation("records", "insert", "shared", {
|
|
id: "shared",
|
|
name: "Local Version",
|
|
})
|
|
|
|
// Get local status
|
|
const localStatus = await localEngine.getRecordStatus("records", "shared")
|
|
expect(localStatus?.syncStatus).toBe("pending_sync")
|
|
|
|
// Cloud has different version with concurrent clock
|
|
cloudStore.set("shared", {
|
|
id: "shared",
|
|
name: "Cloud Version",
|
|
updatedAt: "2024-01-01T10:00:00Z",
|
|
vectorClock: JSON.stringify({ otherClient: 1 }), // Concurrent with local
|
|
})
|
|
|
|
const fetchRemote = vi.fn().mockImplementation(async () => ({
|
|
records: Array.from(cloudStore.values()),
|
|
nextCursor: "cursor-1",
|
|
}))
|
|
|
|
const upsertLocal = vi.fn().mockResolvedValue("shared")
|
|
|
|
// Pull with conflict detection
|
|
// Since local clock is { localClient: 1 } and remote is { otherClient: 1 }
|
|
// These are concurrent - conflict should be detected if local is pending
|
|
const result = await localEngine.pull("records", fetchRemote, upsertLocal)
|
|
|
|
// Conflict detection depends on local sync status
|
|
// If pending_sync, concurrent clocks trigger conflict
|
|
expect(result.created + result.updated + result.conflicts).toBeGreaterThanOrEqual(1)
|
|
})
|
|
|
|
it("resolves conflict using configured strategy", async () => {
|
|
// Create engine with LOCAL_WINS strategy
|
|
const localWinsEngine = createSyncEngine(localProvider, {
|
|
clientId: "test-client",
|
|
conflictStrategy: ConflictStrategy.LOCAL_WINS,
|
|
tables: ["records"],
|
|
})
|
|
await localWinsEngine.initialize()
|
|
|
|
// Create local record
|
|
await localWinsEngine.recordMutation("records", "insert", "conflict-rec", {
|
|
id: "conflict-rec",
|
|
name: "Local Value",
|
|
value: 100,
|
|
})
|
|
|
|
// Cloud has concurrent version
|
|
cloudStore.set("conflict-rec", {
|
|
id: "conflict-rec",
|
|
name: "Cloud Value",
|
|
value: 200,
|
|
updatedAt: "2024-01-01T10:00:00Z",
|
|
vectorClock: JSON.stringify({ server: 5 }), // After local would be { "test-client": 1 }
|
|
})
|
|
|
|
// With LOCAL_WINS, conflicts should resolve using local data
|
|
// This is a high-level test - detailed conflict tests are in conflict.test.ts
|
|
await localWinsEngine.pull(
|
|
"records",
|
|
async () => ({ records: Array.from(cloudStore.values()), nextCursor: "c1" }),
|
|
async () => "id"
|
|
)
|
|
|
|
// Engine should handle the conflict according to LOCAL_WINS strategy
|
|
})
|
|
})
|
|
})
|