2026-01-28 23:00:58 -05:00

320 lines
8.8 KiB
TypeScript

/**
* TextMe Channel Plugin - Inbound Message Handler
*
* Connects to TextMe realtime WebSocket and forwards messages to Clawdbot.
*/
import {
TextMeRealtime,
type IncomingMessage,
type RealtimeOptions,
} from 'textme-unofficial-api';
import { AuthManager, type AuthenticatedAccount } from './auth.js';
import type { PluginConfig } from './config.js';
// ============================================================================
// Types
// ============================================================================
export interface ClawdbotMessage {
channel: string;
channelId: string;
messageId: string;
authorId: string;
authorName?: string;
content: string;
timestamp: string;
attachments?: ClawdbotAttachment[];
metadata?: Record<string, unknown>;
}
export interface ClawdbotAttachment {
url: string;
type: 'image' | 'video' | 'audio' | 'file';
filename?: string;
size?: number;
}
export interface InboundHandlerOptions {
/** Clawdbot gateway URL */
gatewayUrl: string;
/** Clawdbot gateway token */
gatewayToken?: string;
/** Realtime connection options */
realtimeOptions?: RealtimeOptions;
/** Enable debug logging */
debug?: boolean;
/** Message handler callback (alternative to gateway forwarding) */
onMessage?: (message: ClawdbotMessage, account: AuthenticatedAccount) => Promise<void>;
}
export interface AccountConnection {
account: AuthenticatedAccount;
realtime: TextMeRealtime;
connected: boolean;
}
// ============================================================================
// Inbound Handler
// ============================================================================
export class InboundHandler {
private authManager: AuthManager;
private options: InboundHandlerOptions;
private connections: Map<string, AccountConnection> = new Map();
private isRunning = false;
constructor(authManager: AuthManager, options: InboundHandlerOptions) {
this.authManager = authManager;
this.options = options;
}
/**
* Start listening for inbound messages on all accounts
*/
async start(): Promise<void> {
if (this.isRunning) {
this.log('Already running');
return;
}
this.isRunning = true;
const accounts = this.authManager.getAllAccounts();
for (const account of accounts) {
try {
await this.connectAccount(account);
} catch (err) {
console.error(`[TextMeInbound] Failed to connect account ${account.account.id}:`, err);
}
}
this.log(`Started with ${this.connections.size} connections`);
}
/**
* Connect a single account to realtime
*/
async connectAccount(account: AuthenticatedAccount): Promise<void> {
const realtimeOptions: RealtimeOptions = {
autoReconnect: this.options.realtimeOptions?.autoReconnect ?? true,
maxReconnectAttempts: this.options.realtimeOptions?.maxReconnectAttempts ?? 10,
reconnectDelay: this.options.realtimeOptions?.reconnectDelay ?? 5000,
...this.options.realtimeOptions,
};
const realtime = new TextMeRealtime(realtimeOptions);
// Set up event handlers
realtime.on('message', (msg) => this.handleIncomingMessage(account, msg));
realtime.on('error', (err) => this.handleError(account, err));
realtime.on('connected', () => this.handleConnected(account));
realtime.on('disconnected', (info) => this.handleDisconnected(account, info));
// Connect with access token
await realtime.connect(account.tokens.accessToken);
const connection: AccountConnection = {
account,
realtime,
connected: true,
};
this.connections.set(account.account.id, connection);
this.log(`Connected account ${account.account.id}`);
}
/**
* Handle incoming message from TextMe
*/
private async handleIncomingMessage(
account: AuthenticatedAccount,
msg: IncomingMessage
): Promise<void> {
this.log(`Received message from ${msg.senderId}: ${msg.text?.substring(0, 50)}...`);
// Transform to Clawdbot format
const clawdbotMessage: ClawdbotMessage = {
channel: 'textme',
channelId: `textme:${account.account.id}:${msg.conversationId}`,
messageId: msg.id,
authorId: msg.senderId,
authorName: msg.senderName,
content: msg.text,
timestamp: msg.timestamp.toISOString(),
attachments: msg.mediaUrl ? [{
url: msg.mediaUrl,
type: msg.mediaType || 'file',
}] : undefined,
metadata: {
conversationId: msg.conversationId,
accountId: account.account.id,
accountName: account.account.name,
phoneNumbers: account.account.phoneNumbers,
},
};
// Forward to callback or gateway
if (this.options.onMessage) {
try {
await this.options.onMessage(clawdbotMessage, account);
} catch (err) {
console.error('[TextMeInbound] Message callback error:', err);
}
} else {
await this.forwardToGateway(clawdbotMessage);
}
}
/**
* Forward message to Clawdbot gateway
*/
private async forwardToGateway(message: ClawdbotMessage): Promise<void> {
const url = `${this.options.gatewayUrl}/api/channel/inbound`;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (this.options.gatewayToken) {
headers['Authorization'] = `Bearer ${this.options.gatewayToken}`;
}
try {
const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(message),
});
if (!response.ok) {
const errorText = await response.text();
console.error(`[TextMeInbound] Gateway error: ${response.status} ${errorText}`);
} else {
this.log(`Forwarded message ${message.messageId} to gateway`);
}
} catch (err) {
console.error('[TextMeInbound] Failed to forward message to gateway:', err);
}
}
/**
* Handle connection established
*/
private handleConnected(account: AuthenticatedAccount): void {
const connection = this.connections.get(account.account.id);
if (connection) {
connection.connected = true;
}
this.log(`Account ${account.account.id} connected to realtime`);
}
/**
* Handle disconnection
*/
private handleDisconnected(
account: AuthenticatedAccount,
info: { reason: string }
): void {
const connection = this.connections.get(account.account.id);
if (connection) {
connection.connected = false;
}
console.warn(`[TextMeInbound] Account ${account.account.id} disconnected: ${info.reason}`);
}
/**
* Handle realtime error
*/
private handleError(
account: AuthenticatedAccount,
error: { code: string; message: string }
): void {
console.error(`[TextMeInbound] Account ${account.account.id} error: ${error.code} - ${error.message}`);
}
/**
* Reconnect a specific account (e.g., after token refresh)
*/
async reconnectAccount(accountId: string): Promise<void> {
const existing = this.connections.get(accountId);
if (existing) {
existing.realtime.disconnect();
}
const account = this.authManager.getAccount(accountId);
if (account) {
await this.connectAccount(account);
}
}
/**
* Get connection status
*/
getConnectionStatus(): Map<string, boolean> {
const status = new Map<string, boolean>();
for (const [id, conn] of this.connections) {
status.set(id, conn.connected);
}
return status;
}
/**
* Check if all accounts are connected
*/
isAllConnected(): boolean {
for (const conn of this.connections.values()) {
if (!conn.connected) return false;
}
return this.connections.size > 0;
}
/**
* Stop all connections
*/
async stop(): Promise<void> {
this.isRunning = false;
for (const [id, conn] of this.connections) {
try {
conn.realtime.disconnect();
this.log(`Disconnected account ${id}`);
} catch (err) {
console.error(`[TextMeInbound] Error disconnecting ${id}:`, err);
}
}
this.connections.clear();
this.log('Stopped all connections');
}
/**
* Debug logging
*/
private log(message: string): void {
if (this.options.debug) {
console.log(`[TextMeInbound] ${message}`);
}
}
}
// ============================================================================
// Factory
// ============================================================================
export function createInboundHandler(
authManager: AuthManager,
config: PluginConfig
): InboundHandler {
return new InboundHandler(authManager, {
gatewayUrl: config.clawdbotGatewayUrl,
gatewayToken: config.clawdbotGatewayToken,
debug: config.debug,
realtimeOptions: {
autoReconnect: config.autoReconnect,
maxReconnectAttempts: config.maxReconnectAttempts,
reconnectDelay: config.reconnectDelay,
},
});
}