2023-06-13 18:47:05 +00:00
import { sleep } from "../common/async_util.ts" ;
2023-05-23 18:53:53 +00:00
import type { SpacePrimitives } from "../common/spaces/space_primitives.ts" ;
import {
SpaceSync ,
SyncStatus ,
SyncStatusItem ,
} from "../common/spaces/sync.ts" ;
import { EventHook } from "../plugos/hooks/event.ts" ;
import { KVStore } from "../plugos/lib/kv_store.ts" ;
// Keeps the current sync snapshot
const syncSnapshotKey = "syncSnapshot" ;
// Keeps the start time of an ongoing sync, is reset once the sync is done
const syncStartTimeKey = "syncStartTime" ;
2023-08-07 18:42:52 +00:00
// Keeps the start time of the last full sync cycle
const syncLastFullCycleKey = "syncLastFullCycle" ;
2023-05-23 18:53:53 +00:00
// Keeps the last time an activity was registered, used to detect if a sync is still alive and whether a new one should be started already
const syncLastActivityKey = "syncLastActivity" ;
2023-07-06 14:47:50 +00:00
const syncInitialFullSyncCompletedKey = "syncInitialFullSyncCompleted" ;
2023-06-13 18:47:05 +00:00
2023-05-23 18:53:53 +00:00
// maximum time between two activities before we consider a sync crashed
2023-08-08 18:09:48 +00:00
const syncMaxIdleTimeout = 1000 * 27 ;
2023-05-23 18:53:53 +00:00
// How often to sync the whole space
2023-08-08 18:09:48 +00:00
const spaceSyncInterval = 17 * 1000 ; // Every 17s or so
2023-08-07 18:42:52 +00:00
// Used from Client
2023-08-08 18:09:48 +00:00
export const pageSyncInterval = 6000 ;
2023-05-23 18:53:53 +00:00
/ * *
* The SyncService primarily wraps the SpaceSync engine but also coordinates sync between
* different browser tabs . It is using the KVStore to keep track of sync state .
* /
export class SyncService {
spaceSync : SpaceSync ;
lastReportedSyncStatus = Date . now ( ) ;
constructor (
2023-06-13 18:47:05 +00:00
readonly localSpacePrimitives : SpacePrimitives ,
readonly remoteSpace : SpacePrimitives ,
2023-05-23 18:53:53 +00:00
private kvStore : KVStore ,
private eventHook : EventHook ,
private isSyncCandidate : ( path : string ) = > boolean ,
) {
this . spaceSync = new SpaceSync (
this . localSpacePrimitives ,
this . remoteSpace ! ,
{
conflictResolver : this.plugAwareConflictResolver.bind ( this ) ,
isSyncCandidate : this.isSyncCandidate ,
onSyncProgress : ( status ) = > {
this . registerSyncProgress ( status ) . catch ( console . error ) ;
} ,
} ,
) ;
2023-07-28 16:06:49 +00:00
eventHook . addLocalListener (
"editor:pageLoaded" ,
2023-08-05 19:09:41 +00:00
( name , _prevPage , isSynced ) = > {
2023-07-28 16:06:49 +00:00
if ( ! isSynced ) {
2023-08-05 19:09:41 +00:00
this . scheduleFileSync ( ` ${ name } .md ` ) . catch ( console . error ) ;
2023-07-28 16:06:49 +00:00
}
} ,
) ;
2023-05-23 18:53:53 +00:00
2023-08-05 19:09:41 +00:00
eventHook . addLocalListener ( "editor:pageSaved" , ( name ) = > {
2023-06-13 18:47:05 +00:00
const path = ` ${ name } .md ` ;
2023-08-05 19:09:41 +00:00
this . scheduleFileSync ( path ) . catch ( console . error ) ;
2023-05-23 18:53:53 +00:00
} ) ;
}
async isSyncing ( ) : Promise < boolean > {
const startTime = await this . kvStore . get ( syncStartTimeKey ) ;
if ( ! startTime ) {
return false ;
}
// Sync is running, but is it still alive?
const lastActivity = await this . kvStore . get ( syncLastActivityKey ) ! ;
if ( Date . now ( ) - lastActivity > syncMaxIdleTimeout ) {
// It's been too long since the last activity, let's consider this one crashed and
// reset the sync start state
await this . kvStore . del ( syncStartTimeKey ) ;
2023-08-08 15:19:43 +00:00
console . info ( "Sync crashed, resetting" ) ;
2023-05-23 18:53:53 +00:00
return false ;
}
return true ;
}
2023-07-06 14:47:50 +00:00
hasInitialSyncCompleted ( ) : Promise < boolean > {
2023-05-23 18:53:53 +00:00
// Initial sync has happened when sync progress has been reported at least once, but the syncStartTime has been reset (which happens after sync finishes)
2023-07-06 14:47:50 +00:00
return this . kvStore . has ( syncInitialFullSyncCompletedKey ) ;
2023-05-23 18:53:53 +00:00
}
2023-08-07 18:42:52 +00:00
async registerSyncStart ( fullSync : boolean ) : Promise < void > {
2023-05-23 18:53:53 +00:00
// Assumption: this is called after an isSyncing() check
2023-08-07 18:42:52 +00:00
await this . kvStore . batchSet ( [
{
key : syncStartTimeKey ,
value : Date.now ( ) ,
} ,
{
key : syncLastActivityKey ,
value : Date.now ( ) ,
} ,
. . . fullSync // If this is a full sync cycle
? [ {
key : syncLastFullCycleKey ,
value : Date.now ( ) ,
} ]
: [ ] ,
] ) ;
2023-05-23 18:53:53 +00:00
}
async registerSyncProgress ( status? : SyncStatus ) : Promise < void > {
2023-06-14 18:58:08 +00:00
// Emit a sync event at most every 2s
if ( status && this . lastReportedSyncStatus < Date . now ( ) - 2000 ) {
2023-05-23 18:53:53 +00:00
this . eventHook . dispatchEvent ( "sync:progress" , status ) ;
this . lastReportedSyncStatus = Date . now ( ) ;
await this . saveSnapshot ( status . snapshot ) ;
}
await this . kvStore . set ( syncLastActivityKey , Date . now ( ) ) ;
}
async registerSyncStop ( ) : Promise < void > {
await this . registerSyncProgress ( ) ;
await this . kvStore . del ( syncStartTimeKey ) ;
2023-07-06 14:47:50 +00:00
await this . kvStore . set ( syncInitialFullSyncCompletedKey , true ) ;
2023-06-13 18:47:05 +00:00
}
2023-05-23 18:53:53 +00:00
async getSnapshot ( ) : Promise < Map < string , SyncStatusItem > > {
const snapshot = ( await this . kvStore . get ( syncSnapshotKey ) ) || { } ;
return new Map < string , SyncStatusItem > (
Object . entries ( snapshot ) ,
) ;
}
2023-06-13 18:47:05 +00:00
// Await a moment when the sync is no longer running
2023-08-08 15:19:43 +00:00
async noOngoingSync ( timeout : number ) : Promise < void > {
2023-06-13 18:47:05 +00:00
// Not completely safe, could have race condition on setting the syncStartTimeKey
2023-08-08 15:19:43 +00:00
const startTime = Date . now ( ) ;
2023-06-13 18:47:05 +00:00
while ( await this . isSyncing ( ) ) {
2023-08-08 18:09:48 +00:00
await sleep ( 321 ) ;
2023-08-08 15:19:43 +00:00
if ( Date . now ( ) - startTime > timeout ) {
throw new Error ( "Timeout waiting for sync to finish" ) ;
}
2023-06-13 18:47:05 +00:00
}
}
2023-08-05 19:09:41 +00:00
filesScheduledForSync = new Set < string > ( ) ;
async scheduleFileSync ( path : string ) : Promise < void > {
if ( this . filesScheduledForSync . has ( path ) ) {
// Already scheduled, no need to duplicate
console . info ( ` File ${ path } already scheduled for sync ` ) ;
return ;
}
this . filesScheduledForSync . add ( path ) ;
2023-08-08 18:09:48 +00:00
await this . noOngoingSync ( 7000 ) ;
2023-08-05 19:09:41 +00:00
await this . syncFile ( path ) ;
this . filesScheduledForSync . delete ( path ) ;
}
2023-05-23 18:53:53 +00:00
start() {
2023-08-08 18:09:48 +00:00
this . syncSpace ( ) . catch ( console . error ) ;
2023-05-23 18:53:53 +00:00
setInterval ( async ( ) = > {
try {
2023-08-07 18:42:52 +00:00
if ( ! await this . isSyncing ( ) ) {
const lastFullCycle =
( await this . kvStore . get ( syncLastFullCycleKey ) ) || 0 ;
if ( lastFullCycle && Date . now ( ) - lastFullCycle > spaceSyncInterval ) {
// It's been a while since the last full cycle, let's sync the whole space
await this . syncSpace ( ) ;
}
2023-05-23 18:53:53 +00:00
}
} catch ( e : any ) {
console . error ( e ) ;
}
2023-08-07 18:42:52 +00:00
} , spaceSyncInterval / 2 ) ; // check every half the sync cycle because actually running the sync takes some time therefore we don't want to wait for the full cycle
2023-05-23 18:53:53 +00:00
}
async syncSpace ( ) : Promise < number > {
if ( await this . isSyncing ( ) ) {
2023-08-08 15:19:43 +00:00
console . log ( "Aborting space sync: already syncing" ) ;
2023-05-23 18:53:53 +00:00
return 0 ;
}
2023-08-07 18:42:52 +00:00
await this . registerSyncStart ( true ) ;
2023-05-23 18:53:53 +00:00
let operations = 0 ;
const snapshot = await this . getSnapshot ( ) ;
2023-06-13 18:47:05 +00:00
// console.log("Excluded from sync", excludedFromSync);
2023-05-23 18:53:53 +00:00
try {
2023-06-13 18:47:05 +00:00
operations = await this . spaceSync ! . syncFiles (
snapshot ,
2023-07-06 14:47:50 +00:00
( path ) = > this . isSyncCandidate ( path ) ,
2023-06-13 18:47:05 +00:00
) ;
2023-07-28 16:06:49 +00:00
await this . saveSnapshot ( snapshot ) ;
await this . registerSyncStop ( ) ;
2023-05-23 18:53:53 +00:00
this . eventHook . dispatchEvent ( "sync:success" , operations ) ;
} catch ( e : any ) {
2023-07-28 16:06:49 +00:00
await this . saveSnapshot ( snapshot ) ;
await this . registerSyncStop ( ) ;
2023-05-23 18:53:53 +00:00
this . eventHook . dispatchEvent ( "sync:error" , e . message ) ;
2023-07-28 11:54:44 +00:00
console . error ( "Sync error" , e . message ) ;
2023-05-23 18:53:53 +00:00
}
return operations ;
}
2023-07-28 16:06:49 +00:00
// Syncs a single file
2023-05-23 18:53:53 +00:00
async syncFile ( name : string ) {
2023-08-05 19:09:41 +00:00
if ( ! this . isSyncCandidate ( name ) ) {
2023-05-23 18:53:53 +00:00
return ;
}
2023-08-05 19:09:41 +00:00
if ( await this . isSyncing ( ) ) {
console . log ( "Already syncing, aborting individual file sync for" , name ) ;
2023-05-23 18:53:53 +00:00
return ;
}
2023-08-07 18:42:52 +00:00
await this . registerSyncStart ( false ) ;
2023-05-23 18:53:53 +00:00
console . log ( "Syncing file" , name ) ;
const snapshot = await this . getSnapshot ( ) ;
try {
2023-06-13 18:47:05 +00:00
let localHash : number | undefined ;
let remoteHash : number | undefined ;
2023-05-23 18:53:53 +00:00
try {
2023-07-28 19:22:06 +00:00
const localMeta = await this . localSpacePrimitives . getFileMeta ( name ) ;
2023-07-30 06:56:44 +00:00
if ( localMeta . noSync ) {
2023-07-28 19:22:06 +00:00
console . info (
2023-07-30 06:56:44 +00:00
"File marked as no sync, skipping sync in this cycle" ,
2023-07-28 19:22:06 +00:00
name ,
) ;
await this . registerSyncStop ( ) ;
return ;
}
localHash = localMeta . lastModified ;
2023-05-23 18:53:53 +00:00
} catch {
// Not present
}
try {
2023-08-08 13:00:18 +00:00
remoteHash = ( await this . remoteSpace ! . getFileMeta ( name ) ) . lastModified ;
2023-05-23 18:53:53 +00:00
} catch ( e : any ) {
if ( e . message === "Not found" ) {
// File doesn't exist remotely, that's ok
} else {
throw e ;
}
}
2023-07-28 16:06:49 +00:00
await this . spaceSync . syncFile ( snapshot , name , localHash , remoteHash ) ;
2023-05-23 18:53:53 +00:00
this . eventHook . dispatchEvent ( "sync:success" ) ;
} catch ( e : any ) {
this . eventHook . dispatchEvent ( "sync:error" , e . message ) ;
console . error ( "Sync error" , e ) ;
}
await this . saveSnapshot ( snapshot ) ;
await this . registerSyncStop ( ) ;
}
async saveSnapshot ( snapshot : Map < string , SyncStatusItem > ) {
await this . kvStore . set ( syncSnapshotKey , Object . fromEntries ( snapshot ) ) ;
}
public async plugAwareConflictResolver (
name : string ,
snapshot : Map < string , SyncStatusItem > ,
primary : SpacePrimitives ,
secondary : SpacePrimitives ,
) : Promise < number > {
if ( ! name . startsWith ( "_plug/" ) ) {
const operations = await SpaceSync . primaryConflictResolver (
name ,
snapshot ,
primary ,
secondary ,
) ;
if ( operations > 0 ) {
// Something happened -> conflict copy generated, let's report it
await this . eventHook . dispatchEvent ( "sync:conflict" , name ) ;
}
return operations ;
}
console . log (
"[sync]" ,
"Conflict in plug" ,
name ,
"will pick the version from secondary and be done with it." ,
) ;
// Read file from secondary
2023-06-13 18:47:05 +00:00
const { data , meta } = await secondary . readFile (
2023-05-23 18:53:53 +00:00
name ,
) ;
// Write file to primary
const newMeta = await primary . writeFile (
name ,
data ,
false ,
2023-07-02 09:25:32 +00:00
meta ,
2023-05-23 18:53:53 +00:00
) ;
// Update snapshot
snapshot . set ( name , [
newMeta . lastModified ,
2023-06-13 18:47:05 +00:00
meta . lastModified ,
2023-05-23 18:53:53 +00:00
] ) ;
2023-06-13 18:47:05 +00:00
2023-05-23 18:53:53 +00:00
return 1 ;
}
}