From b4ab56c712dc6afcdd18d5bb5907a059b0a677ac Mon Sep 17 00:00:00 2001 From: Zef Hemel Date: Wed, 15 Nov 2023 09:31:44 +0100 Subject: [PATCH] Requeue messages on the server as well --- plugs/index/command.ts | 2 +- server/server_system.ts | 6 ++++++ web/components/panel_html.ts | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/plugs/index/command.ts b/plugs/index/command.ts index 40c5d93..e900c25 100644 --- a/plugs/index/command.ts +++ b/plugs/index/command.ts @@ -22,7 +22,7 @@ export async function reindexSpace() { // Now let's wait for the processing to finish let queueStats = await mq.getQueueStats("indexQueue"); while (queueStats.queued > 0 || queueStats.processing > 0) { - sleep(1000); + await sleep(500); queueStats = await mq.getQueueStats("indexQueue"); } // And notify the user diff --git a/server/server_system.ts b/server/server_system.ts index 0f67b0f..8c6980a 100644 --- a/server/server_system.ts +++ b/server/server_system.ts @@ -68,6 +68,11 @@ export class ServerSystem { const mq = new DataStoreMQ(this.ds); + setInterval(() => { + // Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ) + mq.requeueTimeouts(5000, 3, true).catch(console.error); + }, 20000); // Look to requeue every 20s + const plugNamespaceHook = new PlugNamespaceHook(); this.system.addHook(plugNamespaceHook); @@ -154,6 +159,7 @@ export class ServerSystem { "reindexSpace", [], ).then(() => { + console.log("Initial index completed!"); this.ds.set(["$initialIndexDone"], true); }).catch(console.error); if (awaitIndex) { diff --git a/web/components/panel_html.ts b/web/components/panel_html.ts index df98bb2..12eaab9 100644 --- a/web/components/panel_html.ts +++ b/web/components/panel_html.ts @@ -100,7 +100,7 @@ function loadJsByUrl(url) { } - +