pleroma-support - optionally use pleroma as a host for the bot. #54
2 changed files with 0 additions and 128 deletions
|
@ -1,6 +0,0 @@
|
|||
import misskey
|
||||
import config
|
||||
|
||||
|
||||
def client_connection() -> misskey.Misskey:
|
||||
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
|
@ -1,122 +0,0 @@
|
|||
import traceback
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config
|
||||
from response import generate_response
|
||||
from custom_types import BotResponse
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
|
||||
|
||||
def process_notification(
|
||||
client: misskey.Misskey,
|
||||
notification: Dict[str, Any]) -> None:
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
instance = host if host else 'local'
|
||||
|
||||
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
return
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_notification = parse_notification(notification, client)
|
||||
|
||||
if not parsed_notification:
|
||||
return
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
response: BotResponse | None = generate_response(parsed_notification)
|
||||
|
||||
if not response:
|
||||
return
|
||||
|
||||
client.notes_create(
|
||||
text=response['message'],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response['attachment_urls']
|
||||
# TODO: write actual visible users ids so pleromers can use the bot
|
||||
# privately
|
||||
# visible_user_ids=[]
|
||||
)
|
||||
|
||||
|
||||
def process_notifications(client: misskey.Misskey) -> bool:
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id', '')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a \
|
||||
notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
Loading…
Add table
Reference in a new issue