kemoverse/bot/bot_app.py

76 lines
2.8 KiB
Python

import time
import traceback
from parsing import parse_notification
from db_utils import get_or_create_user, add_pull, get_config, set_config
from fediverse_factory import get_fediverse_service
import config
# Initialize the Fediverse service
fediverse_service = get_fediverse_service()
# Get trusted instances from config
whitelisted_instances = config.TRUSTED_INSTANCES
def stream_notifications():
print("Starting filtered notification stream...")
last_seen_id = get_config("last_seen_notif_id")
while True:
try:
# Get notifications from the fediverse service
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
if notifications:
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.id
# Double-check filtering: Even though we pass since_id to the API,
# we manually filter again for reliability because:
# 1. API might ignore since_id if it's too old (server downtime)
# 2. Pagination limits might cause missed notifications
# 3. Race conditions between fetch and save operations
if last_seen_id is not None and notif_id <= last_seen_id:
continue
username = notification.user.username
host = notification.user.host
instance = host if host else "local"
if instance in whitelisted_instances or instance == "local":
note = notification.post.text if notification.post else ""
notif_type = notification.type.value
print(f"📨 [{notif_type}] from @{username}@{instance}")
print(f"💬 {note}")
print("-" * 30)
# 🧠 Send to the parser
parse_notification(notification, fediverse_service)
else:
print(f"⚠️ Blocked notification from untrusted instance: {host}")
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()