more robust notification type safety, remove websocket, add fetch interval

This commit is contained in:
2025-07-05 01:03:07 +00:00
parent 856cc84208
commit d85acd2179
3 changed files with 73 additions and 97 deletions

View File

@ -3,15 +3,32 @@ import {
OllamaResponse, OllamaResponse,
NewStatusBody, NewStatusBody,
Notification, Notification,
WSEvent,
} from "../types.js"; } from "../types.js";
import striptags from "striptags"; import striptags from "striptags";
import { PrismaClient } from "../generated/prisma/client.js"; import { PrismaClient } from "../generated/prisma/client.js";
import { createWebsocket } from "./websocket.js";
import { WebSocket } from "ws";
const prisma = new PrismaClient(); const prisma = new PrismaClient();
const getNotifications = async () => {
try {
const request = await fetch(
`${process.env.PLEROMA_INSTANCE_URL}/api/v1/notifications?types[]=mention`,
{
method: "GET",
headers: {
Authorization: `Bearer ${process.env.INSTANCE_BEARER_TOKEN}`,
},
}
);
const notifications: Notification[] = await request.json();
return notifications;
} catch (error: any) {
throw new Error(error.message);
}
};
const storeUserData = async (notification: Notification): Promise<void> => { const storeUserData = async (notification: Notification): Promise<void> => {
try { try {
await prisma.user.upsert({ await prisma.user.upsert({
@ -76,7 +93,8 @@ const generateOllamaRequest = async (
try { try {
if ( if (
striptags(notification.status.content).includes("!prompt") && striptags(notification.status.content).includes("!prompt") &&
!notification.status.account.bot !notification.status.account.bot && // sanity check, sort of
notification.type === "mention"
) { ) {
if ( if (
process.env.ONLY_LOCAL_REPLIES === "true" && process.env.ONLY_LOCAL_REPLIES === "true" &&
@ -147,86 +165,63 @@ const postReplyToStatus = async (
if (!response.ok) { if (!response.ok) {
throw new Error(`New status request failed: ${response.statusText}`); throw new Error(`New status request failed: ${response.statusText}`);
} }
await deleteNotification(notification);
} catch (error: any) { } catch (error: any) {
throw new Error(error.message); throw new Error(error.message);
} }
}; };
let ws = createWebsocket(); const deleteNotification = async (notification: Notification) => {
let reconnectAttempts = 0;
const maxReconnectAttempts = 10;
const baseDelay = 5000;
const reconnect = (ws: WebSocket) => {
if (ws) {
ws.close();
}
return createWebsocket();
};
ws.on("close", (event: CloseEvent) => {
try { try {
if (reconnectAttempts < maxReconnectAttempts) { if (!notification.id) {
const delay = baseDelay * Math.pow(1.5, reconnectAttempts);
console.log(
`WebSocket closed.\nReason: ${
event.reason
}\nAttempting to reconnect in ${delay / 1000} seconds...`
);
setTimeout(() => {
console.log(
`Reconnection attempt ${
reconnectAttempts + 1
}/${maxReconnectAttempts}`
);
ws = reconnect(ws);
if (ws.readyState === WebSocket.OPEN) {
console.log(`Reconnection to ${process.env.PLEROMA_INSTANCE_DOMAIN} successful.`)
return;
}
reconnectAttempts++;
}, delay);
} else {
console.error(
`Failed to reconnect after ${maxReconnectAttempts} attempts. Giving up.`
);
}
} catch (error: any) {
console.error(`Reconnection error: ${error.message}`);
throw new Error(error.message);
}
});
ws.on("upgrade", () => {
console.log(
`Websocket connection to ${process.env.PLEROMA_INSTANCE_DOMAIN} successful.`
);
});
ws.on("open", () => {
reconnectAttempts = 0;
setInterval(() => {
ws.send(JSON.stringify({ type: "ping" }));
// console.log("Sending ping to keep session alive...");
}, 20000);
});
ws.on("message", async (data) => {
try {
const message: WSEvent = JSON.parse(data.toString("utf-8"));
if (message.event !== "notification") {
// only watch for notification events
return; return;
} }
console.log("Websocket message received."); const response = await fetch(
const payload = JSON.parse(message.payload) as Notification; `${process.env.PLEROMA_INSTANCE_URL}/api/v1/notifications/${notification.id}/dismiss`,
const ollamaResponse = await generateOllamaRequest(payload); {
if (ollamaResponse) { method: "POST",
await postReplyToStatus(payload, ollamaResponse); headers: {
Authorization: `Bearer ${process.env.INSTANCE_BEARER_TOKEN}`,
},
}
);
if (!response.ok) {
console.error(`Could not delete notification ID: ${notification.id}`);
} }
} catch (error: any) { } catch (error: any) {
throw new Error(error.message); throw new Error(error.message);
} }
}); };
const fetchInterval = process.env.FETCH_INTERVAL
? parseInt(process.env.FETCH_INTERVAL)
: 15000;
const beginFetchCycle = async () => {
let notifications = [];
setInterval(async () => {
notifications = await getNotifications();
if (notifications.length > 0) {
await Promise.all(
notifications.map(async (notification) => {
try {
const ollamaResponse = await generateOllamaRequest(notification);
if (ollamaResponse) {
postReplyToStatus(notification, ollamaResponse);
}
} catch (error: any) {
throw new Error(error.message);
}
})
);
}
}, fetchInterval); // lower intervals may cause the bot to respond multiple times to the same message, but we try to mitigate this with the deleteNotification function
};
console.log(
`Fetching notifications from ${process.env.PLEROMA_INSTANCE_DOMAIN}, every ${
fetchInterval / 1000
} seconds.`
);
await beginFetchCycle();

View File

@ -1,22 +0,0 @@
import { WebSocket } from "ws";
const scheme = process.env.PLEROMA_INSTANCE_URL?.startsWith("https")
? "wss"
: "ws"; // this is so nigger rigged
const host = process.env.PLEROMA_INSTANCE_DOMAIN;
export const createWebsocket = (): WebSocket => {
try {
const ws = new WebSocket( // only connects to Soapbox frontends right now, but could pretty easily connect to Pleroma frontends with some tweaking
`${scheme}://${host}/api/v1/streaming?stream=user`,
[process.env.SOAPBOX_WS_PROTOCOL as string],
{
followRedirects: true,
}
);
return ws;
} catch (error: any) {
throw new Error(error);
}
};

3
types.d.ts vendored
View File

@ -1,6 +1,9 @@
export interface Notification { export interface Notification {
account: Account; account: Account;
status: Status; status: Status;
id: string;
type: string;
created_at: string;
} }
export interface NewStatusBody { export interface NewStatusBody {