#!/usr/bin/env node // Gmail Pub/Sub Pull Daemon // Polls the Pub/Sub subscription for new Gmail notifications // and triggers a Clawdbot cron wake when new email arrives. import { PubSub } from '@google-cloud/pubsub'; import { readFileSync, writeFileSync, existsSync } from 'fs'; import { fileURLToPath } from 'url'; import { dirname, join } from 'path'; const __dirname = dirname(fileURLToPath(import.meta.url)); const PROJECT_ID = 'key-utility-477818-t4'; const SUBSCRIPTION_NAME = 'upwork-email-sub'; const WAKE_URL = 'http://localhost:18700'; // Clawdbot gateway const CRON_JOB_ID = '2205ac65'; // Upwork email pipeline cron const SA_KEY_PATH = join(__dirname, 'sa-key.json'); // Debounce: don't wake more than once per 30 seconds let lastWakeTime = 0; const DEBOUNCE_MS = 30_000; const STATE_FILE = '/Users/jakeshore/.clawdbot/workspace/upwork-email-trigger/daemon-state.json'; function loadState() { if (existsSync(STATE_FILE)) { return JSON.parse(readFileSync(STATE_FILE, 'utf8')); } return { lastHistoryId: null, totalWakes: 0, startedAt: new Date().toISOString() }; } function saveState(state) { writeFileSync(STATE_FILE, JSON.stringify(state, null, 2)); } async function triggerCronWake() { const now = Date.now(); if (now - lastWakeTime < DEBOUNCE_MS) { console.log(`[${ts()}] Debounced — last wake was ${Math.round((now - lastWakeTime) / 1000)}s ago`); return; } lastWakeTime = now; try { // Use gateway hooks/wake endpoint — triggers the cron job immediately const http = await import('http'); const payload = JSON.stringify({ text: 'New Upwork email received — run email pipeline now', jobId: '2205ac65-cda1-45ca-a041-0849b352a177' }); const result = await new Promise((resolve, reject) => { const req = http.request({ hostname: '127.0.0.1', port: 18789, path: '/hooks/wake', method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(payload), 'Authorization': 'Bearer qnLSLAojTmhgdA4vktyaDsoDyvL9yUT_fPVR32vSdxk' }, timeout: 8000, }, (res) => { let data = ''; res.on('data', (chunk) => data += chunk); res.on('end', () => resolve({ status: res.statusCode, data })); }); req.on('error', reject); req.on('timeout', () => { req.destroy(); reject(new Error('HTTP timeout')); }); req.write(payload); req.end(); }); if (result.status >= 200 && result.status < 300) { console.log(`[${ts()}] Cron wake triggered via hooks/wake! Status: ${result.status}, Response: ${result.data}`); } else { throw new Error(`HTTP ${result.status}: ${result.data}`); } } catch (err) { // HTTP didn't work either — log notification for the 5-min cron to pick up console.log(`[${ts()}] Push wake failed (${err.message}) — 5-min cron polling will catch it`); try { // Write marker so we can track missed pushes writeFileSync('/tmp/upwork-gmail-wake', JSON.stringify({ timestamp: new Date().toISOString(), historyId: 'unknown', reason: 'gmail-pubsub-notification', note: 'Push trigger failed, relying on 5-min cron polling' })); } catch (_) {} } } function ts() { return new Date().toLocaleString('en-US', { timeZone: 'America/New_York', hour12: false }); } async function main() { console.log(`[${ts()}] Starting Gmail Pub/Sub pull daemon...`); console.log(`[${ts()}] Project: ${PROJECT_ID}, Subscription: ${SUBSCRIPTION_NAME}`); const state = loadState(); console.log(`[${ts()}] Last history ID: ${state.lastHistoryId}, Total wakes: ${state.totalWakes}`); const pubsub = new PubSub({ projectId: PROJECT_ID, keyFilename: SA_KEY_PATH }); const subscription = pubsub.subscription(SUBSCRIPTION_NAME); // Use streaming pull (long-lived connection) subscription.on('message', async (message) => { const data = JSON.parse(Buffer.from(message.data, 'base64').toString()); console.log(`[${ts()}] Gmail notification: emailAddress=${data.emailAddress}, historyId=${data.historyId}`); // Only process if for localbosses if (data.emailAddress === 'jake@localbosses.org') { state.lastHistoryId = data.historyId; state.totalWakes++; state.lastNotification = new Date().toISOString(); saveState(state); await triggerCronWake(); } message.ack(); }); subscription.on('error', (err) => { console.error(`[${ts()}] Subscription error: ${err.message}`); // Auto-reconnect after 5s setTimeout(() => { console.log(`[${ts()}] Reconnecting...`); }, 5000); }); console.log(`[${ts()}] Listening for Gmail notifications...`); // Keep alive + periodic health check setInterval(() => { console.log(`[${ts()}] Health check — daemon alive, ${state.totalWakes} total wakes`); }, 300_000); // Every 5 minutes } main().catch((err) => { console.error('Fatal:', err); process.exit(1); });