diff --git a/bot/config.py b/bot/config.py index af806f9..9737608 100644 --- a/bot/config.py +++ b/bot/config.py @@ -49,6 +49,8 @@ INSTANCE = config['credentials']['Instance'].lower() ADMINS = json.loads(config['application']['DefaultAdmins']) # SQLite Database location DB_PATH = config['application']['DatabaseLocation'] +# Whether to enable the instance whitelist +USE_WHITELIST = config['application']['UseWhitelist'] NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) diff --git a/bot/db_utils.py b/bot/db_utils.py index f7edd83..802e3f0 100644 --- a/bot/db_utils.py +++ b/bot/db_utils.py @@ -146,6 +146,35 @@ DESC", return row[0] if row else 0 +def add_to_whitelist(instance: str) -> bool: + '''Adds an instance to the whitelist, returns false if instance was already + present''' + try: + CURSOR.execute( + 'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)) + return True + except sqlite3.IntegrityError: + return False + + +def remove_from_whitelist(instance: str) -> bool: + '''Removes an instance to the whitelist, returns false if instance was not + present''' + CURSOR.execute( + 'DELETE FROM instance_whitelist WHERE tld = ?', (instance,)) + return CURSOR.rowcount > 0 + + +def is_whitelisted(instance: str) -> bool: + '''Checks whether an instance is in the whitelist''' + if instance == 'local': + return True + CURSOR.execute( + 'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,)) + row = CURSOR.fetchone() + return row is not None + + def get_config(key: str) -> str: '''Reads the value for a specified config key from the db''' CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) diff --git a/bot/notification.py b/bot/notification.py index 9427dbf..faac1d3 100644 --- a/bot/notification.py +++ b/bot/notification.py @@ -4,9 +4,9 @@ from typing import Dict, Any import misskey from misskey.exceptions import MisskeyAPIException -from config import NOTIFICATION_BATCH_SIZE +from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST from parsing import parse_notification -from db_utils import get_config, set_config +from db_utils import get_config, set_config, is_whitelisted from response import generate_response from custom_types import BotResponse @@ -24,7 +24,7 @@ def process_notification( host = user.get('host') # None if local user instance = host if host else 'local' - if not (instance in WHITELISTED_INSTANCES or instance == 'local'): + if USE_WHITELIST and not is_whitelisted(instance): print(f'⚠️ Blocked notification from untrusted instance: {instance}') return diff --git a/bot/response.py b/bot/response.py index aa7b8b7..16bd05e 100644 --- a/bot/response.py +++ b/bot/response.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta, timezone from typing import TypedDict, Any, List, Dict -from db_utils import get_player, insert_player, delete_player, insert_pull, \ - get_last_rolled_at, get_random_card, is_player_administrator +import db_utils as db from add_card import add_card from config import GACHA_ROLL_INTERVAL from custom_types import BotResponse, ParsedNotification @@ -9,7 +8,7 @@ from custom_types import BotResponse, ParsedNotification def do_roll(author: str) -> BotResponse: '''Determines whether the user can roll, then pulls a random card''' - user_id = get_player(author) + user_id = db.get_player(author) if not user_id: return { 'message': f'{author} 🛑 You haven’t signed up yet! Use the \ @@ -17,7 +16,7 @@ def do_roll(author: str) -> BotResponse: 'attachment_urls': None } # Get date of user's last roll - date = get_last_rolled_at(user_id) + date = db.get_last_rolled_at(user_id) # No date means it's users first roll if date: @@ -46,7 +45,7 @@ def do_roll(author: str) -> BotResponse: 'attachment_urls': None } - card = get_random_card() + card = db.get_random_card() if not card: return { @@ -55,7 +54,7 @@ cards found. 😿', 'attachment_urls': None } - insert_pull(user_id, card['id']) + db.insert_pull(user_id, card['id']) stars = '⭐️' * card['rarity'] return { 'message': f'{author} 🎲 Congrats! You rolled {stars} \ @@ -66,7 +65,7 @@ cards found. 😿', def do_signup(author: str) -> BotResponse: '''Registers a new user if they haven’t signed up yet.''' - user_id = get_player(author) + user_id = db.get_player(author) if user_id: return { @@ -75,7 +74,7 @@ begin~ 🎲', 'attachment_urls': None } - new_user_id = insert_player(author) + new_user_id = db.insert_player(author) return { 'message': f'{author} ✅ Signed up successfully! Your gacha \ destiny begins now... ✨ Use the roll command to start!', @@ -164,7 +163,7 @@ and all your cards.\n' def confirm_delete(author: str) -> BotResponse: - delete_player(author) + db.delete_player(author) return { 'message': f'{author} 🧼 Your account and all your cards have been \ @@ -173,14 +172,43 @@ deleted. RIP your gacha history 🕊️✨', } -def do_admin_test(author: str) -> BotResponse: - player_id = get_player(author) - is_admin = is_player_administrator(player_id) - return { - 'message': f'{author} You are {"not " if not is_admin else ""}an \ -admin.', - 'attachment_urls': None - } +def do_whitelist(author: str, args: list[str]) -> BotResponse: + if len(args) == 0: + return { + 'message': f'{author} Please specify an instance to whitelist', + 'attachment_urls': None + } + + if db.add_to_whitelist(args[0]): + return { + 'message': f'{author} Whitelisted instance: {args[0]}', + 'attachment_urls': None + } + else: + return { + 'message': f'{author} Instance already whitelisted: {args[0]}', + 'attachment_urls': None + } + + +def do_unwhitelist(author: str, args: list[str]) -> BotResponse: + if len(args) == 0: + return { + 'message': f'{author} Please specify an instance to remove from \ +the whitelist', + 'attachment_urls': None + } + + if db.remove_from_whitelist(args[0]): + return { + 'message': f'{author} Unwhitelisted instance: {args[0]}', + 'attachment_urls': None + } + else: + return { + 'message': f'{author} Instance not whitelisted: {args[0]}', + 'attachment_urls': None + } def generate_response(notification: ParsedNotification) -> BotResponse | None: @@ -190,7 +218,7 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: # Temporary response variable res: BotResponse | None = None author = notification['author'] - player_id = get_player(author) + player_id = db.get_player(author) command = notification['command'] # Unrestricted commands @@ -220,14 +248,25 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: res = delete_account(author) case 'confirm_delete_account': res = confirm_delete(author) - case 'admin_test': - res = do_admin_test(author) case _: pass # Commands beyond this point require the user to be an administrator - if not is_player_administrator(player_id): + if not db.is_player_administrator(player_id): return res + # Admin commands + match command: + case 'whitelist': + res = do_whitelist(author, notification['arguments']) + case 'unwhitelist': + res = do_unwhitelist(author, notification['arguments']) + # case 'ban': + # res = do_ban(author, notification['arguments']) + # case 'unban': + # res = do_unban(author, notification['arguments']) + case _: + pass + # Administrator commands go here return res diff --git a/example_config.ini b/example_config.ini index 25402e4..0ea2422 100644 --- a/example_config.ini +++ b/example_config.ini @@ -2,9 +2,12 @@ [application] ; Comma separated list of fedi handles for any administrator users ; More can be added through the application -DefaultAdmins = ["@localadmin", "remoteadmin@example.tld"] +DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"] ; SQLite Database location DatabaseLocation = ./gacha_game.db +; Whether to lmit access to the bot via an instance whitelist +; The whitelist can be adjusted via the application +UseWhitelist = False [gacha] ; Number of seconds players have to wait between rolls