import { EventEmitter } from "../../common/event"; import { PageMeta } from "../../common/types"; import { safeRun } from "../util"; import { Plug } from "../../plugos/plug"; import { Manifest } from "../../common/manifest"; import { PlugMeta, Space, SpaceEvents } from "./space"; const pageWatchInterval = 2000; const plugWatchInterval = 5000; export class HttpRestSpace extends EventEmitter implements Space { pageUrl: string; pageMetaCache = new Map(); plugMetaCache = new Map(); watchedPages = new Set(); saving = false; private plugUrl: string; private initialPageListLoad = true; constructor(url: string) { super(); this.pageUrl = url + "/fs"; this.plugUrl = url + "/plug"; this.watch(); this.pollPlugs(); this.updatePageListAsync(); } watchPage(pageName: string) { this.watchedPages.add(pageName); } unwatchPage(pageName: string) { this.watchedPages.delete(pageName); } watch() { setInterval(() => { safeRun(async () => { if (this.saving) { return; } for (const pageName of this.watchedPages) { const oldMeta = this.pageMetaCache.get(pageName); if (!oldMeta) { // No longer in cache, meaning probably deleted let's unwatch this.watchedPages.delete(pageName); continue; } const newMeta = await this.getPageMeta(pageName); if (oldMeta.lastModified !== newMeta.lastModified) { console.log("Page", pageName, "changed on disk, emitting event"); this.emit("pageChanged", newMeta); } } }); }, pageWatchInterval); setInterval(() => { safeRun(this.pollPlugs.bind(this)); }, plugWatchInterval); } public updatePageListAsync() { safeRun(async () => { let req = await fetch(this.pageUrl, { method: "GET", }); let deletedPages = new Set(this.pageMetaCache.keys()); ((await req.json()) as any[]).forEach((meta: any) => { const pageName = meta.name; const oldPageMeta = this.pageMetaCache.get(pageName); const newPageMeta = { name: pageName, lastModified: meta.lastModified, }; if (!oldPageMeta && !this.initialPageListLoad) { this.emit("pageCreated", newPageMeta); } else if ( oldPageMeta && oldPageMeta.lastModified !== newPageMeta.lastModified ) { this.emit("pageChanged", newPageMeta); } // Page found, not deleted deletedPages.delete(pageName); // Update in cache this.pageMetaCache.set(pageName, newPageMeta); }); for (const deletedPage of deletedPages) { this.pageMetaCache.delete(deletedPage); this.emit("pageDeleted", deletedPage); } this.emit("pageListUpdated", new Set([...this.pageMetaCache.values()])); this.initialPageListLoad = false; }); } async listPages(): Promise> { return new Set([...this.pageMetaCache.values()]); } async readPage(name: string): Promise<{ text: string; meta: PageMeta }> { let res = await fetch(`${this.pageUrl}/${name}`, { method: "GET", }); return { text: await res.text(), meta: this.responseToMetaCacher(name, res), }; } async writePage( name: string, text: string, selfUpdate?: boolean, withMeta?: PageMeta ): Promise { // TODO: withMeta ignored for now try { this.saving = true; let res = await fetch(`${this.pageUrl}/${name}`, { method: "PUT", body: text, }); const newMeta = this.responseToMetaCacher(name, res); if (!selfUpdate) { this.emit("pageChanged", newMeta); } return newMeta; } finally { this.saving = false; } } async deletePage(name: string): Promise { let req = await fetch(`${this.pageUrl}/${name}`, { method: "DELETE", }); if (req.status !== 200) { throw Error(`Failed to delete page: ${req.statusText}`); } this.pageMetaCache.delete(name); this.emit("pageDeleted", name); this.emit("pageListUpdated", new Set([...this.pageMetaCache.values()])); } async proxySyscall(plug: Plug, name: string, args: any[]): Promise { let req = await fetch(`${this.plugUrl}/${plug.name}/syscall/${name}`, { method: "POST", headers: { "Content-type": "application/json", }, body: JSON.stringify(args), }); if (req.status !== 200) { let error = await req.text(); throw Error(error); } if (req.headers.get("Content-length") === "0") { return; } return await req.json(); } async invokeFunction( plug: Plug, env: string, name: string, args: any[] ): Promise { // Invoke locally if (!env || env === "client") { return plug.invoke(name, args); } // Or dispatch to server let req = await fetch(`${this.plugUrl}/${plug.name}/function/${name}`, { method: "POST", headers: { "Content-type": "application/json", }, body: JSON.stringify(args), }); if (req.status !== 200) { let error = await req.text(); throw Error(error); } if (req.headers.get("Content-length") === "0") { return; } return await req.json(); } async getPageMeta(name: string): Promise { let res = await fetch(`${this.pageUrl}/${name}`, { method: "OPTIONS", }); return this.responseToMetaCacher(name, res); } public async listPlugs(): Promise { let res = await fetch(`${this.plugUrl}`, { method: "GET", }); return (await res.json()) as PlugMeta[]; } public async loadPlug(name: string): Promise { let res = await fetch(`${this.plugUrl}/${name}`, { method: "GET", }); return (await res.json()) as Manifest; } private responseToMetaCacher(name: string, res: Response): PageMeta { const meta = { name, lastModified: +(res.headers.get("Last-Modified") || "0"), }; this.pageMetaCache.set(name, meta); return meta; } private async pollPlugs(): Promise { const newPlugs = await this.listPlugs(); let deletedPlugs = new Set(this.plugMetaCache.keys()); for (const newPlugMeta of newPlugs) { const oldPlugMeta = this.plugMetaCache.get(newPlugMeta.name); if ( !oldPlugMeta || (oldPlugMeta && oldPlugMeta.version !== newPlugMeta.version) ) { this.emit( "plugLoaded", newPlugMeta.name, await this.loadPlug(newPlugMeta.name) ); } // Page found, not deleted deletedPlugs.delete(newPlugMeta.name); // Update in cache this.plugMetaCache.set(newPlugMeta.name, newPlugMeta); } for (const deletedPlug of deletedPlugs) { this.plugMetaCache.delete(deletedPlug); this.emit("plugUnloaded", deletedPlug); } } }