From 9c95437f4981c3d4a56090a474a5573281abe4a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?nicole=20miko=C5=82ajczyk?= Date: Thu, 12 Mar 2026 20:02:40 +0100 Subject: [PATCH] nicolium: fall back to polling when streaming is not available MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: nicole mikołajczyk --- .../hooks/streaming/use-timeline-stream.ts | 19 ++++++- .../src/queries/timelines/use-timeline.ts | 57 ++++++++++++++++++- .../src/queries/timelines/use-timelines.ts | 4 +- packages/pl-api/lib/client-base.ts | 1 + packages/pl-api/lib/client/streaming.ts | 17 ++++++ 5 files changed, 93 insertions(+), 5 deletions(-) diff --git a/packages/nicolium/src/api/hooks/streaming/use-timeline-stream.ts b/packages/nicolium/src/api/hooks/streaming/use-timeline-stream.ts index e925eb837..970668a91 100644 --- a/packages/nicolium/src/api/hooks/streaming/use-timeline-stream.ts +++ b/packages/nicolium/src/api/hooks/streaming/use-timeline-stream.ts @@ -1,4 +1,4 @@ -import { useEffect, useRef } from 'react'; +import { useCallback, useEffect, useRef, useState } from 'react'; import { useAppSelector } from '@/hooks/use-app-selector'; import { useClient } from '@/hooks/use-client'; @@ -23,25 +23,41 @@ const useTimelineStream = ( unlisten: (listener: (event: StreamingEvent) => void) => void; subscribe: (stream: string, params?: StreamingParams) => void; unsubscribe: (stream: string, params?: StreamingParams) => void; + onDisconnect: (callback: () => void) => () => void; close: () => void; } | null>(null); + const disconnectCleanup = useRef<(() => void) | null>(null); const accessToken = useAppSelector(getAccessToken); const streamingUrl = instance.configuration.urls.streaming; + const [connected, setConnected] = useState(false); + + const handleDisconnect = useCallback(() => { + socket.current = null; + setConnected(false); + }, []); + const connect = () => { if (!socket.current && streamingUrl) { socket.current = client.streaming.connect(); + disconnectCleanup.current?.(); + disconnectCleanup.current = socket.current.onDisconnect(handleDisconnect); + socket.current.subscribe(stream, params); if (listener) socket.current.listen(listener); + setConnected(true); } }; const disconnect = () => { if (socket.current) { + disconnectCleanup.current?.(); + disconnectCleanup.current = null; socket.current.close(); socket.current = null; + setConnected(false); } }; @@ -82,6 +98,7 @@ const useTimelineStream = ( return { disconnect, + connected, }; }; diff --git a/packages/nicolium/src/queries/timelines/use-timeline.ts b/packages/nicolium/src/queries/timelines/use-timeline.ts index e80dcf6a6..e4430d2fb 100644 --- a/packages/nicolium/src/queries/timelines/use-timeline.ts +++ b/packages/nicolium/src/queries/timelines/use-timeline.ts @@ -1,8 +1,9 @@ -import { useCallback, useEffect, useMemo } from 'react'; +import { useCallback, useEffect, useMemo, useRef } from 'react'; import { importEntities } from '@/actions/importer'; import { useTimelineStream } from '@/api/hooks/streaming/use-timeline-stream'; import { + useTimelinesStore, useTimeline as useStoreTimeline, useTimelinesActions, type TimelineEntry, @@ -17,16 +18,66 @@ interface StreamConfig { params?: StreamingParams; } +interface TimelineOptions { + polling?: boolean; + restoringMaxId?: string; +} + +const POLLING_INTERVAL = 20_000; + const useTimeline = ( timelineId: string, fetcher: TimelineFetcher, streamConfig?: StreamConfig, - restoringMaxId?: string, + options?: TimelineOptions, ) => { + const polling = options?.polling ?? true; + const restoringMaxId = options?.restoringMaxId; + const timeline = useStoreTimeline(timelineId); const timelineActions = useTimelinesActions(); - useTimelineStream(streamConfig?.stream ?? '', streamConfig?.params, !!streamConfig?.stream); + const { connected: streamingConnected } = useTimelineStream( + streamConfig?.stream ?? '', + streamConfig?.params, + !!streamConfig?.stream, + ); + + const fetcherRef = useRef(fetcher); + fetcherRef.current = fetcher; + + const newestStatusId = useRef(undefined); + + useEffect(() => { + if (timeline.entries[0]?.type === 'status') { + newestStatusId.current = timeline.entries[0].originalId; + } + }, [timeline.entries]); + + // polling fallback when streaming is not connected + useEffect(() => { + if (!polling || streamingConnected || timeline.isPending || !newestStatusId) return; + + const poll = async () => { + const sinceId = + useTimelinesStore.getState().timelines[timelineId]?.queuedEntries[0]?.id ?? + newestStatusId.current; + if (!sinceId) return; + + try { + const response = await fetcherRef.current({ since_id: sinceId }); + if (response.items.length === 0) return; + + importEntities({ statuses: response.items }); + for (const status of response.items) { + timelineActions.receiveStreamingStatus(timelineId, status); + } + } catch {} + }; + + const interval = setInterval(poll, POLLING_INTERVAL); + return () => clearInterval(interval); + }, [timelineId, polling, streamingConnected, timeline.isPending]); useEffect(() => { if (!timeline.isPending || timeline.isFetching) return; diff --git a/packages/nicolium/src/queries/timelines/use-timelines.ts b/packages/nicolium/src/queries/timelines/use-timelines.ts index 5ce76e61c..f2c68f0cf 100644 --- a/packages/nicolium/src/queries/timelines/use-timelines.ts +++ b/packages/nicolium/src/queries/timelines/use-timelines.ts @@ -44,7 +44,7 @@ const useHomeTimeline = ( }); }, { stream }, - maxId, + { restoringMaxId: maxId }, ); }; @@ -176,6 +176,8 @@ const useAccountTimeline = ( ...params, ...paginationParams, }), + undefined, + { polling: false }, ); }; diff --git a/packages/pl-api/lib/client-base.ts b/packages/pl-api/lib/client-base.ts index 1b65bf90d..9bcc579af 100644 --- a/packages/pl-api/lib/client-base.ts +++ b/packages/pl-api/lib/client-base.ts @@ -37,6 +37,7 @@ class PlApiBaseClient { unlisten: (listener: (event: StreamingEvent) => void) => void; subscribe: (stream: string, params?: StreamingParams) => void; unsubscribe: (stream: string, params?: StreamingParams) => void; + onDisconnect: (callback: () => void) => () => void; close: () => void; }; /** @internal */ diff --git a/packages/pl-api/lib/client/streaming.ts b/packages/pl-api/lib/client/streaming.ts index 1b9610c0d..153a7822a 100644 --- a/packages/pl-api/lib/client/streaming.ts +++ b/packages/pl-api/lib/client/streaming.ts @@ -49,10 +49,21 @@ const streaming = (client: PlApiBaseClient) => ({ ); }; + let disconnectCallbacks: Array<() => void> = []; + ws.onopen = () => { queue.forEach((fn) => fn()); }; + ws.onerror = () => { + disconnectCallbacks.forEach((fn) => fn()); + }; + + ws.onclose = () => { + client.socket = undefined; + disconnectCallbacks.forEach((fn) => fn()); + }; + client.socket = { listen: (listener, stream) => listeners.push({ listener, stream }), unlisten: (listener) => @@ -61,6 +72,12 @@ const streaming = (client: PlApiBaseClient) => ({ enqueue(() => ws.send(JSON.stringify({ type: 'subscribe', stream, list, tag }))), unsubscribe: (stream, { list, tag } = {}) => enqueue(() => ws.send(JSON.stringify({ type: 'unsubscribe', stream, list, tag }))), + onDisconnect: (callback) => { + disconnectCallbacks.push(callback); + return () => { + disconnectCallbacks = disconnectCallbacks.filter((fn) => fn !== callback); + }; + }, close: () => { ws.close(); client.socket = undefined;