diff --git a/mobile/html/boot.ts b/mobile/html/boot.ts index bfbab08..2bfff41 100644 --- a/mobile/html/boot.ts +++ b/mobile/html/boot.ts @@ -1,5 +1,5 @@ import { Editor } from "../../webapp/src/editor"; -import { HttpRemoteSpace } from "../../webapp/src/space"; +import { Space } from "../../webapp/src/space"; declare namespace window { var ReactNativeWebView: any; @@ -32,17 +32,17 @@ console.error = (...args) => { ); }; try { - let editor = new Editor( - new HttpRemoteSpace(`http://192.168.2.22:3000/fs`, null), - document.getElementById("root")! - ); - console.log("Initing editor"); - safeRun(async () => { - await editor.loadPageList(); - await editor.loadPlugs(); - editor.focus(); - console.log("Inited", editor.viewState); - }); + // let editor = new Editor( + // new Space(`http://192.168.2.22:3000/fs`, null), + // document.getElementById("root")! + // ); + // console.log("Initing editor"); + // safeRun(async () => { + // await editor.loadPageList(); + // await editor.loadPlugs(); + // editor.focus(); + // console.log("Inited", editor.viewState); + // }); } catch (e: any) { console.error("Got an error", e.message); } diff --git a/plugbox/bin/plugbox-bundle.mjs b/plugbox/bin/plugbox-bundle.mjs index 4baee1b..178df30 100755 --- a/plugbox/bin/plugbox-bundle.mjs +++ b/plugbox/bin/plugbox-bundle.mjs @@ -67,7 +67,7 @@ async function run() { .parse(); let generatedManifest = await bundle(args._[0], !!args.debug); - writeFile(args._[1], JSON.stringify(generatedManifest, null, 2)); + await writeFile(args._[1], JSON.stringify(generatedManifest, null, 2)); } run().catch((e) => { diff --git a/plugbox/src/node_sandbox.ts b/plugbox/src/node_sandbox.ts index acc3a83..0149737 100644 --- a/plugbox/src/node_sandbox.ts +++ b/plugbox/src/node_sandbox.ts @@ -20,7 +20,6 @@ export class NodeSandbox implements Sandbox { constructor(readonly system: System, workerScript: string) { this.worker = new Worker(workerScript); - this.worker.on("message", this.onmessage.bind(this)); } diff --git a/plugbox/src/runtime.test.ts b/plugbox/src/runtime.test.ts index b7908d8..5960bcf 100644 --- a/plugbox/src/runtime.test.ts +++ b/plugbox/src/runtime.test.ts @@ -42,6 +42,6 @@ test("Run a Node sandbox", async () => { for (let i = 0; i < 100; i++) { expect(await plug.invoke("addNumbersSyscall", [10, i])).toBe(10 + i); } - console.log(plug.sandbox); + // console.log(plug.sandbox); await system.stop(); }); diff --git a/plugbox/src/runtime.ts b/plugbox/src/runtime.ts index d801667..f050fca 100644 --- a/plugbox/src/runtime.ts +++ b/plugbox/src/runtime.ts @@ -31,7 +31,6 @@ export class Plug { if (!this.sandbox.isLoaded(name)) { await this.sandbox.load(name, this.manifest!.functions[name].code!); } - console.log("Loaded", name); return await this.sandbox.invoke(name, args); } diff --git a/plugbox/src/worker_sandbox.ts b/plugbox/src/worker_sandbox.ts index 85ab2b4..cce8ac4 100644 --- a/plugbox/src/worker_sandbox.ts +++ b/plugbox/src/worker_sandbox.ts @@ -1,5 +1,5 @@ import { ControllerMessage, WorkerMessage } from "./types"; -import { Plug, Sandbox, System } from "./runtime"; +import { Sandbox, System } from "./runtime"; export class WebworkerSandbox implements Sandbox { private worker: Worker; diff --git a/plugs/Makefile b/plugs/Makefile index 39155b2..fc54984 100644 --- a/plugs/Makefile +++ b/plugs/Makefile @@ -1,7 +1,9 @@ +.PHONY: core + BUILD=../plugbox/bin/plugbox-bundle.mjs -core: * +core: core/* ${BUILD} --debug core/core.plug.json ../webapp/src/generated/core.plug.json watch: * - ls -d core/* | entr make \ No newline at end of file + ls -d core/* | entr make diff --git a/plugs/core/navigate.ts b/plugs/core/navigate.ts index 04bad03..362dfdd 100644 --- a/plugs/core/navigate.ts +++ b/plugs/core/navigate.ts @@ -25,13 +25,13 @@ async function navigate(syntaxNode: any) { } export async function linkNavigate() { - navigate(await syscall("editor.getSyntaxNodeUnderCursor")); + await navigate(await syscall("editor.getSyntaxNodeUnderCursor")); } export async function clickNavigate(event: ClickEvent) { if (event.ctrlKey || event.metaKey) { let syntaxNode = await syscall("editor.getSyntaxNodeAtPos", event.pos); - navigate(syntaxNode); + await navigate(syntaxNode); } } @@ -48,7 +48,7 @@ export async function pageComplete() { return { from: prefix.from, options: allPages - .filter((page) => page.name.startsWith(prefix.text)) + .filter((page: any) => page.name.startsWith(prefix.text)) .map((pageMeta: any) => ({ label: pageMeta.name, type: "page", diff --git a/server/page_index b/server/page_index deleted file mode 100644 index e69de29..0000000 diff --git a/server/src/api.test.ts b/server/src/api.test.ts index 471ec26..f142c1f 100644 --- a/server/src/api.test.ts +++ b/server/src/api.test.ts @@ -8,17 +8,16 @@ import * as path from "path"; import * as fs from "fs"; describe("Server test", () => { - let io, + let io: Server, socketServer: SocketServer, - cleaner, - clientSocket, + clientSocket: any, reqId = 0; const tmpDir = path.join(__dirname, "test"); function wsCall(eventName: string, ...args: any[]): Promise { return new Promise((resolve, reject) => { reqId++; - clientSocket.once(`${eventName}Resp${reqId}`, (err, result) => { + clientSocket.once(`${eventName}Resp${reqId}`, (err: any, result: any) => { if (err) { reject(err); } else { @@ -41,6 +40,7 @@ describe("Server test", () => { clientSocket = new Client(`http://localhost:${port}`); socketServer = new SocketServer(tmpDir, io); clientSocket.on("connect", done); + await socketServer.init(); }); }); @@ -52,40 +52,45 @@ describe("Server test", () => { }); test("List pages", async () => { - let pages = await wsCall("listPages"); - console.log(pages); + let pages = await wsCall("page.listPages"); expect(pages.length).toBe(1); + await wsCall("page.writePage", "test2.md", "This is another test"); + let pages2 = await wsCall("page.listPages"); + expect(pages2.length).toBe(2); + await wsCall("page.deletePage", "test2.md"); + let pages3 = await wsCall("page.listPages"); + expect(pages3.length).toBe(1); }); test("Index operations", async () => { - await wsCall("index:clearPageIndexForPage", "test"); - await wsCall("index:set", "test", "testkey", "value"); - expect(await wsCall("index:get", "test", "testkey")).toBe("value"); - await wsCall("index:delete", "test", "testkey"); - expect(await wsCall("index:get", "test", "testkey")).toBe(null); - await wsCall("index:set", "test", "unrelated", 10); - await wsCall("index:set", "test", "unrelated", 12); - await wsCall("index:set", "test2", "complicated", { + await wsCall("index.clearPageIndexForPage", "test"); + await wsCall("index.set", "test", "testkey", "value"); + expect(await wsCall("index.get", "test", "testkey")).toBe("value"); + await wsCall("index.delete", "test", "testkey"); + expect(await wsCall("index.get", "test", "testkey")).toBe(null); + await wsCall("index.set", "test", "unrelated", 10); + await wsCall("index.set", "test", "unrelated", 12); + await wsCall("index.set", "test2", "complicated", { name: "Bla", age: 123123, }); - await wsCall("index:set", "test", "complicated", { name: "Bla", age: 100 }); - await wsCall("index:set", "test", "complicated2", { + await wsCall("index.set", "test", "complicated", { name: "Bla", age: 100 }); + await wsCall("index.set", "test", "complicated2", { name: "Bla", age: 101, }); - expect(await wsCall("index:get", "test", "complicated")).toStrictEqual({ + expect(await wsCall("index.get", "test", "complicated")).toStrictEqual({ name: "Bla", age: 100, }); - let result = await wsCall("index:scanPrefixForPage", "test", "compli"); + let result = await wsCall("index.scanPrefixForPage", "test", "compli"); expect(result.length).toBe(2); - let result2 = await wsCall("index:scanPrefixGlobal", "compli"); + let result2 = await wsCall("index.scanPrefixGlobal", "compli"); expect(result2.length).toBe(3); - await wsCall("index:deletePrefixForPage", "test", "compli"); - let result3 = await wsCall("index:scanPrefixForPage", "test", "compli"); + await wsCall("index.deletePrefixForPage", "test", "compli"); + let result3 = await wsCall("index.scanPrefixForPage", "test", "compli"); expect(result3.length).toBe(0); - let result4 = await wsCall("index:scanPrefixGlobal", "compli"); + let result4 = await wsCall("index.scanPrefixGlobal", "compli"); expect(result4.length).toBe(1); }); }); diff --git a/server/src/api.ts b/server/src/api.ts index 23df8f6..2aea68b 100644 --- a/server/src/api.ts +++ b/server/src/api.ts @@ -1,302 +1,45 @@ -import { stat } from "fs/promises"; -import { ChangeSet } from "@codemirror/state"; -import { Update } from "@codemirror/collab"; -import { Server } from "socket.io"; -import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect"; -import { Socket } from "socket.io"; -import { DiskStorage } from "./disk_storage"; -import { ClientPageState, Page, PageMeta } from "./types"; -import { safeRun } from "./util"; -import * as fs from "fs"; +import { Server, Socket } from "socket.io"; +import { Page } from "./types"; import * as path from "path"; -import knex, { Knex } from "knex"; +import { IndexApi } from "./index_api"; +import { PageApi } from "./page_api"; -type IndexItem = { - page: string; - key: string; - value: any; -}; - -class ClientConnection { +export class ClientConnection { openPages = new Set(); constructor(readonly sock: Socket) {} } +export interface ApiProvider { + init(): Promise; + api(): Object; +} + export class SocketServer { rootPath: string; - serverSock: Server; openPages = new Map(); connectedSockets = new Set(); - pageStore: DiskStorage; - db: Knex; serverSocket: Server; + private apis = new Map(); - api = { - openPage: async (clientConn: ClientConnection, pageName: string) => { - let page = this.openPages.get(pageName); - if (!page) { - try { - let { text, meta } = await this.pageStore.readPage(pageName); - page = new Page(pageName, text, meta); - } catch (e) { - console.log("Creating new page", pageName); - page = new Page(pageName, "", { name: pageName, lastModified: 0 }); - } - this.openPages.set(pageName, page); - } - page.clientStates.add(new ClientPageState(clientConn.sock, page.version)); - clientConn.openPages.add(pageName); - console.log("Opened page", pageName); - this.broadcastCursors(page); - return page.toJSON(); - }, - pushUpdates: async ( - clientConn: ClientConnection, - pageName: string, - version: number, - updates: any[] - ): Promise => { - let page = this.openPages.get(pageName); - - if (!page) { - console.error( - "Received updates for not open page", - pageName, - this.openPages.keys() - ); - return false; - } - if (version !== page.version) { - console.error("Invalid version", version, page.version); - return false; - } else { - console.log("Applying", updates.length, "updates to", pageName); - let transformedUpdates = []; - let textChanged = false; - for (let update of updates) { - let changes = ChangeSet.fromJSON(update.changes); - let transformedUpdate = { - changes, - clientID: update.clientID, - effects: update.cursors?.map((c: Cursor) => { - page.cursors.set(c.userId, c); - return cursorEffect.of(c); - }), - }; - page.updates.push(transformedUpdate); - transformedUpdates.push(transformedUpdate); - let oldText = page.text; - page.text = changes.apply(page.text); - if (oldText !== page.text) { - textChanged = true; - } - } - - if (textChanged) { - if (page.saveTimer) { - clearTimeout(page.saveTimer); - } - - page.saveTimer = setTimeout(() => { - this.flushPageToDisk(pageName, page); - }, 1000); - } - while (page.pending.length) { - page.pending.pop()!(transformedUpdates); - } - return true; - } - }, - - pullUpdates: async ( - clientConn: ClientConnection, - pageName: string, - version: number - ): Promise => { - let page = this.openPages.get(pageName); - // console.log("Pulling updates for", pageName); - if (!page) { - console.error("Fetching updates for not open page"); - return []; - } - // TODO: Optimize this - let oldestVersion = Infinity; - page.clientStates.forEach((client) => { - oldestVersion = Math.min(client.version, oldestVersion); - if (client.socket === clientConn.sock) { - client.version = version; - } - }); - page.flushUpdates(oldestVersion); - if (version < page.version) { - return page.updatesSince(version); - } else { - return new Promise((resolve) => { - page.pending.push(resolve); - }); - } - }, - - readPage: async ( - clientConn: ClientConnection, - pageName: string - ): Promise<{ text: string; meta: PageMeta }> => { - let page = this.openPages.get(pageName); - if (page) { - console.log("Serving page from memory", pageName); - return { - text: page.text.sliceString(0), - meta: page.meta, - }; - } else { - return this.pageStore.readPage(pageName); - } - }, - - writePage: async ( - clientConn: ClientConnection, - pageName: string, - text: string - ) => { - let page = this.openPages.get(pageName); - if (page) { - for (let client of page.clientStates) { - client.socket.emit("reloadPage", pageName); - } - this.openPages.delete(pageName); - } - return this.pageStore.writePage(pageName, text); - }, - - deletePage: async (clientConn: ClientConnection, pageName: string) => { - this.openPages.delete(pageName); - clientConn.openPages.delete(pageName); - // Cascading of this to all connected clients will be handled by file watcher - return this.pageStore.deletePage(pageName); - }, - - listPages: async (clientConn: ClientConnection): Promise => { - return this.pageStore.listPages(); - }, - - getPageMeta: async ( - clientConn: ClientConnection, - pageName: string - ): Promise => { - let page = this.openPages.get(pageName); - if (page) { - return page.meta; - } - return this.pageStore.getPageMeta(pageName); - }, - - "index:clearPageIndexForPage": async ( - clientConn: ClientConnection, - page: string - ) => { - await this.db("page_index").where({ page }).del(); - }, - "index:set": async ( - clientConn: ClientConnection, - page: string, - key: string, - value: any - ) => { - let changed = await this.db("page_index") - .where({ page, key }) - .update("value", JSON.stringify(value)); - if (changed === 0) { - await this.db("page_index").insert({ - page, - key, - value: JSON.stringify(value), - }); - } - }, - "index:get": async ( - clientConn: ClientConnection, - page: string, - key: string - ) => { - let result = await this.db("page_index") - .where({ page, key }) - .select("value"); - if (result.length) { - return JSON.parse(result[0].value); - } else { - return null; - } - }, - "index:delete": async ( - clientConn: ClientConnection, - page: string, - key: string - ) => { - await this.db("page_index").where({ page, key }).del(); - }, - "index:scanPrefixForPage": async ( - clientConn: ClientConnection, - page: string, - prefix: string - ) => { - return ( - await this.db("page_index") - .where({ page }) - .andWhereLike("key", `${prefix}%`) - .select("page", "key", "value") - ).map(({ page, key, value }) => ({ - page, - key, - value: JSON.parse(value), - })); - }, - "index:scanPrefixGlobal": async ( - clientConn: ClientConnection, - prefix: string - ) => { - return ( - await this.db("page_index") - .andWhereLike("key", `${prefix}%`) - .select("page", "key", "value") - ).map(({ page, key, value }) => ({ - page, - key, - value: JSON.parse(value), - })); - }, - "index:deletePrefixForPage": async ( - clientConn: ClientConnection, - page: string, - prefix: string - ) => { - return await this.db("page_index") - .where({ page }) - .andWhereLike("key", `${prefix}%`) - .del(); - }, - - "index:clearPageIndex": async (clientConn: ClientConnection) => { - return await this.db("page_index").del(); - }, - }; + async registerApi(name: string, apiProvider: ApiProvider) { + await apiProvider.init(); + this.apis.set(name, apiProvider); + } constructor(rootPath: string, serverSocket: Server) { this.rootPath = path.resolve(rootPath); this.serverSocket = serverSocket; - this.pageStore = new DiskStorage(this.rootPath); + } - this.db = knex({ - client: "better-sqlite3", - connection: { - filename: path.join(rootPath, "data.db"), - }, - useNullAsDefault: true, - }); - this.initDb(); + public async init() { + await this.registerApi("index", new IndexApi(this.rootPath)); + await this.registerApi( + "page", + new PageApi(this.rootPath, this.connectedSockets) + ); - serverSocket.on("connection", (socket) => { + this.serverSocket.on("connection", (socket) => { const clientConn = new ClientConnection(socket); - // const socketOpenPages = new Set(); console.log("Connected", socket.id); this.connectedSockets.add(socket); @@ -333,121 +76,26 @@ export class SocketServer { if (page) { for (let client of page.clientStates) { if (client.socket === socket) { - this.disconnectClient(client, page); + (this.apis.get("page")! as PageApi).disconnectClient( + client, + page + ); } } } }; - Object.entries(this.api).forEach(([eventName, cb]) => { - onCall(eventName, (...args: any[]): any => { - return cb.call(this, clientConn, ...args); - }); - }); - }); - } - - async initDb() { - if (!(await this.db.schema.hasTable("page_index"))) { - await this.db.schema.createTable("page_index", (table) => { - table.string("page"); - table.string("key"); - table.text("value"); - table.primary(["page", "key"]); - }); - console.log("Created table page_index"); - } - } - - disconnectClient(client: ClientPageState, page: Page) { - page.clientStates.delete(client); - if (page.clientStates.size === 0) { - console.log("No more clients for", page.name, "flushing"); - this.flushPageToDisk(page.name, page); - this.openPages.delete(page.name); - } else { - page.cursors.delete(client.socket.id); - this.broadcastCursors(page); - } - } - - broadcastCursors(page: Page) { - page.clientStates.forEach((client) => { - client.socket.emit( - "cursorSnapshot", - page.name, - Object.fromEntries(page.cursors.entries()) - ); - }); - } - - flushPageToDisk(name: string, page: Page) { - safeRun(async () => { - let meta = await this.pageStore.writePage(name, page.text.sliceString(0)); - console.log(`Wrote page ${name} to disk`); - page.meta = meta; - }); - } - - fileWatcher() { - fs.watch( - this.rootPath, - { - recursive: true, - persistent: false, - }, - (eventType, filename) => { - safeRun(async () => { - if (!filename.endsWith(".md")) { - return; - } - let localPath = path.join(this.rootPath, filename); - let pageName = filename.substring(0, filename.length - 3); - // console.log("Edit in", pageName, eventType); - let modifiedTime = 0; - try { - let s = await stat(localPath); - modifiedTime = s.mtime.getTime(); - } catch (e) { - // File was deleted - console.log("Deleted", pageName); - for (let socket of this.connectedSockets) { - socket.emit("pageDeleted", pageName); - } - return; - } - const openPage = this.openPages.get(pageName); - if (openPage) { - if (openPage.meta.lastModified < modifiedTime) { - console.log("Page changed on disk outside of editor, reloading"); - this.openPages.delete(pageName); - const meta = { - name: pageName, - lastModified: modifiedTime, - } as PageMeta; - for (let client of openPage.clientStates) { - client.socket.emit("pageChanged", meta); - } - } - } - if (eventType === "rename") { - // This most likely means a new file was created, let's push new file listings to all connected sockets - console.log( - "New file created, broadcasting to all connected sockets", - pageName - ); - for (let socket of this.connectedSockets) { - socket.emit("pageCreated", { - name: pageName, - lastModified: modifiedTime, - } as PageMeta); - } - } + for (let [apiName, apiProvider] of this.apis) { + Object.entries(apiProvider.api()).forEach(([eventName, cb]) => { + onCall(`${apiName}.${eventName}`, (...args: any[]): any => { + // @ts-ignore + return cb(clientConn, ...args); + }); }); } - ); + }); } close() { - this.db.destroy(); + (this.apis.get("index")! as IndexApi).db.destroy(); } } diff --git a/server/src/index_api.ts b/server/src/index_api.ts new file mode 100644 index 0000000..aacb04d --- /dev/null +++ b/server/src/index_api.ts @@ -0,0 +1,123 @@ +import { ApiProvider, ClientConnection } from "./api"; +import knex, { Knex } from "knex"; +import path from "path"; + +type IndexItem = { + page: string; + key: string; + value: any; +}; + +export class IndexApi implements ApiProvider { + db: Knex; + constructor(rootPath: string) { + this.db = knex({ + client: "better-sqlite3", + connection: { + filename: path.join(rootPath, "data.db"), + }, + useNullAsDefault: true, + }); + } + + async init() { + if (!(await this.db.schema.hasTable("page_index"))) { + await this.db.schema.createTable("page_index", (table) => { + table.string("page"); + table.string("key"); + table.text("value"); + table.primary(["page", "key"]); + }); + console.log("Created table page_index"); + } + } + + api() { + return { + clearPageIndexForPage: async ( + clientConn: ClientConnection, + page: string + ) => { + await this.db("page_index").where({ page }).del(); + }, + set: async ( + clientConn: ClientConnection, + page: string, + key: string, + value: any + ) => { + let changed = await this.db("page_index") + .where({ page, key }) + .update("value", JSON.stringify(value)); + if (changed === 0) { + await this.db("page_index").insert({ + page, + key, + value: JSON.stringify(value), + }); + } + }, + get: async (clientConn: ClientConnection, page: string, key: string) => { + let result = await this.db("page_index") + .where({ page, key }) + .select("value"); + if (result.length) { + return JSON.parse(result[0].value); + } else { + return null; + } + }, + delete: async ( + clientConn: ClientConnection, + page: string, + key: string + ) => { + await this.db("page_index").where({ page, key }).del(); + }, + scanPrefixForPage: async ( + clientConn: ClientConnection, + page: string, + prefix: string + ) => { + return ( + await this.db("page_index") + .where({ page }) + .andWhereLike("key", `${prefix}%`) + .select("page", "key", "value") + ).map(({ page, key, value }) => ({ + page, + key, + value: JSON.parse(value), + })); + }, + scanPrefixGlobal: async ( + clientConn: ClientConnection, + prefix: string + ) => { + return ( + await this.db("page_index") + .andWhereLike("key", `${prefix}%`) + .select("page", "key", "value") + ).map(({ page, key, value }) => ({ + page, + key, + value: JSON.parse(value), + })); + }, + deletePrefixForPage: async ( + clientConn: ClientConnection, + page: string, + prefix: string + ) => { + return this.db("page_index") + .where({ page }) + .andWhereLike("key", `${prefix}%`) + .del(); + }, + + clearPageIndex: async (clientConn: ClientConnection) => { + return this.db("page_index").del(); + }, + }; + } +} diff --git a/server/src/page_api.ts b/server/src/page_api.ts new file mode 100644 index 0000000..7ffcff2 --- /dev/null +++ b/server/src/page_api.ts @@ -0,0 +1,281 @@ +import { ClientPageState, Page, PageMeta } from "./types"; +import { ChangeSet } from "@codemirror/state"; +import { Update } from "@codemirror/collab"; +import { ApiProvider, ClientConnection } from "./api"; +import { Socket } from "socket.io"; +import { DiskStorage } from "./disk_storage"; +import { safeRun } from "./util"; +import fs from "fs"; +import path from "path"; +import { stat } from "fs/promises"; +import { Cursor, cursorEffect } from "../../webapp/src/cursorEffect"; + +export class PageApi implements ApiProvider { + openPages = new Map(); + pageStore: DiskStorage; + rootPath: string; + connectedSockets: Set; + + constructor(rootPath: string, connectedSockets: Set) { + this.pageStore = new DiskStorage(rootPath); + this.rootPath = rootPath; + this.connectedSockets = connectedSockets; + } + + async init(): Promise { + return; + } + + broadcastCursors(page: Page) { + page.clientStates.forEach((client) => { + client.socket.emit( + "cursorSnapshot", + page.name, + Object.fromEntries(page.cursors.entries()) + ); + }); + } + + flushPageToDisk(name: string, page: Page) { + safeRun(async () => { + let meta = await this.pageStore.writePage(name, page.text.sliceString(0)); + console.log(`Wrote page ${name} to disk`); + page.meta = meta; + }); + } + + disconnectClient(client: ClientPageState, page: Page) { + page.clientStates.delete(client); + if (page.clientStates.size === 0) { + console.log("No more clients for", page.name, "flushing"); + this.flushPageToDisk(page.name, page); + this.openPages.delete(page.name); + } else { + page.cursors.delete(client.socket.id); + this.broadcastCursors(page); + } + } + + fileWatcher() { + fs.watch( + this.rootPath, + { + recursive: true, + persistent: false, + }, + (eventType, filename) => { + safeRun(async () => { + if (!filename.endsWith(".md")) { + return; + } + let localPath = path.join(this.rootPath, filename); + let pageName = filename.substring(0, filename.length - 3); + // console.log("Edit in", pageName, eventType); + let modifiedTime = 0; + try { + let s = await stat(localPath); + modifiedTime = s.mtime.getTime(); + } catch (e) { + // File was deleted + console.log("Deleted", pageName); + for (let socket of this.connectedSockets) { + socket.emit("pageDeleted", pageName); + } + return; + } + const openPage = this.openPages.get(pageName); + if (openPage) { + if (openPage.meta.lastModified < modifiedTime) { + console.log("Page changed on disk outside of editor, reloading"); + this.openPages.delete(pageName); + const meta = { + name: pageName, + lastModified: modifiedTime, + } as PageMeta; + for (let client of openPage.clientStates) { + client.socket.emit("pageChanged", meta); + } + } + } + if (eventType === "rename") { + // This most likely means a new file was created, let's push new file listings to all connected sockets + console.log( + "New file created, broadcasting to all connected sockets", + pageName + ); + for (let socket of this.connectedSockets) { + socket.emit("pageCreated", { + name: pageName, + lastModified: modifiedTime, + } as PageMeta); + } + } + }); + } + ); + } + + api() { + return { + openPage: async (clientConn: ClientConnection, pageName: string) => { + let page = this.openPages.get(pageName); + if (!page) { + try { + let { text, meta } = await this.pageStore.readPage(pageName); + page = new Page(pageName, text, meta); + } catch (e) { + console.log("Creating new page", pageName); + page = new Page(pageName, "", { name: pageName, lastModified: 0 }); + } + this.openPages.set(pageName, page); + } + page.clientStates.add( + new ClientPageState(clientConn.sock, page.version) + ); + clientConn.openPages.add(pageName); + console.log("Opened page", pageName); + this.broadcastCursors(page); + return page.toJSON(); + }, + pushUpdates: async ( + clientConn: ClientConnection, + pageName: string, + version: number, + updates: any[] + ): Promise => { + let page = this.openPages.get(pageName); + + if (!page) { + console.error( + "Received updates for not open page", + pageName, + this.openPages.keys() + ); + return false; + } + if (version !== page.version) { + console.error("Invalid version", version, page.version); + return false; + } else { + console.log("Applying", updates.length, "updates to", pageName); + let transformedUpdates = []; + let textChanged = false; + for (let update of updates) { + let changes = ChangeSet.fromJSON(update.changes); + let transformedUpdate = { + changes, + clientID: update.clientID, + effects: update.cursors?.map((c: Cursor) => { + page!.cursors.set(c.userId, c); + return cursorEffect.of(c); + }), + }; + page.updates.push(transformedUpdate); + transformedUpdates.push(transformedUpdate); + let oldText = page.text; + page.text = changes.apply(page.text); + if (oldText !== page.text) { + textChanged = true; + } + } + + if (textChanged) { + if (page.saveTimer) { + clearTimeout(page.saveTimer); + } + + page.saveTimer = setTimeout(() => { + this.flushPageToDisk(pageName, page!); + }, 1000); + } + while (page.pending.length) { + page.pending.pop()!(transformedUpdates); + } + return true; + } + }, + + pullUpdates: async ( + clientConn: ClientConnection, + pageName: string, + version: number + ): Promise => { + let page = this.openPages.get(pageName); + // console.log("Pulling updates for", pageName); + if (!page) { + console.error("Fetching updates for not open page"); + return []; + } + // TODO: Optimize this + let oldestVersion = Infinity; + page.clientStates.forEach((client) => { + oldestVersion = Math.min(client.version, oldestVersion); + if (client.socket === clientConn.sock) { + client.version = version; + } + }); + page.flushUpdates(oldestVersion); + if (version < page.version) { + return page.updatesSince(version); + } else { + return new Promise((resolve) => { + page!.pending.push(resolve); + }); + } + }, + + readPage: async ( + clientConn: ClientConnection, + pageName: string + ): Promise<{ text: string; meta: PageMeta }> => { + let page = this.openPages.get(pageName); + if (page) { + console.log("Serving page from memory", pageName); + return { + text: page.text.sliceString(0), + meta: page.meta, + }; + } else { + return this.pageStore.readPage(pageName); + } + }, + + writePage: async ( + clientConn: ClientConnection, + pageName: string, + text: string + ) => { + let page = this.openPages.get(pageName); + if (page) { + for (let client of page.clientStates) { + client.socket.emit("reloadPage", pageName); + } + this.openPages.delete(pageName); + } + return this.pageStore.writePage(pageName, text); + }, + + deletePage: async (clientConn: ClientConnection, pageName: string) => { + this.openPages.delete(pageName); + clientConn.openPages.delete(pageName); + // Cascading of this to all connected clients will be handled by file watcher + return this.pageStore.deletePage(pageName); + }, + + listPages: async (clientConn: ClientConnection): Promise => { + return this.pageStore.listPages(); + }, + + getPageMeta: async ( + clientConn: ClientConnection, + pageName: string + ): Promise => { + let page = this.openPages.get(pageName); + if (page) { + return page.meta; + } + return this.pageStore.getPageMeta(pageName); + }, + }; + } +} diff --git a/server/src/server.ts b/server/src/server.ts index 5dd11e4..edd4d68 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -30,6 +30,7 @@ const distDir = `${__dirname}/../../webapp/dist`; app.use("/", express.static(distDir)); let socketServer = new SocketServer(args._[0] as string, io); +socketServer.init(); // Fallback, serve index.html let cachedIndex: string | undefined = undefined; diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..d1d8f14 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,10 @@ +{ + "compilerOptions": { + "target": "esnext", + "strict": true, + "moduleResolution": "node", + "module": "esnext", + "esModuleInterop": true, + "allowSyntheticDefaultImports": true + } +} diff --git a/webapp/package.json b/webapp/package.json index fa83047..339be75 100644 --- a/webapp/package.json +++ b/webapp/package.json @@ -28,6 +28,7 @@ "os-browserify": "^0.3.0", "parcel": "^2.3.2", "path-browserify": "^1.0.1", + "prettier": "^2.5.1", "querystring-es3": "^0.2.1", "stream-browserify": "^3.0.0", "tty-browserify": "^0.0.1", diff --git a/webapp/src/editor.tsx b/webapp/src/editor.tsx index 8810bd8..41680ec 100644 --- a/webapp/src/editor.tsx +++ b/webapp/src/editor.tsx @@ -392,7 +392,7 @@ export class Editor implements AppEventDispatcher { }, }); safeRun(async () => { - def.run(null); + await def.run(null); }); }, }); @@ -445,7 +445,7 @@ export class Editor implements AppEventDispatcher { pageState.scrollTop = this.editorView!.scrollDOM.scrollTop; } - this.space.closePage(this.currentPage); + await this.space.closePage(this.currentPage); } // Fetch next page to open diff --git a/webapp/src/space.ts b/webapp/src/space.ts index f04ba96..b3ff1fd 100644 --- a/webapp/src/space.ts +++ b/webapp/src/space.ts @@ -1,10 +1,10 @@ import { PageMeta } from "./types"; import { Socket } from "socket.io-client"; import { Update } from "@codemirror/collab"; -import { Transaction, Text, ChangeSet } from "@codemirror/state"; +import { ChangeSet, Text, Transaction } from "@codemirror/state"; -import { CollabEvents, CollabDocument } from "./collab"; -import { Cursor, cursorEffect } from "./cursorEffect"; +import { CollabDocument, CollabEvents } from "./collab"; +import { cursorEffect } from "./cursorEffect"; import { EventEmitter } from "./event"; export type SpaceEvents = { @@ -40,7 +40,7 @@ export class Space extends EventEmitter { this.emit(eventName as keyof SpaceEvents, ...args); }); }); - this.wsCall("listPages").then((pages) => { + this.wsCall("page.listPages").then((pages) => { this.allPages = new Set(pages); this.emit("pageListUpdated", this.allPages); }); @@ -87,7 +87,7 @@ export class Space extends EventEmitter { changes: u.changes.toJSON(), cursors: u.effects?.map((e) => e.value), })); - return this.wsCall("pushUpdates", pageName, version, updates); + return this.wsCall("page.pushUpdates", pageName, version, updates); } return false; } @@ -96,13 +96,16 @@ export class Space extends EventEmitter { pageName: string, version: number ): Promise { - let updates: Update[] = await this.wsCall("pullUpdates", pageName, version); - let ups = updates.map((u) => ({ + let updates: Update[] = await this.wsCall( + "page.pullUpdates", + pageName, + version + ); + return updates.map((u) => ({ changes: ChangeSet.fromJSON(u.changes), effects: u.effects?.map((e) => cursorEffect.of(e.value)), clientID: u.clientID, })); - return ups; } async listPages(): Promise { @@ -111,7 +114,7 @@ export class Space extends EventEmitter { async openPage(name: string): Promise { this.reqId++; - let pageJSON = await this.wsCall("openPage", name); + let pageJSON = await this.wsCall("page.openPage", name); return new CollabDocument( Text.of(pageJSON.text), @@ -121,27 +124,27 @@ export class Space extends EventEmitter { } async closePage(name: string): Promise { - this.socket.emit("closePage", name); + this.socket.emit("page.closePage", name); } async readPage(name: string): Promise<{ text: string; meta: PageMeta }> { - return this.wsCall("readPage", name); + return this.wsCall("page.readPage", name); } async writePage(name: string, text: string): Promise { - return this.wsCall("writePage", name, text); + return this.wsCall("page.writePage", name, text); } async deletePage(name: string): Promise { - return this.wsCall("deletePage", name); + return this.wsCall("page.deletePage", name); } async getPageMeta(name: string): Promise { - return this.wsCall("deletePage", name); + return this.wsCall("page.getPageMeta", name); } async indexSet(pageName: string, key: string, value: any) { - await this.wsCall("index:set", pageName, key, value); + await this.wsCall("index.set", pageName, key, value); } async indexBatchSet(pageName: string, kvs: KV[]) { @@ -152,27 +155,27 @@ export class Space extends EventEmitter { } async indexGet(pageName: string, key: string): Promise { - return await this.wsCall("index:get", pageName, key); + return await this.wsCall("index.get", pageName, key); } async indexScanPrefixForPage( pageName: string, keyPrefix: string ): Promise<{ key: string; value: any }[]> { - return await this.wsCall("index:scanPrefixForPage", pageName, keyPrefix); + return await this.wsCall("index.scanPrefixForPage", pageName, keyPrefix); } async indexScanPrefixGlobal( keyPrefix: string ): Promise<{ key: string; value: any }[]> { - return await this.wsCall("index:scanPrefixGlobal", keyPrefix); + return await this.wsCall("index.scanPrefixGlobal", keyPrefix); } async indexDeletePrefixForPage(pageName: string, keyPrefix: string) { - await this.wsCall("index:deletePrefixForPage", keyPrefix); + await this.wsCall("index.deletePrefixForPage", keyPrefix); } async indexDelete(pageName: string, key: string) { - await this.wsCall("index:delete", pageName, key); + await this.wsCall("index.delete", pageName, key); } } diff --git a/webapp/yarn.lock b/webapp/yarn.lock index 84b7cad..acc4c38 100644 --- a/webapp/yarn.lock +++ b/webapp/yarn.lock @@ -2587,6 +2587,11 @@ posthtml@^0.16.4, posthtml@^0.16.5: posthtml-parser "^0.10.0" posthtml-render "^3.0.0" +prettier@^2.5.1: + version "2.5.1" + resolved "https://registry.yarnpkg.com/prettier/-/prettier-2.5.1.tgz#fff75fa9d519c54cf0fce328c1017d94546bc56a" + integrity sha512-vBZcPRUR5MZJwoyi3ZoyQlc1rXeEck8KgeC9AwwOn+exuxLxq5toTRDTSaVrXHxelDMHy9zlicw8u66yxoSUFg== + pretty-format@^27.0.0, pretty-format@^27.5.1: version "27.5.1" resolved "https://registry.yarnpkg.com/pretty-format/-/pretty-format-27.5.1.tgz#2181879fdea51a7a5851fb39d920faa63f01d88e"