nicolium: fall back to polling when streaming is not available
Signed-off-by: nicole mikołajczyk <git@mkljczk.pl>
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
|
||||
import { useAppSelector } from '@/hooks/use-app-selector';
|
||||
import { useClient } from '@/hooks/use-client';
|
||||
@ -23,25 +23,41 @@ const useTimelineStream = (
|
||||
unlisten: (listener: (event: StreamingEvent) => void) => void;
|
||||
subscribe: (stream: string, params?: StreamingParams) => void;
|
||||
unsubscribe: (stream: string, params?: StreamingParams) => void;
|
||||
onDisconnect: (callback: () => void) => () => void;
|
||||
close: () => void;
|
||||
} | null>(null);
|
||||
const disconnectCleanup = useRef<(() => void) | null>(null);
|
||||
|
||||
const accessToken = useAppSelector(getAccessToken);
|
||||
const streamingUrl = instance.configuration.urls.streaming;
|
||||
|
||||
const [connected, setConnected] = useState(false);
|
||||
|
||||
const handleDisconnect = useCallback(() => {
|
||||
socket.current = null;
|
||||
setConnected(false);
|
||||
}, []);
|
||||
|
||||
const connect = () => {
|
||||
if (!socket.current && streamingUrl) {
|
||||
socket.current = client.streaming.connect();
|
||||
|
||||
disconnectCleanup.current?.();
|
||||
disconnectCleanup.current = socket.current.onDisconnect(handleDisconnect);
|
||||
|
||||
socket.current.subscribe(stream, params);
|
||||
if (listener) socket.current.listen(listener);
|
||||
setConnected(true);
|
||||
}
|
||||
};
|
||||
|
||||
const disconnect = () => {
|
||||
if (socket.current) {
|
||||
disconnectCleanup.current?.();
|
||||
disconnectCleanup.current = null;
|
||||
socket.current.close();
|
||||
socket.current = null;
|
||||
setConnected(false);
|
||||
}
|
||||
};
|
||||
|
||||
@ -82,6 +98,7 @@ const useTimelineStream = (
|
||||
|
||||
return {
|
||||
disconnect,
|
||||
connected,
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
import { useCallback, useEffect, useMemo } from 'react';
|
||||
import { useCallback, useEffect, useMemo, useRef } from 'react';
|
||||
|
||||
import { importEntities } from '@/actions/importer';
|
||||
import { useTimelineStream } from '@/api/hooks/streaming/use-timeline-stream';
|
||||
import {
|
||||
useTimelinesStore,
|
||||
useTimeline as useStoreTimeline,
|
||||
useTimelinesActions,
|
||||
type TimelineEntry,
|
||||
@ -17,16 +18,66 @@ interface StreamConfig {
|
||||
params?: StreamingParams;
|
||||
}
|
||||
|
||||
interface TimelineOptions {
|
||||
polling?: boolean;
|
||||
restoringMaxId?: string;
|
||||
}
|
||||
|
||||
const POLLING_INTERVAL = 20_000;
|
||||
|
||||
const useTimeline = (
|
||||
timelineId: string,
|
||||
fetcher: TimelineFetcher,
|
||||
streamConfig?: StreamConfig,
|
||||
restoringMaxId?: string,
|
||||
options?: TimelineOptions,
|
||||
) => {
|
||||
const polling = options?.polling ?? true;
|
||||
const restoringMaxId = options?.restoringMaxId;
|
||||
|
||||
const timeline = useStoreTimeline(timelineId);
|
||||
const timelineActions = useTimelinesActions();
|
||||
|
||||
useTimelineStream(streamConfig?.stream ?? '', streamConfig?.params, !!streamConfig?.stream);
|
||||
const { connected: streamingConnected } = useTimelineStream(
|
||||
streamConfig?.stream ?? '',
|
||||
streamConfig?.params,
|
||||
!!streamConfig?.stream,
|
||||
);
|
||||
|
||||
const fetcherRef = useRef(fetcher);
|
||||
fetcherRef.current = fetcher;
|
||||
|
||||
const newestStatusId = useRef<string | undefined>(undefined);
|
||||
|
||||
useEffect(() => {
|
||||
if (timeline.entries[0]?.type === 'status') {
|
||||
newestStatusId.current = timeline.entries[0].originalId;
|
||||
}
|
||||
}, [timeline.entries]);
|
||||
|
||||
// polling fallback when streaming is not connected
|
||||
useEffect(() => {
|
||||
if (!polling || streamingConnected || timeline.isPending || !newestStatusId) return;
|
||||
|
||||
const poll = async () => {
|
||||
const sinceId =
|
||||
useTimelinesStore.getState().timelines[timelineId]?.queuedEntries[0]?.id ??
|
||||
newestStatusId.current;
|
||||
if (!sinceId) return;
|
||||
|
||||
try {
|
||||
const response = await fetcherRef.current({ since_id: sinceId });
|
||||
if (response.items.length === 0) return;
|
||||
|
||||
importEntities({ statuses: response.items });
|
||||
for (const status of response.items) {
|
||||
timelineActions.receiveStreamingStatus(timelineId, status);
|
||||
}
|
||||
} catch {}
|
||||
};
|
||||
|
||||
const interval = setInterval(poll, POLLING_INTERVAL);
|
||||
return () => clearInterval(interval);
|
||||
}, [timelineId, polling, streamingConnected, timeline.isPending]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!timeline.isPending || timeline.isFetching) return;
|
||||
|
||||
@ -44,7 +44,7 @@ const useHomeTimeline = (
|
||||
});
|
||||
},
|
||||
{ stream },
|
||||
maxId,
|
||||
{ restoringMaxId: maxId },
|
||||
);
|
||||
};
|
||||
|
||||
@ -176,6 +176,8 @@ const useAccountTimeline = (
|
||||
...params,
|
||||
...paginationParams,
|
||||
}),
|
||||
undefined,
|
||||
{ polling: false },
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
@ -37,6 +37,7 @@ class PlApiBaseClient {
|
||||
unlisten: (listener: (event: StreamingEvent) => void) => void;
|
||||
subscribe: (stream: string, params?: StreamingParams) => void;
|
||||
unsubscribe: (stream: string, params?: StreamingParams) => void;
|
||||
onDisconnect: (callback: () => void) => () => void;
|
||||
close: () => void;
|
||||
};
|
||||
/** @internal */
|
||||
|
||||
@ -49,10 +49,21 @@ const streaming = (client: PlApiBaseClient) => ({
|
||||
);
|
||||
};
|
||||
|
||||
let disconnectCallbacks: Array<() => void> = [];
|
||||
|
||||
ws.onopen = () => {
|
||||
queue.forEach((fn) => fn());
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
disconnectCallbacks.forEach((fn) => fn());
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
client.socket = undefined;
|
||||
disconnectCallbacks.forEach((fn) => fn());
|
||||
};
|
||||
|
||||
client.socket = {
|
||||
listen: (listener, stream) => listeners.push({ listener, stream }),
|
||||
unlisten: (listener) =>
|
||||
@ -61,6 +72,12 @@ const streaming = (client: PlApiBaseClient) => ({
|
||||
enqueue(() => ws.send(JSON.stringify({ type: 'subscribe', stream, list, tag }))),
|
||||
unsubscribe: (stream, { list, tag } = {}) =>
|
||||
enqueue(() => ws.send(JSON.stringify({ type: 'unsubscribe', stream, list, tag }))),
|
||||
onDisconnect: (callback) => {
|
||||
disconnectCallbacks.push(callback);
|
||||
return () => {
|
||||
disconnectCallbacks = disconnectCallbacks.filter((fn) => fn !== callback);
|
||||
};
|
||||
},
|
||||
close: () => {
|
||||
ws.close();
|
||||
client.socket = undefined;
|
||||
|
||||
Reference in New Issue
Block a user