import { Hook, Manifest } from "../types.ts"; import { System } from "../system.ts"; import { MQMessage } from "$sb/types.ts"; import { MessageQueue } from "../lib/mq.ts"; import { throttle } from "$sb/lib/async.ts"; type MQSubscription = { queue: string; batchSize?: number; autoAck?: boolean; }; export type MQHookT = { mqSubscriptions?: MQSubscription[]; }; export class MQHook implements Hook { subscriptions: (() => void)[] = []; constructor(private system: System, readonly mq: MessageQueue) { } apply(system: System): void { this.system = system; system.on({ plugLoaded: () => { this.throttledReloadQueues(); }, plugUnloaded: () => { this.throttledReloadQueues(); }, }); this.throttledReloadQueues(); } stop() { // console.log("Unsubscribing from all queues"); this.subscriptions.forEach((sub) => sub()); this.subscriptions = []; } throttledReloadQueues = throttle(() => { this.reloadQueues(); }, 1000); reloadQueues() { this.stop(); for (const plug of this.system.loadedPlugs.values()) { if (!plug.manifest) { continue; } for ( const [name, functionDef] of Object.entries( plug.manifest.functions, ) ) { if (!functionDef.mqSubscriptions) { continue; } const subscriptions = functionDef.mqSubscriptions; for (const subscriptionDef of subscriptions) { const queue = subscriptionDef.queue; // console.log("Subscribing to queue", queue); this.subscriptions.push( this.mq.subscribe( queue, { batchSize: subscriptionDef.batchSize, }, async (messages: MQMessage[]) => { try { await plug.invoke(name, [messages]); if (subscriptionDef.autoAck) { await this.mq.batchAck(queue, messages.map((m) => m.id)); } } catch (e: any) { console.error( "Execution of mqSubscription for queue", queue, "invoking", name, "with messages", messages, "failed:", e, ); } }, ), ); } } } } validateManifest(manifest: Manifest): string[] { const errors: string[] = []; for (const functionDef of Object.values(manifest.functions)) { if (!functionDef.mqSubscriptions) { continue; } for (const subscriptionDef of functionDef.mqSubscriptions) { if (!subscriptionDef.queue) { errors.push("Missing queue name for mqSubscription"); } } } return errors; } }