import traceback from typing import Dict, Any import misskey from misskey.exceptions import MisskeyAPIException from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST from parsing import parse_notification from db_utils import get_config, set_config, is_whitelisted from response import generate_response from custom_types import BotResponse # Define your whitelist # TODO: move to config WHITELISTED_INSTANCES: list[str] = [] def process_notification( client: misskey.Misskey, notification: Dict[str, Any]) -> None: '''Processes an individual notification''' user = notification.get('user', {}) username = user.get('username', 'unknown') host = user.get('host') # None if local user instance = host if host else 'local' if USE_WHITELIST and not is_whitelisted(instance): print(f'⚠️ Blocked notification from untrusted instance: {instance}') return # Copy visibility of the post that was received when replying (so if people # don't want to dump a bunch of notes on home they don't have to) visibility = notification['note']['visibility'] if visibility != 'specified': visibility = 'home' notif_type = notification.get('type', 'unknown') notif_id = notification.get('id') print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') # 🧠 Send to the parser parsed_notification = parse_notification(notification, client) if not parsed_notification: return # Get the note Id to reply to note_id = notification.get('note', {}).get('id') # Get the response response: BotResponse | None = generate_response(parsed_notification) if not response: return client.notes_create( text=response['message'], reply_id=note_id, visibility=visibility, file_ids=response['attachment_urls'] # TODO: write actual visible users ids so pleromers can use the bot # privately # visible_user_ids=[] ) def process_notifications(client: misskey.Misskey) -> bool: '''Processes a batch of unread notifications. Returns False if there are no more notifications to process.''' last_seen_id = get_config('last_seen_notif_id') # process_notification writes to last_seen_id, so make a copy new_last_seen_id = last_seen_id try: notifications = client.i_notifications( # Fetch notifications we haven't seen yet. This option is a bit # tempermental, sometimes it'll include since_id, sometimes it # won't. We need to keep track of what notifications we've # already processed. since_id=last_seen_id, # Let misskey handle the filtering include_types=['mention', 'reply'], # And handle the batch size while we're at it limit=NOTIFICATION_BATCH_SIZE ) # No notifications. Wait the poll period. if not notifications: return False # Iterate oldest to newest for notification in notifications: try: # Skip if we've processed already notif_id = notification.get('id', '') if notif_id <= last_seen_id: continue # Update new_last_seen_id and process new_last_seen_id = notif_id process_notification(client, notification) except Exception as e: print(f'An exception has occured while processing a \ notification: {e}') print(traceback.format_exc()) # If we got as many notifications as we requested, there are probably # more in the queue return len(notifications) == NOTIFICATION_BATCH_SIZE except MisskeyAPIException as e: print(f'An exception has occured while reading notifications: {e}\n') print(traceback.format_exc()) finally: # Quality jank right here, but finally lets us update the last_seen_id # even if we hit an exception or return early if new_last_seen_id > last_seen_id: set_config('last_seen_notif_id', new_last_seen_id) return False