import traceback from typing import Dict, Any from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST from parsing import parse_notification from db_utils import get_config, set_config, is_whitelisted, is_player_banned from response import generate_response from custom_types import BotResponse from fediverse_factory import get_fediverse_service from fediverse_types import FediverseNotification, NotificationType, Visibility def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None: '''Processes an individual fediverse notification using the abstraction''' if fediverse_service is None: fediverse_service = get_fediverse_service() # Get user and instance info username = notification.user.username host = notification.user.host instance = host if host else 'local' # Check whitelist if USE_WHITELIST and not is_whitelisted(instance): print(f'⚠️ Blocked notification from untrusted instance: {instance}') return # Only process mentions and replies if notification.type not in (NotificationType.MENTION, NotificationType.REPLY): return # Return early if no post attached if not notification.post: return # Determine visibility for reply if notification.post.visibility != Visibility.SPECIFIED: visibility = Visibility.HOME else: visibility = Visibility.SPECIFIED notif_type = notification.type.value notif_id = notification.id print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') # 🧠 Send to the parser parsed_notification = parse_notification(notification, fediverse_service) if not parsed_notification: return author = parsed_notification['author'] if is_player_banned(author): print(f'⚠️ Blocked notification from banned player: {author}') return # Get the response response: BotResponse | None = generate_response(parsed_notification) if not response: return # Handle attachment URLs (convert to file IDs if needed) file_ids = response['attachment_urls'] if response['attachment_urls'] else None # Send response using fediverse service fediverse_service.create_post( text=response['message'], reply_to_id=notification.post.id, visibility=visibility, file_ids=file_ids # visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately ) def process_notifications(client: misskey.Misskey) -> bool: '''Processes a batch of unread notifications. Returns False if there are no more notifications to process.''' last_seen_id = get_config('last_seen_notif_id') # process_notification writes to last_seen_id, so make a copy new_last_seen_id = last_seen_id try: notifications = client.i_notifications( # Fetch notifications we haven't seen yet. This option is a bit # tempermental, sometimes it'll include since_id, sometimes it # won't. We need to keep track of what notifications we've # already processed. since_id=last_seen_id, # Let misskey handle the filtering include_types=['mention', 'reply'], # And handle the batch size while we're at it limit=NOTIFICATION_BATCH_SIZE ) # No notifications. Wait the poll period. if not notifications: return False # Iterate oldest to newest for notification in notifications: try: # Skip if we've processed already notif_id = notification.get('id', '') if notif_id <= last_seen_id: continue # Update new_last_seen_id and process new_last_seen_id = notif_id process_notification(client, notification) except Exception as e: print(f'An exception has occured while processing a \ notification: {e}') print(traceback.format_exc()) # If we got as many notifications as we requested, there are probably # more in the queue return len(notifications) == NOTIFICATION_BATCH_SIZE except MisskeyAPIException as e: print(f'An exception has occured while reading notifications: {e}\n') print(traceback.format_exc()) finally: # Quality jank right here, but finally lets us update the last_seen_id # even if we hit an exception or return early if new_last_seen_id > last_seen_id: set_config('last_seen_notif_id', new_last_seen_id) return False