import time import traceback import misskey from parsing import parse_notification from db_utils import get_or_create_user, add_pull, get_config, set_config from client import client_connection # Initialize the Misskey client client = client_connection() # Define your whitelist # TODO: move to config whitelisted_instances: list[str] = [] def stream_notifications(): print("Starting filtered notification stream...") last_seen_id = get_config("last_seen_notif_id") while True: try: # May be able to mark notifications as read using misskey.py and # filter them out here. This function also takes a since_id we # could use as well notifications = client.i_notifications() if notifications: # Oldest to newest notifications.reverse() new_last_seen_id = last_seen_id for notification in notifications: notif_id = notification.get("id") # Skip old or same ID notifications if last_seen_id is not None and notif_id <= last_seen_id: continue user = notification.get("user", {}) username = user.get("username", "unknown") host = user.get("host") # None if local user instance = host if host else "local" if instance in whitelisted_instances or instance == "local": note = notification.get("note", {}).get("text", "") notif_type = notification.get("type", "unknown") print(f"📨 [{notif_type}] from @{username}@{instance}") print(f"💬 {note}") print("-" * 30) # 🧠 Send to the parser parse_notification(notification,client) else: print(f"⚠️ Blocked notification from untrusted instance: {host}") # Update only if this notif_id is greater if new_last_seen_id is None or notif_id > new_last_seen_id: new_last_seen_id = notif_id # Save the latest seen ID if new_last_seen_id and new_last_seen_id != last_seen_id: set_config("last_seen_notif_id", new_last_seen_id) last_seen_id = new_last_seen_id time.sleep(5) except Exception as e: print(f"An exception has occured: {e}\n{traceback.format_exc()}") time.sleep(5) if __name__ == "__main__": stream_notifications()