diff --git a/plugbox/src/runtime.ts b/plugbox/src/runtime.ts index 5950ae1..5c8476c 100644 --- a/plugbox/src/runtime.ts +++ b/plugbox/src/runtime.ts @@ -1,78 +1,16 @@ import { Manifest } from "./types"; +import { WebworkerSandbox } from "./worker_sandbox"; interface SysCallMapping { // TODO: Better typing [key: string]: any; } -export class FunctionWorker { - private worker: Worker; - private inited: Promise; - private initCallback: any; - private invokeResolve?: (result?: any) => void; - private invokeReject?: (reason?: any) => void; - private plug: Plug; - - constructor(plug: Plug, name: string, code: string) { - // let worker = window.Worker; - this.worker = new Worker(new URL("function_worker.ts", import.meta.url), { - type: "module", - }); - - this.worker.onmessage = this.onmessage.bind(this); - this.worker.postMessage({ - type: "boot", - name: name, - code: code, - }); - this.inited = new Promise((resolve) => { - this.initCallback = resolve; - }); - this.plug = plug; - } - - async onmessage(evt: MessageEvent) { - let data = evt.data; - if (!data) return; - switch (data.type) { - case "inited": - this.initCallback(); - break; - case "syscall": - let result = await this.plug.system.syscall(data.name, data.args); - - this.worker.postMessage({ - type: "syscall-response", - id: data.id, - data: result, - }); - break; - case "result": - this.invokeResolve!(data.result); - break; - case "error": - this.invokeReject!(data.reason); - break; - default: - console.error("Unknown message type", data); - } - } - - async invoke(args: Array): Promise { - await this.inited; - this.worker.postMessage({ - type: "invoke", - args: args, - }); - return new Promise((resolve, reject) => { - this.invokeResolve = resolve; - this.invokeReject = reject; - }); - } - - stop() { - this.worker.terminate(); - } +export interface Sandbox { + isLoaded(name: string): boolean; + load(name: string, code: string): Promise; + invoke(name: string, args: any[]): Promise; + stop(): void; } export interface PlugLoader { @@ -81,12 +19,13 @@ export interface PlugLoader { export class Plug { system: System; - private runningFunctions: Map; + // private runningFunctions: Map; + functionWorker: WebworkerSandbox; public manifest?: Manifest; constructor(system: System, name: string) { this.system = system; - this.runningFunctions = new Map(); + this.functionWorker = new WebworkerSandbox(this); } async load(manifest: Manifest) { @@ -95,16 +34,13 @@ export class Plug { } async invoke(name: string, args: Array): Promise { - let worker = this.runningFunctions.get(name); - if (!worker) { - worker = new FunctionWorker( - this, + if (!this.functionWorker.isLoaded(name)) { + await this.functionWorker.load( name, this.manifest!.functions[name].code! ); - this.runningFunctions.set(name, worker); } - return await worker.invoke(args); + return await this.functionWorker.invoke(name, args); } async dispatchEvent(name: string, data?: any): Promise { @@ -122,13 +58,7 @@ export class Plug { } async stop() { - for (const [functionname, worker] of Object.entries( - this.runningFunctions - )) { - console.log(`Stopping ${functionname}`); - worker.stop(); - } - this.runningFunctions = new Map(); + this.functionWorker.stop(); } } @@ -141,7 +71,7 @@ export class System { this.registeredSyscalls = {}; } - registerSyscalls(...registrationObjects: Array) { + registerSyscalls(...registrationObjects: SysCallMapping[]) { for (const registrationObject of registrationObjects) { for (let p in registrationObject) { this.registeredSyscalls[p] = registrationObject[p]; diff --git a/plugbox/src/sandbox.html b/plugbox/src/sandbox.html new file mode 100644 index 0000000..8f6045b --- /dev/null +++ b/plugbox/src/sandbox.html @@ -0,0 +1,7 @@ + + + + + diff --git a/plugbox/src/function_worker.ts b/plugbox/src/sandbox_worker.ts similarity index 59% rename from plugbox/src/function_worker.ts rename to plugbox/src/sandbox_worker.ts index ac1c1bf..174a092 100644 --- a/plugbox/src/function_worker.ts +++ b/plugbox/src/sandbox_worker.ts @@ -1,11 +1,13 @@ declare global { - function syscall(id: string, name: string, args: any[]): Promise; + function syscall(id: number, name: string, args: any[]): Promise; } +import { ControllerMessage, WorkerMessage, WorkerMessageType } from "./types"; import { safeRun } from "./util"; -let func: Function | null = null; -let pendingRequests = new Map void>(); -self.syscall = async (id: string, name: string, args: any[]) => { +let loadedFunctions = new Map(); +let pendingRequests = new Map void>(); + +self.syscall = async (id: number, name: string, args: any[]) => { return await new Promise((resolve, reject) => { pendingRequests.set(id, resolve); self.postMessage({ @@ -38,51 +40,56 @@ function wrapScript(code: string): string { return fn["default"].apply(null, arguments);`; } -self.addEventListener("message", (event) => { +self.addEventListener("message", (event: { data: WorkerMessage }) => { safeRun(async () => { let messageEvent = event; let data = messageEvent.data; switch (data.type) { - case "boot": + case "load": console.log("Booting", data.name); - func = new Function(wrapScript(data.code)); + loadedFunctions.set(data.name!, new Function(wrapScript(data.code!))); self.postMessage({ type: "inited", - }); + name: data.name, + } as ControllerMessage); break; case "invoke": - if (!func) { - throw new Error("No function loaded"); + let fn = loadedFunctions.get(data.name!); + if (!fn) { + throw new Error(`Function not loaded: ${data.name}`); } try { - let result = await Promise.resolve(func(...(data.args || []))); + let result = await Promise.resolve(fn(...(data.args || []))); self.postMessage({ type: "result", + id: data.id, result: result, - }); + } as ControllerMessage); } catch (e: any) { self.postMessage({ type: "error", + id: data.id, reason: e.message, - }); + } as ControllerMessage); throw e; } break; case "syscall-response": - let id = data.id; - const lookup = pendingRequests.get(id); + let syscallId = data.id!; + const lookup = pendingRequests.get(syscallId); if (!lookup) { console.log( "Current outstanding requests", pendingRequests, "looking up", - id + syscallId ); throw Error("Invalid request id"); } - pendingRequests.delete(id); + pendingRequests.delete(syscallId); lookup(data.data); + break; } }); }); diff --git a/plugbox/src/types.ts b/plugbox/src/types.ts index 7a5f7eb..56ed53d 100644 --- a/plugbox/src/types.ts +++ b/plugbox/src/types.ts @@ -2,6 +2,28 @@ export type EventHook = { events: { [key: string]: string[] }; }; +export type WorkerMessageType = "load" | "invoke" | "syscall-response"; + +export type WorkerMessage = { + type: WorkerMessageType; + id?: number; + name?: string; + code?: string; + args?: any[]; + data?: any; +}; + +export type ControllerMessageType = "inited" | "result" | "error" | "syscall"; + +export type ControllerMessage = { + type: ControllerMessageType; + id?: number; + name?: string; + reason?: string; + args?: any[]; + result: any; +}; + export interface Manifest { hooks: HookT & EventHook; functions: { diff --git a/plugbox/src/worker_sandbox.ts b/plugbox/src/worker_sandbox.ts new file mode 100644 index 0000000..936c1b5 --- /dev/null +++ b/plugbox/src/worker_sandbox.ts @@ -0,0 +1,86 @@ +import { ControllerMessage, WorkerMessage } from "./types"; +import { Plug, Sandbox } from "./runtime"; + +export class WebworkerSandbox implements Sandbox { + private worker: Worker; + private reqId = 0; + + private outstandingInits = new Map void>(); + private outstandingInvocations = new Map< + number, + { resolve: (result: any) => void; reject: (e: any) => void } + >(); + private loadedFunctions = new Set(); + + constructor(readonly plug: Plug) { + this.worker = new Worker(new URL("sandbox_worker.ts", import.meta.url), { + type: "module", + }); + + this.worker.onmessage = this.onmessage.bind(this); + } + + isLoaded(name: string) { + return this.loadedFunctions.has(name); + } + + async load(name: string, code: string): Promise { + this.worker.postMessage({ + type: "load", + name: name, + code: code, + } as WorkerMessage); + return new Promise((resolve) => { + this.loadedFunctions.add(name); + this.outstandingInits.set(name, resolve); + }); + } + + async onmessage(evt: { data: ControllerMessage }) { + let data = evt.data; + if (!data) return; + switch (data.type) { + case "inited": + let initCb = this.outstandingInits.get(data.name!); + initCb && initCb(); + this.outstandingInits.delete(data.name!); + break; + case "syscall": + let result = await this.plug.system.syscall(data.name!, data.args!); + + this.worker.postMessage({ + type: "syscall-response", + id: data.id, + data: result, + } as WorkerMessage); + break; + case "result": + let resultCb = this.outstandingInvocations.get(data.id!); + resultCb && resultCb.resolve(data.result); + break; + case "error": + let errCb = this.outstandingInvocations.get(data.result.id!); + errCb && errCb.reject(data.reason); + break; + default: + console.error("Unknown message type", data); + } + } + + async invoke(name: string, args: any[]): Promise { + this.reqId++; + this.worker.postMessage({ + type: "invoke", + id: this.reqId, + name, + args, + }); + return new Promise((resolve, reject) => { + this.outstandingInvocations.set(this.reqId, { resolve, reject }); + }); + } + + stop() { + this.worker.terminate(); + } +} diff --git a/plugs/core/lib/syscall.ts b/plugs/core/lib/syscall.ts index 9115d77..4a8f087 100644 --- a/plugs/core/lib/syscall.ts +++ b/plugs/core/lib/syscall.ts @@ -1,9 +1,9 @@ declare global { - function syscall(id: string, name: string, args: any[]): Promise; + function syscall(id: number, name: string, args: any[]): Promise; } export async function syscall(name: string, ...args: any[]): Promise { - let reqId = "" + Math.floor(Math.random() * 1000000); + let reqId = Math.floor(Math.random() * 1000000); // console.log("Syscall", name, reqId); return await self.syscall(reqId, name, args); // return new Promise((resolve, reject) => { diff --git a/plugs/core/page.ts b/plugs/core/page.ts index 9929ba9..0904f0b 100644 --- a/plugs/core/page.ts +++ b/plugs/core/page.ts @@ -25,8 +25,6 @@ export async function deletePage() { await syscall("editor.navigate", "start"); console.log("Deleting page from space"); await syscall("space.deletePage", pageName); - console.log("Reloading page list"); - await syscall("space.reloadPageList"); } export async function renamePage() { @@ -50,8 +48,6 @@ export async function renamePage() { await syscall("space.writePage", newName, text); console.log("Deleting page from space"); await syscall("space.deletePage", oldName); - console.log("Reloading page list"); - await syscall("space.reloadPageList"); console.log("Navigating to new page"); await syscall("editor.navigate", newName); @@ -63,6 +59,7 @@ export async function renamePage() { for (let pageToUpdate of pageToUpdateSet) { console.log("Now going to update links in", pageToUpdate); let { text } = await syscall("space.readPage", pageToUpdate); + console.log("Received text", text); if (!text) { // Page likely does not exist, but at least we can skip it continue; diff --git a/plugs/yarn.lock b/plugs/yarn.lock new file mode 100644 index 0000000..fb57ccd --- /dev/null +++ b/plugs/yarn.lock @@ -0,0 +1,4 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + + diff --git a/server/src/realtime_storage.ts b/server/src/api.ts similarity index 54% rename from server/src/realtime_storage.ts rename to server/src/api.ts index bf76928..2d1f52d 100644 --- a/server/src/realtime_storage.ts +++ b/server/src/api.ts @@ -8,118 +8,28 @@ import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect"; import { Socket } from "socket.io"; import { DiskStorage } from "./disk_storage"; import { PageMeta } from "./server"; -import { Client, Page } from "./types"; +import { ClientPageState, Page } from "./types"; import { safeRun } from "./util"; -export class RealtimeStorage extends DiskStorage { +export class SocketAPI { openPages = new Map(); - - private disconnectClient(client: Client, page: Page) { - page.clients.delete(client); - if (page.clients.size === 0) { - console.log("No more clients for", page.name, "flushing"); - this.flushPageToDisk(page.name, page); - this.openPages.delete(page.name); - } else { - page.cursors.delete(client.socket.id); - this.broadcastCursors(page); - } - } - - private broadcastCursors(page: Page) { - page.clients.forEach((client) => { - client.socket.emit("cursors", Object.fromEntries(page.cursors.entries())); - }); - } - - private flushPageToDisk(name: string, page: Page) { - super - .writePage(name, page.text.sliceString(0)) - .then((meta) => { - console.log(`Wrote page ${name} to disk`); - page.meta = meta; - }) - .catch((e) => { - console.log(`Could not write ${name} to disk:`, e); - }); - } - - // Override - async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> { - let page = this.openPages.get(pageName); - if (page) { - console.log("Serving page from memory", pageName); - return { - text: page.text.sliceString(0), - meta: page.meta, - }; - } else { - return super.readPage(pageName); - } - } - - async writePage(pageName: string, text: string): Promise { - let page = this.openPages.get(pageName); - if (page) { - for (let client of page.clients) { - client.socket.emit("reload", pageName); - } - this.openPages.delete(pageName); - } - return super.writePage(pageName, text); - } - - disconnectPageSocket(socket: Socket, pageName: string) { - let page = this.openPages.get(pageName); - if (page) { - for (let client of page.clients) { - if (client.socket === socket) { - this.disconnectClient(client, page); - } - } - } - } + connectedSockets: Set = new Set(); + pageStore: DiskStorage; constructor(rootPath: string, io: Server) { - super(rootPath); - - // setInterval(() => { - // console.log("Currently open pages:", this.openPages.keys()); - // }, 10000); - - // Disk watcher - fs.watch( - rootPath, - { - recursive: true, - persistent: false, - }, - (eventType, filename) => { - safeRun(async () => { - if (path.extname(filename) !== ".md") { - return; - } - let localPath = path.join(rootPath, filename); - let pageName = filename.substring(0, filename.length - 3); - let s = await stat(localPath); - // console.log("Edit in", pageName); - const openPage = this.openPages.get(pageName); - if (openPage) { - if (openPage.meta.lastModified < s.mtime.getTime()) { - console.log("Page changed on disk outside of editor, reloading"); - this.openPages.delete(pageName); - for (let client of openPage.clients) { - client.socket.emit("reload", pageName); - } - } - } - }); - } - ); + this.pageStore = new DiskStorage(rootPath); + this.fileWatcher(rootPath); io.on("connection", (socket) => { console.log("Connected", socket.id); - let clientOpenPages = new Set(); + this.connectedSockets.add(socket); + const socketOpenPages = new Set(); + + socket.on("disconnect", () => { + console.log("Disconnected", socket.id); + socketOpenPages.forEach(disconnectPageSocket); + this.connectedSockets.delete(socket); + }); function onCall(eventName: string, cb: (...args: any[]) => Promise) { socket.on(eventName, (reqId: number, ...args) => { @@ -129,11 +39,23 @@ export class RealtimeStorage extends DiskStorage { }); } + const _this = this; + function disconnectPageSocket(pageName: string) { + let page = _this.openPages.get(pageName); + if (page) { + for (let client of page.clientStates) { + if (client.socket === socket) { + _this.disconnectClient(client, page); + } + } + } + } + onCall("openPage", async (pageName: string) => { let page = this.openPages.get(pageName); if (!page) { try { - let { text, meta } = await super.readPage(pageName); + let { text, meta } = await this.pageStore.readPage(pageName); page = new Page(pageName, text, meta); } catch (e) { console.log("Creating new page", pageName); @@ -141,8 +63,8 @@ export class RealtimeStorage extends DiskStorage { } this.openPages.set(pageName, page); } - page.clients.add(new Client(socket, page.version)); - clientOpenPages.add(pageName); + page.clientStates.add(new ClientPageState(socket, page.version)); + socketOpenPages.add(pageName); console.log("Opened page", pageName); this.broadcastCursors(page); return page.toJSON(); @@ -150,8 +72,8 @@ export class RealtimeStorage extends DiskStorage { socket.on("closePage", (pageName: string) => { console.log("Closing page", pageName); - clientOpenPages.delete(pageName); - this.disconnectPageSocket(socket, pageName); + socketOpenPages.delete(pageName); + disconnectPageSocket(pageName); }); onCall( @@ -169,13 +91,13 @@ export class RealtimeStorage extends DiskStorage { pageName, this.openPages.keys() ); - return; + return false; } if (version !== page.version) { console.error("Invalid version", version, page.version); return false; } else { - console.log("Applying", updates.length, "updates"); + console.log("Applying", updates.length, "updates to", pageName); let transformedUpdates = []; let textChanged = false; for (let update of updates) { @@ -225,7 +147,7 @@ export class RealtimeStorage extends DiskStorage { } // TODO: Optimize this let oldestVersion = Infinity; - page.clients.forEach((client) => { + page.clientStates.forEach((client) => { oldestVersion = Math.min(client.version, oldestVersion); if (client.socket === socket) { client.version = version; @@ -242,12 +164,139 @@ export class RealtimeStorage extends DiskStorage { } ); - socket.on("disconnect", () => { - console.log("Disconnected", socket.id); - clientOpenPages.forEach((pageName) => { - this.disconnectPageSocket(socket, pageName); - }); + onCall( + "readPage", + async (pageName: string): Promise<{ text: string; meta: PageMeta }> => { + let page = this.openPages.get(pageName); + if (page) { + console.log("Serving page from memory", pageName); + return { + text: page.text.sliceString(0), + meta: page.meta, + }; + } else { + return this.pageStore.readPage(pageName); + } + } + ); + + onCall("writePage", async (pageName: string, text: string) => { + let page = this.openPages.get(pageName); + if (page) { + for (let client of page.clientStates) { + client.socket.emit("reloadPage", pageName); + } + this.openPages.delete(pageName); + } + return this.pageStore.writePage(pageName, text); + }); + + onCall("deletePage", async (pageName: string) => { + this.openPages.delete(pageName); + socketOpenPages.delete(pageName); + // Cascading of this to all connected clients will be handled by file watcher + return this.pageStore.deletePage(pageName); + }); + + onCall("listPages", async (): Promise => { + return this.pageStore.listPages(); + }); + + onCall("getPageMeta", async (pageName: string): Promise => { + let page = this.openPages.get(pageName); + if (page) { + return page.meta; + } + return this.pageStore.getPageMeta(pageName); }); }); } + + private disconnectClient(client: ClientPageState, page: Page) { + page.clientStates.delete(client); + if (page.clientStates.size === 0) { + console.log("No more clients for", page.name, "flushing"); + this.flushPageToDisk(page.name, page); + this.openPages.delete(page.name); + } else { + page.cursors.delete(client.socket.id); + this.broadcastCursors(page); + } + } + + private broadcastCursors(page: Page) { + page.clientStates.forEach((client) => { + client.socket.emit( + "cursorSnapshot", + page.name, + Object.fromEntries(page.cursors.entries()) + ); + }); + } + + private flushPageToDisk(name: string, page: Page) { + safeRun(async () => { + let meta = await this.pageStore.writePage(name, page.text.sliceString(0)); + console.log(`Wrote page ${name} to disk`); + page.meta = meta; + }); + } + + private fileWatcher(rootPath: string) { + fs.watch( + rootPath, + { + recursive: true, + persistent: false, + }, + (eventType, filename) => { + safeRun(async () => { + if (path.extname(filename) !== ".md") { + return; + } + let localPath = path.join(rootPath, filename); + let pageName = filename.substring(0, filename.length - 3); + // console.log("Edit in", pageName, eventType); + let modifiedTime = 0; + try { + let s = await stat(localPath); + modifiedTime = s.mtime.getTime(); + } catch (e) { + // File was deleted + console.log("Deleted", pageName); + for (let socket of this.connectedSockets) { + socket.emit("pageDeleted", pageName); + } + return; + } + const openPage = this.openPages.get(pageName); + if (openPage) { + if (openPage.meta.lastModified < modifiedTime) { + console.log("Page changed on disk outside of editor, reloading"); + this.openPages.delete(pageName); + const meta = { + name: pageName, + lastModified: modifiedTime, + } as PageMeta; + for (let client of openPage.clientStates) { + client.socket.emit("pageChanged", meta); + } + } + } + if (eventType === "rename") { + // This most likely means a new file was created, let's push new file listings to all connected sockets + console.log( + "New file created, broadcasting to all connected sockets" + ); + for (let socket of this.connectedSockets) { + socket.emit("pageCreated", { + name: pageName, + lastModified: modifiedTime, + } as PageMeta); + } + } + }); + } + ); + } } diff --git a/server/src/server.ts b/server/src/server.ts index 4cab814..f6481c8 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -1,12 +1,8 @@ -import bodyParser from "body-parser"; -import cors from "cors"; import express from "express"; import { readFile } from "fs/promises"; import http from "http"; import { Server } from "socket.io"; -import stream from "stream"; -import { promisify } from "util"; -import { RealtimeStorage } from "./realtime_storage"; +import { SocketAPI } from "./api"; const app = express(); const server = http.createServer(app); @@ -18,7 +14,6 @@ const io = new Server(server, { }); const port = 3000; -const pipeline = promisify(stream.pipeline); export const pagesPath = "../pages"; const distDir = `${__dirname}/../../webapp/dist`; @@ -29,80 +24,7 @@ export type PageMeta = { }; app.use("/", express.static(distDir)); - -let fsRouter = express.Router(); -// let diskFS = new DiskFS(pagesPath); -let filesystem = new RealtimeStorage(pagesPath, io); - -// Page list -fsRouter.route("/").get(async (req, res) => { - res.json(await filesystem.listPages()); -}); - -fsRouter - .route(/\/(.+)/) - .get(async (req, res) => { - let reqPath = req.params[0]; - console.log("Getting", reqPath); - try { - let { text, meta } = await filesystem.readPage(reqPath); - res.status(200); - res.header("Last-Modified", "" + meta.lastModified); - res.header("Content-Type", "text/markdown"); - res.send(text); - } catch (e) { - res.status(200); - res.send(""); - } - }) - .put(bodyParser.text({ type: "*/*" }), async (req, res) => { - let reqPath = req.params[0]; - - try { - let meta = await filesystem.writePage(reqPath, req.body); - res.status(200); - res.header("Last-Modified", "" + meta.lastModified); - res.send("OK"); - } catch (err) { - res.status(500); - res.send("Write failed"); - console.error("Pipeline failed", err); - } - }) - .options(async (req, res) => { - let reqPath = req.params[0]; - try { - const meta = await filesystem.getPageMeta(reqPath); - res.status(200); - res.header("Last-Modified", "" + meta.lastModified); - res.header("Content-Type", "text/markdown"); - res.send(""); - } catch (e) { - res.status(200); - res.send(""); - } - }) - .delete(async (req, res) => { - let reqPath = req.params[0]; - try { - await filesystem.deletePage(reqPath); - res.status(200); - res.send("OK"); - } catch (e) { - console.error("Error deleting file", reqPath, e); - res.status(500); - res.send("OK"); - } - }); - -app.use( - "/fs", - cors({ - methods: "GET,HEAD,PUT,OPTIONS,POST,DELETE", - preflightContinue: true, - }), - fsRouter -); +let filesystem = new SocketAPI(pagesPath, io); // Fallback, serve index.html let cachedIndex: string | undefined = undefined; @@ -113,7 +35,6 @@ app.get("/*", async (req, res) => { res.status(200).header("Content-Type", "text/html").send(cachedIndex); }); -//sup server.listen(port, () => { console.log(`Server istening on port ${port}`); }); diff --git a/server/src/types.ts b/server/src/types.ts index fed1944..a79ff16 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -4,7 +4,7 @@ import { Socket } from "socket.io"; import { Cursor } from "../../webapp/src/cursorEffect"; import { PageMeta } from "./server"; -export class Client { +export class ClientPageState { constructor(public socket: Socket, public version: number) {} } @@ -12,7 +12,7 @@ export class Page { versionOffset = 0; updates: Update[] = []; cursors = new Map(); - clients = new Set(); + clientStates = new Set(); pending: ((value: any) => void)[] = []; diff --git a/webapp/src/boot.ts b/webapp/src/boot.ts index 90da6b3..653a66a 100644 --- a/webapp/src/boot.ts +++ b/webapp/src/boot.ts @@ -1,12 +1,12 @@ import { Editor } from "./editor"; -import { HttpRemoteSpace } from "./space"; +import { RealtimeSpace } from "./space"; import { safeRun } from "./util"; import { io } from "socket.io-client"; let socket = io(`http://${location.hostname}:3000`); let editor = new Editor( - new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket), + new RealtimeSpace(socket), document.getElementById("root")! ); diff --git a/webapp/src/collab.ts b/webapp/src/collab.ts index a540204..b79cf64 100644 --- a/webapp/src/collab.ts +++ b/webapp/src/collab.ts @@ -10,8 +10,8 @@ import { receiveUpdates, sendableUpdates, } from "@codemirror/collab"; -import { RangeSetBuilder, Range } from "@codemirror/rangeset"; -import { EditorState, StateEffect, StateField, Text } from "@codemirror/state"; +import { RangeSetBuilder } from "@codemirror/rangeset"; +import { Text } from "@codemirror/state"; import { Decoration, DecorationSet, @@ -21,7 +21,7 @@ import { WidgetType, } from "@codemirror/view"; import { Cursor, cursorEffect } from "./cursorEffect"; -import { HttpRemoteSpace } from "./space"; +import { RealtimeSpace, SpaceEventHandlers } from "./space"; const throttleInterval = 250; @@ -85,7 +85,7 @@ export function collabExtension( pageName: string, clientID: string, doc: Document, - space: HttpRemoteSpace, + space: RealtimeSpace, reloadCallback: () => void ) { let plugin = ViewPlugin.fromClass( @@ -95,7 +95,15 @@ export function collabExtension( private failedPushes = 0; decorations: DecorationSet; private cursorPositions: Map = doc.cursors; - throttledPush: () => void; + + throttledPush = throttle(() => this.push(), throttleInterval); + + eventHandlers: Partial = { + cursorSnapshot: (pageName, cursors) => { + console.log("Received new cursor snapshot", cursors); + this.cursorPositions = new Map(Object.entries(cursors)); + }, + }; buildDecorations(view: EditorView) { let builder = new RangeSetBuilder(); @@ -128,18 +136,7 @@ export function collabExtension( this.pull(); } this.decorations = this.buildDecorations(view); - this.throttledPush = throttle(() => this.push(), throttleInterval); - - console.log("Created collabo plug"); - space.addEventListener("cursors", this.updateCursors); - } - - updateCursors(cursorEvent: any) { - this.cursorPositions = new Map(); - console.log("Received new cursor snapshot", cursorEvent.detail, this); - for (let userId in cursorEvent.detail) { - this.cursorPositions.set(userId, cursorEvent.detail[userId]); - } + space.on(this.eventHandlers); } update(update: ViewUpdate) { @@ -190,7 +187,7 @@ export function collabExtension( let success = await space.pushUpdates(pageName, version, updates); this.pushing = false; - if (!success) { + if (!success && !this.done) { this.failedPushes++; if (this.failedPushes > 10) { // Not sure if 10 is a good number, but YOLO @@ -198,14 +195,16 @@ export function collabExtension( reloadCallback(); return this.destroy(); } - console.log("Push failed temporarily, but will try again"); + console.log( + `Push for page ${pageName} failed temporarily, but will try again` + ); } else { this.failedPushes = 0; } // Regardless of whether the push failed or new updates came in // while it was running, try again if there's updates remaining - if (sendableUpdates(this.view.state).length) { + if (!this.done && sendableUpdates(this.view.state).length) { // setTimeout(() => this.push(), 100); this.throttledPush(); } @@ -236,7 +235,7 @@ export function collabExtension( destroy() { this.done = true; - space.removeEventListener("cursors", this.updateCursors); + space.off(this.eventHandlers); } }, { @@ -252,7 +251,6 @@ export function collabExtension( return tr.effects.filter((e) => e.is(cursorEffect)); }, }), - // cursorField, plugin, ]; } diff --git a/webapp/src/components/filter.tsx b/webapp/src/components/filter.tsx index 10440f5..4cfc211 100644 --- a/webapp/src/components/filter.tsx +++ b/webapp/src/components/filter.tsx @@ -100,7 +100,7 @@ export function FilterList({ ref={searchBoxRef} onChange={filter} onKeyDown={(e: React.KeyboardEvent) => { - console.log("Key up", e.key); + // console.log("Key up", e.key); if (onKeyPress) { onKeyPress(e.key, text); } diff --git a/webapp/src/components/page_navigator.tsx b/webapp/src/components/page_navigator.tsx index 2caf497..8b3ae3e 100644 --- a/webapp/src/components/page_navigator.tsx +++ b/webapp/src/components/page_navigator.tsx @@ -7,7 +7,7 @@ export function PageNavigator({ onNavigate, currentPage, }: { - allPages: PageMeta[]; + allPages: Set; onNavigate: (page: string | undefined) => void; currentPage?: string; }) { @@ -17,10 +17,10 @@ export function PageNavigator({ continue; } // Order by last modified date in descending order - let orderId = -pageMeta.lastModified.getTime(); + let orderId = -pageMeta.lastModified; // Unless it was opened and is still in memory if (pageMeta.lastOpened) { - orderId = -pageMeta.lastOpened.getTime(); + orderId = -pageMeta.lastOpened; } options.push({ ...pageMeta, diff --git a/webapp/src/editor.tsx b/webapp/src/editor.tsx index 3885ed5..c2cf9ae 100644 --- a/webapp/src/editor.tsx +++ b/webapp/src/editor.tsx @@ -45,7 +45,7 @@ import { slashCommandRegexp } from "./types"; import reducer from "./reducer"; import { smartQuoteKeymap } from "./smart_quotes"; -import { HttpRemoteSpace } from "./space"; +import { RealtimeSpace } from "./space"; import customMarkdownStyle from "./style"; import dbSyscalls from "./syscalls/db.localstorage"; import editorSyscalls from "./syscalls/editor.browser"; @@ -84,7 +84,7 @@ export class Editor implements AppEventDispatcher { viewState: AppViewState; viewDispatch: React.Dispatch; openPages: Map; - space: HttpRemoteSpace; + space: RealtimeSpace; editorCommands: Map; plugs: Plug[]; indexer: Indexer; @@ -92,7 +92,7 @@ export class Editor implements AppEventDispatcher { pageNavigator: IPageNavigator; indexCurrentPageDebounced: () => any; - constructor(space: HttpRemoteSpace, parent: Element) { + constructor(space: RealtimeSpace, parent: Element) { this.editorCommands = new Map(); this.openPages = new Map(); this.plugs = []; @@ -114,7 +114,7 @@ export class Editor implements AppEventDispatcher { } async init() { - await this.loadPageList(); + // await this.loadPageList(); await this.loadPlugs(); this.focus(); @@ -127,8 +127,10 @@ export class Editor implements AppEventDispatcher { if (this.currentPage) { let pageState = this.openPages.get(this.currentPage)!; - pageState.selection = this.editorView!.state.selection; - pageState.scrollTop = this.editorView!.scrollDOM.scrollTop; + if (pageState) { + pageState.selection = this.editorView!.state.selection; + pageState.scrollTop = this.editorView!.scrollDOM.scrollTop; + } this.space.closePage(this.currentPage); } @@ -136,19 +138,25 @@ export class Editor implements AppEventDispatcher { await this.loadPage(pageName); }); - this.space.addEventListener("connect", () => { - if (this.currentPage) { - console.log("Connected to socket, fetch fresh?"); - this.reloadPage(); - } - }); - - this.space.addEventListener("reload", (e) => { - let pageName = (e as CustomEvent).detail; - if (this.currentPage === pageName) { - console.log("Was told to reload the page"); - this.reloadPage(); - } + this.space.on({ + connect: () => { + if (this.currentPage) { + console.log("Connected to socket, fetch fresh?"); + this.reloadPage(); + } + }, + pageChanged: (meta) => { + if (this.currentPage === meta.name) { + console.log("page changed on disk, reloading"); + this.reloadPage(); + } + }, + pageListUpdated: (pages) => { + this.viewDispatch({ + type: "pages-listed", + pages: pages, + }); + }, }); if (this.pageNavigator.getCurrentPage() === "") { @@ -411,14 +419,6 @@ export class Editor implements AppEventDispatcher { } } - async loadPageList() { - let pagesMeta = await this.space.listPages(); - this.viewDispatch({ - type: "pages-listed", - pages: pagesMeta, - }); - } - focus() { this.editorView!.focus(); } diff --git a/webapp/src/reducer.ts b/webapp/src/reducer.ts index e5f3784..23af6d9 100644 --- a/webapp/src/reducer.ts +++ b/webapp/src/reducer.ts @@ -9,10 +9,12 @@ export default function reducer( case "page-loaded": return { ...state, - allPages: state.allPages.map((pageMeta) => - pageMeta.name === action.name - ? { ...pageMeta, lastOpened: new Date() } - : pageMeta + allPages: new Set( + [...state.allPages].map((pageMeta) => + pageMeta.name === action.name + ? { ...pageMeta, lastOpened: Date.now() } + : pageMeta + ) ), currentPage: action.name, }; diff --git a/webapp/src/space.ts b/webapp/src/space.ts index f5b3ae1..f465c33 100644 --- a/webapp/src/space.ts +++ b/webapp/src/space.ts @@ -14,27 +14,81 @@ export interface Space { getPageMeta(name: string): Promise; } -export class HttpRemoteSpace extends EventTarget implements Space { - url: string; +export type SpaceEventHandlers = { + connect: () => void; + cursorSnapshot: ( + pageName: string, + cursors: { [key: string]: Cursor } + ) => void; + pageCreated: (meta: PageMeta) => void; + pageChanged: (meta: PageMeta) => void; + pageDeleted: (name: string) => void; + pageListUpdated: (pages: Set) => void; +}; + +abstract class EventEmitter { + private handlers: Partial[] = []; + + on(handlers: Partial) { + this.handlers.push(handlers); + } + + off(handlers: Partial) { + this.handlers = this.handlers.filter((h) => h !== handlers); + } + + emit(eventName: keyof HandlerT, ...args: any[]) { + for (let handler of this.handlers) { + let fn: any = handler[eventName]; + if (fn) { + fn(...args); + } + } + } +} + +export class RealtimeSpace + extends EventEmitter + implements Space +{ socket: Socket; reqId = 0; + allPages = new Set(); - constructor(url: string, socket: Socket) { + constructor(socket: Socket) { super(); - this.url = url; this.socket = socket; - socket.on("connect", () => { - console.log("connected to socket"); - this.dispatchEvent(new Event("connect")); + [ + "connect", + "cursorSnapshot", + "pageCreated", + "pageChanged", + "pageDeleted", + ].forEach((eventName) => { + socket.on(eventName, (...args) => { + this.emit(eventName as keyof SpaceEventHandlers, ...args); + }); }); - - socket.on("reload", (pageName: string) => { - this.dispatchEvent(new CustomEvent("reload", { detail: pageName })); + this.wsCall("listPages").then((pages) => { + this.allPages = new Set(pages); + this.emit("pageListUpdated", this.allPages); }); - - socket.on("cursors", (cursors) => { - this.dispatchEvent(new CustomEvent("cursors", { detail: cursors })); + this.on({ + pageCreated: (meta) => { + this.allPages.add(meta); + console.log("New page created", meta); + this.emit("pageListUpdated", this.allPages); + }, + pageDeleted: (name) => { + console.log("Page delete", name); + this.allPages.forEach((meta) => { + if (name === meta.name) { + this.allPages.delete(meta); + } + }); + this.emit("pageListUpdated", this.allPages); + }, }); } @@ -76,14 +130,7 @@ export class HttpRemoteSpace extends EventTarget implements Space { } async listPages(): Promise { - let req = await fetch(this.url, { - method: "GET", - }); - - return (await req.json()).map((meta: any) => ({ - name: meta.name, - lastModified: new Date(meta.lastModified), - })); + return Array.from(this.allPages); } async openPage(name: string): Promise { @@ -101,47 +148,18 @@ export class HttpRemoteSpace extends EventTarget implements Space { } async readPage(name: string): Promise<{ text: string; meta: PageMeta }> { - let req = await fetch(`${this.url}/${name}`, { - method: "GET", - }); - return { - text: await req.text(), - meta: { - lastModified: new Date(+req.headers.get("Last-Modified")!), - name: name, - }, - }; + return this.wsCall("readPage", name); } async writePage(name: string, text: string): Promise { - let req = await fetch(`${this.url}/${name}`, { - method: "PUT", - body: text, - }); - // 201 (Created) means a new page was created - return { - lastModified: new Date(+req.headers.get("Last-Modified")!), - name: name, - created: req.status === 201, - }; + return this.wsCall("writePage", name, text); } async deletePage(name: string): Promise { - let req = await fetch(`${this.url}/${name}`, { - method: "DELETE", - }); - if (req.status !== 200) { - throw Error(`Failed to delete page: ${req.statusText}`); - } + return this.wsCall("deletePage", name); } async getPageMeta(name: string): Promise { - let req = await fetch(`${this.url}/${name}`, { - method: "OPTIONS", - }); - return { - name: name, - lastModified: new Date(+req.headers.get("Last-Modified")!), - }; + return this.wsCall("deletePage", name); } } diff --git a/webapp/src/syscalls/space.native.ts b/webapp/src/syscalls/space.native.ts index 68ff1d7..96a1fbf 100644 --- a/webapp/src/syscalls/space.native.ts +++ b/webapp/src/syscalls/space.native.ts @@ -3,10 +3,7 @@ import { PageMeta } from "../types"; export default (editor: Editor) => ({ "space.listPages": (): PageMeta[] => { - return editor.viewState.allPages; - }, - "space.reloadPageList": async () => { - await editor.loadPageList(); + return [...editor.viewState.allPages]; }, "space.reindex": async () => { await editor.indexer.reindexSpace(editor.space, editor); diff --git a/webapp/src/types.ts b/webapp/src/types.ts index 16d3e7f..88aa63e 100644 --- a/webapp/src/types.ts +++ b/webapp/src/types.ts @@ -10,10 +10,9 @@ export type Manifest = plugbox.Manifest; export type PageMeta = { name: string; - lastModified: Date; + lastModified: number; version?: number; - created?: boolean; - lastOpened?: Date; + lastOpened?: number; }; export type AppCommand = { @@ -40,20 +39,20 @@ export type AppViewState = { currentPage?: string; showPageNavigator: boolean; showCommandPalette: boolean; - allPages: PageMeta[]; + allPages: Set; commands: Map; }; export const initialViewState: AppViewState = { showPageNavigator: false, showCommandPalette: false, - allPages: [], + allPages: new Set(), commands: new Map(), }; export type Action = | { type: "page-loaded"; name: string } - | { type: "pages-listed"; pages: PageMeta[] } + | { type: "pages-listed"; pages: Set } | { type: "start-navigate" } | { type: "stop-navigate" } | { type: "update-commands"; commands: Map }