1
0

Collaborative editing works somewhat

This commit is contained in:
Zef Hemel 2022-03-07 13:34:25 +01:00
parent 18c96b75fe
commit 5e34395407
10 changed files with 255 additions and 49 deletions

View File

@ -1,5 +1,5 @@
{
"include": ["src/**/*", "../webapp/src/plugbox_browser/browser_system.ts"],
"include": ["src/**/*"],
"compilerOptions": {
"target": "esnext",
"strict": true,

View File

@ -1,5 +0,0 @@
export const serverEvents = {
openPage: "openPage",
closePage: "closePage",
pageText: "pageText",
};

View File

@ -1,5 +1,5 @@
import cors from "cors";
import express from "express";
import express, { text } from "express";
import fs from "fs";
import { readdir, readFile, stat, unlink } from "fs/promises";
import path from "path";
@ -10,7 +10,6 @@ import { ChangeSet, Text } from "@codemirror/state";
import { Update } from "@codemirror/collab";
import http from "http";
import { Server } from "socket.io";
import { serverEvents } from "./events";
const app = express();
const server = http.createServer(app);
@ -29,6 +28,7 @@ const distDir = `${__dirname}/../../webapp/dist`;
type PageMeta = {
name: string;
lastModified: number;
version?: number;
};
class DiskFS {
@ -195,10 +195,14 @@ class Page {
text: Text;
updates: Update[];
sockets: Map<string, Socket>;
meta: PageMeta;
constructor(text: string) {
pending: ((value: any) => void)[] = [];
constructor(text: string, meta: PageMeta) {
this.updates = [];
this.text = Text.of([text]);
this.text = Text.of(text.split("\n"));
this.meta = meta;
this.sockets = new Map<string, Socket>();
}
}
@ -206,7 +210,7 @@ class Page {
let openPages = new Map<string, Page>();
io.on("connection", (socket) => {
function removeSocket(pageName: string) {
function disconnectSocket(pageName: string) {
let page = openPages.get(pageName);
if (page) {
page.sockets.delete(socket.id);
@ -219,30 +223,91 @@ io.on("connection", (socket) => {
console.log("Connected", socket.id);
let socketOpenPages = new Set<string>();
socket.on(serverEvents.openPage, async (pageName: string) => {
function onCall(eventName: string, cb: (...args: any[]) => Promise<any>) {
socket.on(eventName, (reqId: number, ...args) => {
cb(...args).then((result) => {
socket.emit(`${eventName}Resp${reqId}`, result);
});
});
}
onCall("openPage", async (pageName: string) => {
let page = openPages.get(pageName);
if (!page) {
let { text } = await diskFS.readPage(pageName);
page = new Page(text);
let { text, meta } = await diskFS.readPage(pageName);
page = new Page(text, meta);
openPages.set(pageName, page);
}
page.sockets.set(socket.id, socket);
socketOpenPages.add(pageName);
console.log("Sending document text");
socket.emit(
serverEvents.pageText,
pageName,
openPages.get(pageName).text.toJSON()
);
let enhancedMeta = { ...page.meta, version: page.updates.length };
return [enhancedMeta, page.text.toJSON()];
});
socket.on(serverEvents.closePage, (pageName: string) => {
socket.on("closePage", (pageName: string) => {
console.log("Closing page", pageName);
removeSocket(pageName);
disconnectSocket(pageName);
socketOpenPages.delete(pageName);
});
onCall(
"pushUpdates",
async (
pageName: string,
version: number,
updates: any[]
): Promise<boolean> => {
let page = openPages.get(pageName);
if (!page) {
console.error("Received updates for not open page");
return;
}
if (version !== page.updates.length) {
console.error("Invalid version", version, page.updates.length);
return false;
} else {
console.log("Applying", updates.length, "updates");
for (let update of updates) {
let changes = ChangeSet.fromJSON(update.changes);
page.updates.push({ changes, clientID: update.clientID });
page.text = changes.apply(page.text);
}
while (page.pending.length) {
page.pending.pop()!(updates);
}
return true;
}
}
);
onCall(
"pullUpdates",
async (pageName: string, version: number): Promise<Update[]> => {
let page = openPages.get(pageName);
console.log("Pulling updates for", pageName);
if (!page) {
console.error("Received updates for not open page");
return;
}
console.log(`Let's get real: ${version} < ${page.updates.length}`);
if (version < page.updates.length) {
console.log("Yes");
return page.updates.slice(version);
} else {
console.log("No");
return new Promise((resolve) => {
page.pending.push(resolve);
});
}
}
);
socket.on("disconnect", () => {
console.log("Disconnected", socket.id);
socketOpenPages.forEach(removeSocket);
socketOpenPages.forEach(disconnectSocket);
});
});
//sup

View File

@ -41,6 +41,7 @@
},
"dependencies": {
"@codemirror/basic-setup": "^0.19.1",
"@codemirror/collab": "^0.19.0",
"@codemirror/commands": "^0.19.8",
"@codemirror/lang-markdown": "^0.19.6",
"@codemirror/state": "^0.19.7",

View File

@ -5,8 +5,6 @@ import { io } from "socket.io-client";
let socket = io("http://localhost:3000");
import { serverEvents } from "../../server/src/events";
let editor = new Editor(
new HttpRemoteSpace(`http://${location.hostname}:3000/fs`, socket),
document.getElementById("root")!

77
webapp/src/collab.ts Normal file
View File

@ -0,0 +1,77 @@
import { EditorView, ViewPlugin, ViewUpdate } from "@codemirror/view";
import { HttpRemoteSpace, Space } from "./space";
import {
Update,
receiveUpdates,
sendableUpdates,
collab,
getSyncedVersion,
} from "@codemirror/collab";
import { PageMeta } from "./types";
import { Text } from "@codemirror/state";
export class Document {
text: Text;
meta: PageMeta;
constructor(text: Text, meta: PageMeta) {
this.text = text;
this.meta = meta;
}
}
export function collabExtension(
pageName: string,
startVersion: number,
space: HttpRemoteSpace,
reloadCallback: () => void
) {
let plugin = ViewPlugin.fromClass(
class {
private pushing = false;
private done = false;
constructor(private view: EditorView) {
if (pageName) {
this.pull();
}
}
update(update: ViewUpdate) {
if (update.docChanged) this.push();
}
async push() {
let updates = sendableUpdates(this.view.state);
if (this.pushing || !updates.length) return;
this.pushing = true;
let version = getSyncedVersion(this.view.state);
let success = await space.pushUpdates(pageName, version, updates);
this.pushing = false;
if (!success) {
reloadCallback();
}
// Regardless of whether the push failed or new updates came in
// while it was running, try again if there's updates remaining
if (sendableUpdates(this.view.state).length) {
setTimeout(() => this.push(), 100);
}
}
async pull() {
while (!this.done) {
let version = getSyncedVersion(this.view.state);
let updates = await space.pullUpdates(pageName, version);
this.view.dispatch(receiveUpdates(this.view.state, updates));
}
}
destroy() {
this.done = true;
}
}
);
return [collab({ startVersion }), plugin];
}

View File

@ -10,7 +10,7 @@ import { indentWithTab, standardKeymap } from "@codemirror/commands";
import { history, historyKeymap } from "@codemirror/history";
import { bracketMatching } from "@codemirror/matchbrackets";
import { searchKeymap } from "@codemirror/search";
import { EditorState, StateField, Transaction } from "@codemirror/state";
import { EditorState, StateField, Transaction, Text } from "@codemirror/state";
import {
drawSelection,
dropCursor,
@ -59,6 +59,10 @@ import {
} from "./types";
import { safeRun } from "./util";
import { collabExtension } from "./collab";
import { Document } from "./collab";
class PageState {
editorState: EditorState;
scrollTop: number;
@ -94,12 +98,18 @@ export class Editor implements AppEventDispatcher {
this.viewDispatch = () => {};
this.render(parent);
this.editorView = new EditorView({
state: this.createEditorState(""),
state: this.createEditorState(
new Document(Text.of([""]), {
name: "",
lastModified: new Date(),
version: 0,
})
),
parent: document.getElementById("editor")!,
});
this.pageNavigator = new PathPageNavigator();
this.indexer = new Indexer("page-index", space);
this.watch();
// this.watch();
}
async init() {
@ -176,7 +186,7 @@ export class Editor implements AppEventDispatcher {
return this.viewState.currentPage;
}
createEditorState(text: string): EditorState {
createEditorState(doc: Document): EditorState {
const editor = this;
let commandKeyBindings: KeyBinding[] = [];
for (let def of this.editorCommands.values()) {
@ -196,7 +206,7 @@ export class Editor implements AppEventDispatcher {
}
}
return EditorState.create({
doc: text,
doc: doc.text,
extensions: [
highlightSpecialChars(),
history(),
@ -206,6 +216,12 @@ export class Editor implements AppEventDispatcher {
customMarkdownStyle,
bracketMatching(),
closeBrackets(),
collabExtension(
doc.meta.name,
doc.meta.version!,
this.space,
this.reloadPage.bind(this)
),
autocompletion({
override: [
this.plugCompleter.bind(this),
@ -317,6 +333,8 @@ export class Editor implements AppEventDispatcher {
});
}
reloadPage() {}
async plugCompleter(
ctx: CompletionContext
): Promise<CompletionResult | null> {
@ -439,10 +457,10 @@ export class Editor implements AppEventDispatcher {
cachedMeta.lastModified.getTime() !== newPageMeta.lastModified.getTime()
) {
console.log("File changed on disk, reloading");
let pageData = await this.space.readPage(currentPageName);
let doc = await this.space.openPage(currentPageName);
this.openPages.set(
currentPageName,
new PageState(this.createEditorState(pageData.text), 0, newPageMeta)
new PageState(this.createEditorState(doc), 0, doc.meta)
);
await this.loadPage(currentPageName, false);
}
@ -459,12 +477,8 @@ export class Editor implements AppEventDispatcher {
async loadPage(pageName: string, checkNewVersion: boolean = true) {
let pageState = this.openPages.get(pageName);
if (!pageState) {
let pageData = await this.space.readPage(pageName);
pageState = new PageState(
this.createEditorState(pageData.text),
0,
pageData.meta
);
let doc = await this.space.openPage(pageName);
pageState = new PageState(this.createEditorState(doc), 0, doc.meta);
this.openPages.set(pageName, pageState!);
// Freshly loaded, no need to check for a new version either way
checkNewVersion = false;

View File

@ -1,7 +1,9 @@
import { PageMeta } from "./types";
import { Socket } from "socket.io-client";
import { serverEvents } from "../../server/src/events";
import { EventEmitter } from "events";
import { Update } from "@codemirror/collab";
import { Transaction, Text, ChangeSet } from "@codemirror/state";
import { Document } from "./collab";
export interface Space {
listPages(): Promise<PageMeta[]>;
@ -13,15 +15,48 @@ export interface Space {
export class HttpRemoteSpace implements Space {
url: string;
socket?: Socket;
socket: Socket;
reqId = 0;
constructor(url: string, socket: Socket | null) {
constructor(url: string, socket: Socket) {
this.url = url;
// this.socket = socket;
this.socket = socket;
// socket.on("connect", () => {
// console.log("connected via SocketIO", serverEvents.pageText);
// });
socket.on("connect", () => {
console.log("connected via SocketIO");
});
}
pushUpdates(
pageName: string,
version: number,
fullUpdates: readonly (Update & { origin: Transaction })[]
): Promise<boolean> {
return new Promise((resolve) => {
if (this.socket) {
let updates = fullUpdates.map((u) => ({
clientID: u.clientID,
changes: u.changes.toJSON(),
}));
this.reqId++;
this.socket.emit("pushUpdates", this.reqId, pageName, version, updates);
this.socket.once("pushUpdatesResp" + this.reqId, (result) => {
resolve(result);
});
}
});
}
async pullUpdates(
pageName: string,
version: number
): Promise<readonly Update[]> {
let updates: Update[] = await this.wsCall("pullUpdates", pageName, version);
console.log("Got updates", updates);
return updates.map((u) => ({
changes: ChangeSet.fromJSON(u.changes),
clientID: u.clientID,
}));
}
async listPages(): Promise<PageMeta[]> {
@ -35,11 +70,24 @@ export class HttpRemoteSpace implements Space {
}));
}
async openPage(name: string) {
this.socket!.on(serverEvents.pageText, (pageName, text) => {
console.log("Got this", pageName, text);
wsCall(eventName: string, ...args: any[]): Promise<any> {
return new Promise((resolve) => {
this.reqId++;
this.socket!.once(`${eventName}Resp${this.reqId}`, resolve);
this.socket!.emit(eventName, this.reqId, ...args);
});
this.socket!.emit(serverEvents.openPage, "start");
}
async openPage(name: string): Promise<Document> {
this.reqId++;
let [meta, text] = await this.wsCall("openPage", name);
console.log("Got this", meta, text);
meta.lastModified = new Date(meta.lastModified);
return new Document(Text.of(text), meta);
}
async closePage(name: string): Promise<void> {
this.socket!.emit("closePage", name);
}
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {

View File

@ -11,6 +11,7 @@ export type Manifest = plugbox.Manifest<NuggetHook>;
export type PageMeta = {
name: string;
lastModified: Date;
version?: number;
created?: boolean;
lastOpened?: Date;
};

View File

@ -67,6 +67,13 @@
"@codemirror/text" "^0.19.0"
"@codemirror/view" "^0.19.0"
"@codemirror/collab@^0.19.0":
version "0.19.0"
resolved "https://registry.yarnpkg.com/@codemirror/collab/-/collab-0.19.0.tgz#43938671d58ef8f12e43406ddd315410d85ac1c4"
integrity sha512-pyrEXLLkby82y9wzfanEQGl3V3DX/pIuA97Js7gVEbPAqhvse5iXKNyp1Yr37afhkl2TUeoZyUSFTtcimgdI6g==
dependencies:
"@codemirror/state" "^0.19.0"
"@codemirror/commands@^0.19.0", "@codemirror/commands@^0.19.8":
version "0.19.8"
resolved "https://registry.yarnpkg.com/@codemirror/commands/-/commands-0.19.8.tgz#1f99c1a8bf200d17c4d6997379099459f3678107"