clawdbot-workspace/research-factory-api-architecture.md
2026-02-06 23:01:30 -05:00

1426 lines
57 KiB
Markdown

# AI Agent Factory SaaS — API & MCP Server Architecture
> Comprehensive architecture document for the Factory Dashboard backend, API layer, MCP server integration, and real-time human-in-the-loop pipeline management.
>
> **Date:** 2026-02-06
> **Status:** Research / Design Phase
> **Author:** Architecture Research Agent
---
## Table of Contents
1. [System Overview](#1-system-overview)
2. [API Design (RESTful + WebSocket + GraphQL)](#2-api-design)
3. [MCP Server Design for Factory Operations](#3-mcp-server-design)
4. [Database Schema Design](#4-database-schema-design)
5. [Real-time Architecture](#5-real-time-architecture)
6. [Integration Points](#6-integration-points)
7. [Research: Existing Patterns & Prior Art](#7-existing-patterns)
8. [Deployment & Scaling Considerations](#8-deployment--scaling)
9. [Security Architecture](#9-security-architecture)
10. [Implementation Roadmap](#10-implementation-roadmap)
---
## 1. System Overview
### Architecture Diagram (Text-Based)
```
┌─────────────────────────────────────────────────────────────────────┐
│ CLIENT LAYER │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Factory │ │ Goose Chat │ │ Discord/ │ │
│ │ Dashboard │ │ UI (MCP │ │ Slack Bot │ │
│ │ (React SPA) │ │ Client) │ │ │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ REST/WS │ MCP/stdio │ Webhooks │
└─────────┼──────────────────┼──────────────────┼─────────────────────┘
│ │ │
┌─────────┼──────────────────┼──────────────────┼─────────────────────┐
│ ▼ ▼ ▼ API GATEWAY │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ API Gateway / Load Balancer │ │
│ │ (Rate Limiting, Auth, Routing, CORS) │ │
│ └────────┬────────────┬──────────────┬─────────────────────┘ │
│ │ │ │ │
│ ┌────────▼─────┐ ┌───▼──────┐ ┌─────▼────────┐ │
│ │ REST API │ │ WebSocket│ │ MCP Server │ │
│ │ Server │ │ Server │ │ (stdio/SSE) │ │
│ │ (Express/ │ │ (ws) │ │ │ │
│ │ Hono) │ │ │ │ │ │
│ └──────┬───────┘ └────┬─────┘ └──────┬───────┘ │
└─────────┼──────────────┼──────────────┼─────────────────────────────┘
│ │ │
┌─────────┼──────────────┼──────────────┼─────────────────────────────┐
│ ▼ ▼ ▼ CORE SERVICES │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Service Layer │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Pipeline │ │Task │ │Approval │ │ │
│ │ │Service │ │Queue Svc │ │Service │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Agent │ │Asset │ │Notif. │ │ │
│ │ │Health Svc│ │Service │ │Service │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │Deploy │ │Audit │ │ │
│ │ │Service │ │Service │ │ │
│ │ └──────────┘ └──────────┘ │ │
│ └───────────────────────┬─────────────────────────┘ │
└──────────────────────────┼──────────────────────────────────────────┘
┌──────────────────────────┼──────────────────────────────────────────┐
│ ▼ DATA LAYER │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │PostgreSQL│ │ Redis │ │ S3/R2 │ │Event Bus │ │
│ │(Primary │ │(Cache + │ │(Assets/ │ │(Redis │ │
│ │ Store) │ │ Pub/Sub) │ │ Builds) │ │ Streams) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────┘
```
### Core Principles
1. **Human-in-the-Loop First** — Every automated pipeline step can be gated by a human approval checkpoint.
2. **Durable Execution** — Inspired by Temporal.io; pipeline state survives crashes, restarts, and network failures.
3. **MCP-Native** — The factory exposes its operations as MCP tools, resources, and prompts so AI agents (Goose, Claude, etc.) can drive the factory conversationally.
4. **Event-Sourced Audit Trail** — Every state change is an immutable event, enabling full replay and compliance.
5. **Multi-Channel Notifications** — No decision falls through the cracks; escalation paths through Dashboard → Discord → SMS.
---
## 2. API Design
### 2.1 Base URL & Versioning Strategy
```
Base URL: https://api.agentfactory.dev/v1
Pattern: URL path versioning (v1, v2)
Why: Simple, explicit, cache-friendly. Header-based versioning adds complexity
without benefit at our scale.
Sunset: v(N-2) deprecated with 90-day sunset headers.
Headers: X-API-Version (response), Sunset (deprecation date)
```
### 2.2 Authentication & Authorization
```
Auth Method: Bearer JWT (short-lived, 15min) + Refresh tokens (7d)
API Keys: For server-to-server / CI/CD integrations (scoped, rotatable)
OAuth 2.0: For third-party integrations (GitHub, Discord)
RBAC Roles: owner, admin, operator, viewer, agent (non-human)
Scopes:
- pipelines:read, pipelines:write
- tasks:read, tasks:approve, tasks:reject
- deploy:staging, deploy:production
- agents:manage
- assets:read, assets:write
- audit:read
```
### 2.3 RESTful Endpoints
#### Pipeline Management
```http
# List all pipelines (with filtering + pagination)
GET /v1/pipelines?status=active&stage=testing&page=1&limit=20
# Create a new pipeline
POST /v1/pipelines
Body: {
"name": "ghl-mcp-server",
"template": "mcp-server-standard",
"config": { "platform": "go-high-level", "runtime": "node" },
"priority": "high",
"assignee_id": "user_abc123"
}
# Get pipeline details (includes current stage, tasks, timeline)
GET /v1/pipelines/:pipeline_id
# Update pipeline configuration
PATCH /v1/pipelines/:pipeline_id
Body: { "priority": "critical", "config": { ... } }
# Delete / archive pipeline
DELETE /v1/pipelines/:pipeline_id?archive=true
# Get pipeline stage history (state machine transitions)
GET /v1/pipelines/:pipeline_id/stages
# Advance pipeline to next stage (triggers validation)
POST /v1/pipelines/:pipeline_id/stages/advance
Body: { "target_stage": "testing", "notes": "Scaffolding complete" }
```
#### Stage Transitions & Approvals
```http
# List pending approvals (across all pipelines or filtered)
GET /v1/approvals?status=pending&pipeline_id=xyz&assignee=me
# Get approval details
GET /v1/approvals/:approval_id
# Approve a gate
POST /v1/approvals/:approval_id/approve
Body: { "notes": "LGTM, tests passing", "conditions": [] }
# Reject with feedback
POST /v1/approvals/:approval_id/reject
Body: { "reason": "Test coverage below 80%", "requested_changes": ["Add edge case tests"] }
# Escalate an approval (reassign or bump priority)
POST /v1/approvals/:approval_id/escalate
Body: { "escalate_to": "user_xyz", "reason": "SLA breach in 2h" }
```
#### Task Queue Management
```http
# List all tasks requiring human attention
GET /v1/tasks?status=pending&priority=high&sort=-sla_deadline
# Get task details with full context
GET /v1/tasks/:task_id
# Claim a task (assigns to current user)
POST /v1/tasks/:task_id/claim
# Complete a task with decision
POST /v1/tasks/:task_id/complete
Body: { "decision": "approved", "output": { ... }, "notes": "..." }
# Reassign a task
POST /v1/tasks/:task_id/reassign
Body: { "assignee_id": "user_xyz" }
# Bulk operations
POST /v1/tasks/bulk
Body: { "action": "approve", "task_ids": ["t1", "t2", "t3"], "notes": "Batch approval" }
# Task queue statistics
GET /v1/tasks/stats
Response: {
"pending": 12,
"in_progress": 3,
"blocked": 2,
"avg_wait_time_minutes": 45,
"sla_breaches": 1,
"by_priority": { "critical": 1, "high": 4, "medium": 5, "low": 2 }
}
```
#### Agent Status & Health Monitoring
```http
# List all agents (AI workers, build agents, deploy agents)
GET /v1/agents
# Get specific agent status
GET /v1/agents/:agent_id
Response: {
"id": "agent_goose_01",
"type": "ai_builder",
"status": "active",
"current_task": "task_xyz",
"pipeline": "ghl-mcp-server",
"health": {
"uptime_seconds": 84200,
"tasks_completed_24h": 47,
"error_rate": 0.02,
"last_heartbeat": "2026-02-06T17:00:00Z"
}
}
# Agent heartbeat (agents POST their status)
POST /v1/agents/:agent_id/heartbeat
Body: { "status": "active", "metrics": { ... } }
# Agent activity log
GET /v1/agents/:agent_id/activity?since=2026-02-06T00:00:00Z
```
#### Asset Management
```http
# List assets for a pipeline
GET /v1/pipelines/:pipeline_id/assets?type=code&stage=scaffolding
# Upload an asset
POST /v1/assets
Body (multipart): {
"pipeline_id": "pipe_xyz",
"stage": "scaffolding",
"type": "code", # code | config | docs | build | test_report
"file": <binary>,
"metadata": { "path": "src/index.ts", "generated_by": "agent_goose_01" }
}
# Get asset (download or metadata)
GET /v1/assets/:asset_id
GET /v1/assets/:asset_id/download
# Diff between asset versions
GET /v1/assets/:asset_id/diff?from_version=3&to_version=4
```
#### Notifications & Webhooks
```http
# List notifications for current user
GET /v1/notifications?unread=true
# Mark notification as read
POST /v1/notifications/:id/read
# Register a webhook
POST /v1/webhooks
Body: {
"url": "https://example.com/webhook",
"events": ["pipeline.stage_changed", "approval.pending", "task.sla_breach"],
"secret": "whsec_..."
}
# List webhooks
GET /v1/webhooks
# Webhook event types
GET /v1/webhooks/events
```
#### Audit Trail
```http
# Query audit log (immutable, append-only)
GET /v1/audit?entity_type=pipeline&entity_id=xyz&since=2026-02-01
# Each entry:
{
"id": "audit_001",
"timestamp": "2026-02-06T15:30:00Z",
"actor": { "type": "user", "id": "user_abc" },
"action": "approval.approved",
"entity": { "type": "approval", "id": "appr_xyz" },
"changes": { "status": { "from": "pending", "to": "approved" } },
"metadata": { "ip": "...", "user_agent": "..." }
}
```
### 2.4 WebSocket Events
```
Connection: wss://api.agentfactory.dev/v1/ws?token=<jwt>
Channel subscription model:
→ { "type": "subscribe", "channels": ["pipeline:*", "tasks:pending", "agents:health"] }
← { "type": "subscribed", "channels": [...] }
Events emitted:
pipeline.created — New pipeline initialized
pipeline.stage_changed — Pipeline moved to new stage
pipeline.completed — Pipeline finished (success/failure)
pipeline.blocked — Pipeline blocked on human decision
task.created — New task in queue
task.claimed — Task claimed by a user
task.completed — Task completed (approved/rejected)
task.sla_warning — SLA deadline approaching (30min, 10min)
task.sla_breached — SLA exceeded, escalation triggered
approval.pending — New approval gate awaiting decision
approval.resolved — Approval approved or rejected
agent.status_changed — Agent went active/idle/error/offline
agent.heartbeat — Periodic agent health update
asset.created — New asset uploaded
asset.updated — Asset version bumped
deploy.started — Deployment initiated
deploy.succeeded — Deployment successful
deploy.failed — Deployment failed
Event payload format:
{
"type": "pipeline.stage_changed",
"timestamp": "2026-02-06T15:30:00Z",
"data": {
"pipeline_id": "pipe_xyz",
"from_stage": "scaffolding",
"to_stage": "testing",
"triggered_by": "agent_goose_01"
}
}
```
### 2.5 GraphQL Considerations
A secondary GraphQL endpoint at `/v1/graphql` provides flexible querying for the dashboard, avoiding over-fetching across the deeply nested pipeline → stages → tasks → assets hierarchy.
```graphql
type Query {
# Dashboard summary — one query gets everything
dashboard: DashboardSummary!
# Flexible pipeline querying with nested resolution
pipelines(
filter: PipelineFilter
sort: PipelineSort
pagination: Pagination
): PipelineConnection!
pipeline(id: ID!): Pipeline
# Cross-entity queries (hard with REST)
pendingDecisions(assignee: ID): [Task!]!
blockers: [Blocker!]!
}
type Pipeline {
id: ID!
name: String!
currentStage: Stage!
stages: [Stage!]!
tasks(status: TaskStatus): [Task!]!
assets: [Asset!]!
agent: Agent
metrics: PipelineMetrics!
audit: [AuditEntry!]!
}
type Subscription {
pipelineUpdated(id: ID): Pipeline!
taskCreated: Task!
approvalPending: Approval!
}
```
**Decision: REST-primary, GraphQL-secondary.** REST handles mutations and simple queries. GraphQL handles complex dashboard reads and real-time subscriptions. This avoids the "GraphQL for everything" trap while gaining its benefits for deeply nested reads.
---
## 3. MCP Server Design for Factory Operations
The Factory MCP Server is the bridge between AI agents (Goose, Claude, etc.) and the Factory API. It exposes the factory's capabilities as MCP tools, resources, and prompts — enabling conversational factory management.
### 3.1 MCP Tools
```typescript
// ─── TASK & APPROVAL TOOLS ────────────────────────────────────
tool("factory_get_pending_tasks", {
description: "Get all tasks requiring human attention, sorted by priority and SLA urgency",
parameters: {
pipeline_id: { type: "string", optional: true, description: "Filter by pipeline" },
priority: { type: "string", enum: ["critical","high","medium","low"], optional: true },
assignee: { type: "string", optional: true, description: "Filter by assignee (use 'me' for current)" },
limit: { type: "number", optional: true, default: 20 }
},
returns: "Array of pending tasks with context, priority, SLA status, and blocking info"
})
tool("factory_approve_task", {
description: "Approve a pending task or approval gate, allowing the pipeline to proceed",
parameters: {
task_id: { type: "string", required: true },
notes: { type: "string", optional: true, description: "Approval notes or conditions" },
conditions: { type: "array", items: { type: "string" }, optional: true }
},
returns: "Updated task status and next pipeline action triggered"
})
tool("factory_reject_task", {
description: "Reject a pending task with feedback, sends pipeline back for rework",
parameters: {
task_id: { type: "string", required: true },
reason: { type: "string", required: true, description: "Why this was rejected" },
requested_changes: { type: "array", items: { type: "string" }, optional: true },
severity: { type: "string", enum: ["minor","major","critical"], default: "major" }
},
returns: "Rejection recorded, pipeline stage reverted, agent notified"
})
// ─── PIPELINE MANAGEMENT TOOLS ────────────────────────────────
tool("factory_get_pipeline_status", {
description: "Get current state of all pipelines or a specific pipeline with full context",
parameters: {
pipeline_id: { type: "string", optional: true, description: "Specific pipeline (omit for all)" },
include_details: { type: "boolean", default: false, description: "Include tasks, assets, logs" }
},
returns: "Pipeline(s) with current stage, progress %, blockers, and timeline"
})
tool("factory_advance_stage", {
description: "Manually advance a pipeline to its next stage (triggers validation checks)",
parameters: {
pipeline_id: { type: "string", required: true },
target_stage: { type: "string", optional: true, description: "Target stage (default: next in sequence)" },
skip_validation: { type: "boolean", default: false, description: "Skip gate checks (admin only)" },
notes: { type: "string", optional: true }
},
returns: "Stage transition result, any validation failures, new pending tasks created"
})
tool("factory_assign_priority", {
description: "Set or change priority on a pipeline or task",
parameters: {
entity_type: { type: "string", enum: ["pipeline","task"] },
entity_id: { type: "string", required: true },
priority: { type: "string", enum: ["critical","high","medium","low"], required: true },
reason: { type: "string", optional: true }
},
returns: "Updated priority, SLA recalculated, notifications sent if escalated"
})
tool("factory_get_blockers", {
description: "Get all items that are blocked and why — the 'what's stuck' view",
parameters: {
pipeline_id: { type: "string", optional: true },
include_suggestions: { type: "boolean", default: true }
},
returns: "Array of blockers with root cause, duration blocked, and suggested actions"
})
// ─── BUILD & DEPLOY TOOLS ────────────────────────────────────
tool("factory_run_tests", {
description: "Trigger test suite for a pipeline's current build",
parameters: {
pipeline_id: { type: "string", required: true },
test_type: { type: "string", enum: ["unit","integration","e2e","all"], default: "all" },
environment: { type: "string", enum: ["local","ci"], default: "ci" }
},
returns: "Test run ID, real-time status updates via resource subscription"
})
tool("factory_deploy", {
description: "Deploy a pipeline's build to staging or production",
parameters: {
pipeline_id: { type: "string", required: true },
target: { type: "string", enum: ["staging","production"], required: true },
version: { type: "string", optional: true, description: "Specific version (default: latest)" },
dry_run: { type: "boolean", default: false }
},
returns: "Deployment ID, status, URL. Production requires prior staging deploy + approval."
})
// ─── UTILITY TOOLS ────────────────────────────────────────────
tool("factory_search", {
description: "Search across pipelines, tasks, assets, and logs",
parameters: {
query: { type: "string", required: true },
entity_types: { type: "array", items: { type: "string" }, optional: true },
limit: { type: "number", default: 10 }
}
})
tool("factory_create_pipeline", {
description: "Initialize a new MCP server pipeline from a template",
parameters: {
name: { type: "string", required: true },
platform: { type: "string", required: true, description: "Target platform (e.g., 'go-high-level')" },
template: { type: "string", default: "mcp-server-standard" },
priority: { type: "string", enum: ["critical","high","medium","low"], default: "medium" }
}
})
```
### 3.2 MCP Resources
Resources provide read-only, subscribable data that AI agents can reference contextually.
```typescript
// Dynamic resource: specific pipeline state
resource("factory://pipelines/{pipeline_id}/state", {
description: "Current state of a specific pipeline including stage, progress, and blockers",
mimeType: "application/json"
})
// Dynamic resource: individual server status
resource("factory://servers/{server_name}/status", {
description: "Health, version, deployment status of a specific MCP server",
mimeType: "application/json"
})
// Dynamic resource: test results for a pipeline
resource("factory://pipelines/{pipeline_id}/test-results", {
description: "Latest test results with pass/fail counts, coverage, and failure details",
mimeType: "application/json"
})
// Dynamic resource: build logs
resource("factory://pipelines/{pipeline_id}/build-logs", {
description: "Build output logs for the latest or specified build",
mimeType: "text/plain"
})
// Static resource: factory dashboard summary
resource("factory://dashboard/summary", {
description: "High-level factory status: active pipelines, pending tasks, blockers, health",
mimeType: "application/json"
})
// Static resource: factory configuration / templates
resource("factory://config/templates", {
description: "Available pipeline templates and their configurations",
mimeType: "application/json"
})
// Resource subscription for real-time updates
// Clients subscribe to resources and receive notifications on change
```
### 3.3 MCP Prompts
Prompts are pre-built conversation starters that pull relevant context and structure the AI's response.
```typescript
prompt("review_server", {
description: "Pull all context needed to review an MCP server: code quality, test results, docs, deployment readiness",
arguments: {
server_name: { type: "string", required: true, description: "Name of the MCP server to review" }
},
// Returns a structured prompt with:
// - Pipeline status and history
// - Latest test results and coverage
// - Code review checklist
// - Asset inventory (README, TOOLS.md, package.json)
// - Open tasks/blockers
// - Comparison against quality standards
})
prompt("whats_needs_attention", {
description: "Summary of everything that needs human attention right now, prioritized by urgency",
arguments: {
scope: { type: "string", optional: true, description: "all | my-tasks | critical-only" }
},
// Returns:
// - SLA breaches (most urgent first)
// - Pending approvals
// - Blocked pipelines
// - Failed tests/deploys
// - Agent health issues
// - Suggested next actions
})
prompt("deploy_checklist", {
description: "Pre-deployment review checklist for an MCP server",
arguments: {
pipeline_id: { type: "string", required: true },
target: { type: "string", enum: ["staging","production"], required: true }
},
// Returns structured checklist:
// ☐ All tests passing (unit, integration, e2e)
// ☐ Code review approved
// ☐ README.md complete and accurate
// ☐ TOOLS.md documents all tools
// ☐ Environment variables documented
// ☐ No hardcoded secrets
// ☐ Performance benchmarks acceptable
// ☐ Staging deploy tested
// ☐ Rollback plan documented
// ☐ Monitoring/alerting configured
})
prompt("pipeline_retrospective", {
description: "Generate a retrospective analysis of a completed pipeline",
arguments: {
pipeline_id: { type: "string", required: true }
}
// Returns: timeline, bottlenecks, quality metrics, lessons learned
})
```
---
## 4. Database Schema Design
### 4.1 Core Tables
```sql
-- ═══════════════════════════════════════════════════════════
-- PIPELINES — The top-level entity, one per MCP server build
-- ═══════════════════════════════════════════════════════════
CREATE TABLE pipelines (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) UNIQUE NOT NULL,
template VARCHAR(100) NOT NULL DEFAULT 'mcp-server-standard',
platform VARCHAR(100), -- e.g., 'go-high-level'
-- State machine
current_stage VARCHAR(50) NOT NULL DEFAULT 'intake',
status VARCHAR(30) NOT NULL DEFAULT 'active', -- active, paused, completed, failed, archived
priority VARCHAR(20) NOT NULL DEFAULT 'medium', -- critical, high, medium, low
-- Ownership
created_by UUID REFERENCES users(id),
assignee_id UUID REFERENCES users(id),
-- Configuration (JSONB for flexibility)
config JSONB DEFAULT '{}',
metadata JSONB DEFAULT '{}',
-- SLA tracking
sla_deadline TIMESTAMPTZ,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_pipelines_status ON pipelines(status);
CREATE INDEX idx_pipelines_stage ON pipelines(current_stage);
CREATE INDEX idx_pipelines_priority ON pipelines(priority);
CREATE INDEX idx_pipelines_assignee ON pipelines(assignee_id);
-- ═══════════════════════════════════════════════════════════
-- STAGES — Stage definitions and transitions per pipeline
-- ═══════════════════════════════════════════════════════════
CREATE TABLE pipeline_stages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_id UUID REFERENCES pipelines(id) ON DELETE CASCADE,
stage_name VARCHAR(50) NOT NULL,
stage_order INT NOT NULL,
-- State
status VARCHAR(30) NOT NULL DEFAULT 'pending', -- pending, active, completed, skipped, failed
-- Gate configuration
requires_approval BOOLEAN DEFAULT false,
approval_type VARCHAR(30), -- manual, auto, conditional
auto_advance BOOLEAN DEFAULT false,
-- Validation rules (JSONB: test coverage thresholds, required assets, etc.)
validation_rules JSONB DEFAULT '{}',
-- Timing
entered_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
duration_seconds INT,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(pipeline_id, stage_name)
);
-- ═══════════════════════════════════════════════════════════
-- TASKS — Work items requiring human decisions
-- ═══════════════════════════════════════════════════════════
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_id UUID REFERENCES pipelines(id),
stage_name VARCHAR(50),
-- Task details
type VARCHAR(50) NOT NULL, -- approval, review, decision, manual_action, fix_required
title VARCHAR(500) NOT NULL,
description TEXT,
context JSONB DEFAULT '{}', -- Rich context for the decision-maker
-- Queue management
status VARCHAR(30) NOT NULL DEFAULT 'pending', -- pending, claimed, in_progress, completed, expired, escalated
priority VARCHAR(20) NOT NULL DEFAULT 'medium',
-- Assignment
assignee_id UUID REFERENCES users(id),
claimed_at TIMESTAMPTZ,
claimed_by UUID REFERENCES users(id),
-- Decision tracking
decision VARCHAR(30), -- approved, rejected, deferred, escalated
decision_notes TEXT,
decision_data JSONB DEFAULT '{}',
decided_at TIMESTAMPTZ,
decided_by UUID REFERENCES users(id),
-- SLA
sla_deadline TIMESTAMPTZ,
sla_warning_sent BOOLEAN DEFAULT false,
sla_breached BOOLEAN DEFAULT false,
escalation_level INT DEFAULT 0,
-- What this task blocks
blocks_stage_advance BOOLEAN DEFAULT false,
blocks_pipeline_id UUID REFERENCES pipelines(id),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_priority_sla ON tasks(priority, sla_deadline);
CREATE INDEX idx_tasks_assignee ON tasks(assignee_id);
CREATE INDEX idx_tasks_pipeline ON tasks(pipeline_id);
CREATE INDEX idx_tasks_pending ON tasks(status) WHERE status = 'pending';
-- ═══════════════════════════════════════════════════════════
-- APPROVALS — Formal gate approvals (linked to tasks)
-- ═══════════════════════════════════════════════════════════
CREATE TABLE approvals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID REFERENCES tasks(id),
pipeline_id UUID REFERENCES pipelines(id),
stage_name VARCHAR(50) NOT NULL,
-- Approval details
type VARCHAR(30) NOT NULL, -- stage_gate, deploy, code_review, manual_check
status VARCHAR(30) NOT NULL DEFAULT 'pending',
-- Decision
approved_by UUID REFERENCES users(id),
approved_at TIMESTAMPTZ,
rejection_reason TEXT,
conditions JSONB DEFAULT '[]', -- Conditional approvals
-- Required approvers (for multi-party approvals)
required_approvers INT DEFAULT 1,
current_approvers INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ═══════════════════════════════════════════════════════════
-- AGENTS — AI workers and build/deploy agents
-- ═══════════════════════════════════════════════════════════
CREATE TABLE agents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL, -- ai_builder, test_runner, deployer, monitor
-- Status
status VARCHAR(30) NOT NULL DEFAULT 'idle', -- idle, active, error, offline
current_task_id UUID REFERENCES tasks(id),
current_pipeline_id UUID REFERENCES pipelines(id),
-- Health
last_heartbeat TIMESTAMPTZ,
uptime_seconds INT DEFAULT 0,
error_count_24h INT DEFAULT 0,
-- Configuration
capabilities JSONB DEFAULT '[]',
config JSONB DEFAULT '{}',
-- Metrics
tasks_completed_total INT DEFAULT 0,
avg_task_duration_seconds INT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ═══════════════════════════════════════════════════════════
-- ASSETS — Generated files, code, configs, build artifacts
-- ═══════════════════════════════════════════════════════════
CREATE TABLE assets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_id UUID REFERENCES pipelines(id),
stage_name VARCHAR(50),
-- Asset info
type VARCHAR(30) NOT NULL, -- code, config, docs, build, test_report, screenshot
name VARCHAR(500) NOT NULL,
path VARCHAR(1000), -- Logical path (e.g., src/index.ts)
-- Storage
storage_key VARCHAR(1000) NOT NULL, -- S3/R2 key
storage_bucket VARCHAR(255),
size_bytes BIGINT,
content_type VARCHAR(255),
checksum_sha256 VARCHAR(64),
-- Versioning
version INT DEFAULT 1,
previous_version_id UUID REFERENCES assets(id),
-- Provenance
generated_by UUID, -- Could be user or agent
generator_type VARCHAR(30), -- user, agent, ci
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ═══════════════════════════════════════════════════════════
-- NOTIFICATIONS — Multi-channel notification queue
-- ═══════════════════════════════════════════════════════════
CREATE TABLE notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
-- Content
type VARCHAR(50) NOT NULL, -- task_assigned, approval_pending, sla_warning, deploy_status
title VARCHAR(500) NOT NULL,
body TEXT,
data JSONB DEFAULT '{}',
-- Delivery
channels JSONB DEFAULT '["dashboard"]', -- dashboard, discord, email, sms
delivered_via JSONB DEFAULT '[]',
-- Status
read BOOLEAN DEFAULT false,
read_at TIMESTAMPTZ,
dismissed BOOLEAN DEFAULT false,
-- Linking
entity_type VARCHAR(50),
entity_id UUID,
action_url VARCHAR(1000),
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_notifications_user_unread ON notifications(user_id, read) WHERE read = false;
-- ═══════════════════════════════════════════════════════════
-- AUDIT LOG — Immutable event log
-- ═══════════════════════════════════════════════════════════
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Who
actor_type VARCHAR(30) NOT NULL, -- user, agent, system, webhook
actor_id UUID,
actor_name VARCHAR(255),
-- What
action VARCHAR(100) NOT NULL, -- e.g., pipeline.created, task.approved, deploy.started
-- On what
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
-- Details
changes JSONB DEFAULT '{}', -- { field: { from: x, to: y } }
metadata JSONB DEFAULT '{}', -- IP, user agent, etc.
-- Immutable timestamp
created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL
);
-- Partition by month for performance
CREATE INDEX idx_audit_entity ON audit_log(entity_type, entity_id);
CREATE INDEX idx_audit_action ON audit_log(action);
CREATE INDEX idx_audit_actor ON audit_log(actor_type, actor_id);
CREATE INDEX idx_audit_time ON audit_log(created_at);
```
### 4.2 Pipeline State Machine
```
┌─────────┐ ┌──────────────┐ ┌──────────┐ ┌───────────┐
│ INTAKE │────▶│ SCAFFOLDING │────▶│ BUILDING │────▶│ TESTING │
│ │ │ │ │ │ │ │
└─────────┘ └──────────────┘ └──────────┘ └─────┬─────┘
┌───────▼───────┐
│ REVIEW │
│ ★ HUMAN GATE │
└───────┬───────┘
┌──────────────┐ ┌───────▼───────┐
│ PRODUCTION │◀──────────│ STAGING │
│ ★ HUMAN GATE │ │ ★ HUMAN GATE │
└──────┬───────┘ └───────────────┘
┌──────▼───────┐
│ PUBLISHED │
│ (Complete) │
└──────────────┘
★ = Requires human approval (creates a task in the queue)
Transitions encoded as:
intake → scaffolding (auto, on pipeline creation)
scaffolding → building (auto, when scaffold complete)
building → testing (auto, when build succeeds)
testing → review (auto, when tests pass; creates approval task)
review → staging (manual, requires approval)
staging → production (manual, requires approval + staging tests pass)
production → published (auto, on successful deploy)
ANY → failed (on unrecoverable error)
failed → {previous_stage} (manual retry with approval)
```
### 4.3 Blocker Tracking Query
```sql
-- "What's stuck and why?" — the factory_get_blockers query
WITH blocked_items AS (
-- Tasks past SLA or pending too long
SELECT
'task' as blocker_type,
t.id as entity_id,
t.title,
t.pipeline_id,
p.name as pipeline_name,
t.priority,
t.sla_deadline,
t.created_at,
EXTRACT(EPOCH FROM (NOW() - t.created_at)) / 3600 as hours_blocked,
t.sla_breached,
t.assignee_id,
CASE
WHEN t.sla_breached THEN 'SLA breached'
WHEN t.status = 'pending' AND t.created_at < NOW() - INTERVAL '4 hours' THEN 'Unclaimed for 4h+'
WHEN t.assignee_id IS NULL THEN 'Unassigned'
ELSE 'Awaiting decision'
END as block_reason
FROM tasks t
JOIN pipelines p ON t.pipeline_id = p.id
WHERE t.status IN ('pending', 'claimed')
AND t.blocks_stage_advance = true
UNION ALL
-- Failed pipeline stages
SELECT
'stage' as blocker_type,
ps.id,
CONCAT(p.name, ' — ', ps.stage_name, ' failed'),
p.id,
p.name,
p.priority,
NULL,
ps.entered_at,
EXTRACT(EPOCH FROM (NOW() - ps.entered_at)) / 3600,
false,
p.assignee_id,
'Stage failed — requires intervention'
FROM pipeline_stages ps
JOIN pipelines p ON ps.pipeline_id = p.id
WHERE ps.status = 'failed'
AND p.status = 'active'
)
SELECT * FROM blocked_items
ORDER BY
CASE priority
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
END,
hours_blocked DESC;
```
---
## 5. Real-time Architecture
### 5.1 WebSocket vs SSE Decision
| Criteria | WebSocket | SSE |
|----------|-----------|-----|
| Direction | Bidirectional | Server → Client only |
| Reconnection | Manual | Automatic (built-in) |
| Protocol | ws:// | Standard HTTP |
| Proxy/CDN | Needs config | Works everywhere |
| Browser support | Universal | Universal (except IE) |
| Message types | Binary + Text | Text only |
**Decision: WebSocket for dashboard (bidirectional needed for subscriptions), SSE for MCP transport (one-way server events fit MCP's SSE transport spec).**
### 5.2 Event-Driven Architecture
```
┌──────────────────────────────────────────────────────────┐
│ EVENT BUS (Redis Streams) │
│ │
│ Streams: │
│ factory.pipelines — All pipeline state changes │
│ factory.tasks — Task lifecycle events │
│ factory.agents — Agent status changes │
│ factory.deploys — Deploy lifecycle events │
│ factory.notifications — Notification dispatch queue │
│ │
│ Consumer Groups: │
│ ws-broadcaster — Fans out to WebSocket clients │
│ notification-worker — Processes notification queue │
│ discord-bridge — Sends to Discord channels │
│ sla-monitor — Watches for SLA breaches │
│ audit-writer — Writes to audit_log table │
│ mcp-resource-updater — Triggers MCP resource updates │
└──────────────────────────────────────────────────────────┘
```
**Key events and their consumers:**
```
Event: task.created
→ ws-broadcaster: Push to all subscribed dashboard clients
→ notification-worker: Send notification to assignee
→ discord-bridge: Post to #factory-tasks channel
→ sla-monitor: Start SLA countdown timer
Event: task.sla_warning (30min before deadline)
→ notification-worker: Escalation notification (dashboard + Discord DM)
→ ws-broadcaster: Update task card with warning indicator
Event: task.sla_breached
→ notification-worker: URGENT notification to assignee + admin
→ discord-bridge: @mention in Discord with urgency
→ audit-writer: Record SLA breach
→ sla-monitor: Trigger escalation (reassign to admin)
Event: pipeline.stage_changed
→ ws-broadcaster: Update pipeline view in real-time
→ audit-writer: Record transition
→ mcp-resource-updater: Update factory://pipelines/{id}/state
```
### 5.3 SLA Escalation Ladder (No Decision Falls Through)
```
T+0min: Task created → Notification to assignee (dashboard + Discord)
T+30min: Reminder #1 → Dashboard badge + Discord DM
T+2h: Reminder #2 → Discord channel mention + email
T+4h: SLA Warning → Discord @here + push notification
T+SLA: SLA Breach → Auto-escalate to admin, Discord @everyone in factory channel
T+SLA+2h: Critical → SMS/phone notification to owner
```
---
## 6. Integration Points
### 6.1 Goose Chat UI ↔ MCP Server
The MCP server runs as a stdio transport process that Goose connects to directly. When a user types in Goose:
```
User: "What needs my attention?"
Goose → MCP Client → factory_get_pending_tasks(assignee: "me")
→ factory_get_blockers()
Response rendered in chat:
🔴 2 SLA breaches
🟡 3 pending approvals
🟢 5 pipelines running smoothly
Priority items:
1. [CRITICAL] GHL MCP Server — code review approval (SLA breached 2h ago)
2. [HIGH] Stripe MCP Server — deploy to production approval
3. [MEDIUM] Discord MCP Server — test failures need review
```
The "what needs attention" prompt pre-fetches all relevant resources and formats them conversationally. Users can then act directly:
```
User: "Approve the GHL code review"
Goose → MCP Client → factory_approve_task(task_id: "task_xyz", notes: "Approved via Goose")
```
### 6.2 Discord Integration Bridge
```typescript
// Discord bot that bridges factory events to Discord channels
// Channel mapping:
// #factory-feed — All pipeline events (low noise)
// #factory-tasks — Pending tasks that need human attention
// #factory-deploys — Deploy notifications
// #factory-alerts — SLA breaches, failures, critical issues
// Discord commands:
// /factory status — Dashboard summary
// /factory approve <task_id> — Approve a task
// /factory reject <task_id> — Reject with reason
// /factory pipelines — List active pipelines
// /factory blockers — What's stuck
// Embed format for pending approval:
{
title: "🔔 Approval Required: GHL MCP Server → Production",
color: 0xFFA500, // orange
fields: [
{ name: "Pipeline", value: "ghl-mcp-server", inline: true },
{ name: "Stage", value: "staging → production", inline: true },
{ name: "Priority", value: "🔴 Critical", inline: true },
{ name: "Tests", value: "✅ 47/47 passing", inline: true },
{ name: "Coverage", value: "✅ 94%", inline: true },
{ name: "SLA Deadline", value: "<t:1707238800:R>", inline: true },
],
// Interactive buttons:
components: [
{ type: "button", label: "✅ Approve", style: "SUCCESS", custom_id: "approve:task_xyz" },
{ type: "button", label: "❌ Reject", style: "DANGER", custom_id: "reject:task_xyz" },
{ type: "button", label: "📋 Details", style: "LINK", url: "https://factory.agentfactory.dev/tasks/xyz" }
]
}
```
### 6.3 GitHub Integration
```
Events FROM GitHub → Factory:
- PR opened → Create review task in factory
- PR review approved → Auto-advance code review stage
- PR merged → Trigger build stage
- CI check passed → Update test results resource
- CI check failed → Create "fix required" task
Events FROM Factory → GitHub:
- Pipeline created → Create tracking issue
- Stage completed → Update issue labels
- Deploy succeeded → Comment on PR with deploy URL
- Pipeline completed → Close issue
GitHub App Permissions:
- Pull requests (read/write)
- Issues (read/write)
- Checks (read/write)
- Webhooks (receive)
```
### 6.4 CI/CD Pipeline Triggers
```yaml
# Factory triggers CI/CD via GitHub Actions dispatch or direct API
# On stage transition: building → testing
factory_event: pipeline.stage_changed
when: to_stage == "testing"
action:
- github_actions.dispatch:
workflow: "test-mcp-server.yml"
inputs:
pipeline_id: "{{ pipeline.id }}"
server_name: "{{ pipeline.name }}"
# On deploy approval
factory_event: approval.approved
when: stage == "production"
action:
- github_actions.dispatch:
workflow: "deploy-mcp-server.yml"
inputs:
pipeline_id: "{{ pipeline.id }}"
target: "production"
version: "{{ pipeline.latest_build_version }}"
```
---
## 7. Existing Patterns & Prior Art
### 7.1 Temporal.io — Durable Human-in-the-Loop
Temporal's approach is the gold standard for human-in-the-loop workflows. Key takeaways:
- **Signals** allow external input (human decisions) to be sent to running workflows. The workflow pauses (via `workflow.wait_condition()`) until the signal arrives. This is exactly what our approval gates need.
- **Durability** means that if the server crashes while waiting for human input, the workflow resumes exactly where it left off. The human's decision (once signaled) is durably stored in the workflow history.
- **Queries** let external systems read workflow state without affecting it — perfect for our "get pipeline status" needs.
- **Child workflows** enable parallel processing of individual pipeline stages while a parent workflow orchestrates the overall pipeline.
**Applicability:** Our pipeline state machine should be modeled as a Temporal workflow (or Temporal-inspired durable execution). Each approval gate is a `wait_for_signal` point. Even if we don't use Temporal directly, we should adopt its patterns: signals for human input, durable state, and child workflows for parallelism.
### 7.2 Inngest — Event-Driven with `waitForEvent`
Inngest's `step.waitForEvent()` primitive is elegant for human-in-the-loop:
```typescript
// Inngest-style pattern for our factory:
const pipelineWorkflow = inngest.createFunction(
{ name: "MCP Pipeline" },
{ event: "pipeline.created" },
async ({ event, step }) => {
// Auto steps
const scaffold = await step.run("scaffold", () => scaffoldServer(event));
const build = await step.run("build", () => buildServer(scaffold));
const tests = await step.run("test", () => runTests(build));
// HUMAN GATE: wait for approval event
const approval = await step.waitForEvent("wait-for-review", {
event: "pipeline.review.completed",
match: "data.pipeline_id",
timeout: "72h",
});
if (!approval || approval.data.decision === "rejected") {
// Handle rejection
return { status: "rejected", reason: approval?.data.reason };
}
// Continue to deploy...
await step.run("deploy-staging", () => deployToStaging(build));
}
);
```
**Key insight:** The `waitForEvent` pattern with a `match` field and timeout is how we should implement our approval gates. The pipeline function literally pauses mid-execution until a human fires the approval event.
### 7.3 Linear — Workflow States with Automation
Linear's approach to workflow automation is relevant:
- **Workflow states** are customizable per team (like our pipeline stages)
- **Automations** trigger on state transitions (our event-driven system)
- **GraphQL API** for complex querying of deeply nested data (issues → projects → cycles)
- **Webhooks** fire on every state change with full payload
- **Triage** concept: new issues land in triage, must be explicitly moved to active (like our task queue)
**What we borrow:** Linear's triage model for our task queue. Items don't auto-proceed — they land in a queue where someone must explicitly act on them.
### 7.4 Vercel — Deployment Approvals
Vercel's deployment protection model:
- **Preview deploys** are automatic (like our staging)
- **Production deploys** can require approval from designated team members
- **Comment-based approvals** on the deployment (similar to PR reviews)
- **Rollback** is always one click away
- **Deployment checks** must pass before promotion is even available
**What we borrow:** The production deploy should be gated behind:
1. Successful staging deploy
2. Staging test pass
3. Explicit human approval
4. Pre-flight checks (the "deploy checklist" prompt)
---
## 8. Deployment & Scaling Considerations
### Technology Stack
```
Runtime: Node.js (Hono framework for HTTP, native WebSocket)
Database: PostgreSQL 16 (Neon or Supabase for managed)
Cache/PubSub: Redis (Upstash for serverless, or self-hosted)
Object Store: Cloudflare R2 or AWS S3
Queue: Redis Streams (or BullMQ on top of Redis)
MCP Transport: stdio (local) + SSE (remote)
Auth: JWT + API keys (consider Clerk or Auth.js)
CI/CD: GitHub Actions
Hosting: Railway / Fly.io (containers) or Vercel (edge functions)
Monitoring: OpenTelemetry → Grafana Cloud
```
### Scaling Points
1. **WebSocket connections** — Use Redis pub/sub to fan out events across multiple server instances. Each WS server subscribes to Redis and pushes to its local clients.
2. **Task queue** — Redis Streams with consumer groups ensures exactly-once processing even with multiple workers.
3. **Database** — Read replicas for dashboard queries. Partition `audit_log` by month. JSONB indexes for metadata queries.
4. **MCP Server** — Stateless; each request hits the API layer. Can run as many instances as needed.
---
## 9. Security Architecture
### API Security Layers
```
1. TLS everywhere (HTTPS, WSS)
2. API Gateway rate limiting (per-user, per-IP)
3. JWT validation on every request (15min expiry)
4. RBAC permission check (scoped to resources)
5. Input validation (Zod schemas on every endpoint)
6. Webhook signature verification (HMAC-SHA256)
7. Audit logging (every mutation logged)
8. CORS whitelist (dashboard domain only)
9. Content Security Policy headers
10. SQL injection protection (parameterized queries, ORM)
```
### MCP Server Security
```
- MCP tools check permissions before executing (user's JWT passed through)
- Production deploy tool requires explicit 'deploy:production' scope
- All tool executions logged to audit trail
- Rate limiting on destructive operations (deploy, reject)
- Sensitive data (API keys, secrets) never exposed through MCP resources
```
---
## 10. Implementation Roadmap
### Phase 1: Foundation (Week 1-2)
- [ ] Database schema + migrations (PostgreSQL)
- [ ] Core REST API (pipelines, tasks, approvals CRUD)
- [ ] JWT authentication
- [ ] Basic audit logging
### Phase 2: Real-time + MCP (Week 3-4)
- [ ] WebSocket server + Redis pub/sub
- [ ] Event bus with Redis Streams
- [ ] MCP server with core tools (get_pending, approve, reject, status)
- [ ] MCP resources (pipeline state, dashboard summary)
### Phase 3: Integrations (Week 5-6)
- [ ] Discord bot bridge
- [ ] GitHub webhook integration
- [ ] CI/CD pipeline triggers
- [ ] Notification system with escalation ladder
### Phase 4: Dashboard (Week 7-8)
- [ ] React dashboard consuming REST + WebSocket
- [ ] GraphQL endpoint for complex queries
- [ ] MCP prompts (review, attention, deploy checklist)
- [ ] SLA monitoring and alerting
### Phase 5: Polish (Week 9-10)
- [ ] Bulk operations
- [ ] Advanced analytics and retrospectives
- [ ] Performance optimization
- [ ] Security audit
- [ ] Documentation and onboarding
---
## Appendix: Full Event Catalog
| Event | Trigger | Consumers |
|-------|---------|-----------|
| `pipeline.created` | New pipeline initialized | ws, audit, discord, mcp |
| `pipeline.stage_changed` | Stage transition | ws, audit, discord, mcp, github |
| `pipeline.blocked` | Human gate reached | ws, audit, discord, notification |
| `pipeline.completed` | Pipeline finished | ws, audit, discord, github |
| `pipeline.failed` | Unrecoverable error | ws, audit, discord, notification |
| `task.created` | New human task | ws, audit, discord, notification, sla-monitor |
| `task.claimed` | User claimed task | ws, audit |
| `task.completed` | Decision made | ws, audit, discord, pipeline-service |
| `task.sla_warning` | SLA approaching | ws, notification, discord |
| `task.sla_breached` | SLA exceeded | ws, notification, discord, escalation |
| `approval.pending` | Gate awaiting approval | ws, audit, notification, discord |
| `approval.approved` | Gate approved | ws, audit, discord, pipeline-service |
| `approval.rejected` | Gate rejected | ws, audit, discord, notification |
| `agent.status_changed` | Agent health change | ws, audit, notification (if error) |
| `deploy.started` | Deploy initiated | ws, audit, discord, github |
| `deploy.succeeded` | Deploy complete | ws, audit, discord, github |
| `deploy.failed` | Deploy failed | ws, audit, discord, notification |
| `asset.created` | New artifact | ws, audit |
| `test.completed` | Test run finished | ws, audit, mcp, pipeline-service |
---
*This architecture document serves as the blueprint for implementing the AI Agent Factory SaaS backend. It should be treated as a living document, updated as implementation progresses and requirements evolve.*