1
0

Work on next-gen data store

This commit is contained in:
Zef Hemel 2023-09-03 21:15:17 +02:00
parent 541b347da4
commit 95d182e382
10 changed files with 511 additions and 2 deletions

View File

@ -0,0 +1,67 @@
import "https://esm.sh/fake-indexeddb@4.0.2/auto";
import { IndexedDBKvPrimitives } from "./indexeddb_kv_primitives.ts";
import { DataStore } from "./datastore.ts";
import { DenoKvPrimitives } from "./deno_kv_primitives.ts";
import { KvPrimitives } from "./kv_primitives.ts";
import { assertEquals } from "https://deno.land/std@0.165.0/testing/asserts.ts";
async function test(db: KvPrimitives) {
const dataStore = new DataStore(db);
await dataStore.set(["user", "peter"], { name: "Peter" });
await dataStore.set(["user", "hank"], { name: "Hank" });
let results = await dataStore.query({
prefix: ["user"],
filter: ["=", "name", "Peter"],
});
assertEquals(results, [{ key: ["user", "peter"], value: { name: "Peter" } }]);
await dataStore.batchSet([
{ key: ["kv", "name"], value: "Zef" },
{ key: ["kv", "data"], value: new Uint8Array([1, 2, 3]) },
{
key: ["kv", "complicated"],
value: {
name: "Frank",
parents: ["John", "Jane"],
address: {
street: "123 Main St",
city: "San Francisco",
},
},
},
]);
assertEquals(await dataStore.get(["kv", "name"]), "Zef");
assertEquals(await dataStore.get(["kv", "data"]), new Uint8Array([1, 2, 3]));
results = await dataStore.query({
prefix: ["kv"],
filter: ["=", "", "Zef"],
});
assertEquals(results, [{ key: ["kv", "name"], value: "Zef" }]);
results = await dataStore.query({
prefix: ["kv"],
filter: ["and", ["=", "parents", "John"], [
"=",
"address.city",
"San Francisco",
]],
});
assertEquals(results[0].key, ["kv", "complicated"]);
}
Deno.test("Test Deno KV DataStore", async () => {
const tmpFile = await Deno.makeTempFile();
const db = new DenoKvPrimitives(tmpFile);
await db.init();
await test(db);
db.close();
await Deno.remove(tmpFile);
});
Deno.test("Test IndexDB DataStore", {
sanitizeResources: false,
sanitizeOps: false,
}, async () => {
const db = new IndexedDBKvPrimitives("test");
await db.init();
await test(db);
db.close();
});

183
plugos/lib/datastore.ts Normal file
View File

@ -0,0 +1,183 @@
import { KvKey, KvPrimitives } from "./kv_primitives.ts";
export type { KvKey };
export type KvValue = any;
export type KV = {
key: KvKey;
value: KvValue;
};
export type KvOrderBy = {
attribute: string;
desc: boolean;
};
export type KvQuery = {
prefix: KvKey;
filter?: KvQueryFilter;
orderBy?: KvOrderBy[];
limit?: number;
select?: string[];
};
export type KvQueryFilter =
| ["=", string, any]
| ["!=", string, any]
| ["=~", string, RegExp]
| ["!=~", string, RegExp]
| ["prefix", string, string]
| ["<", string, any]
| ["<=", string, any]
| [">", string, any]
| [">=", string, any]
| ["in", string, any[]]
| ["and", KvQueryFilter, KvQueryFilter]
| ["or", KvQueryFilter, KvQueryFilter];
function filterKvQuery(kvQuery: KvQueryFilter, obj: KvValue): boolean {
const [op, op1, op2] = kvQuery;
if (op === "and") {
return filterKvQuery(op1, obj) &&
filterKvQuery(op2, obj);
} else if (op === "or") {
return filterKvQuery(op1, obj) || filterKvQuery(op2, obj);
}
// Look up the value of the attribute, supporting nested attributes via `attr.attr2.attr3`, and empty attribute value signifies the root object
let attributeVal = obj;
for (const part of op1.split(".")) {
if (!part) {
continue;
}
if (attributeVal === undefined) {
return false;
}
attributeVal = attributeVal[part];
}
// And apply the operator
switch (op) {
case "=": {
if (Array.isArray(attributeVal) && !Array.isArray(op2)) {
// Record property is an array, and value is a scalar: find the value in the array
if (attributeVal.includes(op2)) {
return true;
}
} else if (Array.isArray(attributeVal) && Array.isArray(obj)) {
// Record property is an array, and value is an array: find the value in the array
if (attributeVal.some((v) => obj.includes(v))) {
return true;
}
}
return attributeVal === op2;
}
case "!=":
return attributeVal !== op2;
case "=~":
return op2.test(attributeVal);
case "!=~":
return !op2.test(attributeVal);
case "prefix":
return attributeVal.startsWith(op2);
case "<":
return attributeVal < op2;
case "<=":
return attributeVal <= op2;
case ">":
return attributeVal > op2;
case ">=":
return attributeVal >= op2;
case "in":
return op2.includes(attributeVal);
default:
throw new Error(`Unupported operator: ${op}`);
}
}
/**
* This is the data store class you'll actually want to use, wrapping the primitives
* in a more user-friendly way
*/
export class DataStore {
constructor(private kv: KvPrimitives) {
}
async get(key: KvKey): Promise<KvValue> {
return (await this.kv.batchGet([key]))[0];
}
batchGet(keys: KvKey[]): Promise<KvValue[]> {
return this.kv.batchGet(keys);
}
set(key: KvKey, value: KvValue): Promise<void> {
return this.kv.batchSet([{ key, value }]);
}
batchSet(entries: KV[]): Promise<void> {
return this.kv.batchSet(entries);
}
delete(key: KvKey): Promise<void> {
return this.kv.batchDelete([key]);
}
batchDelete(keys: KvKey[]): Promise<void> {
return this.kv.batchDelete(keys);
}
async query(query: KvQuery): Promise<KV[]> {
const results: KV[] = [];
let itemCount = 0;
// Accumuliate results
for await (const entry of this.kv.query({ prefix: query.prefix })) {
// Filter
if (query.filter && !filterKvQuery(query.filter, entry.value)) {
continue;
}
results.push(entry);
itemCount++;
// Stop when the limit has been reached
if (itemCount === query.limit) {
break;
}
}
// Order by
if (query.orderBy) {
results.sort((a, b) => {
const aVal = a.value;
const bVal = b.value;
for (const { attribute, desc } of query.orderBy!) {
if (
aVal[attribute] < bVal[attribute] || aVal[attribute] === undefined
) {
return desc ? 1 : -1;
}
if (
aVal[attribute] > bVal[attribute] || bVal[attribute] === undefined
) {
return desc ? -1 : 1;
}
}
// Consider them equal. This helps with comparing arrays (like tags)
return 0;
});
}
if (query.select) {
for (let i = 0; i < results.length; i++) {
const rec = results[i].value;
const newRec: any = {};
for (const k of query.select) {
newRec[k] = rec[k];
}
results[i].value = newRec;
}
}
return results;
}
}

View File

@ -0,0 +1,11 @@
import { DenoKvPrimitives } from "./deno_kv_primitives.ts";
import { allTests } from "./kv_primitives.test.ts";
Deno.test("Test Deno KV Primitives", async () => {
const tmpFile = await Deno.makeTempFile();
const db = new DenoKvPrimitives(tmpFile);
await db.init();
await allTests(db);
db.close();
await Deno.remove(tmpFile);
});

View File

@ -0,0 +1,72 @@
/// <reference lib="deno.unstable" />
import { KV, KvKey, KvPrimitives, KvQueryOptions } from "./kv_primitives.ts";
const kvBatchSize = 10;
export class DenoKvPrimitives implements KvPrimitives {
db!: Deno.Kv;
constructor(private path?: string) {
}
async init() {
this.db = await Deno.openKv(this.path);
}
async batchGet(keys: KvKey[]): Promise<any[]> {
const results: any[] = [];
const batches: Deno.KvKey[][] = [];
for (let i = 0; i < keys.length; i += kvBatchSize) {
batches.push(keys.slice(i, i + kvBatchSize));
}
for (const batch of batches) {
const res = await this.db.getMany(batch);
results.push(...res.map((r) => r.value === null ? undefined : r.value));
}
return results;
}
async batchSet(entries: KV[]): Promise<void> {
// Split into batches of kvBatchSize
const batches: KV[][] = [];
for (let i = 0; i < entries.length; i += kvBatchSize) {
batches.push(entries.slice(i, i + kvBatchSize));
}
for (const batch of batches) {
let batchOp = this.db.atomic();
for (const { key, value } of batch) {
batchOp = batchOp.set(key, value);
}
const res = await batchOp.commit();
if (!res.ok) {
throw res;
}
}
}
async batchDelete(keys: KvKey[]): Promise<void> {
const batches: KvKey[][] = [];
for (let i = 0; i < keys.length; i += kvBatchSize) {
batches.push(keys.slice(i, i + kvBatchSize));
}
for (const batch of batches) {
let batchOp = this.db.atomic();
for (const key of batch) {
batchOp = batchOp.delete(key);
}
const res = await batchOp.commit();
if (!res.ok) {
throw res;
}
}
}
async *query({ prefix }: KvQueryOptions): AsyncIterableIterator<KV> {
prefix = prefix || [];
for await (
const result of this.db.list({ prefix: prefix as Deno.KvKey })
) {
yield { key: result.key as KvKey, value: result.value as any };
}
}
close() {
this.db.close();
}
}

View File

@ -0,0 +1,13 @@
import "https://esm.sh/fake-indexeddb@4.0.2/auto";
import { IndexedDBKvPrimitives } from "./indexeddb_kv_primitives.ts";
import { allTests } from "./kv_primitives.test.ts";
Deno.test("Test IDB key primitives", {
sanitizeResources: false,
sanitizeOps: false,
}, async () => {
const db = new IndexedDBKvPrimitives("test");
await db.init();
await allTests(db);
db.close();
});

View File

@ -0,0 +1,75 @@
import { KV, KvKey, KvPrimitives, KvQueryOptions } from "./kv_primitives.ts";
import { IDBPDatabase, openDB } from "https://esm.sh/idb@7.1.1/with-async-ittr";
const sep = "\uffff";
export class IndexedDBKvPrimitives implements KvPrimitives {
db!: IDBPDatabase<any>;
constructor(
private dbName: string,
private objectStoreName: string = "data",
) {
}
async init() {
this.db = await openDB(this.dbName, 1, {
upgrade: (db) => {
db.createObjectStore(this.objectStoreName);
},
});
}
batchGet(keys: KvKey[]): Promise<any[]> {
const tx = this.db.transaction(this.objectStoreName, "readonly");
return Promise.all(keys.map((key) => tx.store.get(this.buildKey(key))));
}
async batchSet(entries: KV[]): Promise<void> {
const tx = this.db.transaction(this.objectStoreName, "readwrite");
await Promise.all([
...entries.map(({ key, value }) =>
tx.store.put(value, this.buildKey(key))
),
tx.done,
]);
}
async batchDelete(keys: KvKey[]): Promise<void> {
const tx = this.db.transaction(this.objectStoreName, "readwrite");
await Promise.all([
...keys.map((key) => tx.store.delete(this.buildKey(key))),
tx.done,
]);
}
async *query({ prefix }: KvQueryOptions): AsyncIterableIterator<KV> {
const tx = this.db.transaction(this.objectStoreName, "readonly");
prefix = prefix || [];
for await (
const entry of tx.store.iterate(IDBKeyRange.bound(
this.buildKey([...prefix, ""]),
this.buildKey([...prefix, "\ufffe"]),
))
) {
yield { key: this.extractKey(entry.key), value: entry.value };
}
}
private buildKey(key: KvKey): string {
for (const k of key) {
if (k.includes(sep)) {
throw new Error(`Key cannot contain ${sep}`);
}
}
return key.join(sep);
}
private extractKey(key: string): KvKey {
return key.split(sep);
}
close() {
this.db.close();
}
}

View File

@ -0,0 +1,68 @@
import { KV, KvPrimitives } from "./kv_primitives.ts";
import { assertEquals } from "../../test_deps.ts";
export async function allTests(db: KvPrimitives) {
await db.batchSet([
{ key: ["kv", "test2"], value: "Hello2" },
{ key: ["kv", "test1"], value: "Hello1" },
{ key: ["other", "random"], value: "Hello3" },
]);
const result = await db.batchGet([["kv", "test1"], ["kv", "test2"], [
"kv",
"test3",
]]);
assertEquals(result.length, 3);
assertEquals(result[0], "Hello1");
assertEquals(result[1], "Hello2");
assertEquals(result[2], undefined);
let counter = 0;
// Query all
for await (const _entry of db.query({})) {
counter++;
}
assertEquals(counter, 3);
counter = 0;
// Query prefix
for await (const _entry of db.query({ prefix: ["kv"] })) {
counter++;
console.log(_entry);
}
assertEquals(counter, 2);
// Delete a few keys
await db.batchDelete([["kv", "test1"], ["other", "random"]]);
const result2 = await db.batchGet([["kv", "test1"], ["kv", "test2"], [
"other",
"random",
]]);
assertEquals(result2.length, 3);
assertEquals(result2[0], undefined);
assertEquals(result2[1], "Hello2");
assertEquals(result2[2], undefined);
// Update a key
await db.batchSet([{ key: ["kv", "test2"], value: "Hello2.1" }]);
const [val] = await db.batchGet([["kv", "test2"]]);
assertEquals(val, "Hello2.1");
// Set a large batch
const largeBatch: KV[] = [];
for (let i = 0; i < 50; i++) {
largeBatch.push({ key: ["test", "test" + i], value: "Hello" });
}
await db.batchSet(largeBatch);
const largeBatchResult: KV[] = [];
for await (const entry of db.query({ prefix: ["test"] })) {
largeBatchResult.push(entry);
}
assertEquals(largeBatchResult.length, 50);
// Delete the large batch
await db.batchDelete(largeBatch.map((e) => e.key));
// Make sure they're gone
for await (const _entry of db.query({ prefix: ["test"] })) {
throw new Error("This should not happen");
}
}

View File

@ -0,0 +1,18 @@
export type KvKey = string[];
export type KvValue = any;
export type KV = {
key: KvKey;
value: KvValue;
};
export type KvQueryOptions = {
prefix?: KvKey;
};
export interface KvPrimitives {
batchGet(keys: KvKey[]): Promise<(KvValue | undefined)[]>;
batchSet(entries: KV[]): Promise<void>;
batchDelete(keys: KvKey[]): Promise<void>;
query(options: KvQueryOptions): AsyncIterableIterator<KV>;
}

View File

@ -2,7 +2,8 @@ import { assertEquals } from "../../test_deps.ts";
import { DenoKVStore } from "./kv_store.deno_kv.ts"; import { DenoKVStore } from "./kv_store.deno_kv.ts";
Deno.test("Test KV index", async () => { Deno.test("Test KV index", async () => {
const denoKv = await Deno.openKv("test.db"); const tmpFile = await Deno.makeTempFile();
const denoKv = await Deno.openKv(tmpFile);
const kv = new DenoKVStore(denoKv); const kv = new DenoKVStore(denoKv);
await kv.set("name", "Peter"); await kv.set("name", "Peter");
@ -53,5 +54,5 @@ Deno.test("Test KV index", async () => {
assertEquals(await kv.queryPrefix(""), []); assertEquals(await kv.queryPrefix(""), []);
denoKv.close(); denoKv.close();
await Deno.remove("test.db"); await Deno.remove(tmpFile);
}); });

View File

@ -1,4 +1,5 @@
import { safeRun } from "../common/util.ts"; import { safeRun } from "../common/util.ts";
import { IndexedDBKvPrimitives } from "../plugos/lib/indexeddb_kv_primitives.ts";
import { Client } from "./client.ts"; import { Client } from "./client.ts";
const syncMode = window.silverBulletConfig.syncOnly || const syncMode = window.silverBulletConfig.syncOnly ||