import { WebSocket } from 'isows'; import * as v from 'valibot'; import { streamingEventSchema } from '@/entities'; import { buildFullPath } from '@/utils/url'; import type { PlApiBaseClient } from '@/client-base'; import type { StreamingEvent } from '@/entities'; const MAX_RECONNECT_DELAY = 30_000; const INITIAL_RECONNECT_DELAY = 1_000; const streaming = (client: PlApiBaseClient) => ({ /** * Check if the server is alive * Verify that the streaming service is alive before connecting to it * @see {@link https://docs.joinmastodon.org/methods/streaming/#health} */ health: async () => { const response = await client.request('/api/v1/streaming/health'); return v.parse(v.literal('OK'), response.json); }, /** * Establishing a WebSocket connection * Open a multiplexed WebSocket connection to receive events. * @see {@link https://docs.joinmastodon.org/methods/streaming/#websocket} */ connect: () => { if (client.socket) return client.socket; let listeners: Array<{ listener: (event: StreamingEvent) => any; stream?: string }> = []; let disconnectCallbacks: Array<() => void> = []; let reconnectCallbacks: Array<() => void> = []; let subscriptions: Array<{ stream: string; params?: { list?: string; tag?: string } }> = []; let ws: WebSocket; let reconnectDelay = INITIAL_RECONNECT_DELAY; let reconnectTimer: ReturnType | null = null; let intentionallyClosed = false; const buildPath = () => buildFullPath('/api/v1/streaming', client.instanceInformation?.configuration.urls.streaming, { access_token: client.accessToken, }); const createWebSocket = () => { const path = buildPath(); ws = new WebSocket(path, client.accessToken); const queue: Array<() => any> = []; const enqueue = (fn: () => any) => ws.readyState === WebSocket.CONNECTING ? queue.push(fn) : fn(); ws.onopen = () => { queue.forEach((fn) => fn()); queue.length = 0; // Reset backoff on successful connection reconnectDelay = INITIAL_RECONNECT_DELAY; // Re-subscribe to all active streams for (const sub of subscriptions) { ws.send(JSON.stringify({ type: 'subscribe', stream: sub.stream, ...sub.params })); } // Notify reconnect listeners (not on first connect) if (reconnectCallbacks.length > 0) { reconnectCallbacks.forEach((fn) => fn()); } }; ws.onmessage = (event) => { const message = v.parse(streamingEventSchema, JSON.parse(event.data as string)); listeners.filter( ({ listener, stream }) => (!stream || message.stream.includes(stream)) && listener(message), ); }; ws.onerror = () => { disconnectCallbacks.forEach((fn) => fn()); }; ws.onclose = () => { if (intentionallyClosed) { client.socket = undefined; disconnectCallbacks.forEach((fn) => fn()); return; } disconnectCallbacks.forEach((fn) => fn()); scheduleReconnect(); }; // Expose enqueue for subscribe/unsubscribe return enqueue; }; const scheduleReconnect = () => { if (intentionallyClosed) return; if (reconnectTimer) return; reconnectTimer = setTimeout(() => { reconnectTimer = null; if (intentionallyClosed) return; enqueueRef = createWebSocket(); }, reconnectDelay); // Exponential backoff with jitter reconnectDelay = Math.min(reconnectDelay * 2, MAX_RECONNECT_DELAY); }; // Reconnect immediately when tab becomes visible const handleVisibilityChange = () => { if (document.visibilityState === 'visible' && !intentionallyClosed) { if (ws.readyState === WebSocket.CLOSED || ws.readyState === WebSocket.CLOSING) { // Clear any pending reconnect and connect immediately if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } reconnectDelay = INITIAL_RECONNECT_DELAY; enqueueRef = createWebSocket(); } } }; if (typeof document !== 'undefined') { document.addEventListener('visibilitychange', handleVisibilityChange); } let enqueueRef = createWebSocket(); client.socket = { listen: (listener, stream) => listeners.push({ listener, stream }), unlisten: (listener) => (listeners = listeners.filter((value) => value.listener !== listener)), subscribe: (stream, params = {}) => { // Track subscription for re-subscribe on reconnect const existing = subscriptions.find( (s) => s.stream === stream && s.params?.list === params.list && s.params?.tag === params.tag, ); if (!existing) { subscriptions.push({ stream, params: { list: params.list, tag: params.tag } }); } enqueueRef(() => ws.send(JSON.stringify({ type: 'subscribe', stream, ...params }))); }, unsubscribe: (stream, params = {}) => { subscriptions = subscriptions.filter( (s) => !( s.stream === stream && s.params?.list === params.list && s.params?.tag === params.tag ), ); enqueueRef(() => ws.send(JSON.stringify({ type: 'unsubscribe', stream, ...params }))); }, onDisconnect: (callback) => { disconnectCallbacks.push(callback); return () => { disconnectCallbacks = disconnectCallbacks.filter((fn) => fn !== callback); }; }, onReconnect: (callback) => { reconnectCallbacks.push(callback); return () => { reconnectCallbacks = reconnectCallbacks.filter((fn) => fn !== callback); }; }, close: () => { intentionallyClosed = true; if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } if (typeof document !== 'undefined') { document.removeEventListener('visibilitychange', handleVisibilityChange); } ws.close(); client.socket = undefined; }, }; return client.socket; }, }); export { streaming };