You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
kemoverse/bot/notification.py

114 lines
4.3 KiB

import traceback
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification
from db_utils import get_config, set_config
from response import generate_response
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
def process_notification(client, notification):
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
host = user.get('host') # None if local user
instance = host if host else 'local'
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
print(f' Blocked notification from untrusted instance: {instance}')
# Copy visibility of the post that was received when replying (so if people
# don't want to dump a bunch of notes on home they don't have to)
visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
notif_type = notification.get('type', 'unknown')
notif_id = notification.get('id')
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_command = parse_notification(notification, client)
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
# handle two cases here: either we have a response, or we don't.
# TODO: Return dictionaries instead of tuples. They handle multiple
# elements a lot better as they're not position dependent
response = generate_response(parsed_command)
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client):
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False