diff --git a/plugbox/tsconfig.json b/plugbox/tsconfig.json index 613ffde..ca8018b 100644 --- a/plugbox/tsconfig.json +++ b/plugbox/tsconfig.json @@ -1,5 +1,5 @@ { - "include": ["src/**/*", "../webapp/src/plugbox_browser/browser_system.ts"], + "include": ["src/**/*"], "compilerOptions": { "target": "esnext", "strict": true, diff --git a/server/src/events.ts b/server/src/events.ts deleted file mode 100644 index 113e36a..0000000 --- a/server/src/events.ts +++ /dev/null @@ -1,5 +0,0 @@ -export const serverEvents = { - openPage: "openPage", - closePage: "closePage", - pageText: "pageText", -}; diff --git a/server/src/server.ts b/server/src/server.ts index 1f7f7fd..fa9a8b4 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -1,5 +1,5 @@ import cors from "cors"; -import express from "express"; +import express, { text } from "express"; import fs from "fs"; import { readdir, readFile, stat, unlink } from "fs/promises"; import path from "path"; @@ -10,7 +10,6 @@ import { ChangeSet, Text } from "@codemirror/state"; import { Update } from "@codemirror/collab"; import http from "http"; import { Server } from "socket.io"; -import { serverEvents } from "./events"; const app = express(); const server = http.createServer(app); @@ -29,6 +28,7 @@ const distDir = `${__dirname}/../../webapp/dist`; type PageMeta = { name: string; lastModified: number; + version?: number; }; class DiskFS { @@ -195,10 +195,14 @@ class Page { text: Text; updates: Update[]; sockets: Map; + meta: PageMeta; - constructor(text: string) { + pending: ((value: any) => void)[] = []; + + constructor(text: string, meta: PageMeta) { this.updates = []; - this.text = Text.of([text]); + this.text = Text.of(text.split("\n")); + this.meta = meta; this.sockets = new Map(); } } @@ -206,7 +210,7 @@ class Page { let openPages = new Map(); io.on("connection", (socket) => { - function removeSocket(pageName: string) { + function disconnectSocket(pageName: string) { let page = openPages.get(pageName); if (page) { page.sockets.delete(socket.id); @@ -219,30 +223,91 @@ io.on("connection", (socket) => { console.log("Connected", socket.id); let socketOpenPages = new Set(); - socket.on(serverEvents.openPage, async (pageName: string) => { + + function onCall(eventName: string, cb: (...args: any[]) => Promise) { + socket.on(eventName, (reqId: number, ...args) => { + cb(...args).then((result) => { + socket.emit(`${eventName}Resp${reqId}`, result); + }); + }); + } + + onCall("openPage", async (pageName: string) => { let page = openPages.get(pageName); if (!page) { - let { text } = await diskFS.readPage(pageName); - page = new Page(text); + let { text, meta } = await diskFS.readPage(pageName); + page = new Page(text, meta); openPages.set(pageName, page); } page.sockets.set(socket.id, socket); socketOpenPages.add(pageName); console.log("Sending document text"); - socket.emit( - serverEvents.pageText, - pageName, - openPages.get(pageName).text.toJSON() - ); + let enhancedMeta = { ...page.meta, version: page.updates.length }; + return [enhancedMeta, page.text.toJSON()]; }); - socket.on(serverEvents.closePage, (pageName: string) => { + + socket.on("closePage", (pageName: string) => { console.log("Closing page", pageName); - removeSocket(pageName); + disconnectSocket(pageName); socketOpenPages.delete(pageName); }); + + onCall( + "pushUpdates", + async ( + pageName: string, + version: number, + updates: any[] + ): Promise => { + let page = openPages.get(pageName); + + if (!page) { + console.error("Received updates for not open page"); + return; + } + if (version !== page.updates.length) { + console.error("Invalid version", version, page.updates.length); + return false; + } else { + console.log("Applying", updates.length, "updates"); + for (let update of updates) { + let changes = ChangeSet.fromJSON(update.changes); + page.updates.push({ changes, clientID: update.clientID }); + page.text = changes.apply(page.text); + } + while (page.pending.length) { + page.pending.pop()!(updates); + } + return true; + } + } + ); + + onCall( + "pullUpdates", + async (pageName: string, version: number): Promise => { + let page = openPages.get(pageName); + console.log("Pulling updates for", pageName); + if (!page) { + console.error("Received updates for not open page"); + return; + } + console.log(`Let's get real: ${version} < ${page.updates.length}`); + if (version < page.updates.length) { + console.log("Yes"); + return page.updates.slice(version); + } else { + console.log("No"); + return new Promise((resolve) => { + page.pending.push(resolve); + }); + } + } + ); + socket.on("disconnect", () => { console.log("Disconnected", socket.id); - socketOpenPages.forEach(removeSocket); + socketOpenPages.forEach(disconnectSocket); }); }); //sup diff --git a/webapp/package.json b/webapp/package.json index bc0303d..df58c7c 100644 --- a/webapp/package.json +++ b/webapp/package.json @@ -41,6 +41,7 @@ }, "dependencies": { "@codemirror/basic-setup": "^0.19.1", + "@codemirror/collab": "^0.19.0", "@codemirror/commands": "^0.19.8", "@codemirror/lang-markdown": "^0.19.6", "@codemirror/state": "^0.19.7", diff --git a/webapp/src/boot.ts b/webapp/src/boot.ts index 8cf6380..049c316 100644 --- a/webapp/src/boot.ts +++ b/webapp/src/boot.ts @@ -5,8 +5,6 @@ import { io } from "socket.io-client"; let socket = io("http://localhost:3000"); -import { serverEvents } from "../../server/src/events"; - let editor = new Editor( new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket), document.getElementById("root")! diff --git a/webapp/src/collab.ts b/webapp/src/collab.ts new file mode 100644 index 0000000..eca25c9 --- /dev/null +++ b/webapp/src/collab.ts @@ -0,0 +1,77 @@ +import { EditorView, ViewPlugin, ViewUpdate } from "@codemirror/view"; +import { HttpRemoteSpace, Space } from "./space"; +import { + Update, + receiveUpdates, + sendableUpdates, + collab, + getSyncedVersion, +} from "@codemirror/collab"; +import { PageMeta } from "./types"; +import { Text } from "@codemirror/state"; + +export class Document { + text: Text; + meta: PageMeta; + + constructor(text: Text, meta: PageMeta) { + this.text = text; + this.meta = meta; + } +} + +export function collabExtension( + pageName: string, + startVersion: number, + space: HttpRemoteSpace, + reloadCallback: () => void +) { + let plugin = ViewPlugin.fromClass( + class { + private pushing = false; + private done = false; + + constructor(private view: EditorView) { + if (pageName) { + this.pull(); + } + } + + update(update: ViewUpdate) { + if (update.docChanged) this.push(); + } + + async push() { + let updates = sendableUpdates(this.view.state); + if (this.pushing || !updates.length) return; + this.pushing = true; + let version = getSyncedVersion(this.view.state); + let success = await space.pushUpdates(pageName, version, updates); + this.pushing = false; + + if (!success) { + reloadCallback(); + } + + // Regardless of whether the push failed or new updates came in + // while it was running, try again if there's updates remaining + if (sendableUpdates(this.view.state).length) { + setTimeout(() => this.push(), 100); + } + } + + async pull() { + while (!this.done) { + let version = getSyncedVersion(this.view.state); + let updates = await space.pullUpdates(pageName, version); + this.view.dispatch(receiveUpdates(this.view.state, updates)); + } + } + + destroy() { + this.done = true; + } + } + ); + return [collab({ startVersion }), plugin]; +} diff --git a/webapp/src/editor.tsx b/webapp/src/editor.tsx index 8f2006e..3e878ff 100644 --- a/webapp/src/editor.tsx +++ b/webapp/src/editor.tsx @@ -10,7 +10,7 @@ import { indentWithTab, standardKeymap } from "@codemirror/commands"; import { history, historyKeymap } from "@codemirror/history"; import { bracketMatching } from "@codemirror/matchbrackets"; import { searchKeymap } from "@codemirror/search"; -import { EditorState, StateField, Transaction } from "@codemirror/state"; +import { EditorState, StateField, Transaction, Text } from "@codemirror/state"; import { drawSelection, dropCursor, @@ -59,6 +59,10 @@ import { } from "./types"; import { safeRun } from "./util"; +import { collabExtension } from "./collab"; + +import { Document } from "./collab"; + class PageState { editorState: EditorState; scrollTop: number; @@ -94,12 +98,18 @@ export class Editor implements AppEventDispatcher { this.viewDispatch = () => {}; this.render(parent); this.editorView = new EditorView({ - state: this.createEditorState(""), + state: this.createEditorState( + new Document(Text.of([""]), { + name: "", + lastModified: new Date(), + version: 0, + }) + ), parent: document.getElementById("editor")!, }); this.pageNavigator = new PathPageNavigator(); this.indexer = new Indexer("page-index", space); - this.watch(); + // this.watch(); } async init() { @@ -176,7 +186,7 @@ export class Editor implements AppEventDispatcher { return this.viewState.currentPage; } - createEditorState(text: string): EditorState { + createEditorState(doc: Document): EditorState { const editor = this; let commandKeyBindings: KeyBinding[] = []; for (let def of this.editorCommands.values()) { @@ -196,7 +206,7 @@ export class Editor implements AppEventDispatcher { } } return EditorState.create({ - doc: text, + doc: doc.text, extensions: [ highlightSpecialChars(), history(), @@ -206,6 +216,12 @@ export class Editor implements AppEventDispatcher { customMarkdownStyle, bracketMatching(), closeBrackets(), + collabExtension( + doc.meta.name, + doc.meta.version!, + this.space, + this.reloadPage.bind(this) + ), autocompletion({ override: [ this.plugCompleter.bind(this), @@ -317,6 +333,8 @@ export class Editor implements AppEventDispatcher { }); } + reloadPage() {} + async plugCompleter( ctx: CompletionContext ): Promise { @@ -439,10 +457,10 @@ export class Editor implements AppEventDispatcher { cachedMeta.lastModified.getTime() !== newPageMeta.lastModified.getTime() ) { console.log("File changed on disk, reloading"); - let pageData = await this.space.readPage(currentPageName); + let doc = await this.space.openPage(currentPageName); this.openPages.set( currentPageName, - new PageState(this.createEditorState(pageData.text), 0, newPageMeta) + new PageState(this.createEditorState(doc), 0, doc.meta) ); await this.loadPage(currentPageName, false); } @@ -459,12 +477,8 @@ export class Editor implements AppEventDispatcher { async loadPage(pageName: string, checkNewVersion: boolean = true) { let pageState = this.openPages.get(pageName); if (!pageState) { - let pageData = await this.space.readPage(pageName); - pageState = new PageState( - this.createEditorState(pageData.text), - 0, - pageData.meta - ); + let doc = await this.space.openPage(pageName); + pageState = new PageState(this.createEditorState(doc), 0, doc.meta); this.openPages.set(pageName, pageState!); // Freshly loaded, no need to check for a new version either way checkNewVersion = false; diff --git a/webapp/src/space.ts b/webapp/src/space.ts index a4f7132..ff351c1 100644 --- a/webapp/src/space.ts +++ b/webapp/src/space.ts @@ -1,7 +1,9 @@ import { PageMeta } from "./types"; import { Socket } from "socket.io-client"; -import { serverEvents } from "../../server/src/events"; -import { EventEmitter } from "events"; +import { Update } from "@codemirror/collab"; +import { Transaction, Text, ChangeSet } from "@codemirror/state"; + +import { Document } from "./collab"; export interface Space { listPages(): Promise; @@ -13,15 +15,48 @@ export interface Space { export class HttpRemoteSpace implements Space { url: string; - socket?: Socket; + socket: Socket; + reqId = 0; - constructor(url: string, socket: Socket | null) { + constructor(url: string, socket: Socket) { this.url = url; - // this.socket = socket; + this.socket = socket; - // socket.on("connect", () => { - // console.log("connected via SocketIO", serverEvents.pageText); - // }); + socket.on("connect", () => { + console.log("connected via SocketIO"); + }); + } + + pushUpdates( + pageName: string, + version: number, + fullUpdates: readonly (Update & { origin: Transaction })[] + ): Promise { + return new Promise((resolve) => { + if (this.socket) { + let updates = fullUpdates.map((u) => ({ + clientID: u.clientID, + changes: u.changes.toJSON(), + })); + this.reqId++; + this.socket.emit("pushUpdates", this.reqId, pageName, version, updates); + this.socket.once("pushUpdatesResp" + this.reqId, (result) => { + resolve(result); + }); + } + }); + } + + async pullUpdates( + pageName: string, + version: number + ): Promise { + let updates: Update[] = await this.wsCall("pullUpdates", pageName, version); + console.log("Got updates", updates); + return updates.map((u) => ({ + changes: ChangeSet.fromJSON(u.changes), + clientID: u.clientID, + })); } async listPages(): Promise { @@ -35,11 +70,24 @@ export class HttpRemoteSpace implements Space { })); } - async openPage(name: string) { - this.socket!.on(serverEvents.pageText, (pageName, text) => { - console.log("Got this", pageName, text); + wsCall(eventName: string, ...args: any[]): Promise { + return new Promise((resolve) => { + this.reqId++; + this.socket!.once(`${eventName}Resp${this.reqId}`, resolve); + this.socket!.emit(eventName, this.reqId, ...args); }); - this.socket!.emit(serverEvents.openPage, "start"); + } + + async openPage(name: string): Promise { + this.reqId++; + let [meta, text] = await this.wsCall("openPage", name); + console.log("Got this", meta, text); + meta.lastModified = new Date(meta.lastModified); + return new Document(Text.of(text), meta); + } + + async closePage(name: string): Promise { + this.socket!.emit("closePage", name); } async readPage(name: string): Promise<{ text: string; meta: PageMeta }> { diff --git a/webapp/src/types.ts b/webapp/src/types.ts index c044924..e2d6840 100644 --- a/webapp/src/types.ts +++ b/webapp/src/types.ts @@ -11,6 +11,7 @@ export type Manifest = plugbox.Manifest; export type PageMeta = { name: string; lastModified: Date; + version?: number; created?: boolean; lastOpened?: Date; }; diff --git a/webapp/yarn.lock b/webapp/yarn.lock index 34138f6..19001d5 100644 --- a/webapp/yarn.lock +++ b/webapp/yarn.lock @@ -67,6 +67,13 @@ "@codemirror/text" "^0.19.0" "@codemirror/view" "^0.19.0" +"@codemirror/collab@^0.19.0": + version "0.19.0" + resolved "https://registry.yarnpkg.com/@codemirror/collab/-/collab-0.19.0.tgz#43938671d58ef8f12e43406ddd315410d85ac1c4" + integrity sha512-pyrEXLLkby82y9wzfanEQGl3V3DX/pIuA97Js7gVEbPAqhvse5iXKNyp1Yr37afhkl2TUeoZyUSFTtcimgdI6g== + dependencies: + "@codemirror/state" "^0.19.0" + "@codemirror/commands@^0.19.0", "@codemirror/commands@^0.19.8": version "0.19.8" resolved "https://registry.yarnpkg.com/@codemirror/commands/-/commands-0.19.8.tgz#1f99c1a8bf200d17c4d6997379099459f3678107"