// workers/fanfare-sync.worker.ts
import { completeAdmission } from "../services/fanfare.service";
interface SyncJob {
orderId: string;
attempts: number;
lastAttempt?: Date;
}
const MAX_ATTEMPTS = 5;
const RETRY_DELAYS = [1000, 5000, 30000, 120000, 600000]; // 1s, 5s, 30s, 2m, 10m
export async function processFanfareSyncQueue() {
// Get pending sync jobs
const pendingJobs = await db
.select()
.from(fanfareSyncQueue)
.where(
and(
lt(fanfareSyncQueue.attempts, MAX_ATTEMPTS),
or(isNull(fanfareSyncQueue.nextAttempt), lt(fanfareSyncQueue.nextAttempt, new Date()))
)
)
.limit(10);
for (const job of pendingJobs) {
await processJob(job);
}
}
async function processJob(job: SyncJob) {
try {
// Get order details
const order = await db.select().from(orders).where(eq(orders.id, job.orderId)).limit(1);
if (!order[0]) {
// Order doesn't exist, remove job
await db.delete(fanfareSyncQueue).where(eq(fanfareSyncQueue.orderId, job.orderId));
return;
}
// Attempt completion
await completeAdmission({
distributionId: order[0].distributionId,
distributionType: order[0].distributionType,
consumerId: order[0].consumerId,
orderId: order[0].id,
orderAmount: order[0].totalAmount,
});
// Success - update order and remove job
await db.update(orders).set({ fanfareSynced: true, fanfareSyncedAt: new Date() }).where(eq(orders.id, job.orderId));
await db.delete(fanfareSyncQueue).where(eq(fanfareSyncQueue.orderId, job.orderId));
console.log(`Fanfare sync successful for order ${job.orderId}`);
} catch (error) {
// Update job for retry
const nextAttempt = new Date(Date.now() + RETRY_DELAYS[job.attempts] || 600000);
await db
.update(fanfareSyncQueue)
.set({
attempts: job.attempts + 1,
lastAttempt: new Date(),
nextAttempt,
lastError: error instanceof Error ? error.message : "Unknown error",
})
.where(eq(fanfareSyncQueue.orderId, job.orderId));
console.error(`Fanfare sync failed for order ${job.orderId}, will retry at ${nextAttempt}`);
}
}
// Run every minute
setInterval(processFanfareSyncQueue, 60000);