import { OllamaRequest, OllamaResponse, NewStatusBody, Notification, } from "../types.js"; import striptags from "striptags"; import { PrismaClient } from "../generated/prisma/client.js"; const prisma = new PrismaClient(); const getNotifications = async () => { try { const request = await fetch( `${process.env.PLEROMA_INSTANCE_URL}/api/v1/notifications?types[]=mention`, { method: "GET", headers: { Authorization: `Bearer ${process.env.INSTANCE_BEARER_TOKEN}`, }, } ); const notifications: Notification[] = await request.json(); return notifications; } catch (error: any) { throw new Error(error.message); } }; const notifications = await getNotifications(); const storeUserData = async (notification: Notification) => { try { const user = await prisma.user.upsert({ where: { userFqn: notification.status.account.fqn }, update: { lastRespondedTo: new Date(Date.now()), }, create: { userFqn: notification.status.account.fqn, lastRespondedTo: new Date(Date.now()), }, }); } catch (error: any) { throw new Error(error.message); } }; const alreadyRespondedTo = async ( notification: Notification ): Promise => { try { const duplicate = await prisma.response.findFirst({ where: { pleromaNotificationId: notification.status.id }, }); if (duplicate) { return true; } return false; } catch (error: any) { throw new Error(error.message); } }; const storePromptData = async ( notification: Notification, ollamaResponseBody: OllamaResponse ) => { try { await prisma.response.create({ data: { response: ollamaResponseBody.response, request: `${notification.status.account.fqn} asks: ${striptags( notification.status.content )}`, to: notification.account.fqn, pleromaNotificationId: notification.status.id, }, }); } catch (error: any) { throw new Error(error.message); } }; const generateOllamaRequest = async ( notification: Notification ): Promise => { try { if ( striptags(notification.status.content).includes("!prompt") && !notification.status.account.bot ) { if ( process.env.ONLY_LOCAL_REPLIES === "true" && !notification.status.account.fqn.includes( `@${process.env.PLEROMA_INSTANCE_DOMAIN}` ) ) { return; } if (await alreadyRespondedTo(notification)) { // console.log( // `Already responded to notification ID ${notification.status.id}. Canceling.` // ); return; } await storeUserData(notification); const ollamaRequestBody: OllamaRequest = { model: process.env.OLLAMA_MODEL as string, system: process.env.OLLAMA_SYSTEM_PROMPT as string, prompt: striptags(notification.status.content), stream: false, }; const response = await fetch(`${process.env.OLLAMA_URL}/api/generate`, { method: "POST", body: JSON.stringify(ollamaRequestBody), }); const ollamaResponse: OllamaResponse = await response.json(); await storePromptData(notification, ollamaResponse); console.log( `${notification.status.account.fqn} asked:\n${notification.status.content}\nResponse:\n${ollamaResponse.response}` ); postReplyToStatus(notification, ollamaResponse); } } catch (error: any) { throw new Error(error.message); } }; const postReplyToStatus = async ( notification: Notification, ollamaResponseBody: OllamaResponse ) => { try { const mentions = notification.status.mentions?.map((mention) => { return mention.acct; }); let statusBody: NewStatusBody = { content_type: "text/markdown", status: ollamaResponseBody.response, in_reply_to_id: notification.status.id, to: mentions, }; const response = await fetch( `${process.env.PLEROMA_INSTANCE_URL}/api/v1/statuses`, { method: "POST", headers: { Authorization: `Bearer ${process.env.INSTANCE_BEARER_TOKEN}`, "Content-Type": "application/json", }, body: JSON.stringify(statusBody), } ); if (!response.ok) { throw new Error(`New status request failed: ${response.statusText}`); } } catch (error: any) { throw new Error(error.message); } }; if (notifications) { await Promise.all( notifications.map((notification) => { generateOllamaRequest(notification); }) ); }