diff --git a/bot/add_character.py b/bot/add_character.py index 68528c3..aae3fb3 100644 --- a/bot/add_character.py +++ b/bot/add_character.py @@ -32,7 +32,7 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl raise ValueError("Image URL must be provided.") # Download image - response = requests.get(image_url, stream=True) + response = requests.get(image_url, stream=True, timeout=30) if response.status_code != 200: raise RuntimeError(f"Failed to download image from {image_url}") @@ -55,9 +55,6 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl character_id = cur.lastrowid return character_id, file_id - - except Exception as e: - raise finally: if 'conn' in locals(): - conn.close() \ No newline at end of file + conn.close() diff --git a/bot/bot_app.py b/bot/bot_app.py index 5806478..b65ef3a 100644 --- a/bot/bot_app.py +++ b/bot/bot_app.py @@ -1,79 +1,14 @@ import time -import traceback -import misskey -from parsing import parse_notification -from db_utils import get_or_create_user, add_pull, get_config, set_config +import misskey as misskey from client import client_connection -# Initialize the Misskey client -client = client_connection() - -# Define your whitelist -# TODO: move to config -whitelisted_instances: list[str] = [] - -def stream_notifications(): - print("Starting filtered notification stream...") - - last_seen_id = get_config("last_seen_notif_id") +from config import NOTIFICATION_POLL_INTERVAL +from notification import process_notifications +if __name__ == '__main__': + # Initialize the Misskey client + client = client_connection() + print('Listening for notifications...') while True: - try: - # May be able to mark notifications as read using misskey.py and - # filter them out here. This function also takes a since_id we - # could use as well - notifications = client.i_notifications() - - if notifications: - # Oldest to newest - notifications.reverse() - - new_last_seen_id = last_seen_id - - for notification in notifications: - notif_id = notification.get("id") - - # Skip old or same ID notifications - if last_seen_id is not None and notif_id <= last_seen_id: - continue - - user = notification.get("user", {}) - username = user.get("username", "unknown") - host = user.get("host") # None if local user - - instance = host if host else "local" - - if instance in whitelisted_instances or instance == "local": - note = notification.get("note", {}).get("text", "") - notif_type = notification.get("type", "unknown") - - print(f"📨 [{notif_type}] from @{username}@{instance}") - print(f"💬 {note}") - print("-" * 30) - - # 🧠 Send to the parser - parse_notification(notification,client) - - else: - print(f"⚠️ Blocked notification from untrusted instance: {host}") - - # Update only if this notif_id is greater - if new_last_seen_id is None or notif_id > new_last_seen_id: - new_last_seen_id = notif_id - - # Save the latest seen ID - if new_last_seen_id and new_last_seen_id != last_seen_id: - set_config("last_seen_notif_id", new_last_seen_id) - last_seen_id = new_last_seen_id - - time.sleep(5) - - except Exception as e: - print(f"An exception has occured: {e}\n{traceback.format_exc()}") - time.sleep(5) - - - - -if __name__ == "__main__": - stream_notifications() + if not process_notifications(client): + time.sleep(NOTIFICATION_POLL_INTERVAL) diff --git a/bot/config.py b/bot/config.py index b24293c..643aeb1 100644 --- a/bot/config.py +++ b/bot/config.py @@ -4,19 +4,20 @@ config = configparser.ConfigParser() config.read('config.ini') # Username for the bot -USER = config['application']['BotUser'] - +USER = config['credentials']['User'] # API key for the bot -KEY = config['application']['ApiKey'] +KEY = config['credentials']['Token'] # Bot's Misskey instance URL -INSTANCE = config['application']['InstanceUrl'] - -# SQLite Database location -DB_PATH = config['application']['DatabaseLocation'] - -# Extra stuff for control of the bot +INSTANCE = config['credentials']['Instance'] # TODO: move this to db # Fedi handles in the traditional 'user@domain.tld' style, allows these users -# to use extra admin exclusive commands with the bot''' -ADMINS = config['application']['DefaultAdmins'] +# to use extra admin exclusive commands with the bot +ADMINS = config['application']['DefaultAdmins'] +# SQLite Database location +DB_PATH = config['application']['DatabaseLocation'] + +NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) +NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) + +GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) diff --git a/bot/db_utils.py b/bot/db_utils.py index bb80d98..a521ec5 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -1,5 +1,4 @@ import sqlite3 -import random import config DB_PATH = config.DB_PATH @@ -40,6 +39,17 @@ def add_pull(user_id, character_id): conn.commit() conn.close() +def get_last_rolled_at(user_id): + '''Gets the timestamp when the user last rolled''' + conn = get_db_connection() + cur = conn.cursor() + cur.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \ + (user_id,)) + row = cur.fetchone() + conn.close() + return row[0] if row else None + + def get_config(key): '''Reads the value for a specified config key from the db''' conn = get_db_connection() diff --git a/bot/gacha_response.py b/bot/gacha_response.py deleted file mode 100644 index e703aff..0000000 --- a/bot/gacha_response.py +++ /dev/null @@ -1,69 +0,0 @@ -import random -from db_utils import get_or_create_user, add_pull, get_db_connection -from add_character import add_character - -def get_character(): - ''' Gets a random character from the database''' - conn = get_db_connection() - cur = conn.cursor() - cur.execute('SELECT * FROM characters') - characters = cur.fetchall() - conn.close() - - if not characters: - return None, None, None, None - - weights = [c['weight'] for c in characters] - chosen = random.choices(characters, weights=weights, k=1)[0] - - return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity'] - -def is_float(val): - try: - float(val) - return True - except ValueError: - return False - - -# TODO: See issue #3, separate command parsing from game logic. -def gacha_response(command,full_user, arguments,note_obj): - '''Parses a given command with arguments, processes the game state and - returns a response''' - - if command == "roll": - user_id = get_or_create_user(full_user) - character_id, character_name, file_id, rarity = get_character() - - if not character_id: - #TODO: Can't have tuples of a single element - # Return these as a dict or object instead. - return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿") - - add_pull(user_id,character_id) - stars = '⭐️' * rarity - return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]]) - - if command == "create": - # Example call from bot logic - image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None - if not image_url: - return "You need an image to create a character, dumbass." - - if len(arguments) != 3: - return "Please specify the following attributes in order: name, rarity, drop weighting" - - if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): - return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5" - - if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): - return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0" - - character_id, file_id = add_character( - name=arguments[0], - rarity=int(arguments[1]), - weight=float(arguments[2]), - image_url=image_url - ) - return([f"Added {arguments[0]}, ID {character_id}.",[file_id]]) - return None diff --git a/bot/notification.py b/bot/notification.py new file mode 100644 index 0000000..e0ae020 --- /dev/null +++ b/bot/notification.py @@ -0,0 +1,115 @@ +import traceback +from misskey.exceptions import MisskeyAPIException + +from config import NOTIFICATION_BATCH_SIZE +from parsing import parse_notification +from db_utils import get_config, set_config +from response import generate_response + +# Define your whitelist +# TODO: move to config +WHITELISTED_INSTANCES: list[str] = [] + +def process_notification(client, notification): + '''Processes an individual notification''' + user = notification.get('user', {}) + username = user.get('username', 'unknown') + host = user.get('host') # None if local user + instance = host if host else 'local' + + if not (instance in WHITELISTED_INSTANCES or instance == 'local'): + print(f'⚠️ Blocked notification from untrusted instance: {instance}') + return + + # Copy visibility of the post that was received when replying (so if people + # don't want to dump a bunch of notes on home they don't have to) + visibility = notification['note']['visibility'] + if visibility != 'specified': + visibility = 'home' + + notif_type = notification.get('type', 'unknown') + notif_id = notification.get('id') + print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') + + # 🧠 Send to the parser + parsed_command = parse_notification(notification, client) + + # Get the note Id to reply to + note_id = notification.get('note', {}).get('id') + + # Get the response + # TODO: Formalize exactly *what* is returned by this. Ideally just want to + # handle two cases here: either we have a response, or we don't. + # TODO: Return dictionaries instead of tuples. They handle multiple + # elements a lot better as they're not position dependent + response = generate_response(parsed_command) + if isinstance(response, str): + client.notes_create( + text=response, + reply_id=note_id, + visibility=visibility + ) + elif response: + client.notes_create( + text=response[0], + reply_id=note_id, + visibility=visibility, + file_ids=response[1] + #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately + ) + +def process_notifications(client): + '''Processes a batch of unread notifications. Returns False if there are + no more notifications to process.''' + + last_seen_id = get_config('last_seen_notif_id') + # process_notification writes to last_seen_id, so make a copy + new_last_seen_id = last_seen_id + + try: + notifications = client.i_notifications( + # Fetch notifications we haven't seen yet. This option is a bit + # tempermental, sometimes it'll include since_id, sometimes it + # won't. We need to keep track of what notifications we've + # already processed. + since_id=last_seen_id, + # Let misskey handle the filtering + include_types=['mention', 'reply'], + # And handle the batch size while we're at it + limit=NOTIFICATION_BATCH_SIZE + ) + + # No notifications. Wait the poll period. + if not notifications: + return False + + # Iterate oldest to newest + for notification in notifications: + try: + # Skip if we've processed already + notif_id = notification.get('id') + if notif_id <= last_seen_id: + continue + + # Update new_last_seen_id and process + new_last_seen_id = notif_id + process_notification(client, notification) + + except Exception as e: + print(f'An exception has occured while processing a notification: {e}') + print(traceback.format_exc()) + + # If we got as many notifications as we requested, there are probably + # more in the queue + return len(notifications) == NOTIFICATION_BATCH_SIZE + + except MisskeyAPIException as e: + print(f'An exception has occured while reading notifications: {e}\n') + print(traceback.format_exc()) + finally: + # Quality jank right here, but finally lets us update the last_seen_id + # even if we hit an exception or return early + if new_last_seen_id > last_seen_id: + set_config('last_seen_notif_id', new_last_seen_id) + + return False diff --git a/bot/parsing.py b/bot/parsing.py index c17a276..bbefe70 100644 --- a/bot/parsing.py +++ b/bot/parsing.py @@ -1,23 +1,11 @@ import random, re import config -from gacha_response import gacha_response def parse_notification(notification,client): - '''Oarses any notifications received by the bot and sends any commands to + '''Parses any notifications received by the bot and sends any commands to gacha_response()''' - # We get the type of notification to filter the ones that we actually want - # to parse - - notif_type = notification.get("type") - if not notif_type in ('mention', 'reply'): - return # Ignore anything that isn't a mention - - # We want the visibility to be related to the type that was received (so if - # people don't want to dump a bunch of notes on home they don't have to) - visibility = notification["note"]["visibility"] - if visibility != "specified": - visibility = "home" + # Get the full Activitypub ID of the user user = notification.get("user", {}) @@ -50,22 +38,4 @@ def parse_notification(notification,client): command = parts[0].lower() if parts else None arguments = parts[1:] if len(parts) > 1 else [] - # TODO: move response generation to a different function - response = gacha_response(command.lower(),full_user, arguments, note_obj) - if not response: - return - - if isinstance(response, str): - client.notes_create( - text=response, - reply_id=note_id, - visibility=visibility - ) - else: - client.notes_create( - text=response[0], - reply_id=note_id, - visibility=visibility, - file_ids=response[1] - #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately - ) + return [command,full_user, arguments, note_obj] diff --git a/bot/response.py b/bot/response.py new file mode 100644 index 0000000..55100ad --- /dev/null +++ b/bot/response.py @@ -0,0 +1,117 @@ +import random +from datetime import datetime, timedelta, timezone +from db_utils import get_or_create_user, add_pull, get_db_connection, get_last_rolled_at +from add_character import add_character +from config import GACHA_ROLL_INTERVAL + +def get_character(): + ''' Gets a random character from the database''' + conn = get_db_connection() + cur = conn.cursor() + cur.execute('SELECT * FROM characters') + characters = cur.fetchall() + conn.close() + + if not characters: + return None, None, None, None + + weights = [c['weight'] for c in characters] + chosen = random.choices(characters, weights=weights, k=1)[0] + + return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity'] + +def do_roll(full_user): + '''Determines whether the user can roll, then pulls a random character''' + user_id = get_or_create_user(full_user) + + # Get date of user's last roll + date = get_last_rolled_at(user_id) + + # No date means it's users first roll + if date: + # SQLite timestamps returned by the DB are always in UTC + # Below timestamps are to be converted to UTC + prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z') + now = datetime.now(timezone.utc) + + time_since_last_roll = now - prev + roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL) + duration = roll_interval - time_since_last_roll + + # User needs to wait before they can roll again + if time_since_last_roll < roll_interval: + remaining_duration = None + if duration.seconds > 3600: + remaining_duration = f'{-(duration.seconds // -3600)} hours' + elif duration.seconds > 60: + remaining_duration = f'{-(duration.seconds // -60)} minutes' + else: + remaining_duration = f'{duration.seconds} seconds' + + return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.' + + character_id, character_name, file_id, rarity = get_character() + + if not character_id: + return f'{full_user} Uwaaa... something went wrong! No characters found. 😿' + + add_pull(user_id,character_id) + stars = '⭐️' * rarity + return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\ + She's all yours now~ 💖✨",[file_id]]) + +def is_float(val): + '''Returns true if `val` can be converted to a float''' + try: + float(val) + return True + except ValueError: + return False + +def do_create(full_user, arguments, note_obj): + '''Creates a character''' + # Example call from bot logic + image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None + if not image_url: + return f'{full_user}{full_user} You need an image to create a character, dumbass.' + + if len(arguments) != 3: + return '{full_user}Please specify the following attributes in order: \ + name, rarity, drop weighting' + + if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): + return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5' + + if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): + return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \ + must be a decimal value between 0.0 and 1.0' + + character_id, file_id = add_character( + name=arguments[0], + rarity=int(arguments[1]), + weight=float(arguments[2]), + image_url=image_url + ) + return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]]) + +def do_help(full_user): + '''Provides a list of commands that the bot can do.''' + return f'{full_user} Here\'s what I can do:\n \ + - `roll` Pulls a random character.\ + - `create ` Creates a character using a given image.\ + - `help` Shows this message' + +def generate_response(parsed_command): + '''Given a command with arguments, processes the game state and + returns a response''' + + command, full_user, arguments, note_obj = parsed_command + match command: + case 'roll': + return do_roll(full_user) + case 'create': + return do_create(full_user, arguments, note_obj) + case 'help': + return do_help(command) + case _: + return None diff --git a/db.py b/db.py index 63d0b43..ffcfe2e 100644 --- a/db.py +++ b/db.py @@ -41,6 +41,9 @@ cursor.execute(""" ) """) +# Initialize essential config key +cursor.execute('INSERT INTO config VALUES ("last_seen_notif_id", 0)') + """ # Insert example characters into the database if they don't already exist characters = [ ('Murakami-san', 1, 0.35), diff --git a/example_config.ini b/example_config.ini index 4412c92..d7f1c14 100644 --- a/example_config.ini +++ b/example_config.ini @@ -1,17 +1,27 @@ ; Rename me to config.ini and put your values in here [application] -; Full fedi handle of the bot user -BotUser = @bot@example.tld +; Comma separated list of fedi handles for any administrator users +; More can be added through the application +DefaultAdmins = ['admin@example.tld'] +; SQLite Database location +DatabaseLocation = ./gacha_game.db +[gacha] +; Number of seconds players have to wait between rolls +RollInterval = 72000 + +[notification] +; Number of seconds to sleep while awaiting new notifications +PollInterval = 5 +; Number of notifications to process at once (max 100) +BatchSize = 10 + +[credentials] +; Fully qualified URL of the instance hosting the bot +Instance = http://example.tld +; Full fedi handle of the bot user +User = @bot@example.tld ; API key for the bot ; Generate one by going to Settings > API > Generate access token -ApiKey = abcdefghijklmnopqrstuvwxyz012345 +Token = abcdefghijklmnopqrstuvwxyz012345 -; Fully qualified URL of the instance hosting the bot -InstanceUrl = http://example.tld - -; Comma separated list of fedi handles for any administrator users -DefaultAdmins = ['admin@example.tld'] - -; SQLite Database location -DatabaseLocation = ./gacha_game.db \ No newline at end of file