diff --git a/plugos/lib/mq.dexie.ts b/plugos/lib/mq.dexie.ts index 1334f6b..ead37c7 100644 --- a/plugos/lib/mq.dexie.ts +++ b/plugos/lib/mq.dexie.ts @@ -160,7 +160,11 @@ export class DexieMQ { await this.processing.bulkDelete(ids.map((id) => [queue, id])); } - async requeueTimeouts(timeout: number, maxRetries?: number) { + async requeueTimeouts( + timeout: number, + maxRetries?: number, + disableDLQ?: boolean, + ) { const now = Date.now(); const messages = await this.processing.where("ts").below(now - timeout) .toArray(); @@ -175,18 +179,26 @@ export class DexieMQ { for (const m of messages) { const retries = (m.retries || 0) + 1; if (maxRetries && retries > maxRetries) { - console.warn( - "[mq]", - "Message exceeded max retries, moving to DLQ", - m, - ); - dlqMessages.push({ - queue: m.queue, - id: m.id, - body: m.body, - ts: Date.now(), - retries, - }); + if (disableDLQ) { + console.warn( + "[mq]", + "Message exceeded max retries, flushing message", + m, + ); + } else { + console.warn( + "[mq]", + "Message exceeded max retries, moving to DLQ", + m, + ); + dlqMessages.push({ + queue: m.queue, + id: m.id, + body: m.body, + ts: Date.now(), + retries, + }); + } } else { console.info("[mq]", "Message ack timed out, requeueing", m); requeuedMessages.push({ diff --git a/plugs/core/page.ts b/plugs/core/page.ts index b91ece1..66f557d 100644 --- a/plugs/core/page.ts +++ b/plugs/core/page.ts @@ -91,9 +91,6 @@ export async function reindexCommand() { const pages = await space.listPages(); await mq.batchSend("indexQueue", pages.map((page) => page.name)); - - // console.log("Indexing queued!"); - // await editor.flashNotification("Reindexing done"); } // Completion diff --git a/plugs/directive/command.ts b/plugs/directive/command.ts index 099b9a1..03f76ce 100644 --- a/plugs/directive/command.ts +++ b/plugs/directive/command.ts @@ -9,6 +9,8 @@ import { renderDirectives } from "./directives.ts"; import { extractFrontmatter } from "$sb/lib/frontmatter.ts"; import { PageMeta } from "../../web/types.ts"; import { isFederationPath } from "$sb/lib/resolve.ts"; +import { mq } from "$sb/plugos-syscall/mod.ts"; +import { Message } from "$sb/mq.ts"; export async function updateDirectivesOnPageCommand() { // If `arg` is a string, it's triggered automatically via an event, not explicitly via a command @@ -81,8 +83,19 @@ export async function updateDirectivesInSpaceCommand() { await editor.flashNotification( "Updating directives in entire space, this can take a while...", ); - await updateDirectivesInSpace(); - await editor.flashNotification("Done!"); + // await updateDirectivesInSpace(); + const pages = await space.listPages(); + + await mq.batchSend("directiveUpdateQueue", pages.map((page) => page.name)); +} + +export async function processUpdateQueue(messages: Message[]) { + for (const message of messages) { + const pageName: string = message.body; + console.log("Updating directives in page", pageName); + await updateDirectivesForPage(pageName); + await mq.ack("directiveUpdateQueue", message.id); + } } async function findReplacements( @@ -152,7 +165,20 @@ async function updateDirectivesForPage( ) { const pageMeta = await space.getPageMeta(pageName); const currentText = await space.readPage(pageName); - const newText = await updateDirectives(pageMeta, currentText); + const tree = await markdown.parseMarkdown(currentText); + const metaData = await extractFrontmatter(tree, ["$disableDirectives"]); + + if (isFederationPath(pageName)) { + console.info("Current page is a federation page, not updating directives."); + return; + } + + if (metaData.$disableDirectives) { + console.info("Directives disabled in page meta, not updating them."); + return; + } + + const newText = await updateDirectives(pageMeta, tree, currentText); if (newText !== currentText) { console.info("Content of page changed, saving."); await space.writePage(pageName, newText); @@ -161,9 +187,9 @@ async function updateDirectivesForPage( export async function updateDirectives( pageMeta: PageMeta, + tree: ParseTree, text: string, ) { - const tree = await markdown.parseMarkdown(text); const replacements = await findReplacements(tree, text, pageMeta); // Iterate again and replace the bodies. diff --git a/plugs/directive/directive.plug.yaml b/plugs/directive/directive.plug.yaml index be26022..a2ff9ae 100644 --- a/plugs/directive/directive.plug.yaml +++ b/plugs/directive/directive.plug.yaml @@ -15,6 +15,11 @@ functions: path: ./command.ts:updateDirectivesInSpaceCommand command: name: "Directives: Update Entire Space" + processUpdateQueue: + path: ./command.ts:processUpdateQueue + mqSubscriptions: + - queue: directiveUpdateQueue + batchSize: 3 indexData: path: ./data.ts:indexData events: diff --git a/plugs/directive/template_directive.ts b/plugs/directive/template_directive.ts index ebdb18e..37dc826 100644 --- a/plugs/directive/template_directive.ts +++ b/plugs/directive/template_directive.ts @@ -77,7 +77,8 @@ export async function templateDirectiveRenderer( newBody = templateFn(parsedArgs, buildHandebarOptions(pageMeta)); // Recursively render directives - newBody = await updateDirectives(pageMeta, newBody); + const tree = await markdown.parseMarkdown(newBody); + newBody = await updateDirectives(pageMeta, tree, newBody); } return newBody.trim(); } diff --git a/web/client.ts b/web/client.ts index 03853ba..d3d37e7 100644 --- a/web/client.ts +++ b/web/client.ts @@ -99,8 +99,8 @@ export class Client { this.mq = new DexieMQ(`${this.dbPrefix}_mq`, indexedDB, IDBKeyRange); setInterval(() => { - // Timeout after 5s - this.mq.requeueTimeouts(5000, 3).catch(console.error); + // Timeout after 5s, retries 3 times, otherwise drops the message (no DLQ) + this.mq.requeueTimeouts(5000, 3, true).catch(console.error); }, 20000); // Look to requeue every 20s // Event hook