From 95d182e3826ee31ca8c156b8d53b5cee11104286 Mon Sep 17 00:00:00 2001 From: Zef Hemel Date: Sun, 3 Sep 2023 21:15:17 +0200 Subject: [PATCH] Work on next-gen data store --- plugos/lib/datastore.test.ts | 67 ++++++++ plugos/lib/datastore.ts | 183 +++++++++++++++++++++ plugos/lib/deno_kv_primitives.test.ts | 11 ++ plugos/lib/deno_kv_primitives.ts | 72 ++++++++ plugos/lib/indexeddb_kv_primitives.test.ts | 13 ++ plugos/lib/indexeddb_kv_primitives.ts | 75 +++++++++ plugos/lib/kv_primitives.test.ts | 68 ++++++++ plugos/lib/kv_primitives.ts | 18 ++ plugos/lib/kv_store.deno_kv.test.ts | 5 +- web/boot.ts | 1 + 10 files changed, 511 insertions(+), 2 deletions(-) create mode 100644 plugos/lib/datastore.test.ts create mode 100644 plugos/lib/datastore.ts create mode 100644 plugos/lib/deno_kv_primitives.test.ts create mode 100644 plugos/lib/deno_kv_primitives.ts create mode 100644 plugos/lib/indexeddb_kv_primitives.test.ts create mode 100644 plugos/lib/indexeddb_kv_primitives.ts create mode 100644 plugos/lib/kv_primitives.test.ts create mode 100644 plugos/lib/kv_primitives.ts diff --git a/plugos/lib/datastore.test.ts b/plugos/lib/datastore.test.ts new file mode 100644 index 0000000..5fa8176 --- /dev/null +++ b/plugos/lib/datastore.test.ts @@ -0,0 +1,67 @@ +import "https://esm.sh/fake-indexeddb@4.0.2/auto"; +import { IndexedDBKvPrimitives } from "./indexeddb_kv_primitives.ts"; +import { DataStore } from "./datastore.ts"; +import { DenoKvPrimitives } from "./deno_kv_primitives.ts"; +import { KvPrimitives } from "./kv_primitives.ts"; +import { assertEquals } from "https://deno.land/std@0.165.0/testing/asserts.ts"; + +async function test(db: KvPrimitives) { + const dataStore = new DataStore(db); + await dataStore.set(["user", "peter"], { name: "Peter" }); + await dataStore.set(["user", "hank"], { name: "Hank" }); + let results = await dataStore.query({ + prefix: ["user"], + filter: ["=", "name", "Peter"], + }); + assertEquals(results, [{ key: ["user", "peter"], value: { name: "Peter" } }]); + await dataStore.batchSet([ + { key: ["kv", "name"], value: "Zef" }, + { key: ["kv", "data"], value: new Uint8Array([1, 2, 3]) }, + { + key: ["kv", "complicated"], + value: { + name: "Frank", + parents: ["John", "Jane"], + address: { + street: "123 Main St", + city: "San Francisco", + }, + }, + }, + ]); + assertEquals(await dataStore.get(["kv", "name"]), "Zef"); + assertEquals(await dataStore.get(["kv", "data"]), new Uint8Array([1, 2, 3])); + results = await dataStore.query({ + prefix: ["kv"], + filter: ["=", "", "Zef"], + }); + assertEquals(results, [{ key: ["kv", "name"], value: "Zef" }]); + results = await dataStore.query({ + prefix: ["kv"], + filter: ["and", ["=", "parents", "John"], [ + "=", + "address.city", + "San Francisco", + ]], + }); + assertEquals(results[0].key, ["kv", "complicated"]); +} + +Deno.test("Test Deno KV DataStore", async () => { + const tmpFile = await Deno.makeTempFile(); + const db = new DenoKvPrimitives(tmpFile); + await db.init(); + await test(db); + db.close(); + await Deno.remove(tmpFile); +}); + +Deno.test("Test IndexDB DataStore", { + sanitizeResources: false, + sanitizeOps: false, +}, async () => { + const db = new IndexedDBKvPrimitives("test"); + await db.init(); + await test(db); + db.close(); +}); diff --git a/plugos/lib/datastore.ts b/plugos/lib/datastore.ts new file mode 100644 index 0000000..1dc3a7a --- /dev/null +++ b/plugos/lib/datastore.ts @@ -0,0 +1,183 @@ +import { KvKey, KvPrimitives } from "./kv_primitives.ts"; + +export type { KvKey }; + +export type KvValue = any; + +export type KV = { + key: KvKey; + value: KvValue; +}; + +export type KvOrderBy = { + attribute: string; + desc: boolean; +}; + +export type KvQuery = { + prefix: KvKey; + filter?: KvQueryFilter; + orderBy?: KvOrderBy[]; + limit?: number; + select?: string[]; +}; + +export type KvQueryFilter = + | ["=", string, any] + | ["!=", string, any] + | ["=~", string, RegExp] + | ["!=~", string, RegExp] + | ["prefix", string, string] + | ["<", string, any] + | ["<=", string, any] + | [">", string, any] + | [">=", string, any] + | ["in", string, any[]] + | ["and", KvQueryFilter, KvQueryFilter] + | ["or", KvQueryFilter, KvQueryFilter]; + +function filterKvQuery(kvQuery: KvQueryFilter, obj: KvValue): boolean { + const [op, op1, op2] = kvQuery; + + if (op === "and") { + return filterKvQuery(op1, obj) && + filterKvQuery(op2, obj); + } else if (op === "or") { + return filterKvQuery(op1, obj) || filterKvQuery(op2, obj); + } + + // Look up the value of the attribute, supporting nested attributes via `attr.attr2.attr3`, and empty attribute value signifies the root object + let attributeVal = obj; + for (const part of op1.split(".")) { + if (!part) { + continue; + } + if (attributeVal === undefined) { + return false; + } + attributeVal = attributeVal[part]; + } + + // And apply the operator + switch (op) { + case "=": { + if (Array.isArray(attributeVal) && !Array.isArray(op2)) { + // Record property is an array, and value is a scalar: find the value in the array + if (attributeVal.includes(op2)) { + return true; + } + } else if (Array.isArray(attributeVal) && Array.isArray(obj)) { + // Record property is an array, and value is an array: find the value in the array + if (attributeVal.some((v) => obj.includes(v))) { + return true; + } + } + + return attributeVal === op2; + } + case "!=": + return attributeVal !== op2; + case "=~": + return op2.test(attributeVal); + case "!=~": + return !op2.test(attributeVal); + case "prefix": + return attributeVal.startsWith(op2); + case "<": + return attributeVal < op2; + case "<=": + return attributeVal <= op2; + case ">": + return attributeVal > op2; + case ">=": + return attributeVal >= op2; + case "in": + return op2.includes(attributeVal); + default: + throw new Error(`Unupported operator: ${op}`); + } +} + +/** + * This is the data store class you'll actually want to use, wrapping the primitives + * in a more user-friendly way + */ +export class DataStore { + constructor(private kv: KvPrimitives) { + } + + async get(key: KvKey): Promise { + return (await this.kv.batchGet([key]))[0]; + } + + batchGet(keys: KvKey[]): Promise { + return this.kv.batchGet(keys); + } + + set(key: KvKey, value: KvValue): Promise { + return this.kv.batchSet([{ key, value }]); + } + + batchSet(entries: KV[]): Promise { + return this.kv.batchSet(entries); + } + + delete(key: KvKey): Promise { + return this.kv.batchDelete([key]); + } + + batchDelete(keys: KvKey[]): Promise { + return this.kv.batchDelete(keys); + } + + async query(query: KvQuery): Promise { + const results: KV[] = []; + let itemCount = 0; + // Accumuliate results + for await (const entry of this.kv.query({ prefix: query.prefix })) { + // Filter + if (query.filter && !filterKvQuery(query.filter, entry.value)) { + continue; + } + results.push(entry); + itemCount++; + // Stop when the limit has been reached + if (itemCount === query.limit) { + break; + } + } + // Order by + if (query.orderBy) { + results.sort((a, b) => { + const aVal = a.value; + const bVal = b.value; + for (const { attribute, desc } of query.orderBy!) { + if ( + aVal[attribute] < bVal[attribute] || aVal[attribute] === undefined + ) { + return desc ? 1 : -1; + } + if ( + aVal[attribute] > bVal[attribute] || bVal[attribute] === undefined + ) { + return desc ? -1 : 1; + } + } + // Consider them equal. This helps with comparing arrays (like tags) + return 0; + }); + } + + if (query.select) { + for (let i = 0; i < results.length; i++) { + const rec = results[i].value; + const newRec: any = {}; + for (const k of query.select) { + newRec[k] = rec[k]; + } + results[i].value = newRec; + } + } + return results; + } +} diff --git a/plugos/lib/deno_kv_primitives.test.ts b/plugos/lib/deno_kv_primitives.test.ts new file mode 100644 index 0000000..3f44753 --- /dev/null +++ b/plugos/lib/deno_kv_primitives.test.ts @@ -0,0 +1,11 @@ +import { DenoKvPrimitives } from "./deno_kv_primitives.ts"; +import { allTests } from "./kv_primitives.test.ts"; + +Deno.test("Test Deno KV Primitives", async () => { + const tmpFile = await Deno.makeTempFile(); + const db = new DenoKvPrimitives(tmpFile); + await db.init(); + await allTests(db); + db.close(); + await Deno.remove(tmpFile); +}); diff --git a/plugos/lib/deno_kv_primitives.ts b/plugos/lib/deno_kv_primitives.ts new file mode 100644 index 0000000..818f848 --- /dev/null +++ b/plugos/lib/deno_kv_primitives.ts @@ -0,0 +1,72 @@ +/// + +import { KV, KvKey, KvPrimitives, KvQueryOptions } from "./kv_primitives.ts"; +const kvBatchSize = 10; + +export class DenoKvPrimitives implements KvPrimitives { + db!: Deno.Kv; + constructor(private path?: string) { + } + + async init() { + this.db = await Deno.openKv(this.path); + } + + async batchGet(keys: KvKey[]): Promise { + const results: any[] = []; + const batches: Deno.KvKey[][] = []; + for (let i = 0; i < keys.length; i += kvBatchSize) { + batches.push(keys.slice(i, i + kvBatchSize)); + } + for (const batch of batches) { + const res = await this.db.getMany(batch); + results.push(...res.map((r) => r.value === null ? undefined : r.value)); + } + return results; + } + async batchSet(entries: KV[]): Promise { + // Split into batches of kvBatchSize + const batches: KV[][] = []; + for (let i = 0; i < entries.length; i += kvBatchSize) { + batches.push(entries.slice(i, i + kvBatchSize)); + } + for (const batch of batches) { + let batchOp = this.db.atomic(); + for (const { key, value } of batch) { + batchOp = batchOp.set(key, value); + } + const res = await batchOp.commit(); + if (!res.ok) { + throw res; + } + } + } + async batchDelete(keys: KvKey[]): Promise { + const batches: KvKey[][] = []; + for (let i = 0; i < keys.length; i += kvBatchSize) { + batches.push(keys.slice(i, i + kvBatchSize)); + } + for (const batch of batches) { + let batchOp = this.db.atomic(); + for (const key of batch) { + batchOp = batchOp.delete(key); + } + const res = await batchOp.commit(); + if (!res.ok) { + throw res; + } + } + } + async *query({ prefix }: KvQueryOptions): AsyncIterableIterator { + prefix = prefix || []; + for await ( + const result of this.db.list({ prefix: prefix as Deno.KvKey }) + ) { + yield { key: result.key as KvKey, value: result.value as any }; + } + } + + close() { + this.db.close(); + } +} diff --git a/plugos/lib/indexeddb_kv_primitives.test.ts b/plugos/lib/indexeddb_kv_primitives.test.ts new file mode 100644 index 0000000..a4c8aa0 --- /dev/null +++ b/plugos/lib/indexeddb_kv_primitives.test.ts @@ -0,0 +1,13 @@ +import "https://esm.sh/fake-indexeddb@4.0.2/auto"; +import { IndexedDBKvPrimitives } from "./indexeddb_kv_primitives.ts"; +import { allTests } from "./kv_primitives.test.ts"; + +Deno.test("Test IDB key primitives", { + sanitizeResources: false, + sanitizeOps: false, +}, async () => { + const db = new IndexedDBKvPrimitives("test"); + await db.init(); + await allTests(db); + db.close(); +}); diff --git a/plugos/lib/indexeddb_kv_primitives.ts b/plugos/lib/indexeddb_kv_primitives.ts new file mode 100644 index 0000000..fb96376 --- /dev/null +++ b/plugos/lib/indexeddb_kv_primitives.ts @@ -0,0 +1,75 @@ +import { KV, KvKey, KvPrimitives, KvQueryOptions } from "./kv_primitives.ts"; +import { IDBPDatabase, openDB } from "https://esm.sh/idb@7.1.1/with-async-ittr"; + +const sep = "\uffff"; + +export class IndexedDBKvPrimitives implements KvPrimitives { + db!: IDBPDatabase; + + constructor( + private dbName: string, + private objectStoreName: string = "data", + ) { + } + + async init() { + this.db = await openDB(this.dbName, 1, { + upgrade: (db) => { + db.createObjectStore(this.objectStoreName); + }, + }); + } + + batchGet(keys: KvKey[]): Promise { + const tx = this.db.transaction(this.objectStoreName, "readonly"); + return Promise.all(keys.map((key) => tx.store.get(this.buildKey(key)))); + } + + async batchSet(entries: KV[]): Promise { + const tx = this.db.transaction(this.objectStoreName, "readwrite"); + await Promise.all([ + ...entries.map(({ key, value }) => + tx.store.put(value, this.buildKey(key)) + ), + tx.done, + ]); + } + + async batchDelete(keys: KvKey[]): Promise { + const tx = this.db.transaction(this.objectStoreName, "readwrite"); + await Promise.all([ + ...keys.map((key) => tx.store.delete(this.buildKey(key))), + tx.done, + ]); + } + + async *query({ prefix }: KvQueryOptions): AsyncIterableIterator { + const tx = this.db.transaction(this.objectStoreName, "readonly"); + prefix = prefix || []; + for await ( + const entry of tx.store.iterate(IDBKeyRange.bound( + this.buildKey([...prefix, ""]), + this.buildKey([...prefix, "\ufffe"]), + )) + ) { + yield { key: this.extractKey(entry.key), value: entry.value }; + } + } + + private buildKey(key: KvKey): string { + for (const k of key) { + if (k.includes(sep)) { + throw new Error(`Key cannot contain ${sep}`); + } + } + return key.join(sep); + } + + private extractKey(key: string): KvKey { + return key.split(sep); + } + + close() { + this.db.close(); + } +} diff --git a/plugos/lib/kv_primitives.test.ts b/plugos/lib/kv_primitives.test.ts new file mode 100644 index 0000000..3a565ec --- /dev/null +++ b/plugos/lib/kv_primitives.test.ts @@ -0,0 +1,68 @@ +import { KV, KvPrimitives } from "./kv_primitives.ts"; +import { assertEquals } from "../../test_deps.ts"; + +export async function allTests(db: KvPrimitives) { + await db.batchSet([ + { key: ["kv", "test2"], value: "Hello2" }, + { key: ["kv", "test1"], value: "Hello1" }, + { key: ["other", "random"], value: "Hello3" }, + ]); + const result = await db.batchGet([["kv", "test1"], ["kv", "test2"], [ + "kv", + "test3", + ]]); + assertEquals(result.length, 3); + assertEquals(result[0], "Hello1"); + assertEquals(result[1], "Hello2"); + assertEquals(result[2], undefined); + let counter = 0; + // Query all + for await (const _entry of db.query({})) { + counter++; + } + assertEquals(counter, 3); + + counter = 0; + // Query prefix + for await (const _entry of db.query({ prefix: ["kv"] })) { + counter++; + console.log(_entry); + } + assertEquals(counter, 2); + + // Delete a few keys + await db.batchDelete([["kv", "test1"], ["other", "random"]]); + const result2 = await db.batchGet([["kv", "test1"], ["kv", "test2"], [ + "other", + "random", + ]]); + assertEquals(result2.length, 3); + assertEquals(result2[0], undefined); + assertEquals(result2[1], "Hello2"); + assertEquals(result2[2], undefined); + + // Update a key + await db.batchSet([{ key: ["kv", "test2"], value: "Hello2.1" }]); + const [val] = await db.batchGet([["kv", "test2"]]); + assertEquals(val, "Hello2.1"); + + // Set a large batch + const largeBatch: KV[] = []; + for (let i = 0; i < 50; i++) { + largeBatch.push({ key: ["test", "test" + i], value: "Hello" }); + } + await db.batchSet(largeBatch); + const largeBatchResult: KV[] = []; + for await (const entry of db.query({ prefix: ["test"] })) { + largeBatchResult.push(entry); + } + assertEquals(largeBatchResult.length, 50); + + // Delete the large batch + await db.batchDelete(largeBatch.map((e) => e.key)); + + // Make sure they're gone + for await (const _entry of db.query({ prefix: ["test"] })) { + throw new Error("This should not happen"); + } +} diff --git a/plugos/lib/kv_primitives.ts b/plugos/lib/kv_primitives.ts new file mode 100644 index 0000000..ce9709b --- /dev/null +++ b/plugos/lib/kv_primitives.ts @@ -0,0 +1,18 @@ +export type KvKey = string[]; +export type KvValue = any; + +export type KV = { + key: KvKey; + value: KvValue; +}; + +export type KvQueryOptions = { + prefix?: KvKey; +}; + +export interface KvPrimitives { + batchGet(keys: KvKey[]): Promise<(KvValue | undefined)[]>; + batchSet(entries: KV[]): Promise; + batchDelete(keys: KvKey[]): Promise; + query(options: KvQueryOptions): AsyncIterableIterator; +} diff --git a/plugos/lib/kv_store.deno_kv.test.ts b/plugos/lib/kv_store.deno_kv.test.ts index d067647..a286996 100644 --- a/plugos/lib/kv_store.deno_kv.test.ts +++ b/plugos/lib/kv_store.deno_kv.test.ts @@ -2,7 +2,8 @@ import { assertEquals } from "../../test_deps.ts"; import { DenoKVStore } from "./kv_store.deno_kv.ts"; Deno.test("Test KV index", async () => { - const denoKv = await Deno.openKv("test.db"); + const tmpFile = await Deno.makeTempFile(); + const denoKv = await Deno.openKv(tmpFile); const kv = new DenoKVStore(denoKv); await kv.set("name", "Peter"); @@ -53,5 +54,5 @@ Deno.test("Test KV index", async () => { assertEquals(await kv.queryPrefix(""), []); denoKv.close(); - await Deno.remove("test.db"); + await Deno.remove(tmpFile); }); diff --git a/web/boot.ts b/web/boot.ts index e15f684..721040d 100644 --- a/web/boot.ts +++ b/web/boot.ts @@ -1,4 +1,5 @@ import { safeRun } from "../common/util.ts"; +import { IndexedDBKvPrimitives } from "../plugos/lib/indexeddb_kv_primitives.ts"; import { Client } from "./client.ts"; const syncMode = window.silverBulletConfig.syncOnly ||