1
0

Enormous refactor adding IndexedDB space and syncing.

This commit is contained in:
Zef Hemel 2022-04-07 14:04:50 +02:00
parent eb781b9e19
commit fff2690e99
14 changed files with 168 additions and 110 deletions

View File

@ -93,17 +93,21 @@ export async function updateMaterializedQueriesOnPage(pageName: string) {
return `${startQuery}\n${results.sort().join("\n")}\n${endQuery}`;
case "link":
let uniqueLinks = new Set<string>();
console.log("Here!!");
for (let { key, page, value: name } of await scanPrefixGlobal(
`pl:${pageName}:`
)) {
console.log("Here!!");
let [, pos] = key.split(":");
if (!filter || (filter && name.includes(filter))) {
uniqueLinks.add(name);
}
}
console.log("Here!!");
for (const uniqueResult of uniqueLinks) {
results.push(`* [[${uniqueResult}]]`);
}
console.log("Here!!");
return `${startQuery}\n${results.sort().join("\n")}\n${endQuery}`;
case "item":
for (let {

View File

@ -68,6 +68,7 @@ export class ExpressServer {
// Page list
fsRouter.route("/").get(async (req, res) => {
res.header("Now-Timestamp", "" + Date.now());
res.json(await this.storage.listPages());
});

View File

@ -141,7 +141,7 @@ export class DiskStorage implements Storage {
if (lastModified) {
let d = new Date(lastModified);
console.log("Going to set the modified time", d);
await utimes(localPath, lastModified, lastModified);
await utimes(localPath, d, d);
}
// Fetch new metadata
const s = await stat(localPath);

View File

@ -2,23 +2,33 @@ import { Editor } from "./editor";
import { safeRun } from "./util";
import { WatchableSpace } from "./spaces/cache_space";
import { HttpRestSpace } from "./spaces/httprest_space";
import { IndexedDBSpace } from "./spaces/indexeddb_space";
import { SpaceSync } from "./spaces/sync";
// let localSpace = new WatchableSpace(new IndexedDBSpace("pages"), true);
// localSpace.watch();
let localSpace = new WatchableSpace(new IndexedDBSpace("pages"), true);
localSpace.watch();
let serverSpace = new WatchableSpace(new HttpRestSpace(""), true);
serverSpace.watch();
// serverSpace.watch();
// @ts-ignore
// window.syncer = async () => {
// let lastSync = +(localStorage.getItem("lastSync") || "0");
// let syncer = new SpaceSync(serverSpace, localSpace, lastSync, "_trash/");
// await syncer.syncPages(
// SpaceSync.primaryConflictResolver(serverSpace, localSpace)
// );
// localStorage.setItem("lastSync", "" + syncer.lastSync);
// console.log("Done!");
// };
let editor = new Editor(serverSpace, document.getElementById("root")!);
window.syncer = async () => {
let lastLocalSync = +(localStorage.getItem("lastLocalSync") || "0"),
lastRemoteSync = +(localStorage.getItem("lastRemoteSync") || "0");
let syncer = new SpaceSync(
serverSpace,
localSpace,
lastRemoteSync,
lastLocalSync,
"_trash/"
);
await syncer.syncPages(
SpaceSync.primaryConflictResolver(serverSpace, localSpace)
);
localStorage.setItem("lastLocalSync", "" + syncer.secondaryLastSync);
localStorage.setItem("lastRemoteSync", "" + syncer.primaryLastSync);
console.log("Done!");
};
let editor = new Editor(localSpace, document.getElementById("root")!);
safeRun(async () => {
await editor.init();

View File

@ -33,6 +33,15 @@ export function TopBar({
<FontAwesomeIcon icon={faFileLines} />
</span>
<span className="current-page">{prettyName(pageName)}</span>
<button
onClick={(e) => {
// @ts-ignore
window.syncer();
e.stopPropagation();
}}
>
Sync
</button>
{notifications.length > 0 && (
<div className="status">
{notifications.map((notification) => (

View File

@ -32,7 +32,7 @@ import { smartQuoteKeymap } from "./smart_quotes";
import { WatchableSpace } from "./spaces/cache_space";
import customMarkdownStyle from "./style";
import { editorSyscalls } from "./syscalls/editor";
import { indexerSyscalls } from "./syscalls/indexer";
import { indexerSyscalls } from "./syscalls";
import { spaceSyscalls } from "./syscalls/space";
import { Action, AppViewState, initialViewState } from "./types";
import { SilverBulletHooks } from "../common/manifest";

View File

@ -8,7 +8,7 @@ const pageWatchInterval = 2000;
const trashPrefix = "_trash/";
const plugPrefix = "_plug/";
export class WatchableSpace extends EventEmitter<SpaceEvents> implements Space {
export class WatchableSpace extends EventEmitter<SpaceEvents> {
pageMetaCache = new Map<string, PageMeta>();
watchedPages = new Set<string>();
private initialPageListLoad = true;
@ -46,7 +46,7 @@ export class WatchableSpace extends EventEmitter<SpaceEvents> implements Space {
safeRun(async () => {
let newPageList = await this.space.fetchPageList();
let deletedPages = new Set<string>(this.pageMetaCache.keys());
newPageList.forEach((meta) => {
newPageList.pages.forEach((meta) => {
const pageName = meta.name;
const oldPageMeta = this.pageMetaCache.get(pageName);
const newPageMeta = {
@ -112,11 +112,11 @@ export class WatchableSpace extends EventEmitter<SpaceEvents> implements Space {
await this.writePage(
`${trashPrefix}${name}`,
pageData.text,
true,
false,
deleteDate
);
}
await this.space.deletePage(name, deleteDate);
await this.space.deletePage(name);
this.pageMetaCache.delete(name);
this.emit("pageDeleted", name);
@ -210,7 +210,7 @@ export class WatchableSpace extends EventEmitter<SpaceEvents> implements Space {
}
}
fetchPageList(): Promise<Set<PageMeta>> {
fetchPageList(): Promise<{ pages: Set<PageMeta>; nowTimestamp: number }> {
return this.space.fetchPageList();
}

View File

@ -11,7 +11,10 @@ export class HttpRestSpace implements Space {
this.plugUrl = url + "/plug";
}
public async fetchPageList(): Promise<Set<PageMeta>> {
public async fetchPageList(): Promise<{
pages: Set<PageMeta>;
nowTimestamp: number;
}> {
let req = await fetch(this.pageUrl, {
method: "GET",
});
@ -25,7 +28,10 @@ export class HttpRestSpace implements Space {
});
});
return result;
return {
pages: result,
nowTimestamp: +req.headers.get("Now-Timestamp")!,
};
}
async readPage(name: string): Promise<{ text: string; meta: PageMeta }> {

View File

@ -12,7 +12,7 @@ type Page = {
export class IndexedDBSpace implements Space {
private pageTable: Table<Page, string>;
constructor(dbName: string) {
constructor(dbName: string, readonly timeSkew: number = 0) {
const db = new Dexie(dbName);
db.version(1).stores({
page: "name",
@ -42,13 +42,19 @@ export class IndexedDBSpace implements Space {
return plug.invoke(name, args);
}
async fetchPageList(): Promise<Set<PageMeta>> {
async fetchPageList(): Promise<{
pages: Set<PageMeta>;
nowTimestamp: number;
}> {
let allPages = await this.pageTable.toArray();
let set = new Set(allPages.map((p) => p.meta));
return set;
return {
pages: new Set(allPages.map((p) => p.meta)),
nowTimestamp: Date.now() + this.timeSkew,
};
}
proxySyscall(plug: Plug<any>, name: string, args: any[]): Promise<any> {
console.log("Going this", name);
return plug.syscall(name, args);
}
@ -69,7 +75,7 @@ export class IndexedDBSpace implements Space {
): Promise<PageMeta> {
let meta = {
name,
lastModified: lastModified ? lastModified : new Date().getTime(),
lastModified: lastModified ? lastModified : Date.now() + this.timeSkew,
};
await this.pageTable.put({
name,

View File

@ -13,7 +13,7 @@ export type SpaceEvents = {
export interface Space {
// Pages
fetchPageList(): Promise<Set<PageMeta>>;
fetchPageList(): Promise<{ pages: Set<PageMeta>; nowTimestamp: number }>;
readPage(name: string): Promise<{ text: string; meta: PageMeta }>;
getPageMeta(name: string): Promise<PageMeta>;
writePage(
@ -22,7 +22,7 @@ export interface Space {
selfUpdate?: boolean,
lastModified?: number
): Promise<PageMeta>;
deletePage(name: string, deleteDate?: number): Promise<void>;
deletePage(name: string): Promise<void>;
// Plugs
proxySyscall(plug: Plug<any>, name: string, args: any[]): Promise<any>;

View File

@ -9,32 +9,33 @@ require("fake-indexeddb/auto");
test("Test store", async () => {
let primary = new WatchableSpace(new IndexedDBSpace("primary"), true);
let secondary = new WatchableSpace(new IndexedDBSpace("secondary"), true);
let sync = new SpaceSync(primary, secondary, 0, "_trash/");
let secondary = new WatchableSpace(
new IndexedDBSpace("secondary", -5000),
true
);
let sync = new SpaceSync(primary, secondary, 0, 0, "_trash/");
async function conflictResolver(pageMeta1: PageMeta, pageMeta2: PageMeta) {}
// Write one page to primary
await primary.writePage("start", "Hello");
expect((await secondary.listPages()).size).toBe(0);
await sync.syncPages(conflictResolver);
await syncPages(conflictResolver);
expect((await secondary.listPages()).size).toBe(1);
expect((await secondary.readPage("start")).text).toBe("Hello");
let lastSync = sync.lastSync;
// Should be a no-op
await sync.syncPages();
expect(sync.lastSync).toBe(lastSync);
expect(await syncPages()).toBe(0);
// Now let's make a change on the secondary
await secondary.writePage("start", "Hello!!");
await secondary.writePage("test", "Test page");
// And sync it
await sync.syncPages();
await syncPages();
expect((await primary.listPages()).size).toBe(2);
expect((await secondary.listPages()).size).toBe(2);
expect(primary.listPages().size).toBe(2);
expect(secondary.listPages().size).toBe(2);
expect((await primary.readPage("start")).text).toBe("Hello!!");
@ -43,12 +44,12 @@ test("Test store", async () => {
await primary.writePage("start2", "2");
await secondary.writePage("start3", "3");
await secondary.writePage("start4", "4");
await sync.syncPages();
await syncPages();
expect((await primary.listPages()).size).toBe(5);
expect((await secondary.listPages()).size).toBe(5);
expect(await sync.syncPages()).toBe(0);
expect(await syncPages()).toBe(0);
console.log("Deleting pages");
// Delete some pages
@ -58,29 +59,29 @@ test("Test store", async () => {
console.log("Pages", await primary.listPages());
console.log("Trash", await primary.listTrash());
await sync.syncPages();
await syncPages();
expect((await primary.listPages()).size).toBe(3);
expect((await secondary.listPages()).size).toBe(3);
// No-op
expect(await sync.syncPages()).toBe(0);
expect(await syncPages()).toBe(0);
await secondary.deletePage("start4");
await primary.deletePage("start2");
await sync.syncPages();
await syncPages();
// Just "test" left
expect((await primary.listPages()).size).toBe(1);
expect((await secondary.listPages()).size).toBe(1);
// No-op
expect(await sync.syncPages()).toBe(0);
expect(await syncPages()).toBe(0);
await secondary.writePage("start", "I'm back");
await sync.syncPages();
await syncPages();
expect((await primary.readPage("start")).text).toBe("I'm back");
@ -88,10 +89,10 @@ test("Test store", async () => {
await primary.writePage("start", "Hello 1");
await secondary.writePage("start", "Hello 2");
await sync.syncPages(SpaceSync.primaryConflictResolver(primary, secondary));
await syncPages(SpaceSync.primaryConflictResolver(primary, secondary));
// Sync conflicting copy back
await sync.syncPages();
await syncPages();
// Verify that primary won
expect((await primary.readPage("start")).text).toBe("Hello 1");
@ -100,4 +101,23 @@ test("Test store", async () => {
// test + start + start.conflicting copy
expect((await primary.listPages()).size).toBe(3);
expect((await secondary.listPages()).size).toBe(3);
async function syncPages(
conflictResolver?: (
pageMeta1: PageMeta,
pageMeta2: PageMeta
) => Promise<void>
): Promise<number> {
// Awesome practice: adding sleeps to fix issues!
await sleep(2);
let n = await sync.syncPages(conflictResolver);
await sleep(2);
return n;
}
});
function sleep(ms: number = 5): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}

View File

@ -6,7 +6,8 @@ export class SpaceSync {
constructor(
private primary: WatchableSpace,
private secondary: WatchableSpace,
public lastSync: number,
public primaryLastSync: number,
public secondaryLastSync: number,
private trashPrefix: string
) {}
@ -33,14 +34,20 @@ export class SpaceSync {
};
}
async syncablePages(space: Space): Promise<PageMeta[]> {
return [...(await space.fetchPageList())].filter(
(pageMeta) => !pageMeta.name.startsWith(this.trashPrefix)
);
async syncablePages(
space: WatchableSpace
): Promise<{ pages: PageMeta[]; nowTimestamp: number }> {
let fetchResult = await space.fetchPageList();
return {
pages: [...fetchResult.pages].filter(
(pageMeta) => !pageMeta.name.startsWith(this.trashPrefix)
),
nowTimestamp: fetchResult.nowTimestamp,
};
}
async trashPages(space: Space): Promise<PageMeta[]> {
return [...(await space.fetchPageList())]
return [...(await space.fetchPageList()).pages]
.filter((pageMeta) => pageMeta.name.startsWith(this.trashPrefix))
.map((pageMeta) => ({
...pageMeta,
@ -56,27 +63,28 @@ export class SpaceSync {
): Promise<number> {
let syncOps = 0;
let allPagesPrimary = new Map(
(await this.syncablePages(this.primary)).map((p) => [p.name, p])
);
let { pages: primaryAllPagesSet, nowTimestamp: primarySyncTimestamp } =
await this.syncablePages(this.primary);
let allPagesPrimary = new Map(primaryAllPagesSet.map((p) => [p.name, p]));
let { pages: secondaryAllPagesSet, nowTimestamp: secondarySyncTimestamp } =
await this.syncablePages(this.secondary);
let allPagesSecondary = new Map(
(await this.syncablePages(this.secondary)).map((p) => [p.name, p])
secondaryAllPagesSet.map((p) => [p.name, p])
);
let allTrashPrimary = new Map(
(await this.trashPages(this.primary))
// Filter out old trash
.filter((p) => p.lastModified > this.lastSync)
.filter((p) => p.lastModified > this.primaryLastSync)
.map((p) => [p.name, p])
);
let allTrashSecondary = new Map(
(await this.trashPages(this.secondary))
// Filter out old trash
.filter((p) => p.lastModified > this.lastSync)
.filter((p) => p.lastModified > this.secondaryLastSync)
.map((p) => [p.name, p])
);
let createdPagesOnSecondary = new Set<string>();
// Iterate over all pages on the primary first
for (let [name, pageMetaPrimary] of allPagesPrimary.entries()) {
let pageMetaSecondary = allPagesSecondary.get(pageMetaPrimary.name);
@ -95,15 +103,14 @@ export class SpaceSync {
name,
pageData.text,
true,
pageData.meta.lastModified
secondarySyncTimestamp // The reason for this is to not include it in the next sync cycle, we cannot blindly use the lastModified date due to time skew
);
syncOps++;
createdPagesOnSecondary.add(name);
} else {
// Existing page
if (pageMetaPrimary.lastModified > this.lastSync) {
if (pageMetaPrimary.lastModified > this.primaryLastSync) {
// Primary updated since last sync
if (pageMetaSecondary.lastModified > this.lastSync) {
if (pageMetaSecondary.lastModified > this.secondaryLastSync) {
// Secondary also updated! CONFLICT
if (conflictResolver) {
await conflictResolver(pageMetaPrimary, pageMetaSecondary);
@ -124,11 +131,11 @@ export class SpaceSync {
name,
pageData.text,
false,
pageData.meta.lastModified
secondarySyncTimestamp
);
syncOps++;
}
} else if (pageMetaSecondary.lastModified > this.lastSync) {
} else if (pageMetaSecondary.lastModified > this.secondaryLastSync) {
// Secondary updated, but not primary (checked above)
// Push from secondary to primary
console.log("Changed page on secondary", name, "syncing to primary");
@ -137,7 +144,7 @@ export class SpaceSync {
name,
pageData.text,
false,
pageData.meta.lastModified
primarySyncTimestamp
);
syncOps++;
} else {
@ -161,8 +168,8 @@ export class SpaceSync {
await this.primary.writePage(
name,
pageData.text,
true,
pageData.meta.lastModified
false,
primarySyncTimestamp
);
syncOps++;
}
@ -170,50 +177,45 @@ export class SpaceSync {
// And finally, let's trash some pages
for (let pageToDelete of allTrashPrimary.values()) {
if (pageToDelete.lastModified > this.lastSync) {
// New deletion
console.log("Deleting", pageToDelete.name, "on secondary");
try {
await this.secondary.deletePage(
pageToDelete.name,
pageToDelete.lastModified
);
syncOps++;
} catch (e: any) {
console.log("Page already gone", e.message);
}
console.log("Deleting", pageToDelete.name, "on secondary");
try {
await this.secondary.deletePage(
pageToDelete.name,
secondarySyncTimestamp
);
syncOps++;
} catch (e: any) {
console.log("Page already gone", e.message);
}
}
for (let pageToDelete of allTrashSecondary.values()) {
if (pageToDelete.lastModified > this.lastSync) {
// New deletion
console.log("Deleting", pageToDelete.name, "on primary");
try {
await this.primary.deletePage(
pageToDelete.name,
pageToDelete.lastModified
);
syncOps++;
} catch (e: any) {
console.log("Page already gone", e.message);
}
console.log("Deleting", pageToDelete.name, "on primary");
try {
await this.primary.deletePage(pageToDelete.name, primarySyncTimestamp);
syncOps++;
} catch (e: any) {
console.log("Page already gone", e.message);
}
}
// Setting last sync time to the timestamps we got back when fetching the page lists on each end
this.primaryLastSync = primarySyncTimestamp;
this.secondaryLastSync = secondarySyncTimestamp;
// Find the latest timestamp and set it as lastSync
allPagesPrimary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
allPagesSecondary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
allTrashPrimary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
allTrashSecondary.forEach((pageMeta) => {
this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
});
// allPagesPrimary.forEach((pageMeta) => {
// this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
// });
// allPagesSecondary.forEach((pageMeta) => {
// this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
// });
// allTrashPrimary.forEach((pageMeta) => {
// this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
// });
// allTrashSecondary.forEach((pageMeta) => {
// this.lastSync = Math.max(this.lastSync, pageMeta.lastModified);
// });
return syncOps;
}

View File

@ -1,8 +1,8 @@
import { Space } from "../spaces/space";
import { SysCallMapping } from "../../plugos/system";
import { proxySyscalls } from "../../plugos/syscalls/transport";
import { WatchableSpace } from "../spaces/cache_space";
export function indexerSyscalls(space: Space): SysCallMapping {
export function indexerSyscalls(space: WatchableSpace): SysCallMapping {
return proxySyscalls(
[
"index.scanPrefixForPage",

View File

@ -1,7 +1,7 @@
import { SysCallMapping } from "../../plugos/system";
import { Space } from "../spaces/space";
import { WatchableSpace } from "../spaces/cache_space";
export function systemSyscalls(space: Space): SysCallMapping {
export function systemSyscalls(space: WatchableSpace): SysCallMapping {
return {
"system.invokeFunction": async (
ctx,