Files
ncd-fe/packages/pl-api/lib/client/streaming.ts
matty 0384f29f8b
Some checks failed
Nicolium CI / Test and upload artifacts (22.x) (push) Has been cancelled
Nicolium CI / release (push) Has been cancelled
Nicolium CI / deploy (push) Has been cancelled
pl-api CI / Test for pl-api formatting (22.x) (push) Has been cancelled
pl-hooks CI / Test for a successful build (22.x) (push) Has been cancelled
minor bug fixes and summat
2026-03-31 23:44:18 +00:00

195 lines
6.2 KiB
TypeScript

import { WebSocket } from 'isows';
import * as v from 'valibot';
import { streamingEventSchema } from '@/entities';
import { buildFullPath } from '@/utils/url';
import type { PlApiBaseClient } from '@/client-base';
import type { StreamingEvent } from '@/entities';
const MAX_RECONNECT_DELAY = 30_000;
const INITIAL_RECONNECT_DELAY = 1_000;
const streaming = (client: PlApiBaseClient) => ({
/**
* Check if the server is alive
* Verify that the streaming service is alive before connecting to it
* @see {@link https://docs.joinmastodon.org/methods/streaming/#health}
*/
health: async () => {
const response = await client.request('/api/v1/streaming/health');
return v.parse(v.literal('OK'), response.json);
},
/**
* Establishing a WebSocket connection
* Open a multiplexed WebSocket connection to receive events.
* @see {@link https://docs.joinmastodon.org/methods/streaming/#websocket}
*/
connect: () => {
if (client.socket) return client.socket;
let listeners: Array<{ listener: (event: StreamingEvent) => any; stream?: string }> = [];
let disconnectCallbacks: Array<() => void> = [];
let reconnectCallbacks: Array<() => void> = [];
let subscriptions: Array<{ stream: string; params?: { list?: string; tag?: string } }> = [];
let ws: WebSocket;
let reconnectDelay = INITIAL_RECONNECT_DELAY;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let intentionallyClosed = false;
const buildPath = () =>
buildFullPath('/api/v1/streaming', client.instanceInformation?.configuration.urls.streaming, {
access_token: client.accessToken,
});
const createWebSocket = () => {
const path = buildPath();
ws = new WebSocket(path, client.accessToken);
const queue: Array<() => any> = [];
const enqueue = (fn: () => any) =>
ws.readyState === WebSocket.CONNECTING ? queue.push(fn) : fn();
ws.onopen = () => {
queue.forEach((fn) => fn());
queue.length = 0;
// Reset backoff on successful connection
reconnectDelay = INITIAL_RECONNECT_DELAY;
// Re-subscribe to all active streams
for (const sub of subscriptions) {
ws.send(JSON.stringify({ type: 'subscribe', stream: sub.stream, ...sub.params }));
}
// Notify reconnect listeners (not on first connect)
if (reconnectCallbacks.length > 0) {
reconnectCallbacks.forEach((fn) => fn());
}
};
ws.onmessage = (event) => {
const message = v.parse(streamingEventSchema, JSON.parse(event.data as string));
listeners.filter(
({ listener, stream }) =>
(!stream || message.stream.includes(stream)) && listener(message),
);
};
ws.onerror = () => {
disconnectCallbacks.forEach((fn) => fn());
};
ws.onclose = () => {
if (intentionallyClosed) {
client.socket = undefined;
disconnectCallbacks.forEach((fn) => fn());
return;
}
disconnectCallbacks.forEach((fn) => fn());
scheduleReconnect();
};
// Expose enqueue for subscribe/unsubscribe
return enqueue;
};
const scheduleReconnect = () => {
if (intentionallyClosed) return;
if (reconnectTimer) return;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (intentionallyClosed) return;
enqueueRef = createWebSocket();
}, reconnectDelay);
// Exponential backoff with jitter
reconnectDelay = Math.min(reconnectDelay * 2, MAX_RECONNECT_DELAY);
};
// Reconnect immediately when tab becomes visible
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible' && !intentionallyClosed) {
if (ws.readyState === WebSocket.CLOSED || ws.readyState === WebSocket.CLOSING) {
// Clear any pending reconnect and connect immediately
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
reconnectDelay = INITIAL_RECONNECT_DELAY;
enqueueRef = createWebSocket();
}
}
};
if (typeof document !== 'undefined') {
document.addEventListener('visibilitychange', handleVisibilityChange);
}
let enqueueRef = createWebSocket();
client.socket = {
listen: (listener, stream) => listeners.push({ listener, stream }),
unlisten: (listener) =>
(listeners = listeners.filter((value) => value.listener !== listener)),
subscribe: (stream, params = {}) => {
// Track subscription for re-subscribe on reconnect
const existing = subscriptions.find(
(s) =>
s.stream === stream && s.params?.list === params.list && s.params?.tag === params.tag,
);
if (!existing) {
subscriptions.push({ stream, params: { list: params.list, tag: params.tag } });
}
enqueueRef(() => ws.send(JSON.stringify({ type: 'subscribe', stream, ...params })));
},
unsubscribe: (stream, params = {}) => {
subscriptions = subscriptions.filter(
(s) =>
!(
s.stream === stream &&
s.params?.list === params.list &&
s.params?.tag === params.tag
),
);
enqueueRef(() => ws.send(JSON.stringify({ type: 'unsubscribe', stream, ...params })));
},
onDisconnect: (callback) => {
disconnectCallbacks.push(callback);
return () => {
disconnectCallbacks = disconnectCallbacks.filter((fn) => fn !== callback);
};
},
onReconnect: (callback) => {
reconnectCallbacks.push(callback);
return () => {
reconnectCallbacks = reconnectCallbacks.filter((fn) => fn !== callback);
};
},
close: () => {
intentionallyClosed = true;
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (typeof document !== 'undefined') {
document.removeEventListener('visibilitychange', handleVisibilityChange);
}
ws.close();
client.socket = undefined;
},
};
return client.socket;
},
});
export { streaming };