WIP: store and get relation between rooms and channels:

* fix rebuildData
* write data to disk
This commit is contained in:
John Livingston 2023-09-11 18:13:57 +02:00
parent e4683cf282
commit 28faad6bbe
No known key found for this signature in database
GPG Key ID: B17B5640CE66CDBC
2 changed files with 62 additions and 6 deletions

View File

@ -25,6 +25,9 @@ class RoomChannel {
protected channel2Rooms: Map<number, Map<string, true>> = new Map<number, Map<string, true>>() protected channel2Rooms: Map<number, Map<string, true>> = new Map<number, Map<string, true>>()
protected needSync: boolean = false protected needSync: boolean = false
protected syncTimeout: ReturnType<typeof setTimeout> | undefined
protected isWriting: boolean = false
constructor (params: { constructor (params: {
options: RegisterServerOptions options: RegisterServerOptions
prosodyDomain: string prosodyDomain: string
@ -68,7 +71,9 @@ class RoomChannel {
*/ */
public static async destroySingleton (): Promise<void> { public static async destroySingleton (): Promise<void> {
if (!singleton) { return } if (!singleton) { return }
singleton.cancelScheduledSync()
await singleton.sync() await singleton.sync()
singleton.cancelScheduledSync() // in case sync rescheduled... we will lose data, but they could be rebuild later
singleton = undefined singleton = undefined
} }
@ -199,8 +204,9 @@ class RoomChannel {
channelId = channelId.toString() channelId = channelId.toString()
if (!(channelId in data)) { if (!(channelId in data)) {
this.logger.debug(`Room ${room.localpart} is associated to channel ${channelId}`) this.logger.debug(`Room ${room.localpart} is associated to channel ${channelId}`)
data[channelId] = [room.localpart] data[channelId] = []
} }
data[channelId].push(room.localpart)
} }
// This part must be done atomicly: // This part must be done atomicly:
@ -214,8 +220,28 @@ class RoomChannel {
*/ */
public async sync (): Promise<void> { public async sync (): Promise<void> {
if (!this.needSync) { return } if (!this.needSync) { return }
this.logger.error('sync Not implemented yet')
this.needSync = false // Note: must be done at the right moment if (this.isWriting) {
this.logger.info('Already writing, scheduling a new sync')
this.scheduleSync()
return
}
this.logger.info('Syncing...')
this.isWriting = true
try {
const data = this._serializeData() // must be atomic
this.needSync = false // Note: must be done atomicly with the read
await fs.promises.mkdir(path.dirname(this.dataFilePath), { recursive: true })
await fs.promises.writeFile(this.dataFilePath, JSON.stringify(data))
this.logger.info('Syncing done.')
} catch (err) {
this.logger.error(err as string)
this.logger.error('Syncing failed.')
this.needSync = true
} finally {
this.isWriting = false
}
} }
/** /**
@ -224,7 +250,26 @@ class RoomChannel {
*/ */
public scheduleSync (): void { public scheduleSync (): void {
if (!this.needSync) { return } if (!this.needSync) { return }
this.logger.error('scheduleSync Not implemented yet') if (this.syncTimeout) {
// Already scheduled... nothing to do
this.logger.debug('There is already a sync scheduled, skipping.')
return
}
this.logger.info('Scheduling a new sync...')
this.syncTimeout = setTimeout(() => {
this.syncTimeout = undefined
this.logger.info('Running scheduled sync')
this.sync().then(() => {}, (err) => {
this.logger.error(err)
// We will not re-schedule the sync, to avoid flooding error log if there is an issue with the server
})
}, 100)
}
public cancelScheduledSync (): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout)
}
} }
/** /**
@ -351,6 +396,18 @@ class RoomChannel {
return splits[0] return splits[0]
} }
protected _serializeData (): any {
const data: any = {}
this.channel2Rooms.forEach((rooms, channelId) => {
const a: string[] = []
rooms.forEach((_val, roomJID) => {
a.push(roomJID)
})
data[channelId.toString()] = a
})
return data
}
} }
export { export {
@ -358,4 +415,3 @@ export {
} }
// TODO: schedule rebuild every X hours/days // TODO: schedule rebuild every X hours/days
// TODO: write to disk, debouncing writes

View File

@ -46,7 +46,7 @@ async function register (options: RegisterServerOptions): Promise<any> {
await ensureProsodyRunning(options) await ensureProsodyRunning(options)
if (roomChannelNeedsDataInit) { if (roomChannelNeedsDataInit) {
logger.info('The RoomChannel singleton has not found data, we must rebuild') logger.info('The RoomChannel singleton has not found any data, we must rebuild')
// no need to wait here, can be done without await. // no need to wait here, can be done without await.
roomChannelSingleton.rebuildData().then( roomChannelSingleton.rebuildData().then(
() => { logger.info('RoomChannel singleton rebuild done') }, () => { logger.info('RoomChannel singleton rebuild done') },