You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
kemoverse/bot/bot_app.py

114 lines
4.4 KiB

import time
import traceback
import misskey
from parsing import parse_notification
from db_utils import get_or_create_user, add_pull, get_config, set_config
from client import client_connection
from response import generate_response
# Initialize the Misskey client
client = client_connection()
# Define your whitelist
# TODO: move to config
whitelisted_instances: list[str] = []
def stream_notifications():
print("Starting filtered notification stream...")
last_seen_id = get_config("last_seen_notif_id")
while True:
try:
# May be able to mark notifications as read using misskey.py and
# filter them out here. This function also takes a since_id we
# could use as well
notifications = client.i_notifications()
if notifications:
# Oldest to newest
notifications.reverse()
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.get("id")
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host") # None if local user
instance = host if host else "local"
if instance in whitelisted_instances or instance == "local":
note = notification.get("note", {})
note_text = note.get("text", "")
note_id = note.get("id")
notif_type = notification.get("type", "unknown")
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
visibility = notification["note"]["visibility"]
if visibility != "specified":
visibility = "home"
print(f"📨 [{notif_type}] from @{username}@{instance}")
print(f"💬 {note_text}")
print("-" * 30)
# We get the type of notification to filter the ones that we actually want
# to parse
notif_type = notification.get("type")
if notif_type in ('mention', 'reply'):
# 🧠 Send to the parser
parsed_command = parse_notification(notification,client)
# Get the response
response = generate_response(parsed_command)
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
else:
print(f" Blocked notification from untrusted instance: {host}")
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(2)
if __name__ == "__main__":
stream_notifications()