import time import traceback import misskey from parsing import parse_notification from db_utils import get_or_create_user, add_pull, get_config, set_config from client import client_connection from response import generate_response # Initialize the Misskey client client = client_connection() # Define your whitelist # TODO: move to config whitelisted_instances: list[str] = [] def stream_notifications(): print("Starting filtered notification stream...") last_seen_id = get_config("last_seen_notif_id") while True: try: # May be able to mark notifications as read using misskey.py and # filter them out here. This function also takes a since_id we # could use as well notifications = client.i_notifications() if notifications: # Oldest to newest notifications.reverse() new_last_seen_id = last_seen_id for notification in notifications: notif_id = notification.get("id") # Skip old or same ID notifications if last_seen_id is not None and notif_id <= last_seen_id: continue user = notification.get("user", {}) username = user.get("username", "unknown") host = user.get("host") # None if local user instance = host if host else "local" if instance in whitelisted_instances or instance == "local": note = notification.get("note", {}) note_text = note.get("text", "") note_id = note.get("id") notif_type = notification.get("type", "unknown") # We want the visibility to be related to the type that was received (so if # people don't want to dump a bunch of notes on home they don't have to) visibility = notification["note"]["visibility"] if visibility != "specified": visibility = "home" print(f"📨 [{notif_type}] from @{username}@{instance}") print(f"💬 {note_text}") print("-" * 30) # We get the type of notification to filter the ones that we actually want # to parse notif_type = notification.get("type") if notif_type in ('mention', 'reply'): # 🧠 Send to the parser parsed_command = parse_notification(notification,client) # Get the response response = generate_response(parsed_command) if isinstance(response, str): client.notes_create( text=response, reply_id=note_id, visibility=visibility ) elif response: client.notes_create( text=response[0], reply_id=note_id, visibility=visibility, file_ids=response[1] #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately ) else: print(f"⚠️ Blocked notification from untrusted instance: {host}") # Update only if this notif_id is greater if new_last_seen_id is None or notif_id > new_last_seen_id: new_last_seen_id = notif_id # Save the latest seen ID if new_last_seen_id and new_last_seen_id != last_seen_id: set_config("last_seen_notif_id", new_last_seen_id) last_seen_id = new_last_seen_id time.sleep(5) except Exception as e: print(f"An exception has occured: {e}\n{traceback.format_exc()}") time.sleep(2) if __name__ == "__main__": stream_notifications()