1
0

Lazy plugs (#596)

* Manifest caching and lazy loading of plug workers
* Fixes #546 Plug unloading after time out
This commit is contained in:
Zef Hemel 2023-12-06 18:44:48 +01:00 committed by GitHub
parent 8451680c01
commit 8527528af4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 226 additions and 67 deletions

View File

@ -114,7 +114,13 @@ export class EventedSpacePrimitives implements SpacePrimitives {
meta, meta,
); );
if (!selfUpdate) { if (!selfUpdate) {
await this.dispatchEvent("file:changed", name, true); await this.dispatchEvent(
"file:changed",
name,
true,
undefined,
newMeta.lastModified,
);
} }
this.spaceSnapshot[name] = newMeta.lastModified; this.spaceSnapshot[name] = newMeta.lastModified;

View File

@ -23,7 +23,7 @@ export function deletePage(name: string): Promise<void> {
return syscall("space.deletePage", name); return syscall("space.deletePage", name);
} }
export function listPlugs(): Promise<string[]> { export function listPlugs(): Promise<FileMeta[]> {
return syscall("space.listPlugs"); return syscall("space.listPlugs");
} }

View File

@ -18,6 +18,8 @@ Deno.test("Run a plugos endpoint server", async () => {
await system.load( await system.load(
new URL(`file://${workerPath}`), new URL(`file://${workerPath}`),
"test",
0,
createSandbox, createSandbox,
); );

View File

@ -9,7 +9,7 @@ import { KvPrimitives } from "./kv_primitives.ts";
*/ */
export class DataStore { export class DataStore {
constructor( constructor(
private kv: KvPrimitives, readonly kv: KvPrimitives,
private prefix: KvKey = [], private prefix: KvKey = [],
private functionMap: FunctionMap = builtinFunctions, private functionMap: FunctionMap = builtinFunctions,
) { ) {

49
plugos/manifest_cache.ts Normal file
View File

@ -0,0 +1,49 @@
import { KvPrimitives } from "./lib/kv_primitives.ts";
import { Plug } from "./plug.ts";
import { Manifest } from "./types.ts";
export interface ManifestCache<T> {
getManifest(plug: Plug<T>, hash: number): Promise<Manifest<T>>;
}
export class KVPrimitivesManifestCache<T> implements ManifestCache<T> {
constructor(private kv: KvPrimitives, private manifestPrefix: string) {
}
async getManifest(plug: Plug<T>, hash: number): Promise<Manifest<T>> {
const [cached] = await this.kv.batchGet([[
this.manifestPrefix,
plug.name,
]]);
if (cached && cached.hash === hash) {
// console.log("Using KV cached manifest for", plug.name);
return cached.manifest;
}
await plug.sandbox.init();
const manifest = plug.sandbox.manifest!;
await this.kv.batchSet([{
key: [this.manifestPrefix, plug.name],
value: { manifest, hash },
}]);
return manifest;
}
}
export class InMemoryManifestCache<T> implements ManifestCache<T> {
private cache = new Map<string, {
manifest: Manifest<T>;
hash: number;
}>();
async getManifest(plug: Plug<T>, hash: number): Promise<Manifest<T>> {
const cached = this.cache.get(plug.workerUrl.href);
if (cached && cached.hash === hash) {
// console.log("Using memory cached manifest for", plug.name);
return cached.manifest;
}
await plug.sandbox.init();
const manifest = plug.sandbox.manifest!;
this.cache.set(plug.name!, { manifest, hash });
return manifest;
}
}

View File

@ -9,34 +9,40 @@ export class Plug<HookT> {
public grantedPermissions: string[] = []; public grantedPermissions: string[] = [];
public sandbox: Sandbox<HookT>; public sandbox: Sandbox<HookT>;
// Resolves once the worker has been loaded // Resolves once the plug's manifest is available
ready: Promise<void>; ready: Promise<void>;
// Only available after ready resolves // Only available after ready resolves
public manifest?: Manifest<HookT>; public manifest?: Manifest<HookT>;
public assets?: AssetBundle; public assets?: AssetBundle;
// Time of last function invocation
unloadTimeout?: number;
constructor( constructor(
private system: System<HookT>, private system: System<HookT>,
public workerUrl: URL, public workerUrl: URL,
readonly name: string,
private hash: number,
private sandboxFactory: (plug: Plug<HookT>) => Sandbox<HookT>, private sandboxFactory: (plug: Plug<HookT>) => Sandbox<HookT>,
) { ) {
this.runtimeEnv = system.env; this.runtimeEnv = system.env;
// Kick off worker this.scheduleUnloadTimeout();
this.sandbox = this.sandboxFactory(this);
this.ready = this.sandbox.ready.then(() => {
this.manifest = this.sandbox.manifest!;
this.assets = new AssetBundle(
this.manifest.assets ? this.manifest.assets as AssetJson : {},
);
// TODO: These need to be explicitly granted, not just taken
this.grantedPermissions = this.manifest.requiredPermissions || [];
});
}
get name(): string | undefined { this.sandbox = this.sandboxFactory(this);
return this.manifest?.name; // Retrieve the manifest asynchonously, which may either come from a cache or be loaded from the worker
this.ready = system.options.manifestCache!.getManifest(this, this.hash)
.then(
(manifest) => {
this.manifest = manifest;
this.assets = new AssetBundle(
manifest.assets ? manifest.assets as AssetJson : {},
);
// TODO: These need to be explicitly granted, not just taken
this.grantedPermissions = manifest.requiredPermissions || [];
},
);
} }
// Invoke a syscall // Invoke a syscall
@ -54,11 +60,26 @@ export class Plug<HookT> {
return !funDef.env || !this.runtimeEnv || funDef.env === this.runtimeEnv; return !funDef.env || !this.runtimeEnv || funDef.env === this.runtimeEnv;
} }
scheduleUnloadTimeout() {
if (!this.system.options.plugFlushTimeout) {
return;
}
// Reset the unload timeout, if set
if (this.unloadTimeout) {
clearTimeout(this.unloadTimeout);
}
this.unloadTimeout = setTimeout(() => {
this.stop();
}, this.system.options.plugFlushTimeout);
}
// Invoke a function // Invoke a function
async invoke(name: string, args: any[]): Promise<any> { async invoke(name: string, args: any[]): Promise<any> {
// Ensure the worker is fully up and running // Ensure the worker is fully up and running
await this.ready; await this.ready;
this.scheduleUnloadTimeout();
// Before we access the manifest // Before we access the manifest
const funDef = this.manifest!.functions[name]; const funDef = this.manifest!.functions[name];
if (!funDef) { if (!funDef) {
@ -90,8 +111,7 @@ export class Plug<HookT> {
} }
stop() { stop() {
if (this.sandbox) { console.log("Stopping sandbox for", this.name);
this.sandbox.stop(); this.sandbox.stop();
}
} }
} }

View File

@ -34,6 +34,8 @@ Deno.test("Run a deno sandbox", async () => {
const plug = await system.load( const plug = await system.load(
new URL(`file://${workerPath}`), new URL(`file://${workerPath}`),
"test",
0,
createSandbox, createSandbox,
); );

View File

@ -9,26 +9,38 @@ export type SandboxFactory<HookT> = (plug: Plug<HookT>) => Sandbox<HookT>;
* Effectively this wraps a web worker, the reason to have this split from Plugs is to allow plugs to manage multiple sandboxes, e.g. for performance in the future * Effectively this wraps a web worker, the reason to have this split from Plugs is to allow plugs to manage multiple sandboxes, e.g. for performance in the future
*/ */
export class Sandbox<HookT> { export class Sandbox<HookT> {
private worker: Worker; private worker?: Worker;
private reqId = 0; private reqId = 0;
private outstandingInvocations = new Map< private outstandingInvocations = new Map<
number, number,
{ resolve: (result: any) => void; reject: (e: any) => void } { resolve: (result: any) => void; reject: (e: any) => void }
>(); >();
public ready: Promise<void>; // public ready: Promise<void>;
public manifest?: Manifest<HookT>; public manifest?: Manifest<HookT>;
constructor( constructor(
readonly plug: Plug<HookT>, readonly plug: Plug<HookT>,
workerOptions = {}, private workerOptions = {},
) { ) {
this.worker = new Worker(plug.workerUrl, { }
...workerOptions,
/**
* Should only invoked lazily (either by invoke, or by a ManifestCache to load the manifest)
*/
init(): Promise<void> {
console.log("Booting up worker for", this.plug.name);
if (this.worker) {
// Should not happen
console.warn("Double init of sandbox");
}
this.worker = new Worker(this.plug.workerUrl, {
...this.workerOptions,
type: "module", type: "module",
}); });
this.ready = new Promise((resolve) => {
this.worker.onmessage = (ev) => { return new Promise((resolve) => {
this.worker!.onmessage = (ev) => {
if (ev.data.type === "manifest") { if (ev.data.type === "manifest") {
this.manifest = ev.data.manifest; this.manifest = ev.data.manifest;
resolve(); resolve();
@ -46,14 +58,14 @@ export class Sandbox<HookT> {
try { try {
const result = await this.plug.syscall(data.name!, data.args!); const result = await this.plug.syscall(data.name!, data.args!);
this.worker.postMessage({ this.worker!.postMessage({
type: "sysr", type: "sysr",
id: data.id, id: data.id,
result: result, result: result,
} as WorkerMessage); } as WorkerMessage);
} catch (e: any) { } catch (e: any) {
// console.error("Syscall fail", e); // console.error("Syscall fail", e);
this.worker.postMessage({ this.worker!.postMessage({
type: "sysr", type: "sysr",
id: data.id, id: data.id,
error: e.message, error: e.message,
@ -76,9 +88,13 @@ export class Sandbox<HookT> {
} }
} }
invoke(name: string, args: any[]): Promise<any> { async invoke(name: string, args: any[]): Promise<any> {
if (!this.worker) {
// Lazy initialization
await this.init();
}
this.reqId++; this.reqId++;
this.worker.postMessage({ this.worker!.postMessage({
type: "inv", type: "inv",
id: this.reqId, id: this.reqId,
name, name,
@ -90,6 +106,9 @@ export class Sandbox<HookT> {
} }
stop() { stop() {
this.worker.terminate(); if (this.worker) {
this.worker.terminate();
this.worker = undefined;
}
} }
} }

View File

@ -3,6 +3,7 @@ import { EventEmitter } from "./event.ts";
import type { SandboxFactory } from "./sandbox.ts"; import type { SandboxFactory } from "./sandbox.ts";
import { Plug } from "./plug.ts"; import { Plug } from "./plug.ts";
import { deepObjectMerge } from "$sb/lib/json.ts"; import { deepObjectMerge } from "$sb/lib/json.ts";
import { InMemoryManifestCache, ManifestCache } from "./manifest_cache.ts";
export interface SysCallMapping { export interface SysCallMapping {
[key: string]: (ctx: SyscallContext, ...args: any) => Promise<any> | any; [key: string]: (ctx: SyscallContext, ...args: any) => Promise<any> | any;
@ -28,13 +29,27 @@ type Syscall = {
callback: SyscallSignature; callback: SyscallSignature;
}; };
export type SystemOptions = {
manifestCache?: ManifestCache<any>;
plugFlushTimeout?: number;
};
export class System<HookT> extends EventEmitter<SystemEvents<HookT>> { export class System<HookT> extends EventEmitter<SystemEvents<HookT>> {
protected plugs = new Map<string, Plug<HookT>>(); protected plugs = new Map<string, Plug<HookT>>();
protected registeredSyscalls = new Map<string, Syscall>(); protected registeredSyscalls = new Map<string, Syscall>();
protected enabledHooks = new Set<Hook<HookT>>(); protected enabledHooks = new Set<Hook<HookT>>();
constructor(readonly env?: string) { /**
* @param env either an environment or undefined for hybrid mode
*/
constructor(
readonly env: string | undefined,
readonly options: SystemOptions = {},
) {
super(); super();
if (!options.manifestCache) {
options.manifestCache = new InMemoryManifestCache();
}
} }
get loadedPlugs(): Map<string, Plug<HookT>> { get loadedPlugs(): Map<string, Plug<HookT>> {
@ -94,11 +109,13 @@ export class System<HookT> extends EventEmitter<SystemEvents<HookT>> {
async load( async load(
workerUrl: URL, workerUrl: URL,
name: string,
hash: number,
sandboxFactory: SandboxFactory<HookT>, sandboxFactory: SandboxFactory<HookT>,
// Mapping plug name -> manifest overrides // Mapping plug name -> manifest overrides
manifestOverrides?: Record<string, Partial<Manifest<HookT>>>, manifestOverrides?: Record<string, Partial<Manifest<HookT>>>,
): Promise<Plug<HookT>> { ): Promise<Plug<HookT>> {
const plug = new Plug(this, workerUrl, sandboxFactory); const plug = new Plug(this, workerUrl, name, hash, sandboxFactory);
// Wait for worker to boot, and pass back its manifest // Wait for worker to boot, and pass back its manifest
await plug.ready; await plug.ready;

View File

@ -5,16 +5,19 @@ import { System } from "../../plugos/system.ts";
import { createSandbox } from "../../plugos/environments/deno_sandbox.ts"; import { createSandbox } from "../../plugos/environments/deno_sandbox.ts";
import { loadMarkdownExtensions } from "../../common/markdown_parser/markdown_ext.ts"; import { loadMarkdownExtensions } from "../../common/markdown_parser/markdown_ext.ts";
import { renderMarkdownToHtml } from "./markdown_render.ts"; import { renderMarkdownToHtml } from "./markdown_render.ts";
import { assertEquals } from "../../test_deps.ts";
Deno.test("Markdown render", async () => { Deno.test("Markdown render", async () => {
const system = new System<any>("server"); const system = new System<any>("server");
await system.load( await system.load(
new URL("../../dist_plug_bundle/_plug/editor.plug.js", import.meta.url), new URL("../../dist_plug_bundle/_plug/editor.plug.js", import.meta.url),
"editor",
0,
createSandbox, createSandbox,
); );
await system.load( await system.load(
new URL("../../dist_plug_bundle/_plug/tasks.plug.js", import.meta.url), new URL("../../dist_plug_bundle/_plug/tasks.plug.js", import.meta.url),
"tasks",
0,
createSandbox, createSandbox,
); );
const lang = buildMarkdown(loadMarkdownExtensions(system)); const lang = buildMarkdown(loadMarkdownExtensions(system));

View File

@ -68,7 +68,7 @@ export async function updatePlugsCommand() {
const allPlugNames = [...builtinPlugNames, ...allCustomPlugNames]; const allPlugNames = [...builtinPlugNames, ...allCustomPlugNames];
// And delete extra ones // And delete extra ones
for (const existingPlug of await space.listPlugs()) { for (const { name: existingPlug } of await space.listPlugs()) {
const plugName = existingPlug.substring( const plugName = existingPlug.substring(
"_plug/".length, "_plug/".length,
existingPlug.length - ".plug.js".length, existingPlug.length - ".plug.js".length,

View File

@ -33,11 +33,14 @@ import { languageSyscalls } from "../common/syscalls/language.ts";
import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts"; import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts";
import { codeWidgetSyscalls } from "../web/syscalls/code_widget.ts"; import { codeWidgetSyscalls } from "../web/syscalls/code_widget.ts";
import { CodeWidgetHook } from "../web/hooks/code_widget.ts"; import { CodeWidgetHook } from "../web/hooks/code_widget.ts";
import { KVPrimitivesManifestCache } from "../plugos/manifest_cache.ts";
const fileListInterval = 30 * 1000; // 30s const fileListInterval = 30 * 1000; // 30s
const plugNameExtractRegex = /\/(.+)\.plug\.js$/;
export class ServerSystem { export class ServerSystem {
system: System<SilverBulletHooks> = new System("server"); system!: System<SilverBulletHooks>;
spacePrimitives!: SpacePrimitives; spacePrimitives!: SpacePrimitives;
denoKv!: Deno.Kv; denoKv!: Deno.Kv;
listInterval?: number; listInterval?: number;
@ -52,6 +55,18 @@ export class ServerSystem {
// Always needs to be invoked right after construction // Always needs to be invoked right after construction
async init(awaitIndex = false) { async init(awaitIndex = false) {
this.denoKv = await Deno.openKv(this.dbPath);
const kvPrimitives = new DenoKvPrimitives(this.denoKv);
this.ds = new DataStore(kvPrimitives);
this.system = new System(
"server",
{
manifestCache: new KVPrimitivesManifestCache(kvPrimitives, "manifest"),
plugFlushTimeout: 5 * 60 * 1000, // 5 minutes
},
);
// Event hook // Event hook
const eventHook = new EventHook(); const eventHook = new EventHook();
this.system.addHook(eventHook); this.system.addHook(eventHook);
@ -60,9 +75,6 @@ export class ServerSystem {
const cronHook = new CronHook(this.system); const cronHook = new CronHook(this.system);
this.system.addHook(cronHook); this.system.addHook(cronHook);
this.denoKv = await Deno.openKv(this.dbPath);
this.ds = new DataStore(new DenoKvPrimitives(this.denoKv));
// Endpoint hook // Endpoint hook
this.system.addHook(new EndpointHook(this.app, "/_/")); this.system.addHook(new EndpointHook(this.app, "/_/"));
@ -179,10 +191,13 @@ export class ServerSystem {
} }
async loadPlugFromSpace(path: string): Promise<Plug<SilverBulletHooks>> { async loadPlugFromSpace(path: string): Promise<Plug<SilverBulletHooks>> {
const plugJS = (await this.spacePrimitives.readFile(path)).data; const { meta, data } = await this.spacePrimitives.readFile(path);
const plugName = path.match(plugNameExtractRegex)![1];
return this.system.load( return this.system.load(
// Base64 encoding this to support `deno compile` mode // Base64 encoding this to support `deno compile` mode
new URL(base64EncodedDataUrl("application/javascript", plugJS)), new URL(base64EncodedDataUrl("application/javascript", data)),
plugName,
meta.lastModified,
createSandbox, createSandbox,
); );
} }

View File

@ -29,7 +29,7 @@ export function spaceSyscalls(space: Space): SysCallMapping {
"space.deletePage": async (_ctx, name: string) => { "space.deletePage": async (_ctx, name: string) => {
await space.deletePage(name); await space.deletePage(name);
}, },
"space.listPlugs": (): Promise<string[]> => { "space.listPlugs": (): Promise<FileMeta[]> => {
return space.listPlugs(); return space.listPlugs();
}, },
"space.listAttachments": async (): Promise<AttachmentMeta[]> => { "space.listAttachments": async (): Promise<AttachmentMeta[]> => {

View File

@ -38,6 +38,12 @@ import { languageSyscalls } from "../common/syscalls/language.ts";
import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts"; import { handlebarsSyscalls } from "../common/syscalls/handlebars.ts";
import { codeWidgetSyscalls } from "./syscalls/code_widget.ts"; import { codeWidgetSyscalls } from "./syscalls/code_widget.ts";
import { clientCodeWidgetSyscalls } from "./syscalls/client_code_widget.ts"; import { clientCodeWidgetSyscalls } from "./syscalls/client_code_widget.ts";
import {
InMemoryManifestCache,
KVPrimitivesManifestCache,
} from "../plugos/manifest_cache.ts";
const plugNameExtractRegex = /\/(.+)\.plug\.js$/;
export class ClientSystem { export class ClientSystem {
commandHook: CommandHook; commandHook: CommandHook;
@ -51,11 +57,18 @@ export class ClientSystem {
private client: Client, private client: Client,
private mq: MessageQueue, private mq: MessageQueue,
private ds: DataStore, private ds: DataStore,
// private dbPrefix: string,
private eventHook: EventHook, private eventHook: EventHook,
) { ) {
// Only set environment to "client" when running in thin client mode, otherwise we run everything locally (hybrid) // Only set environment to "client" when running in thin client mode, otherwise we run everything locally (hybrid)
this.system = new System(client.syncMode ? undefined : "client"); this.system = new System(
client.syncMode ? undefined : "client",
{
manifestCache: new KVPrimitivesManifestCache<SilverBulletHooks>(
ds.kv,
"manifest",
),
},
);
this.system.addHook(this.eventHook); this.system.addHook(this.eventHook);
@ -93,22 +106,28 @@ export class ClientSystem {
this.slashCommandHook = new SlashCommandHook(this.client); this.slashCommandHook = new SlashCommandHook(this.client);
this.system.addHook(this.slashCommandHook); this.system.addHook(this.slashCommandHook);
this.eventHook.addLocalListener("file:changed", async (path: string) => { this.eventHook.addLocalListener(
if (path.startsWith("_plug/") && path.endsWith(".plug.js")) { "file:changed",
console.log("Plug updated, reloading:", path); async (path: string, _selfUpdate, _oldHash, newHash) => {
this.system.unload(path); if (path.startsWith("_plug/") && path.endsWith(".plug.js")) {
const plug = await this.system.load( const plugName = plugNameExtractRegex.exec(path)![1];
new URL(`/${path}`, location.href), console.log("Plug updated, reloading", plugName, "from", path);
createSandbox, this.system.unload(path);
this.client.settings.plugOverrides, const plug = await this.system.load(
); new URL(`/${path}`, location.href),
if ((plug.manifest! as Manifest).syntax) { plugName,
// If there are syntax extensions, rebuild the markdown parser immediately newHash,
this.updateMarkdownParser(); createSandbox,
this.client.settings.plugOverrides,
);
if ((plug.manifest! as Manifest).syntax) {
// If there are syntax extensions, rebuild the markdown parser immediately
this.updateMarkdownParser();
}
this.client.debouncedPlugsUpdatedEvent();
} }
this.client.debouncedPlugsUpdatedEvent(); },
} );
});
// Debugging // Debugging
// this.eventHook.addLocalListener("file:listed", (files) => { // this.eventHook.addLocalListener("file:listed", (files) => {
@ -177,15 +196,23 @@ export class ClientSystem {
await space.updatePageList(); await space.updatePageList();
await this.system.unloadAll(); await this.system.unloadAll();
console.log("(Re)loading plugs"); console.log("(Re)loading plugs");
await Promise.all((await space.listPlugs()).map(async (plugName) => { await Promise.all((await space.listPlugs()).map(async (plugMeta) => {
try { try {
const plugName = plugNameExtractRegex.exec(plugMeta.name)![1];
await this.system.load( await this.system.load(
new URL(plugName, location.origin), new URL(plugMeta.name, location.origin),
plugName,
plugMeta.lastModified,
createSandbox, createSandbox,
this.client.settings.plugOverrides, this.client.settings.plugOverrides,
); );
} catch (e: any) { } catch (e: any) {
console.error("Could not load plug", plugName, "error:", e.message); console.error(
"Could not load plug",
plugMeta.name,
"error:",
e.message,
);
} }
})); }));
} }

View File

@ -103,14 +103,13 @@ export class Space {
return this.cachedPageList; return this.cachedPageList;
} }
async listPlugs(): Promise<string[]> { async listPlugs(): Promise<FileMeta[]> {
const files = await this.spacePrimitives.fetchFileList(); const files = await this.spacePrimitives.fetchFileList();
return files return files
.filter((fileMeta) => .filter((fileMeta) =>
fileMeta.name.startsWith(plugPrefix) && fileMeta.name.startsWith(plugPrefix) &&
fileMeta.name.endsWith(".plug.js") fileMeta.name.endsWith(".plug.js")
) );
.map((fileMeta) => fileMeta.name);
} }
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> { async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {

View File

@ -33,7 +33,7 @@ export function spaceSyscalls(editor: Client): SysCallMapping {
console.log("Deleting page"); console.log("Deleting page");
await editor.space.deletePage(name); await editor.space.deletePage(name);
}, },
"space.listPlugs": (): Promise<string[]> => { "space.listPlugs": (): Promise<FileMeta[]> => {
return editor.space.listPlugs(); return editor.space.listPlugs();
}, },
"space.listAttachments": async (): Promise<AttachmentMeta[]> => { "space.listAttachments": async (): Promise<AttachmentMeta[]> => {