2026-02-18 23:01:51 -05:00

144 lines
5.0 KiB
JavaScript

#!/usr/bin/env node
// Gmail Pub/Sub Pull Daemon
// Polls the Pub/Sub subscription for new Gmail notifications
// and triggers a Clawdbot cron wake when new email arrives.
import { PubSub } from '@google-cloud/pubsub';
import { readFileSync, writeFileSync, existsSync } from 'fs';
import { fileURLToPath } from 'url';
import { dirname, join } from 'path';
const __dirname = dirname(fileURLToPath(import.meta.url));
const PROJECT_ID = 'key-utility-477818-t4';
const SUBSCRIPTION_NAME = 'upwork-email-sub';
const WAKE_URL = 'http://localhost:18700'; // Clawdbot gateway
const CRON_JOB_ID = 'c9df3e78'; // Unified upwork-email-apply cron (scan + apply in one shot)
const SA_KEY_PATH = join(__dirname, 'sa-key.json');
// Debounce: don't wake more than once per 30 seconds
let lastWakeTime = 0;
const DEBOUNCE_MS = 30_000;
const STATE_FILE = '/Users/jakeshore/.clawdbot/workspace/upwork-email-trigger/daemon-state.json';
function loadState() {
if (existsSync(STATE_FILE)) {
return JSON.parse(readFileSync(STATE_FILE, 'utf8'));
}
return { lastHistoryId: null, totalWakes: 0, startedAt: new Date().toISOString() };
}
function saveState(state) {
writeFileSync(STATE_FILE, JSON.stringify(state, null, 2));
}
async function triggerCronWake() {
const now = Date.now();
if (now - lastWakeTime < DEBOUNCE_MS) {
console.log(`[${ts()}] Debounced — last wake was ${Math.round((now - lastWakeTime) / 1000)}s ago`);
return;
}
lastWakeTime = now;
try {
// Use gateway hooks/wake endpoint — triggers the cron job immediately
const http = await import('http');
const payload = JSON.stringify({
text: 'New Upwork email received — run email pipeline now',
jobId: 'c9df3e78-9721-4bcc-b4c3-ba716ab6273d'
});
const result = await new Promise((resolve, reject) => {
const req = http.request({
hostname: '127.0.0.1',
port: 18789,
path: '/hooks/wake',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(payload),
'Authorization': 'Bearer qnLSLAojTmhgdA4vktyaDsoDyvL9yUT_fPVR32vSdxk'
},
timeout: 8000,
}, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => resolve({ status: res.statusCode, data }));
});
req.on('error', reject);
req.on('timeout', () => { req.destroy(); reject(new Error('HTTP timeout')); });
req.write(payload);
req.end();
});
if (result.status >= 200 && result.status < 300) {
console.log(`[${ts()}] Cron wake triggered via hooks/wake! Status: ${result.status}, Response: ${result.data}`);
} else {
throw new Error(`HTTP ${result.status}: ${result.data}`);
}
} catch (err) {
// HTTP didn't work either — log notification for the 5-min cron to pick up
console.log(`[${ts()}] Push wake failed (${err.message}) — 5-min cron polling will catch it`);
try {
// Write marker so we can track missed pushes
writeFileSync('/tmp/upwork-gmail-wake', JSON.stringify({
timestamp: new Date().toISOString(),
historyId: 'unknown',
reason: 'gmail-pubsub-notification',
note: 'Push trigger failed, relying on 5-min cron polling'
}));
} catch (_) {}
}
}
function ts() {
return new Date().toLocaleString('en-US', { timeZone: 'America/New_York', hour12: false });
}
async function main() {
console.log(`[${ts()}] Starting Gmail Pub/Sub pull daemon...`);
console.log(`[${ts()}] Project: ${PROJECT_ID}, Subscription: ${SUBSCRIPTION_NAME}`);
const state = loadState();
console.log(`[${ts()}] Last history ID: ${state.lastHistoryId}, Total wakes: ${state.totalWakes}`);
const pubsub = new PubSub({ projectId: PROJECT_ID, keyFilename: SA_KEY_PATH });
const subscription = pubsub.subscription(SUBSCRIPTION_NAME);
// Use streaming pull (long-lived connection)
subscription.on('message', async (message) => {
const data = JSON.parse(Buffer.from(message.data, 'base64').toString());
console.log(`[${ts()}] Gmail notification: emailAddress=${data.emailAddress}, historyId=${data.historyId}`);
// Only process if for localbosses
if (data.emailAddress === 'jake@localbosses.org') {
state.lastHistoryId = data.historyId;
state.totalWakes++;
state.lastNotification = new Date().toISOString();
saveState(state);
await triggerCronWake();
}
message.ack();
});
subscription.on('error', (err) => {
console.error(`[${ts()}] Subscription error: ${err.message}`);
// Auto-reconnect after 5s
setTimeout(() => {
console.log(`[${ts()}] Reconnecting...`);
}, 5000);
});
console.log(`[${ts()}] Listening for Gmail notifications...`);
// Keep alive + periodic health check
setInterval(() => {
console.log(`[${ts()}] Health check — daemon alive, ${state.totalWakes} total wakes`);
}, 300_000); // Every 5 minutes
}
main().catch((err) => {
console.error('Fatal:', err);
process.exit(1);
});