///
import {
MQMessage,
MQStats,
MQSubscribeOptions,
} from "../../plug-api/types.ts";
import { MessageQueue } from "./mq.ts";
type QueuedMessage = [string, MQMessage];
export class DenoKvMQ implements MessageQueue {
listeners: Map void | Promise>> =
new Map();
constructor(private kv: Deno.Kv) {
kv.listenQueue(async (message: unknown) => {
const [queue, body] = message as QueuedMessage;
const listeners = this.listeners.get(queue);
if (!listeners) {
return;
}
for (const listener of listeners) {
await Promise.resolve(listener([{ id: "_dummyid", queue, body }]));
}
});
}
// Dummy implementation
getQueueStats(_queue: string): Promise {
return Promise.resolve({
queued: 0,
processing: 0,
dlq: 0,
});
}
// Dummy implementation
getAllQueueStats(): Promise> {
return Promise.resolve({});
}
async batchSend(queue: string, bodies: any[]): Promise {
for (const body of bodies) {
const result = await this.kv.enqueue([queue, body]);
if (!result.ok) {
throw result;
}
}
// const results = await Promise.all(
// bodies.map((body) => this.kv.enqueue([queue, body])),
// );
// for (const result of results) {
// if (!result.ok) {
// throw result;
// }
// }
}
async send(queue: string, body: any): Promise {
const result = await this.kv.enqueue([queue, body]);
if (!result.ok) {
throw result;
}
}
subscribe(
queue: string,
_options: MQSubscribeOptions,
callback: (messages: MQMessage[]) => void | Promise,
): () => void {
const listeners = this.listeners.get(queue);
if (!listeners) {
this.listeners.set(queue, new Set([callback]));
} else {
listeners.add(callback);
}
return () => {
const listeners = this.listeners.get(queue);
if (!listeners) {
return;
}
listeners.delete(callback);
};
}
ack(_queue: string, _id: string): Promise {
// Doesn't apply to this implementation
return Promise.resolve();
}
batchAck(_queue: string, _ids: string[]): Promise {
// Doesn't apply to this implementation
return Promise.resolve();
}
}