kemoverse/bot/notification.py

70 lines
2.4 KiB
Python

import config
from parsing import parse_notification
from db_utils import is_whitelisted, is_player_banned
from response import generate_response
from custom_types import BotResponse
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None:
'''Processes an individual fediverse notification using the abstraction'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get user and instance info
username = notification.user.username
host = notification.user.host
instance = host if host else 'local'
# Check whitelist
if config.USE_WHITELIST and not is_whitelisted(instance):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
# Only process mentions and replies
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return
# Return early if no post attached
if not notification.post:
return
# Determine visibility for reply
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
notif_type = notification.type.value
notif_id = notification.id
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_notification = parse_notification(notification, fediverse_service)
if not parsed_notification:
return
author = parsed_notification['author']
if is_player_banned(author):
print(f'⚠️ Blocked notification from banned player: {author}')
return
# Get the response
response: BotResponse | None = generate_response(parsed_notification)
if not response:
return
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
# Send response using fediverse service
fediverse_service.create_post(
text=response['message'],
reply_to_id=notification.post.id,
visibility=visibility,
file_ids=file_ids
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately
)