From fde6e1167a8143a7ebfda9bab09c8fa70c1cc751 Mon Sep 17 00:00:00 2001 From: VD15 Date: Sat, 7 Jun 2025 19:23:17 +0100 Subject: [PATCH 1/8] Add administrators --- bot/{add_character.py => add_card.py} | 20 ++-- bot/bot_app.py | 3 + bot/config.py | 10 +- bot/custom_types.py | 2 +- bot/db_utils.py | 128 ++++++++++++++++--------- bot/response.py | 105 +++++++++++--------- example_config.ini | 2 +- migrations/0003_rename_tables.sql | 4 + migrations/0004_add_administrators.sql | 1 + 9 files changed, 171 insertions(+), 104 deletions(-) rename bot/{add_character.py => add_card.py} (76%) create mode 100644 migrations/0003_rename_tables.sql create mode 100644 migrations/0004_add_administrators.sql diff --git a/bot/add_character.py b/bot/add_card.py similarity index 76% rename from bot/add_character.py rename to bot/add_card.py index 18b0f98..fcaab43 100644 --- a/bot/add_character.py +++ b/bot/add_card.py @@ -1,27 +1,27 @@ import requests from misskey.exceptions import MisskeyAPIException from client import client_connection -from db_utils import insert_character -from custom_types import Character +from db_utils import insert_card +from custom_types import Card from config import RARITY_TO_WEIGHT -def add_character( +def add_card( name: str, rarity: int, image_url: str) -> tuple[int, str]: ''' - Adds a character to the database, uploading the image from a public URL to + Adds a card to the database, uploading the image from a public URL to the bot's Misskey Drive. Args: - name (str): Character name. - rarity (int): Character rarity (e.g., 1-5). + name (str): Card name. + rarity (int): Card rarity (e.g., 1-5). image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']). Returns: - tuple[int, str]: Character ID and bot's Drive file_id. + tuple[int, str]: Card ID and bot's Drive file_id. Raises: ValueError: If inputs are invalid. @@ -32,7 +32,7 @@ def add_character( # Validate inputs if not stripped_name: - raise ValueError('Character name cannot be empty.') + raise ValueError('Card name cannot be empty.') if rarity < 1: raise ValueError('Rarity must be a positive integer.') if rarity not in RARITY_TO_WEIGHT.keys(): @@ -55,10 +55,10 @@ def add_character( from e # Insert into database - character_id = insert_character( + card_id = insert_card( stripped_name, rarity, RARITY_TO_WEIGHT[rarity], file_id ) - return character_id, file_id + return card_id, file_id diff --git a/bot/bot_app.py b/bot/bot_app.py index 825695e..ed2772b 100644 --- a/bot/bot_app.py +++ b/bot/bot_app.py @@ -12,6 +12,9 @@ if __name__ == '__main__': # Connect to DB db.connect() + # Setup default administrators + db.setup_administrators() + print('Listening for notifications...') while True: if not process_notifications(client): diff --git a/bot/config.py b/bot/config.py index 227f949..af806f9 100644 --- a/bot/config.py +++ b/bot/config.py @@ -1,5 +1,6 @@ '''Essentials for the bot to function''' import configparser +import json from os import environ, path @@ -21,7 +22,9 @@ def get_config_file() -> str: raise ConfigError(f'Could not find {config_path}') return config_path -def get_rarity_to_weight(config_section): + +def get_rarity_to_weight( + config_section: configparser.SectionProxy) -> dict[int, float]: """Parses Rarity_X keys from config and returns a {rarity: weight} dict.""" rarity_weights = {} for key, value in config_section.items(): @@ -41,10 +44,9 @@ KEY = config['credentials']['Token'] # Bot's Misskey instance URL INSTANCE = config['credentials']['Instance'].lower() -# TODO: move this to db # Fedi handles in the traditional 'user@domain.tld' style, allows these users # to use extra admin exclusive commands with the bot -ADMINS = config['application']['DefaultAdmins'] +ADMINS = json.loads(config['application']['DefaultAdmins']) # SQLite Database location DB_PATH = config['application']['DatabaseLocation'] @@ -53,4 +55,4 @@ NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) -RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha']) \ No newline at end of file +RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha']) diff --git a/bot/custom_types.py b/bot/custom_types.py index 0c23cb6..7fc7885 100644 --- a/bot/custom_types.py +++ b/bot/custom_types.py @@ -5,7 +5,7 @@ BotResponse = TypedDict('BotResponse', { 'attachment_urls': List[str] | None }) -Character = TypedDict('Character', { +Card = TypedDict('Card', { 'id': int, 'name': str, 'rarity': int, diff --git a/bot/db_utils.py b/bot/db_utils.py index 68409be..f7edd83 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -1,7 +1,7 @@ from random import choices import sqlite3 import config -from custom_types import Character +from custom_types import Card DB_PATH = config.DB_PATH CONNECTION: sqlite3.Connection @@ -18,16 +18,38 @@ def connect() -> None: CURSOR = CONNECTION.cursor() -def get_random_character() -> Character | None: - ''' Gets a random character from the database''' - CURSOR.execute('SELECT * FROM characters') - characters = CURSOR.fetchall() +def setup_administrators() -> None: + '''Creates administrator players for each handle in the config file''' + # Get default admins from config + for username in config.ADMINS: + player_id = get_player(username) + if player_id == 0: + # Create player if not exists + print(f'Creating administrator player: {username}') + CURSOR.execute( + 'INSERT INTO players (username, has_rolled, is_administrator) \ + VALUES (?, ?, ?)', + (username, False, True) + ) + else: + # Update is_administrator if exists + print(f'Granting administrator to player: {username}') + CURSOR.execute( + 'UPDATE players SET is_administrator = 1 WHERE id = ?', + (player_id,) + ) - if not characters: + +def get_random_card() -> Card | None: + ''' Gets a random card from the database''' + CURSOR.execute('SELECT * FROM cards') + cards = CURSOR.fetchall() + + if not cards: return None - weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in characters] - chosen = choices(characters, weights=weights, k=1)[0] + weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in cards] + chosen = choices(cards, weights=weights, k=1)[0] return { 'id': chosen['id'], @@ -37,73 +59,89 @@ def get_random_character() -> Character | None: 'image_url': chosen['file_id'] } + def get_player(username: str) -> int: '''Retrieve a player ID by username, or return None if not found.''' - CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,)) - user = CURSOR.fetchone() - if user: - return int(user[0]) - -def insert_player(username: str) -> int: - '''Insert a new player with default has_rolled = False and return their user ID.''' CURSOR.execute( - 'INSERT INTO users (username, has_rolled) VALUES (?, ?)', - (username, False) - ) - return CURSOR.lastrowid - -def delete_player(username: str) -> bool: - '''Permanently deletes a user and all their pulls.''' - CURSOR.execute( - 'SELECT id FROM users WHERE username = ?', + 'SELECT id FROM players WHERE username = ?', (username,) ) - user = CURSOR.fetchone() + player = CURSOR.fetchone() + if player: + return int(player[0]) + return 0 - user_id = user[0] + +def insert_player(username: str) -> int: + '''Insert a new player with default has_rolled = False and return their + player ID.''' + CURSOR.execute( + 'INSERT INTO players (username, has_rolled) VALUES (?, ?)', + (username, False) + ) + return CURSOR.lastrowid if CURSOR.lastrowid else 0 + + +def delete_player(username: str) -> bool: + '''Permanently deletes a player and all their pulls.''' + CURSOR.execute( + 'SELECT id FROM players WHERE username = ?', + (username,) + ) + player = CURSOR.fetchone() + + player_id = player[0] # Delete pulls CURSOR.execute( - 'DELETE FROM pulls WHERE user_id = ?', - (user_id,) + 'DELETE FROM pulls WHERE player_id = ?', + (player_id,) ) - # Delete user + # Delete player CURSOR.execute( - 'DELETE FROM users WHERE id = ?', - (user_id,) + 'DELETE FROM players WHERE id = ?', + (player_id,) ) return True - -def insert_character( - name: str, rarity: int, weight: float, file_id: str) -> int: - '''Inserts a character''' +def is_player_administrator(player_id: int) -> bool: CURSOR.execute( - 'INSERT INTO characters (name, rarity, weight, file_id) VALUES \ + 'SELECT is_administrator FROM PLAYERS WHERE id = ? LIMIT 1', + (player_id,) + ) + row = CURSOR.fetchone() + return row[0] if row else False + + +def insert_card( + name: str, rarity: int, weight: float, file_id: str) -> int: + '''Inserts a card''' + CURSOR.execute( + 'INSERT INTO cards (name, rarity, weight, file_id) VALUES \ (?, ?, ?, ?)', (name, rarity, weight, file_id) ) - character_id = CURSOR.lastrowid - return character_id if character_id else 0 + card_id = CURSOR.lastrowid + return card_id if card_id else 0 -def insert_pull(user_id: int, character_id: int) -> None: +def insert_pull(player_id: int, card_id: int) -> None: '''Creates a pull in the database''' CURSOR.execute( - 'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', - (user_id, character_id) + 'INSERT INTO pulls (player_id, card_id) VALUES (?, ?)', + (player_id, card_id) ) -def get_last_rolled_at(user_id: int) -> int: - '''Gets the timestamp when the user last rolled''' +def get_last_rolled_at(player_id: int) -> int: + '''Gets the timestamp when the player last rolled''' CURSOR.execute( - "SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \ + "SELECT timestamp FROM pulls WHERE player_id = ? ORDER BY timestamp \ DESC", - (user_id,)) + (player_id,)) row = CURSOR.fetchone() return row[0] if row else 0 diff --git a/bot/response.py b/bot/response.py index 3fde3ed..aa7b8b7 100644 --- a/bot/response.py +++ b/bot/response.py @@ -1,19 +1,20 @@ from datetime import datetime, timedelta, timezone from typing import TypedDict, Any, List, Dict -from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, \ - get_random_character -from add_character import add_character +from db_utils import get_player, insert_player, delete_player, insert_pull, \ + get_last_rolled_at, get_random_card, is_player_administrator +from add_card import add_card from config import GACHA_ROLL_INTERVAL from custom_types import BotResponse, ParsedNotification def do_roll(author: str) -> BotResponse: - '''Determines whether the user can roll, then pulls a random character''' + '''Determines whether the user can roll, then pulls a random card''' user_id = get_player(author) if not user_id: return { - 'message':f'{author} 🛑 You haven’t signed up yet! Use the `signup` command to start playing.', - 'attachment_urls': None + 'message': f'{author} 🛑 You haven’t signed up yet! Use the \ +`signup` command to start playing.', + 'attachment_urls': None } # Get date of user's last roll date = get_last_rolled_at(user_id) @@ -45,39 +46,43 @@ def do_roll(author: str) -> BotResponse: 'attachment_urls': None } - character = get_random_character() + card = get_random_card() - if not character: + if not card: return { 'message': f'{author} Uwaaa... something went wrong! No \ -characters found. 😿', +cards found. 😿', 'attachment_urls': None } - insert_pull(user_id, character['id']) - stars = '⭐️' * character['rarity'] + insert_pull(user_id, card['id']) + stars = '⭐️' * card['rarity'] return { 'message': f'{author} 🎲 Congrats! You rolled {stars} \ -**{character['name']}**\nShe\'s all yours now~ 💖✨', - 'attachment_urls': [character['image_url']] +**{card['name']}**\nShe\'s all yours now~ 💖✨', + 'attachment_urls': [card['image_url']] } + def do_signup(author: str) -> BotResponse: '''Registers a new user if they haven’t signed up yet.''' user_id = get_player(author) if user_id: return { - 'message':f'{author} 👀 You’re already signed up! Let the rolling begin~ 🎲', + 'message': f'{author} 👀 You’re already signed up! Let the rolling \ +begin~ 🎲', 'attachment_urls': None } new_user_id = insert_player(author) return { - 'message': f'{author} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!', + 'message': f'{author} ✅ Signed up successfully! Your gacha \ +destiny begins now... ✨ Use the roll command to start!', 'attachment_urls': None } + def is_float(val: Any) -> bool: '''Returns true if `val` can be converted to a float''' try: @@ -91,14 +96,14 @@ def do_create( author: str, arguments: List[str], note_obj: Dict[str, Any]) -> BotResponse: - '''Creates a character''' + '''Creates a card''' # Example call from bot logic image_url = note_obj.get('files', [{}])[0].get('url') \ if note_obj.get('files') else None if not image_url: return { - 'message': f'{author} You need an image to create a character, \ + 'message': f'{author} You need an image to create a card, \ dumbass.', 'attachment_urls': None } @@ -123,13 +128,13 @@ must be a decimal value between 0.0 and 1.0', 'attachment_urls': None } - character_id, file_id = add_character( + card_id, file_id = add_card( name=arguments[0], rarity=int(arguments[1]), image_url=image_url ) return { - 'message': f'{author} Added {arguments[0]}, ID {character_id}.', + 'message': f'{author} Added {arguments[0]}, ID {card_id}.', 'attachment_urls': [file_id] } @@ -137,30 +142,43 @@ must be a decimal value between 0.0 and 1.0', def do_help(author: str) -> BotResponse: '''Provides a list of commands that the bot can do.''' return { - 'message':f'{author} Here\'s what I can do:\n \ - - `roll` Pulls a random character.\ - - `create ` Creates a character using a given image.\ - - `signup` Registers your account.\ - - `delete_account` Deletes your account.\ - - `help` Shows this message', - 'attachment_urls': None + 'message': f'{author} Here\'s what I can do:\n\ +- `roll` Pulls a random card.\n\ +- `create ` Creates a card using a given image.\n\ +- `signup` Registers your account.\n\ +- `delete_account` Deletes your account.\n\ +- `help` Shows this message', + 'attachment_urls': None } - + + def delete_account(author: str) -> BotResponse: return { - 'message':f'{author} ⚠️ This will permanently delete your account and all your cards.\n' - 'If you’re sure, reply with `confirm_delete` to proceed.\n\n' + 'message': f'{author} ⚠️ This will permanently delete your account \ +and all your cards.\n' + 'If you’re sure, reply with `confirm_delete_account` to proceed.\n\n' '**There is no undo.** Your gacha luck will be lost to the void... 💀✨', 'attachment_urls': None } + def confirm_delete(author: str) -> BotResponse: - delete_player(author) return { - 'message':f'{author} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨', + 'message': f'{author} 🧼 Your account and all your cards have been \ +deleted. RIP your gacha history 🕊️✨', + 'attachment_urls': None + } + + +def do_admin_test(author: str) -> BotResponse: + player_id = get_player(author) + is_admin = is_player_administrator(player_id) + return { + 'message': f'{author} You are {"not " if not is_admin else ""}an \ +admin.', 'attachment_urls': None } @@ -171,25 +189,23 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: # Temporary response variable res: BotResponse | None = None - # TODO: Check if the user has an account author = notification['author'] - user_id = get_player(author) + player_id = get_player(author) command = notification['command'] - # Check if the user is an administrator - # user_is_administrator = user_is_administrator() # Unrestricted commands match command: + case 'roll': + res = do_roll(author) case 'signup': res = do_signup(author) case 'help': res = do_help(author) - case 'roll': - res = do_roll(author) case _: pass - if not user_id: + # Commands beyond this point require the user to have an account + if not player_id: return res # User commands @@ -200,15 +216,18 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: notification['arguments'], notification['note_obj'] ) - case 'signup': - res = do_signup(author) case 'delete_account': res = delete_account(author) - case 'confirm_delete': + case 'confirm_delete_account': res = confirm_delete(author) + case 'admin_test': + res = do_admin_test(author) case _: pass - # if not user_is_administrator: - return res + + # Commands beyond this point require the user to be an administrator + if not is_player_administrator(player_id): + return res # Administrator commands go here + return res diff --git a/example_config.ini b/example_config.ini index af7e0f2..25402e4 100644 --- a/example_config.ini +++ b/example_config.ini @@ -2,7 +2,7 @@ [application] ; Comma separated list of fedi handles for any administrator users ; More can be added through the application -DefaultAdmins = ['admin@example.tld'] +DefaultAdmins = ["@localadmin", "remoteadmin@example.tld"] ; SQLite Database location DatabaseLocation = ./gacha_game.db diff --git a/migrations/0003_rename_tables.sql b/migrations/0003_rename_tables.sql new file mode 100644 index 0000000..a3ba3a7 --- /dev/null +++ b/migrations/0003_rename_tables.sql @@ -0,0 +1,4 @@ +ALTER TABLE users RENAME TO players; +ALTER TABLE characters RENAME TO cards; +ALTER TABLE pulls RENAME user_id TO player_id; +ALTER TABLE pulls RENAME character_id TO card_id; diff --git a/migrations/0004_add_administrators.sql b/migrations/0004_add_administrators.sql new file mode 100644 index 0000000..7503e21 --- /dev/null +++ b/migrations/0004_add_administrators.sql @@ -0,0 +1 @@ +ALTER TABLE players ADD COLUMN is_administrator BOOLEAN NOT NULL DEFAULT 0; From 8ae6e25b95bca5643055d72d336c1b66c58de426 Mon Sep 17 00:00:00 2001 From: VD15 Date: Sat, 7 Jun 2025 20:40:27 +0100 Subject: [PATCH 2/8] Add instance whitelist --- bot/config.py | 2 ++ bot/db_utils.py | 29 ++++++++++++++++ bot/notification.py | 6 ++-- bot/response.py | 81 +++++++++++++++++++++++++++++++++------------ example_config.ini | 5 ++- 5 files changed, 98 insertions(+), 25 deletions(-) diff --git a/bot/config.py b/bot/config.py index af806f9..9737608 100644 --- a/bot/config.py +++ b/bot/config.py @@ -49,6 +49,8 @@ INSTANCE = config['credentials']['Instance'].lower() ADMINS = json.loads(config['application']['DefaultAdmins']) # SQLite Database location DB_PATH = config['application']['DatabaseLocation'] +# Whether to enable the instance whitelist +USE_WHITELIST = config['application']['UseWhitelist'] NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) diff --git a/bot/db_utils.py b/bot/db_utils.py index f7edd83..802e3f0 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -146,6 +146,35 @@ DESC", return row[0] if row else 0 +def add_to_whitelist(instance: str) -> bool: + '''Adds an instance to the whitelist, returns false if instance was already + present''' + try: + CURSOR.execute( + 'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)) + return True + except sqlite3.IntegrityError: + return False + + +def remove_from_whitelist(instance: str) -> bool: + '''Removes an instance to the whitelist, returns false if instance was not + present''' + CURSOR.execute( + 'DELETE FROM instance_whitelist WHERE tld = ?', (instance,)) + return CURSOR.rowcount > 0 + + +def is_whitelisted(instance: str) -> bool: + '''Checks whether an instance is in the whitelist''' + if instance == 'local': + return True + CURSOR.execute( + 'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,)) + row = CURSOR.fetchone() + return row is not None + + def get_config(key: str) -> str: '''Reads the value for a specified config key from the db''' CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) diff --git a/bot/notification.py b/bot/notification.py index 9427dbf..faac1d3 100644 --- a/bot/notification.py +++ b/bot/notification.py @@ -4,9 +4,9 @@ from typing import Dict, Any import misskey from misskey.exceptions import MisskeyAPIException -from config import NOTIFICATION_BATCH_SIZE +from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST from parsing import parse_notification -from db_utils import get_config, set_config +from db_utils import get_config, set_config, is_whitelisted from response import generate_response from custom_types import BotResponse @@ -24,7 +24,7 @@ def process_notification( host = user.get('host') # None if local user instance = host if host else 'local' - if not (instance in WHITELISTED_INSTANCES or instance == 'local'): + if USE_WHITELIST and not is_whitelisted(instance): print(f'⚠️ Blocked notification from untrusted instance: {instance}') return diff --git a/bot/response.py b/bot/response.py index aa7b8b7..16bd05e 100644 --- a/bot/response.py +++ b/bot/response.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta, timezone from typing import TypedDict, Any, List, Dict -from db_utils import get_player, insert_player, delete_player, insert_pull, \ - get_last_rolled_at, get_random_card, is_player_administrator +import db_utils as db from add_card import add_card from config import GACHA_ROLL_INTERVAL from custom_types import BotResponse, ParsedNotification @@ -9,7 +8,7 @@ from custom_types import BotResponse, ParsedNotification def do_roll(author: str) -> BotResponse: '''Determines whether the user can roll, then pulls a random card''' - user_id = get_player(author) + user_id = db.get_player(author) if not user_id: return { 'message': f'{author} 🛑 You haven’t signed up yet! Use the \ @@ -17,7 +16,7 @@ def do_roll(author: str) -> BotResponse: 'attachment_urls': None } # Get date of user's last roll - date = get_last_rolled_at(user_id) + date = db.get_last_rolled_at(user_id) # No date means it's users first roll if date: @@ -46,7 +45,7 @@ def do_roll(author: str) -> BotResponse: 'attachment_urls': None } - card = get_random_card() + card = db.get_random_card() if not card: return { @@ -55,7 +54,7 @@ cards found. 😿', 'attachment_urls': None } - insert_pull(user_id, card['id']) + db.insert_pull(user_id, card['id']) stars = '⭐️' * card['rarity'] return { 'message': f'{author} 🎲 Congrats! You rolled {stars} \ @@ -66,7 +65,7 @@ cards found. 😿', def do_signup(author: str) -> BotResponse: '''Registers a new user if they haven’t signed up yet.''' - user_id = get_player(author) + user_id = db.get_player(author) if user_id: return { @@ -75,7 +74,7 @@ begin~ 🎲', 'attachment_urls': None } - new_user_id = insert_player(author) + new_user_id = db.insert_player(author) return { 'message': f'{author} ✅ Signed up successfully! Your gacha \ destiny begins now... ✨ Use the roll command to start!', @@ -164,7 +163,7 @@ and all your cards.\n' def confirm_delete(author: str) -> BotResponse: - delete_player(author) + db.delete_player(author) return { 'message': f'{author} 🧼 Your account and all your cards have been \ @@ -173,14 +172,43 @@ deleted. RIP your gacha history 🕊️✨', } -def do_admin_test(author: str) -> BotResponse: - player_id = get_player(author) - is_admin = is_player_administrator(player_id) - return { - 'message': f'{author} You are {"not " if not is_admin else ""}an \ -admin.', - 'attachment_urls': None - } +def do_whitelist(author: str, args: list[str]) -> BotResponse: + if len(args) == 0: + return { + 'message': f'{author} Please specify an instance to whitelist', + 'attachment_urls': None + } + + if db.add_to_whitelist(args[0]): + return { + 'message': f'{author} Whitelisted instance: {args[0]}', + 'attachment_urls': None + } + else: + return { + 'message': f'{author} Instance already whitelisted: {args[0]}', + 'attachment_urls': None + } + + +def do_unwhitelist(author: str, args: list[str]) -> BotResponse: + if len(args) == 0: + return { + 'message': f'{author} Please specify an instance to remove from \ +the whitelist', + 'attachment_urls': None + } + + if db.remove_from_whitelist(args[0]): + return { + 'message': f'{author} Unwhitelisted instance: {args[0]}', + 'attachment_urls': None + } + else: + return { + 'message': f'{author} Instance not whitelisted: {args[0]}', + 'attachment_urls': None + } def generate_response(notification: ParsedNotification) -> BotResponse | None: @@ -190,7 +218,7 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: # Temporary response variable res: BotResponse | None = None author = notification['author'] - player_id = get_player(author) + player_id = db.get_player(author) command = notification['command'] # Unrestricted commands @@ -220,14 +248,25 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: res = delete_account(author) case 'confirm_delete_account': res = confirm_delete(author) - case 'admin_test': - res = do_admin_test(author) case _: pass # Commands beyond this point require the user to be an administrator - if not is_player_administrator(player_id): + if not db.is_player_administrator(player_id): return res + # Admin commands + match command: + case 'whitelist': + res = do_whitelist(author, notification['arguments']) + case 'unwhitelist': + res = do_unwhitelist(author, notification['arguments']) + # case 'ban': + # res = do_ban(author, notification['arguments']) + # case 'unban': + # res = do_unban(author, notification['arguments']) + case _: + pass + # Administrator commands go here return res diff --git a/example_config.ini b/example_config.ini index 25402e4..0ea2422 100644 --- a/example_config.ini +++ b/example_config.ini @@ -2,9 +2,12 @@ [application] ; Comma separated list of fedi handles for any administrator users ; More can be added through the application -DefaultAdmins = ["@localadmin", "remoteadmin@example.tld"] +DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"] ; SQLite Database location DatabaseLocation = ./gacha_game.db +; Whether to lmit access to the bot via an instance whitelist +; The whitelist can be adjusted via the application +UseWhitelist = False [gacha] ; Number of seconds players have to wait between rolls From 97b30c79f44ef28dc72f945a1bfbf3a555c673d6 Mon Sep 17 00:00:00 2001 From: VD15 Date: Sat, 7 Jun 2025 23:18:39 +0100 Subject: [PATCH 3/8] Add admin commands --- bot/db_utils.py | 42 ++++++++++++++++++++--- bot/notification.py | 7 +++- bot/parsing.py | 18 +++++----- bot/response.py | 56 ++++++++++++++++++++++++++++--- migrations/0005_add_whitelist.sql | 7 ++++ 5 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 migrations/0005_add_whitelist.sql diff --git a/bot/db_utils.py b/bot/db_utils.py index 802e3f0..94e915e 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -90,6 +90,9 @@ def delete_player(username: str) -> bool: ) player = CURSOR.fetchone() + if not player: + return False + player_id = player[0] # Delete pulls @@ -107,10 +110,40 @@ def delete_player(username: str) -> bool: return True -def is_player_administrator(player_id: int) -> bool: +def ban_player(username: str) -> bool: + '''Adds a player to the ban list.''' + try: + CURSOR.execute( + 'INSERT INTO banned_players (handle) VALUES (?)', + (username,) + ) + return True + except sqlite3.IntegrityError: + return False + + +def unban_player(username: str) -> bool: + '''Removes a player from the ban list.''' CURSOR.execute( - 'SELECT is_administrator FROM PLAYERS WHERE id = ? LIMIT 1', - (player_id,) + 'DELETE FROM banned_players WHERE handle = ?', + (username,) + ) + return CURSOR.rowcount > 0 + + +def is_player_banned(username: str) -> bool: + CURSOR.execute( + 'SELECT * FROM banned_players WHERE handle = ?', + (username,) + ) + row = CURSOR.fetchone() + return row is not None + + +def is_player_administrator(username: str) -> bool: + CURSOR.execute( + 'SELECT is_administrator FROM players WHERE username = ? LIMIT 1', + (username,) ) row = CURSOR.fetchone() return row[0] if row else False @@ -151,7 +184,8 @@ def add_to_whitelist(instance: str) -> bool: present''' try: CURSOR.execute( - 'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)) + 'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,) + ) return True except sqlite3.IntegrityError: return False diff --git a/bot/notification.py b/bot/notification.py index faac1d3..deb8ec6 100644 --- a/bot/notification.py +++ b/bot/notification.py @@ -6,7 +6,7 @@ from misskey.exceptions import MisskeyAPIException from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST from parsing import parse_notification -from db_utils import get_config, set_config, is_whitelisted +from db_utils import get_config, set_config, is_whitelisted, is_player_banned from response import generate_response from custom_types import BotResponse @@ -44,6 +44,11 @@ def process_notification( if not parsed_notification: return + author = parsed_notification['author'] + if is_player_banned(author): + print(f'⚠️ Blocked notification from banned player: {author}') + return + # Get the note Id to reply to note_id = notification.get('note', {}).get('id') diff --git a/bot/parsing.py b/bot/parsing.py index eece077..e1e8583 100644 --- a/bot/parsing.py +++ b/bot/parsing.py @@ -24,6 +24,8 @@ def parse_notification( note_id = note_obj.get("id") note = note_text.strip().lower() if note_text else "" + # Split words into tokens + parts = note.split() # Check for both short and fully-qualified name mentions username_variants = [ @@ -31,18 +33,16 @@ def parse_notification( f'@{config.USER.split("@")[1]}' ] - # Make sure the notification text explicitly mentions the bot - if not any(variant in note for variant in username_variants): + # Notifs must consist of the initial mention and at least one other token + if len(parts) <= 1: return None - # Find command and arguments after the mention - # Removes all mentions - # regex = mentions that start with @ and may contain @domain - cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip() - parts = cleaned_text.split() + # Make sure the first token is a mention to the bot + if not parts[0] in username_variants: + return None - command = parts[0].lower() if parts else None - arguments = parts[1:] if len(parts) > 1 else [] + command = parts[1].lower() + arguments = parts[2:] if len(parts) > 2 else [] return { 'author': full_user, diff --git a/bot/response.py b/bot/response.py index 16bd05e..f21d7b0 100644 --- a/bot/response.py +++ b/bot/response.py @@ -211,6 +211,52 @@ the whitelist', } +def do_ban(author: str, args: list[str]) -> BotResponse: + if len(args) == 0: + return { + 'message': f'{author} Please specify a user to ban', + 'attachment_urls': None + } + + if db.is_player_administrator(args[0]): + return { + 'message': f'{author} Cannot ban other administrators.', + 'attachment_urls': None + } + + if db.ban_player(args[0]): + # Delete banned player's account + db.delete_player(args[0]) + return { + 'message': f'{author} 🔨 **BONK!** Get banned, {args[0]}!', + 'attachment_urls': None + } + else: + return { + 'message': f'{author} Player is already banned: {args[0]}', + 'attachment_urls': None + } + + +def do_unban(author: str, args: list[str]) -> BotResponse: + if len(args) == 0: + return { + 'message': f'{author} Please specify a user to unban', + 'attachment_urls': None + } + + if db.unban_player(args[0]): + return { + 'message': f'{author} Player unbanned: {args[0]}!', + 'attachment_urls': None + } + else: + return { + 'message': f'{author} Player was not banned: {args[0]}', + 'attachment_urls': None + } + + def generate_response(notification: ParsedNotification) -> BotResponse | None: '''Given a command with arguments, processes the game state and returns a response''' @@ -252,7 +298,7 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: pass # Commands beyond this point require the user to be an administrator - if not db.is_player_administrator(player_id): + if not db.is_player_administrator(author): return res # Admin commands @@ -261,10 +307,10 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: res = do_whitelist(author, notification['arguments']) case 'unwhitelist': res = do_unwhitelist(author, notification['arguments']) - # case 'ban': - # res = do_ban(author, notification['arguments']) - # case 'unban': - # res = do_unban(author, notification['arguments']) + case 'ban': + res = do_ban(author, notification['arguments']) + case 'unban': + res = do_unban(author, notification['arguments']) case _: pass diff --git a/migrations/0005_add_whitelist.sql b/migrations/0005_add_whitelist.sql new file mode 100644 index 0000000..d24f2e3 --- /dev/null +++ b/migrations/0005_add_whitelist.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS instance_whitelist ( + tld TEXT UNIQUE PRIMARY KEY +); + +CREATE TABLE IF NOT EXISTS banned_players ( + handle TEXT UNIQUE PRIMARY KEY +); From 4b1b8a53c7d691bfd2eda1d76d038298173b6ecf Mon Sep 17 00:00:00 2001 From: VD15 Date: Sat, 7 Jun 2025 23:25:48 +0100 Subject: [PATCH 4/8] Fix nonstandard apostrophe --- bot/response.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bot/response.py b/bot/response.py index f21d7b0..b49de1e 100644 --- a/bot/response.py +++ b/bot/response.py @@ -155,7 +155,7 @@ def delete_account(author: str) -> BotResponse: return { 'message': f'{author} ⚠️ This will permanently delete your account \ and all your cards.\n' - 'If you’re sure, reply with `confirm_delete_account` to proceed.\n\n' + 'If you\'re sure, reply with `confirm_delete_account` to proceed.\n\n' '**There is no undo.** Your gacha luck will be lost to the void... 💀✨', 'attachment_urls': None From 59915be66170ffcf5596a9d30d384c7d7e899646 Mon Sep 17 00:00:00 2001 From: VD15 Date: Sat, 7 Jun 2025 23:59:07 +0100 Subject: [PATCH 5/8] Enable WAL for DB connections --- .gitignore | 4 ++-- bot/db_utils.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index e5543ec..b0a1050 100644 --- a/.gitignore +++ b/.gitignore @@ -183,7 +183,7 @@ cython_debug/ # Custom stuff gacha_game*.db -gacha_game*.db.* +gacha_game*.db* config*.ini -.idea \ No newline at end of file +.idea diff --git a/bot/db_utils.py b/bot/db_utils.py index 94e915e..72b431f 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -16,6 +16,7 @@ def connect() -> None: CONNECTION = sqlite3.connect(DB_PATH, autocommit=True) CONNECTION.row_factory = sqlite3.Row CURSOR = CONNECTION.cursor() + CURSOR.execute('pragma journal_mode=wal') def setup_administrators() -> None: From 1368c907a222d02387c23d956b50005501315a65 Mon Sep 17 00:00:00 2001 From: VD15 Date: Sun, 8 Jun 2025 00:09:02 +0100 Subject: [PATCH 6/8] Revert "Enable WAL for DB connections" This reverts commit 59915be66170ffcf5596a9d30d384c7d7e899646. --- .gitignore | 4 ++-- bot/db_utils.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index b0a1050..e5543ec 100644 --- a/.gitignore +++ b/.gitignore @@ -183,7 +183,7 @@ cython_debug/ # Custom stuff gacha_game*.db -gacha_game*.db* +gacha_game*.db.* config*.ini -.idea +.idea \ No newline at end of file diff --git a/bot/db_utils.py b/bot/db_utils.py index 72b431f..94e915e 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -16,7 +16,6 @@ def connect() -> None: CONNECTION = sqlite3.connect(DB_PATH, autocommit=True) CONNECTION.row_factory = sqlite3.Row CURSOR = CONNECTION.cursor() - CURSOR.execute('pragma journal_mode=wal') def setup_administrators() -> None: From bd287b096ab6de4e3947e611b9517cd6767abdaa Mon Sep 17 00:00:00 2001 From: Moon Date: Fri, 13 Jun 2025 18:47:54 +0900 Subject: [PATCH 7/8] rm reference to weight column. --- bot/add_card.py | 1 - bot/db_utils.py | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/bot/add_card.py b/bot/add_card.py index fcaab43..16834a9 100644 --- a/bot/add_card.py +++ b/bot/add_card.py @@ -58,7 +58,6 @@ def add_card( card_id = insert_card( stripped_name, rarity, - RARITY_TO_WEIGHT[rarity], file_id ) return card_id, file_id diff --git a/bot/db_utils.py b/bot/db_utils.py index 94e915e..bdbd0d8 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -150,12 +150,11 @@ def is_player_administrator(username: str) -> bool: def insert_card( - name: str, rarity: int, weight: float, file_id: str) -> int: + name: str, rarity: int, file_id: str) -> int: '''Inserts a card''' CURSOR.execute( - 'INSERT INTO cards (name, rarity, weight, file_id) VALUES \ -(?, ?, ?, ?)', - (name, rarity, weight, file_id) + 'INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?)', + (name, rarity, file_id) ) card_id = CURSOR.lastrowid return card_id if card_id else 0 From 77d4fa13bb44ca9b5322eaf8b12bb71cfb9a6cd8 Mon Sep 17 00:00:00 2001 From: Moon Date: Fri, 13 Jun 2025 19:31:23 +0900 Subject: [PATCH 8/8] rm validation of removed weight --- bot/response.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/bot/response.py b/bot/response.py index b49de1e..e661bbf 100644 --- a/bot/response.py +++ b/bot/response.py @@ -120,12 +120,6 @@ in order: name, rarity', be a number between 1 and 5', 'attachment_urls': None } - if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): - return { - 'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \ -must be a decimal value between 0.0 and 1.0', - 'attachment_urls': None - } card_id, file_id = add_card( name=arguments[0],