1
0

Enormous refactor adding IndexedDB space and syncing.

This commit is contained in:
Zef Hemel 2022-04-06 15:39:20 +02:00
parent 38faf50ab8
commit eb781b9e19
16 changed files with 611 additions and 321 deletions

View File

@ -1,5 +1,5 @@
import {SysCallMapping} from "../../plugos/system";
import {MarkdownTree, parse} from "../tree";
import { SysCallMapping } from "../../plugos/system";
import { MarkdownTree, parse } from "../tree";
export function markdownSyscalls(): SysCallMapping {
return {

View File

@ -1,4 +1,4 @@
import {SyntaxNode} from "@lezer/common";
import { SyntaxNode } from "@lezer/common";
import wikiMarkdownLang from "../webapp/parser";
export type MarkdownTree = {

View File

@ -1,4 +1,4 @@
import {syscall} from "./syscall";
import { syscall } from "./syscall";
export async function set(key: string, value: any): Promise<void> {
return syscall("clientStore.set", key, value);

View File

@ -5,5 +5,5 @@ export async function invokeFunction(
name: string,
...args: any[]
): Promise<any> {
return syscall("system.invokeFunctionOnServer", name, ...args);
return syscall("system.invokeFunction", env, name, ...args);
}

View File

@ -56,7 +56,6 @@ functions:
env: server
updateMaterializedQueriesOnPage:
path: ./materialized_queries.ts:updateMaterializedQueriesOnPage
env: server
updateMaterializedQueriesCommand:
path: ./materialized_queries.ts:updateMaterializedQueriesCommand
command:

View File

@ -54,10 +54,10 @@ export async function updateMaterializedQueriesOnPage(pageName: string) {
let pages = await listPages();
if (orderBy) {
pages = pages.sort((a: any, b: any) => {
console.log(a, orderBy, a[orderBy]);
if (a[orderBy] === b[orderBy]) {
return 0;
}
if (a[orderBy] < b[orderBy]) {
return !!orderDesc ? 1 : -1;
} else {

View File

@ -1,20 +1,20 @@
import express, {Express} from "express";
import {SilverBulletHooks} from "../common/manifest";
import {EndpointHook} from "../plugos/hooks/endpoint";
import {readFile} from "fs/promises";
import {System} from "../plugos/system";
import express, { Express } from "express";
import { SilverBulletHooks } from "../common/manifest";
import { EndpointHook } from "../plugos/hooks/endpoint";
import { readFile } from "fs/promises";
import { System } from "../plugos/system";
import cors from "cors";
import {DiskStorage, EventedStorage, Storage} from "./disk_storage";
import { DiskStorage, EventedStorage, Storage } from "./disk_storage";
import path from "path";
import bodyParser from "body-parser";
import {EventHook} from "../plugos/hooks/event";
import { EventHook } from "../plugos/hooks/event";
import spaceSyscalls from "./syscalls/space";
import {eventSyscalls} from "../plugos/syscalls/event";
import {pageIndexSyscalls} from "./syscalls";
import knex, {Knex} from "knex";
import { eventSyscalls } from "../plugos/syscalls/event";
import { pageIndexSyscalls } from "./syscalls";
import knex, { Knex } from "knex";
import shellSyscalls from "../plugos/syscalls/shell.node";
import {NodeCronHook} from "../plugos/hooks/node_cron";
import {markdownSyscalls} from "../common/syscalls/markdown";
import { NodeCronHook } from "../plugos/hooks/node_cron";
import { markdownSyscalls } from "../common/syscalls/markdown";
export class ExpressServer {
app: Express;
@ -58,7 +58,7 @@ export class ExpressServer {
system.registerSyscalls([], spaceSyscalls(this.storage));
system.registerSyscalls([], eventSyscalls(this.eventHook));
system.registerSyscalls([], markdownSyscalls());
system.addHook(new EndpointHook(app, "/_"));
system.addHook(new EndpointHook(app, "/_/"));
}
async init() {
@ -85,7 +85,9 @@ export class ExpressServer {
res.header("Content-Type", "text/markdown");
res.send(pageData.text);
} catch (e) {
// CORS
res.status(200);
res.header("X-Status", "404");
res.send("");
}
})
@ -94,7 +96,13 @@ export class ExpressServer {
console.log("Saving", pageName);
try {
let meta = await this.storage.writePage(pageName, req.body);
let meta = await this.storage.writePage(
pageName,
req.body,
req.header("Last-Modified")
? +req.header("Last-Modified")!
: undefined
);
res.status(200);
res.header("Last-Modified", "" + meta.lastModified);
res.send("OK");
@ -113,8 +121,10 @@ export class ExpressServer {
res.header("Content-Type", "text/markdown");
res.send("");
} catch (e) {
// CORS
res.status(200);
res.send("");
res.header("X-Status", "404");
res.send("Not found");
}
})
.delete(async (req, res) => {

View File

@ -1,17 +1,17 @@
import {mkdir, readdir, readFile, stat, unlink, writeFile} from "fs/promises";
import { mkdir, readdir, readFile, stat, unlink, utimes, writeFile } from "fs/promises";
import * as path from "path";
import {PageMeta} from "../common/types";
import {EventHook} from "../plugos/hooks/event";
import { PageMeta } from "../common/types";
import { EventHook } from "../plugos/hooks/event";
export interface Storage {
listPages(): Promise<PageMeta[]>;
readPage(pageName: string): Promise<{ text: string; meta: PageMeta }>;
writePage(pageName: string, text: string): Promise<PageMeta>;
writePage(
pageName: string,
text: string,
lastModified?: number
): Promise<PageMeta>;
getPageMeta(pageName: string): Promise<PageMeta>;
deletePage(pageName: string): Promise<void>;
}
@ -26,8 +26,12 @@ export class EventedStorage implements Storage {
return this.wrapped.readPage(pageName);
}
async writePage(pageName: string, text: string): Promise<PageMeta> {
const newPageMeta = this.wrapped.writePage(pageName, text);
async writePage(
pageName: string,
text: string,
lastModified?: number
): Promise<PageMeta> {
const newPageMeta = this.wrapped.writePage(pageName, text, lastModified);
// This can happen async
this.eventHook
.dispatchEvent("page:saved", pageName)
@ -55,9 +59,28 @@ export class EventedStorage implements Storage {
export class DiskStorage implements Storage {
rootPath: string;
plugPrefix: string;
constructor(rootPath: string) {
constructor(rootPath: string, plugPrefix: string = "_plug/") {
this.rootPath = rootPath;
this.plugPrefix = plugPrefix;
}
pageNameToPath(pageName: string) {
if (pageName.startsWith(this.plugPrefix)) {
return path.join(this.rootPath, pageName + ".plug.json");
}
return path.join(this.rootPath, pageName + ".md");
}
pathToPageName(fullPath: string): string {
let extLength = fullPath.endsWith(".plug.json")
? ".plug.json".length
: ".md".length;
return fullPath.substring(
this.rootPath.length + 1,
fullPath.length - extLength
);
}
async listPages(): Promise<PageMeta[]> {
@ -68,15 +91,13 @@ export class DiskStorage implements Storage {
for (let file of files) {
const fullPath = path.join(dir, file);
let s = await stat(fullPath);
// console.log("Encountering", fullPath, s);
if (s.isDirectory()) {
await walkPath(fullPath);
} else {
if (path.extname(file) === ".md") {
if (file.endsWith(".md") || file.endsWith(".json")) {
fileNames.push({
name: fullPath.substring(
this.rootPath.length + 1,
fullPath.length - 3
),
name: this.pathToPageName(fullPath),
lastModified: s.mtime.getTime(),
});
}
@ -88,7 +109,7 @@ export class DiskStorage implements Storage {
}
async readPage(pageName: string): Promise<{ text: string; meta: PageMeta }> {
const localPath = path.join(this.rootPath, pageName + ".md");
const localPath = this.pageNameToPath(pageName);
try {
const s = await stat(localPath);
return {
@ -104,8 +125,12 @@ export class DiskStorage implements Storage {
}
}
async writePage(pageName: string, text: string): Promise<PageMeta> {
let localPath = path.join(this.rootPath, pageName + ".md");
async writePage(
pageName: string,
text: string,
lastModified?: number
): Promise<PageMeta> {
let localPath = this.pageNameToPath(pageName);
try {
// Ensure parent folder exists
await mkdir(path.dirname(localPath), { recursive: true });
@ -113,6 +138,11 @@ export class DiskStorage implements Storage {
// Actually write the file
await writeFile(localPath, text);
if (lastModified) {
let d = new Date(lastModified);
console.log("Going to set the modified time", d);
await utimes(localPath, lastModified, lastModified);
}
// Fetch new metadata
const s = await stat(localPath);
return {
@ -126,7 +156,7 @@ export class DiskStorage implements Storage {
}
async getPageMeta(pageName: string): Promise<PageMeta> {
let localPath = path.join(this.rootPath, pageName + ".md");
let localPath = this.pageNameToPath(pageName);
try {
const s = await stat(localPath);
return {
@ -140,7 +170,7 @@ export class DiskStorage implements Storage {
}
async deletePage(pageName: string): Promise<void> {
let localPath = path.join(this.rootPath, pageName + ".md");
let localPath = this.pageNameToPath(pageName);
await unlink(localPath);
}
}

View File

@ -1,12 +1,24 @@
import { Editor } from "./editor";
import { safeRun } from "./util";
import { IndexedDBSpace } from "./spaces/indexeddb_space";
import { WatchableSpace } from "./spaces/cache_space";
import { HttpRestSpace } from "./spaces/httprest_space";
let editor = new Editor(
// new HttpRestSpace(""),
new IndexedDBSpace("pages"),
document.getElementById("root")!
);
// let localSpace = new WatchableSpace(new IndexedDBSpace("pages"), true);
// localSpace.watch();
let serverSpace = new WatchableSpace(new HttpRestSpace(""), true);
serverSpace.watch();
// @ts-ignore
// window.syncer = async () => {
// let lastSync = +(localStorage.getItem("lastSync") || "0");
// let syncer = new SpaceSync(serverSpace, localSpace, lastSync, "_trash/");
// await syncer.syncPages(
// SpaceSync.primaryConflictResolver(serverSpace, localSpace)
// );
// localStorage.setItem("lastSync", "" + syncer.lastSync);
// console.log("Done!");
// };
let editor = new Editor(serverSpace, document.getElementById("root")!);
safeRun(async () => {
await editor.init();

View File

@ -29,7 +29,7 @@ import { PathPageNavigator } from "./navigator";
import customMarkDown from "./parser";
import reducer from "./reducer";
import { smartQuoteKeymap } from "./smart_quotes";
import { Space } from "./spaces/space";
import { WatchableSpace } from "./spaces/cache_space";
import customMarkdownStyle from "./style";
import { editorSyscalls } from "./syscalls/editor";
import { indexerSyscalls } from "./syscalls/indexer";
@ -69,7 +69,7 @@ export class Editor implements AppEventDispatcher {
editorView?: EditorView;
viewState: AppViewState;
viewDispatch: React.Dispatch<Action>;
space: Space;
space: WatchableSpace;
pageNavigator: PathPageNavigator;
eventHook: EventHook;
saveTimeout: any;
@ -78,7 +78,7 @@ export class Editor implements AppEventDispatcher {
}, 1000);
private system = new System<SilverBulletHooks>("client");
constructor(space: Space, parent: Element) {
constructor(space: WatchableSpace, parent: Element) {
this.space = space;
this.viewState = initialViewState;
this.viewDispatch = () => {};
@ -439,7 +439,18 @@ export class Editor implements AppEventDispatcher {
}
// Fetch next page to open
let doc = await this.space.readPage(pageName);
let doc;
try {
doc = await this.space.readPage(pageName);
} catch (e: any) {
// Not found, new page
console.log("Creating new page", pageName);
doc = {
text: "",
meta: { name: pageName, lastModified: 0 },
};
}
let editorState = this.createEditorState(pageName, doc.text);
let pageState = this.openPages.get(pageName);
editorView.setState(editorState);

View File

@ -0,0 +1,251 @@
import { Space, SpaceEvents } from "./space";
import { safeRun } from "../util";
import { PageMeta } from "../../common/types";
import { EventEmitter } from "../../common/event";
import { Plug } from "../../plugos/plug";
const pageWatchInterval = 2000;
const trashPrefix = "_trash/";
const plugPrefix = "_plug/";
export class WatchableSpace extends EventEmitter<SpaceEvents> implements Space {
pageMetaCache = new Map<string, PageMeta>();
watchedPages = new Set<string>();
private initialPageListLoad = true;
private saving = false;
constructor(private space: Space, private trashEnabled = true) {
super();
this.on({
pageCreated: async (pageMeta) => {
if (pageMeta.name.startsWith(plugPrefix)) {
let pageData = await this.readPage(pageMeta.name);
this.emit(
"plugLoaded",
pageMeta.name.substring(plugPrefix.length),
JSON.parse(pageData.text)
);
this.watchPage(pageMeta.name);
}
},
pageChanged: async (pageMeta) => {
if (pageMeta.name.startsWith(plugPrefix)) {
let pageData = await this.readPage(pageMeta.name);
this.emit(
"plugLoaded",
pageMeta.name.substring(plugPrefix.length),
JSON.parse(pageData.text)
);
this.watchPage(pageMeta.name);
}
},
});
}
public updatePageListAsync() {
safeRun(async () => {
let newPageList = await this.space.fetchPageList();
let deletedPages = new Set<string>(this.pageMetaCache.keys());
newPageList.forEach((meta) => {
const pageName = meta.name;
const oldPageMeta = this.pageMetaCache.get(pageName);
const newPageMeta = {
name: pageName,
lastModified: meta.lastModified,
};
if (
!oldPageMeta &&
(pageName.startsWith(plugPrefix) || !this.initialPageListLoad)
) {
this.emit("pageCreated", newPageMeta);
} else if (
oldPageMeta &&
oldPageMeta.lastModified !== newPageMeta.lastModified
) {
this.emit("pageChanged", newPageMeta);
}
// Page found, not deleted
deletedPages.delete(pageName);
// Update in cache
this.pageMetaCache.set(pageName, newPageMeta);
});
for (const deletedPage of deletedPages) {
this.pageMetaCache.delete(deletedPage);
this.emit("pageDeleted", deletedPage);
}
this.emit("pageListUpdated", this.listPages());
this.initialPageListLoad = false;
});
}
watch() {
setInterval(() => {
safeRun(async () => {
if (this.saving) {
return;
}
for (const pageName of this.watchedPages) {
const oldMeta = this.pageMetaCache.get(pageName);
if (!oldMeta) {
// No longer in cache, meaning probably deleted let's unwatch
this.watchedPages.delete(pageName);
continue;
}
const newMeta = await this.space.getPageMeta(pageName);
if (oldMeta.lastModified !== newMeta.lastModified) {
this.emit("pageChanged", newMeta);
}
}
});
}, pageWatchInterval);
this.updatePageListAsync();
}
async deletePage(name: string, deleteDate?: number): Promise<void> {
await this.getPageMeta(name); // Check if page exists, if not throws Error
if (this.trashEnabled) {
let pageData = await this.readPage(name);
// Move to trash
await this.writePage(
`${trashPrefix}${name}`,
pageData.text,
true,
deleteDate
);
}
await this.space.deletePage(name, deleteDate);
this.pageMetaCache.delete(name);
this.emit("pageDeleted", name);
this.emit("pageListUpdated", new Set([...this.pageMetaCache.values()]));
}
async getPageMeta(name: string): Promise<PageMeta> {
return this.metaCacher(name, await this.space.getPageMeta(name));
}
invokeFunction(
plug: Plug<any>,
env: string,
name: string,
args: any[]
): Promise<any> {
return this.space.invokeFunction(plug, env, name, args);
}
listPages(): Set<PageMeta> {
return new Set(
[...this.pageMetaCache.values()].filter(
(pageMeta) =>
!pageMeta.name.startsWith(trashPrefix) &&
!pageMeta.name.startsWith(plugPrefix)
)
);
}
listTrash(): Set<PageMeta> {
return new Set(
[...this.pageMetaCache.values()]
.filter(
(pageMeta) =>
pageMeta.name.startsWith(trashPrefix) &&
!pageMeta.name.startsWith(plugPrefix)
)
.map((pageMeta) => ({
...pageMeta,
name: pageMeta.name.substring(trashPrefix.length),
}))
);
}
listPlugs(): Set<PageMeta> {
return new Set(
[...this.pageMetaCache.values()].filter((pageMeta) =>
pageMeta.name.startsWith(plugPrefix)
)
);
}
proxySyscall(plug: Plug<any>, name: string, args: any[]): Promise<any> {
return this.space.proxySyscall(plug, name, args);
}
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {
let pageData = await this.space.readPage(name);
this.pageMetaCache.set(name, pageData.meta);
return pageData;
}
watchPage(pageName: string) {
this.watchedPages.add(pageName);
}
unwatchPage(pageName: string) {
this.watchedPages.delete(pageName);
}
async writePage(
name: string,
text: string,
selfUpdate?: boolean,
lastModified?: number
): Promise<PageMeta> {
try {
this.saving = true;
let pageMeta = await this.space.writePage(
name,
text,
selfUpdate,
lastModified
);
if (!selfUpdate) {
this.emit("pageChanged", pageMeta);
}
return this.metaCacher(name, pageMeta);
} finally {
this.saving = false;
}
}
fetchPageList(): Promise<Set<PageMeta>> {
return this.space.fetchPageList();
}
// private pollPlugs() {
// safeRun(async () => {
// const newPlugs = await this.space.listPlugs();
// let deletedPlugs = new Set<string>(this.plugMetaCache.keys());
// for (const newPlugMeta of newPlugs) {
// const oldPlugMeta = this.plugMetaCache.get(newPlugMeta.name);
// if (
// !oldPlugMeta ||
// (oldPlugMeta && oldPlugMeta.version !== newPlugMeta.version)
// ) {
// this.emit(
// "plugLoaded",
// newPlugMeta.name,
// await this.space.loadPlug(newPlugMeta.name)
// );
// }
// // Page found, not deleted
// deletedPlugs.delete(newPlugMeta.name);
//
// // Update in cache
// this.plugMetaCache.set(newPlugMeta.name, newPlugMeta);
// }
//
// for (const deletedPlug of deletedPlugs) {
// this.plugMetaCache.delete(deletedPlug);
// this.emit("plugUnloaded", deletedPlug);
// }
// });
// }
private metaCacher(name: string, pageMeta: PageMeta): PageMeta {
this.pageMetaCache.set(name, pageMeta);
return pageMeta;
}
}

View File

@ -1,116 +1,43 @@
import { EventEmitter } from "../../common/event";
import { PageMeta } from "../../common/types";
import { safeRun } from "../util";
import { Plug } from "../../plugos/plug";
import { Manifest } from "../../common/manifest";
import { PlugMeta, Space, SpaceEvents } from "./space";
import { Space } from "./space";
const pageWatchInterval = 2000;
const plugWatchInterval = 5000;
export class HttpRestSpace extends EventEmitter<SpaceEvents> implements Space {
export class HttpRestSpace implements Space {
pageUrl: string;
pageMetaCache = new Map<string, PageMeta>();
plugMetaCache = new Map<string, PlugMeta>();
watchedPages = new Set<string>();
saving = false;
private plugUrl: string;
private initialPageListLoad = true;
constructor(url: string) {
super();
this.pageUrl = url + "/fs";
this.plugUrl = url + "/plug";
this.watch();
this.pollPlugs();
this.updatePageListAsync();
}
watchPage(pageName: string) {
this.watchedPages.add(pageName);
}
unwatchPage(pageName: string) {
this.watchedPages.delete(pageName);
}
watch() {
setInterval(() => {
safeRun(async () => {
if (this.saving) {
return;
}
for (const pageName of this.watchedPages) {
const oldMeta = this.pageMetaCache.get(pageName);
if (!oldMeta) {
// No longer in cache, meaning probably deleted let's unwatch
this.watchedPages.delete(pageName);
continue;
}
const newMeta = await this.getPageMeta(pageName);
if (oldMeta.lastModified !== newMeta.lastModified) {
console.log("Page", pageName, "changed on disk, emitting event");
this.emit("pageChanged", newMeta);
}
}
});
}, pageWatchInterval);
setInterval(() => {
safeRun(this.pollPlugs.bind(this));
}, plugWatchInterval);
}
public updatePageListAsync() {
safeRun(async () => {
let req = await fetch(this.pageUrl, {
method: "GET",
});
let deletedPages = new Set<string>(this.pageMetaCache.keys());
((await req.json()) as any[]).forEach((meta: any) => {
const pageName = meta.name;
const oldPageMeta = this.pageMetaCache.get(pageName);
const newPageMeta = {
name: pageName,
lastModified: meta.lastModified,
};
if (!oldPageMeta && !this.initialPageListLoad) {
this.emit("pageCreated", newPageMeta);
} else if (
oldPageMeta &&
oldPageMeta.lastModified !== newPageMeta.lastModified
) {
this.emit("pageChanged", newPageMeta);
}
// Page found, not deleted
deletedPages.delete(pageName);
// Update in cache
this.pageMetaCache.set(pageName, newPageMeta);
});
for (const deletedPage of deletedPages) {
this.pageMetaCache.delete(deletedPage);
this.emit("pageDeleted", deletedPage);
}
this.emit("pageListUpdated", new Set([...this.pageMetaCache.values()]));
this.initialPageListLoad = false;
public async fetchPageList(): Promise<Set<PageMeta>> {
let req = await fetch(this.pageUrl, {
method: "GET",
});
}
async listPages(): Promise<Set<PageMeta>> {
return new Set([...this.pageMetaCache.values()]);
let result = new Set<PageMeta>();
((await req.json()) as any[]).forEach((meta: any) => {
const pageName = meta.name;
result.add({
name: pageName,
lastModified: meta.lastModified,
});
});
return result;
}
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {
let res = await fetch(`${this.pageUrl}/${name}`, {
method: "GET",
});
if (res.headers.get("X-Status") === "404") {
throw new Error(`Page not found`);
}
return {
text: await res.text(),
meta: this.responseToMetaCacher(name, res),
meta: this.responseToMeta(name, res),
};
}
@ -118,23 +45,20 @@ export class HttpRestSpace extends EventEmitter<SpaceEvents> implements Space {
name: string,
text: string,
selfUpdate?: boolean,
withMeta?: PageMeta
lastModified?: number
): Promise<PageMeta> {
// TODO: withMeta ignored for now
try {
this.saving = true;
let res = await fetch(`${this.pageUrl}/${name}`, {
method: "PUT",
body: text,
});
const newMeta = this.responseToMetaCacher(name, res);
if (!selfUpdate) {
this.emit("pageChanged", newMeta);
}
return newMeta;
} finally {
this.saving = false;
}
// TODO: lastModified ignored for now
let res = await fetch(`${this.pageUrl}/${name}`, {
method: "PUT",
body: text,
headers: lastModified
? {
"Last-Modified": "" + lastModified,
}
: undefined,
});
const newMeta = this.responseToMeta(name, res);
return newMeta;
}
async deletePage(name: string): Promise<void> {
@ -144,9 +68,6 @@ export class HttpRestSpace extends EventEmitter<SpaceEvents> implements Space {
if (req.status !== 200) {
throw Error(`Failed to delete page: ${req.statusText}`);
}
this.pageMetaCache.delete(name);
this.emit("pageDeleted", name);
this.emit("pageListUpdated", new Set([...this.pageMetaCache.values()]));
}
async proxySyscall(plug: Plug<any>, name: string, args: any[]): Promise<any> {
@ -199,57 +120,17 @@ export class HttpRestSpace extends EventEmitter<SpaceEvents> implements Space {
let res = await fetch(`${this.pageUrl}/${name}`, {
method: "OPTIONS",
});
return this.responseToMetaCacher(name, res);
if (res.headers.get("X-Status") === "404") {
throw new Error(`Page not found`);
}
return this.responseToMeta(name, res);
}
public async listPlugs(): Promise<PlugMeta[]> {
let res = await fetch(`${this.plugUrl}`, {
method: "GET",
});
return (await res.json()) as PlugMeta[];
}
public async loadPlug(name: string): Promise<Manifest> {
let res = await fetch(`${this.plugUrl}/${name}`, {
method: "GET",
});
return (await res.json()) as Manifest;
}
private responseToMetaCacher(name: string, res: Response): PageMeta {
private responseToMeta(name: string, res: Response): PageMeta {
const meta = {
name,
lastModified: +(res.headers.get("Last-Modified") || "0"),
};
this.pageMetaCache.set(name, meta);
return meta;
}
private async pollPlugs(): Promise<void> {
const newPlugs = await this.listPlugs();
let deletedPlugs = new Set<string>(this.plugMetaCache.keys());
for (const newPlugMeta of newPlugs) {
const oldPlugMeta = this.plugMetaCache.get(newPlugMeta.name);
if (
!oldPlugMeta ||
(oldPlugMeta && oldPlugMeta.version !== newPlugMeta.version)
) {
this.emit(
"plugLoaded",
newPlugMeta.name,
await this.loadPlug(newPlugMeta.name)
);
}
// Page found, not deleted
deletedPlugs.delete(newPlugMeta.name);
// Update in cache
this.plugMetaCache.set(newPlugMeta.name, newPlugMeta);
}
for (const deletedPlug of deletedPlugs) {
this.plugMetaCache.delete(deletedPlug);
this.emit("plugUnloaded", deletedPlug);
}
}
}

View File

@ -1,9 +1,7 @@
import { PlugMeta, Space, SpaceEvents } from "./space";
import { EventEmitter } from "../../common/event";
import { Space } from "./space";
import { PageMeta } from "../../common/types";
import Dexie, { Table } from "dexie";
import { Plug } from "../../plugos/plug";
import { Manifest } from "../../common/manifest";
type Page = {
name: string;
@ -11,31 +9,18 @@ type Page = {
meta: PageMeta;
};
type PlugManifest = {
name: string;
manifest: Manifest;
};
export class IndexedDBSpace extends EventEmitter<SpaceEvents> implements Space {
export class IndexedDBSpace implements Space {
private pageTable: Table<Page, string>;
private plugMetaTable: Table<PlugMeta, string>;
private plugManifestTable: Table<PlugManifest, string>;
constructor(dbName: string) {
super();
const db = new Dexie(dbName);
db.version(1).stores({
page: "name",
plugMeta: "name",
plugManifest: "name",
});
this.pageTable = db.table("page");
this.plugMetaTable = db.table("plugMeta");
this.plugManifestTable = db.table("plugManifest");
}
async deletePage(name: string): Promise<void> {
this.emit("pageDeleted", name);
return this.pageTable.delete(name);
}
@ -44,7 +29,7 @@ export class IndexedDBSpace extends EventEmitter<SpaceEvents> implements Space {
if (entry) {
return entry.meta;
} else {
throw Error(`Page not found ${name}`);
throw Error(`Page not found`);
}
}
@ -57,10 +42,9 @@ export class IndexedDBSpace extends EventEmitter<SpaceEvents> implements Space {
return plug.invoke(name, args);
}
async listPages(): Promise<Set<PageMeta>> {
async fetchPageList(): Promise<Set<PageMeta>> {
let allPages = await this.pageTable.toArray();
let set = new Set(allPages.map((p) => p.meta));
this.emit("pageListUpdated", set);
return set;
}
@ -71,15 +55,9 @@ export class IndexedDBSpace extends EventEmitter<SpaceEvents> implements Space {
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {
let page = await this.pageTable.get(name);
if (page) {
return page!;
return page;
} else {
return {
text: "",
meta: {
name,
lastModified: 0,
},
};
throw new Error("Page not found");
}
}
@ -87,44 +65,17 @@ export class IndexedDBSpace extends EventEmitter<SpaceEvents> implements Space {
name: string,
text: string,
selfUpdate?: boolean,
withMeta?: PageMeta
lastModified?: number
): Promise<PageMeta> {
let meta = withMeta
? withMeta
: {
name,
lastModified: new Date().getTime(),
};
let meta = {
name,
lastModified: lastModified ? lastModified : new Date().getTime(),
};
await this.pageTable.put({
name,
text,
meta,
});
if (!selfUpdate) {
this.emit("pageChanged", meta);
}
// TODO: add pageCreated
return meta;
}
unwatchPage(pageName: string): void {}
updatePageListAsync(): void {
this.listPages();
}
watchPage(pageName: string): void {}
async listPlugs(): Promise<PlugMeta[]> {
return this.plugMetaTable.toArray();
}
async loadPlug(name: string): Promise<Manifest> {
let plugManifest = await this.plugManifestTable.get(name);
if (plugManifest) {
return plugManifest.manifest;
} else {
throw Error(`Plug not found ${name}`);
}
}
}

View File

@ -11,29 +11,20 @@ export type SpaceEvents = {
plugUnloaded: (plugName: string) => void;
};
export type PlugMeta = {
name: string;
version: number;
};
export interface Space {
// Pages
watchPage(pageName: string): void;
unwatchPage(pageName: string): void;
listPages(): Promise<Set<PageMeta>>;
fetchPageList(): Promise<Set<PageMeta>>;
readPage(name: string): Promise<{ text: string; meta: PageMeta }>;
getPageMeta(name: string): Promise<PageMeta>;
writePage(
name: string,
text: string,
selfUpdate?: boolean,
withMeta?: PageMeta
lastModified?: number
): Promise<PageMeta>;
deletePage(name: string): Promise<void>;
deletePage(name: string, deleteDate?: number): Promise<void>;
// Plugs
listPlugs(): Promise<PlugMeta[]>;
loadPlug(name: string): Promise<Manifest>;
proxySyscall(plug: Plug<any>, name: string, args: any[]): Promise<any>;
invokeFunction(
plug: Plug<any>,
@ -41,12 +32,4 @@ export interface Space {
name: string,
args: any[]
): Promise<any>;
// Events
on(handlers: Partial<SpaceEvents>): void;
off(handlers: Partial<SpaceEvents>): void;
emit(eventName: keyof SpaceEvents, ...args: any[]): void;
// TODO: Get rid of this
updatePageListAsync(): void;
}

View File

@ -1,19 +1,23 @@
import { expect, test } from "@jest/globals";
import { IndexedDBSpace } from "./indexeddb_space";
import { SpaceSync } from "./sync";
import { PageMeta } from "../../common/types";
import { WatchableSpace } from "./cache_space";
// For testing in node.js
require("fake-indexeddb/auto");
test("Test store", async () => {
let primary = new IndexedDBSpace("primary");
let secondary = new IndexedDBSpace("secondary");
let sync = new SpaceSync(primary, secondary, 0);
let primary = new WatchableSpace(new IndexedDBSpace("primary"), true);
let secondary = new WatchableSpace(new IndexedDBSpace("secondary"), true);
let sync = new SpaceSync(primary, secondary, 0, "_trash/");
async function conflictResolver(pageMeta1: PageMeta, pageMeta2: PageMeta) {}
// Write one page to primary
await primary.writePage("start", "Hello");
expect((await secondary.listPages()).size).toBe(0);
await sync.syncPages();
await sync.syncPages(conflictResolver);
expect((await secondary.listPages()).size).toBe(1);
expect((await secondary.readPage("start")).text).toBe("Hello");
let lastSync = sync.lastSync;
@ -39,24 +43,61 @@ test("Test store", async () => {
await primary.writePage("start2", "2");
await secondary.writePage("start3", "3");
await secondary.writePage("start4", "4");
await sync.syncPages();
expect((await primary.listPages()).size).toBe(5);
expect((await secondary.listPages()).size).toBe(5);
console.log("Should be no op");
expect(await sync.syncPages()).toBe(0);
console.log("Deleting pages");
// Delete some pages
await primary.deletePage("start");
await primary.deletePage("start3");
console.log("Pages", await primary.listPages());
console.log("Trash", await primary.listTrash());
await sync.syncPages();
console.log("Done");
expect((await primary.listPages()).size).toBe(3);
expect((await secondary.listPages()).size).toBe(3);
// No-op
expect(await sync.syncPages()).toBe(0);
await secondary.deletePage("start4");
await primary.deletePage("start2");
await sync.syncPages();
// Just "test" left
expect((await primary.listPages()).size).toBe(1);
expect((await secondary.listPages()).size).toBe(1);
// No-op
expect(await sync.syncPages()).toBe(0);
await secondary.writePage("start", "I'm back");
await sync.syncPages();
expect((await primary.readPage("start")).text).toBe("I'm back");
// Cause a conflict
await primary.writePage("start", "Hello 1");
await secondary.writePage("start", "Hello 2");
try {
await sync.syncPages();
// This should throw a sync conflict, so cannot be here
expect(false).toBe(true);
} catch {}
await sync.syncPages(SpaceSync.primaryConflictResolver(primary, secondary));
// Sync conflicting copy back
await sync.syncPages();
// Verify that primary won
expect((await primary.readPage("start")).text).toBe("Hello 1");
expect((await secondary.readPage("start")).text).toBe("Hello 1");
// test + start + start.conflicting copy
expect((await primary.listPages()).size).toBe(3);
expect((await secondary.listPages()).size).toBe(3);
});

View File

@ -1,22 +1,78 @@
import { WatchableSpace } from "./cache_space";
import { PageMeta } from "../../common/types";
import { Space } from "./space";
export class SpaceSync {
lastSync: number;
constructor(
private primary: Space,
private secondary: Space,
lastSync: number
) {
this.lastSync = lastSync;
private primary: WatchableSpace,
private secondary: WatchableSpace,
public lastSync: number,
private trashPrefix: string
) {}
// Strategy: Primary wins
public static primaryConflictResolver(
primary: WatchableSpace,
secondary: WatchableSpace
): (pageMeta1: PageMeta, pageMeta2: PageMeta) => Promise<void> {
return async (pageMeta1, pageMeta2) => {
const pageName = pageMeta1.name;
const revisionPageName = `${pageName}.conflicted.${pageMeta2.lastModified}`;
// Copy secondary to conflict copy
let oldPageData = await secondary.readPage(pageName);
await secondary.writePage(revisionPageName, oldPageData.text);
// Write replacement on top
let newPageData = await primary.readPage(pageName);
await secondary.writePage(
pageName,
newPageData.text,
true,
newPageData.meta.lastModified
);
};
}
async syncPages() {
async syncablePages(space: Space): Promise<PageMeta[]> {
return [...(await space.fetchPageList())].filter(
(pageMeta) => !pageMeta.name.startsWith(this.trashPrefix)
);
}
async trashPages(space: Space): Promise<PageMeta[]> {
return [...(await space.fetchPageList())]
.filter((pageMeta) => pageMeta.name.startsWith(this.trashPrefix))
.map((pageMeta) => ({
...pageMeta,
name: pageMeta.name.substring(this.trashPrefix.length),
}));
}
async syncPages(
conflictResolver?: (
pageMeta1: PageMeta,
pageMeta2: PageMeta
) => Promise<void>
): Promise<number> {
let syncOps = 0;
let allPagesPrimary = new Map(
[...(await this.primary.listPages())].map((p) => [p.name, p])
(await this.syncablePages(this.primary)).map((p) => [p.name, p])
);
let allPagesSecondary = new Map(
[...(await this.secondary.listPages())].map((p) => [p.name, p])
(await this.syncablePages(this.secondary)).map((p) => [p.name, p])
);
let allTrashPrimary = new Map(
(await this.trashPages(this.primary))
// Filter out old trash
.filter((p) => p.lastModified > this.lastSync)
.map((p) => [p.name, p])
);
let allTrashSecondary = new Map(
(await this.trashPages(this.secondary))
// Filter out old trash
.filter((p) => p.lastModified > this.lastSync)
.map((p) => [p.name, p])
);
let createdPagesOnSecondary = new Set<string>();
@ -26,6 +82,12 @@ export class SpaceSync {
let pageMetaSecondary = allPagesSecondary.get(pageMetaPrimary.name);
if (!pageMetaSecondary) {
// New page on primary
// Let's check it's not on the deleted list
if (allTrashSecondary.has(name)) {
// Explicitly deleted, let's skip
continue;
}
// Push from primary to secondary
console.log("New page on primary", name, "syncing to secondary");
let pageData = await this.primary.readPage(name);
@ -33,8 +95,9 @@ export class SpaceSync {
name,
pageData.text,
true,
pageData.meta
pageData.meta.lastModified
);
syncOps++;
createdPagesOnSecondary.add(name);
} else {
// Existing page
@ -42,7 +105,13 @@ export class SpaceSync {
// Primary updated since last sync
if (pageMetaSecondary.lastModified > this.lastSync) {
// Secondary also updated! CONFLICT
throw Error(`Sync conflict for ${name}`);
if (conflictResolver) {
await conflictResolver(pageMetaPrimary, pageMetaSecondary);
} else {
throw Error(
`Sync conflict for ${name} with no conflict resolver specified`
);
}
} else {
// Ok, not changed on secondary, push it secondary
console.log(
@ -54,9 +123,10 @@ export class SpaceSync {
await this.secondary.writePage(
name,
pageData.text,
true,
pageData.meta
false,
pageData.meta.lastModified
);
syncOps++;
}
} else if (pageMetaSecondary.lastModified > this.lastSync) {
// Secondary updated, but not primary (checked above)
@ -66,9 +136,10 @@ export class SpaceSync {
await this.primary.writePage(
name,
pageData.text,
true,
pageData.meta
false,
pageData.meta.lastModified
);
syncOps++;
} else {
// Neither updated, no-op
}
@ -76,24 +147,74 @@ export class SpaceSync {
}
// Now do a simplified version in reverse, only detecting new pages
// Finally, let's go over all pages on the secondary and see if the primary has them
for (let [name, pageMetaSecondary] of allPagesSecondary.entries()) {
if (!allPagesPrimary.has(pageMetaSecondary.name)) {
// New page on secondary
// Let's check it's not on the deleted list
if (allTrashPrimary.has(name)) {
// Explicitly deleted, let's skip
continue;
}
// Push from secondary to primary
console.log("New page on secondary", name, "pushing to primary");
let pageData = await this.secondary.readPage(name);
await this.primary.writePage(name, pageData.text, true, pageData.meta);
await this.primary.writePage(
name,
pageData.text,
true,
pageData.meta.lastModified
);
syncOps++;
}
}
// Find the latest timestamp on the primary and set it as lastSync
// And finally, let's trash some pages
for (let pageToDelete of allTrashPrimary.values()) {
if (pageToDelete.lastModified > this.lastSync) {
// New deletion
console.log("Deleting", pageToDelete.name, "on secondary");
try {
await this.secondary.deletePage(
pageToDelete.name,
pageToDelete.lastModified
);
syncOps++;
} catch (e: any) {
console.log("Page already gone", e.message);
}
}
}
for (let pageToDelete of allTrashSecondary.values()) {
if (pageToDelete.lastModified > this.lastSync) {
// New deletion
console.log("Deleting", pageToDelete.name, "on primary");
try {
await this.primary.deletePage(
pageToDelete.name,
pageToDelete.lastModified
);
syncOps++;
} catch (e: any) {
console.log("Page already gone", e.message);
}
}
}
// Find the latest timestamp and set it as lastSync
allPagesPrimary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
allPagesSecondary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
allTrashPrimary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
allTrashSecondary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
return syncOps;
}
}