From d85acd217902a53c554f88022b6537e156eab536 Mon Sep 17 00:00:00 2001 From: matty Date: Sat, 5 Jul 2025 01:03:07 +0000 Subject: [PATCH] more robust notification type safety, remove websocket, add fetch interval --- src/main.ts | 145 +++++++++++++++++++++++------------------------ src/websocket.ts | 22 ------- types.d.ts | 3 + 3 files changed, 73 insertions(+), 97 deletions(-) delete mode 100644 src/websocket.ts diff --git a/src/main.ts b/src/main.ts index db03e3b..5b965fd 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,15 +3,32 @@ import { OllamaResponse, NewStatusBody, Notification, - WSEvent, } from "../types.js"; import striptags from "striptags"; import { PrismaClient } from "../generated/prisma/client.js"; -import { createWebsocket } from "./websocket.js"; -import { WebSocket } from "ws"; const prisma = new PrismaClient(); +const getNotifications = async () => { + try { + const request = await fetch( + `${process.env.PLEROMA_INSTANCE_URL}/api/v1/notifications?types[]=mention`, + { + method: "GET", + headers: { + Authorization: `Bearer ${process.env.INSTANCE_BEARER_TOKEN}`, + }, + } + ); + + const notifications: Notification[] = await request.json(); + + return notifications; + } catch (error: any) { + throw new Error(error.message); + } +}; + const storeUserData = async (notification: Notification): Promise => { try { await prisma.user.upsert({ @@ -76,7 +93,8 @@ const generateOllamaRequest = async ( try { if ( striptags(notification.status.content).includes("!prompt") && - !notification.status.account.bot + !notification.status.account.bot && // sanity check, sort of + notification.type === "mention" ) { if ( process.env.ONLY_LOCAL_REPLIES === "true" && @@ -147,86 +165,63 @@ const postReplyToStatus = async ( if (!response.ok) { throw new Error(`New status request failed: ${response.statusText}`); } + + await deleteNotification(notification); } catch (error: any) { throw new Error(error.message); } }; -let ws = createWebsocket(); -let reconnectAttempts = 0; -const maxReconnectAttempts = 10; -const baseDelay = 5000; - -const reconnect = (ws: WebSocket) => { - if (ws) { - ws.close(); - } - return createWebsocket(); -}; - -ws.on("close", (event: CloseEvent) => { +const deleteNotification = async (notification: Notification) => { try { - if (reconnectAttempts < maxReconnectAttempts) { - const delay = baseDelay * Math.pow(1.5, reconnectAttempts); - console.log( - `WebSocket closed.\nReason: ${ - event.reason - }\nAttempting to reconnect in ${delay / 1000} seconds...` - ); - - setTimeout(() => { - console.log( - `Reconnection attempt ${ - reconnectAttempts + 1 - }/${maxReconnectAttempts}` - ); - ws = reconnect(ws); - if (ws.readyState === WebSocket.OPEN) { - console.log(`Reconnection to ${process.env.PLEROMA_INSTANCE_DOMAIN} successful.`) - return; - } - reconnectAttempts++; - }, delay); - } else { - console.error( - `Failed to reconnect after ${maxReconnectAttempts} attempts. Giving up.` - ); - } - } catch (error: any) { - console.error(`Reconnection error: ${error.message}`); - throw new Error(error.message); - } -}); - -ws.on("upgrade", () => { - console.log( - `Websocket connection to ${process.env.PLEROMA_INSTANCE_DOMAIN} successful.` - ); -}); - -ws.on("open", () => { - reconnectAttempts = 0; - setInterval(() => { - ws.send(JSON.stringify({ type: "ping" })); - // console.log("Sending ping to keep session alive..."); - }, 20000); -}); - - -ws.on("message", async (data) => { - try { - const message: WSEvent = JSON.parse(data.toString("utf-8")); - if (message.event !== "notification") { - // only watch for notification events + if (!notification.id) { return; } - console.log("Websocket message received."); - const payload = JSON.parse(message.payload) as Notification; - const ollamaResponse = await generateOllamaRequest(payload); - if (ollamaResponse) { - await postReplyToStatus(payload, ollamaResponse); + const response = await fetch( + `${process.env.PLEROMA_INSTANCE_URL}/api/v1/notifications/${notification.id}/dismiss`, + { + method: "POST", + headers: { + Authorization: `Bearer ${process.env.INSTANCE_BEARER_TOKEN}`, + }, + } + ); + if (!response.ok) { + console.error(`Could not delete notification ID: ${notification.id}`); } } catch (error: any) { throw new Error(error.message); } -}); +}; + +const fetchInterval = process.env.FETCH_INTERVAL + ? parseInt(process.env.FETCH_INTERVAL) + : 15000; + +const beginFetchCycle = async () => { + let notifications = []; + setInterval(async () => { + notifications = await getNotifications(); + if (notifications.length > 0) { + await Promise.all( + notifications.map(async (notification) => { + try { + const ollamaResponse = await generateOllamaRequest(notification); + if (ollamaResponse) { + postReplyToStatus(notification, ollamaResponse); + } + } catch (error: any) { + throw new Error(error.message); + } + }) + ); + } + }, fetchInterval); // lower intervals may cause the bot to respond multiple times to the same message, but we try to mitigate this with the deleteNotification function +}; + +console.log( + `Fetching notifications from ${process.env.PLEROMA_INSTANCE_DOMAIN}, every ${ + fetchInterval / 1000 + } seconds.` +); +await beginFetchCycle(); diff --git a/src/websocket.ts b/src/websocket.ts deleted file mode 100644 index e3740d6..0000000 --- a/src/websocket.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { WebSocket } from "ws"; - -const scheme = process.env.PLEROMA_INSTANCE_URL?.startsWith("https") - ? "wss" - : "ws"; // this is so nigger rigged -const host = process.env.PLEROMA_INSTANCE_DOMAIN; - -export const createWebsocket = (): WebSocket => { - try { - const ws = new WebSocket( // only connects to Soapbox frontends right now, but could pretty easily connect to Pleroma frontends with some tweaking - `${scheme}://${host}/api/v1/streaming?stream=user`, - [process.env.SOAPBOX_WS_PROTOCOL as string], - { - followRedirects: true, - } - ); - - return ws; - } catch (error: any) { - throw new Error(error); - } -}; diff --git a/types.d.ts b/types.d.ts index 2a755e1..76d265f 100644 --- a/types.d.ts +++ b/types.d.ts @@ -1,6 +1,9 @@ export interface Notification { account: Account; status: Status; + id: string; + type: string; + created_at: string; } export interface NewStatusBody {