You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
kemoverse/bot/bot_app.py

78 lines
2.6 KiB

import time
import misskey
from parsing import parse_notification
from db_utils import get_or_create_user, add_pull, get_config, set_config
from client import client_connection
# Initialize the Misskey client
client = client_connection()
# Define your whitelist
# TODO: move to config
whitelisted_instances: list[str] = []
def stream_notifications():
print("Starting filtered notification stream...")
last_seen_id = get_config("last_seen_notif_id")
while True:
try:
# May be able to mark notifications as read using misskey.py and
# filter them out here. This function also takes a since_id we
# could use as well
notifications = client.i_notifications()
if notifications:
# Oldest to newest
notifications.reverse()
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.get("id")
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host") # None if local user
instance = host if host else "local"
if instance in whitelisted_instances or instance == "local":
note = notification.get("note", {}).get("text", "")
notif_type = notification.get("type", "unknown")
print(f"📨 [{notif_type}] from @{username}@{instance}")
print(f"💬 {note}")
print("-" * 30)
# 🧠 Send to the parser
parse_notification(notification,client)
else:
print(f" Blocked notification from untrusted instance: {host}")
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()