forked from waifu/kemoverse
Compare commits
5 commits
master
...
9_fix_bot_
Author | SHA1 | Date | |
---|---|---|---|
9be92afce3 | |||
d2a7e523e8 | |||
45dd6f10e3 | |||
24309ce900 | |||
6b6777cb89 |
6 changed files with 141 additions and 111 deletions
|
@ -1,79 +1,14 @@
|
|||
import time
|
||||
import traceback
|
||||
import misskey
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_or_create_user, add_pull, get_config, set_config
|
||||
import misskey as misskey
|
||||
from client import client_connection
|
||||
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
whitelisted_instances: list[str] = []
|
||||
|
||||
def stream_notifications():
|
||||
print("Starting filtered notification stream...")
|
||||
|
||||
last_seen_id = get_config("last_seen_notif_id")
|
||||
from config import NOTIFICATION_POLL_INTERVAL
|
||||
from notification import process_notifications
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
print('Listening for notifications...')
|
||||
while True:
|
||||
try:
|
||||
# May be able to mark notifications as read using misskey.py and
|
||||
# filter them out here. This function also takes a since_id we
|
||||
# could use as well
|
||||
notifications = client.i_notifications()
|
||||
|
||||
if notifications:
|
||||
# Oldest to newest
|
||||
notifications.reverse()
|
||||
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.get("id")
|
||||
|
||||
# Skip old or same ID notifications
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host") # None if local user
|
||||
|
||||
instance = host if host else "local"
|
||||
|
||||
if instance in whitelisted_instances or instance == "local":
|
||||
note = notification.get("note", {}).get("text", "")
|
||||
notif_type = notification.get("type", "unknown")
|
||||
|
||||
print(f"📨 [{notif_type}] from @{username}@{instance}")
|
||||
print(f"💬 {note}")
|
||||
print("-" * 30)
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parse_notification(notification,client)
|
||||
|
||||
else:
|
||||
print(f"⚠️ Blocked notification from untrusted instance: {host}")
|
||||
|
||||
# Update only if this notif_id is greater
|
||||
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||
new_last_seen_id = notif_id
|
||||
|
||||
# Save the latest seen ID
|
||||
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||
set_config("last_seen_notif_id", new_last_seen_id)
|
||||
last_seen_id = new_last_seen_id
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
stream_notifications()
|
||||
if not process_notifications(client):
|
||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
||||
|
|
|
@ -20,3 +20,6 @@ DB_PATH = config['application']['DatabaseLocation']
|
|||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot'''
|
||||
ADMINS = config['application']['DefaultAdmins']
|
||||
|
||||
NOTIFICATION_POLL_INTERVAL = int(config['application']['NotificationPollInterval'])
|
||||
NOTIFICATION_BATCH_SIZE = int(config['application']['NotificationBatchSize'])
|
||||
|
|
114
bot/notification.py
Normal file
114
bot/notification.py
Normal file
|
@ -0,0 +1,114 @@
|
|||
import traceback
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config
|
||||
from response import generate_response
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
|
||||
def process_notification(client, notification):
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
instance = host if host else 'local'
|
||||
|
||||
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_command = parse_notification(notification, client)
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
|
||||
# handle two cases here: either we have a response, or we don't.
|
||||
# TODO: Return dictionaries instead of tuples. They handle multiple
|
||||
# elements a lot better as they're not position dependent
|
||||
response = generate_response(parsed_command)
|
||||
if isinstance(response, str):
|
||||
client.notes_create(
|
||||
text=response,
|
||||
reply_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
elif response:
|
||||
client.notes_create(
|
||||
text=response[0],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
||||
def process_notifications(client):
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
|
@ -1,23 +1,11 @@
|
|||
import random, re
|
||||
import config
|
||||
from gacha_response import gacha_response
|
||||
|
||||
def parse_notification(notification,client):
|
||||
'''Oarses any notifications received by the bot and sends any commands to
|
||||
'''Parses any notifications received by the bot and sends any commands to
|
||||
gacha_response()'''
|
||||
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
|
||||
notif_type = notification.get("type")
|
||||
if not notif_type in ('mention', 'reply'):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification["note"]["visibility"]
|
||||
if visibility != "specified":
|
||||
visibility = "home"
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
|
@ -50,22 +38,4 @@ def parse_notification(notification,client):
|
|||
command = parts[0].lower() if parts else None
|
||||
arguments = parts[1:] if len(parts) > 1 else []
|
||||
|
||||
# TODO: move response generation to a different function
|
||||
response = gacha_response(command.lower(),full_user, arguments, note_obj)
|
||||
if not response:
|
||||
return
|
||||
|
||||
if isinstance(response, str):
|
||||
client.notes_create(
|
||||
text=response,
|
||||
reply_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
else:
|
||||
client.notes_create(
|
||||
text=response[0],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
return [command,full_user, arguments, note_obj]
|
||||
|
|
|
@ -26,11 +26,13 @@ def is_float(val):
|
|||
return False
|
||||
|
||||
|
||||
# TODO: See issue #3, separate command parsing from game logic.
|
||||
def gacha_response(command,full_user, arguments,note_obj):
|
||||
'''Parses a given command with arguments, processes the game state and
|
||||
def generate_response(parsed_command):
|
||||
|
||||
'''Given a command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
command, full_user, arguments, note_obj = parsed_command
|
||||
|
||||
if command == "roll":
|
||||
user_id = get_or_create_user(full_user)
|
||||
character_id, character_name, file_id, rarity = get_character()
|
|
@ -15,3 +15,9 @@ DefaultAdmins = ['admin@example.tld']
|
|||
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
|
||||
; Number of seconds to sleep while awaiting new notifications
|
||||
NotificationPollInterval = 5
|
||||
|
||||
; Number of notifications to process at once (limit 100)
|
||||
NotificationBatchSize = 10
|
||||
|
|
Loading…
Add table
Reference in a new issue