Version 1.1 #55
					 12 changed files with 352 additions and 120 deletions
				
			
		|  | @ -1,27 +1,27 @@ | ||||||
| import requests | import requests | ||||||
| from misskey.exceptions import MisskeyAPIException | from misskey.exceptions import MisskeyAPIException | ||||||
| from client import client_connection | from client import client_connection | ||||||
| from db_utils import insert_character | from db_utils import insert_card | ||||||
| from custom_types import Character | from custom_types import Card | ||||||
| from config import RARITY_TO_WEIGHT | from config import RARITY_TO_WEIGHT | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def add_character( | def add_card( | ||||||
|         name: str, |         name: str, | ||||||
|         rarity: int, |         rarity: int, | ||||||
|         image_url: str) -> tuple[int, str]: |         image_url: str) -> tuple[int, str]: | ||||||
|     ''' |     ''' | ||||||
|     Adds a character to the database, uploading the image from a public URL to |     Adds a card to the database, uploading the image from a public URL to | ||||||
|     the bot's Misskey Drive. |     the bot's Misskey Drive. | ||||||
| 
 | 
 | ||||||
|     Args: |     Args: | ||||||
|         name (str): Character name. |         name (str): Card name. | ||||||
|         rarity (int): Character rarity (e.g., 1-5). |         rarity (int): Card rarity (e.g., 1-5). | ||||||
|         image_url (str): Public URL of the image from the post (e.g., from |         image_url (str): Public URL of the image from the post (e.g., from | ||||||
|             note['files'][i]['url']). |             note['files'][i]['url']). | ||||||
| 
 | 
 | ||||||
|     Returns: |     Returns: | ||||||
|         tuple[int, str]: Character ID and bot's Drive file_id. |         tuple[int, str]: Card ID and bot's Drive file_id. | ||||||
| 
 | 
 | ||||||
|     Raises: |     Raises: | ||||||
|         ValueError: If inputs are invalid. |         ValueError: If inputs are invalid. | ||||||
|  | @ -32,7 +32,7 @@ def add_character( | ||||||
| 
 | 
 | ||||||
|     # Validate inputs |     # Validate inputs | ||||||
|     if not stripped_name: |     if not stripped_name: | ||||||
|         raise ValueError('Character name cannot be empty.') |         raise ValueError('Card name cannot be empty.') | ||||||
|     if rarity < 1: |     if rarity < 1: | ||||||
|         raise ValueError('Rarity must be a positive integer.') |         raise ValueError('Rarity must be a positive integer.') | ||||||
|     if rarity not in RARITY_TO_WEIGHT.keys(): |     if rarity not in RARITY_TO_WEIGHT.keys(): | ||||||
|  | @ -55,10 +55,10 @@ def add_character( | ||||||
|                 from e |                 from e | ||||||
| 
 | 
 | ||||||
|     # Insert into database |     # Insert into database | ||||||
|     character_id = insert_character( |     card_id = insert_card( | ||||||
|             stripped_name, |             stripped_name, | ||||||
|             rarity, |             rarity, | ||||||
|             RARITY_TO_WEIGHT[rarity], |             RARITY_TO_WEIGHT[rarity], | ||||||
|             file_id |             file_id | ||||||
|     ) |     ) | ||||||
|     return character_id, file_id |     return card_id, file_id | ||||||
|  | @ -12,6 +12,9 @@ if __name__ == '__main__': | ||||||
|     # Connect to DB |     # Connect to DB | ||||||
|     db.connect() |     db.connect() | ||||||
| 
 | 
 | ||||||
|  |     # Setup default administrators | ||||||
|  |     db.setup_administrators() | ||||||
|  | 
 | ||||||
|     print('Listening for notifications...') |     print('Listening for notifications...') | ||||||
|     while True: |     while True: | ||||||
|         if not process_notifications(client): |         if not process_notifications(client): | ||||||
|  |  | ||||||
|  | @ -1,5 +1,6 @@ | ||||||
| '''Essentials for the bot to function''' | '''Essentials for the bot to function''' | ||||||
| import configparser | import configparser | ||||||
|  | import json | ||||||
| from os import environ, path | from os import environ, path | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -21,7 +22,9 @@ def get_config_file() -> str: | ||||||
|         raise ConfigError(f'Could not find {config_path}') |         raise ConfigError(f'Could not find {config_path}') | ||||||
|     return config_path |     return config_path | ||||||
| 
 | 
 | ||||||
| def get_rarity_to_weight(config_section): | 
 | ||||||
|  | def get_rarity_to_weight( | ||||||
|  |         config_section: configparser.SectionProxy) -> dict[int, float]: | ||||||
|     """Parses Rarity_X keys from config and returns a {rarity: weight} dict.""" |     """Parses Rarity_X keys from config and returns a {rarity: weight} dict.""" | ||||||
|     rarity_weights = {} |     rarity_weights = {} | ||||||
|     for key, value in config_section.items(): |     for key, value in config_section.items(): | ||||||
|  | @ -41,12 +44,13 @@ KEY = config['credentials']['Token'] | ||||||
| # Bot's Misskey instance URL | # Bot's Misskey instance URL | ||||||
| INSTANCE = config['credentials']['Instance'].lower() | INSTANCE = config['credentials']['Instance'].lower() | ||||||
| 
 | 
 | ||||||
| # TODO: move this to db |  | ||||||
| # Fedi handles in the traditional 'user@domain.tld' style, allows these users | # Fedi handles in the traditional 'user@domain.tld' style, allows these users | ||||||
| # to use extra admin exclusive commands with the bot | # to use extra admin exclusive commands with the bot | ||||||
| ADMINS = config['application']['DefaultAdmins'] | ADMINS = json.loads(config['application']['DefaultAdmins']) | ||||||
| # SQLite Database location | # SQLite Database location | ||||||
| DB_PATH = config['application']['DatabaseLocation'] | DB_PATH = config['application']['DatabaseLocation'] | ||||||
|  | # Whether to enable the instance whitelist | ||||||
|  | USE_WHITELIST = config['application']['UseWhitelist'] | ||||||
| 
 | 
 | ||||||
| NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) | NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) | ||||||
| NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) | NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) | ||||||
|  |  | ||||||
|  | @ -5,7 +5,7 @@ BotResponse = TypedDict('BotResponse', { | ||||||
|     'attachment_urls': List[str] | None |     'attachment_urls': List[str] | None | ||||||
| }) | }) | ||||||
| 
 | 
 | ||||||
| Character = TypedDict('Character', { | Card = TypedDict('Card', { | ||||||
|     'id': int, |     'id': int, | ||||||
|     'name': str, |     'name': str, | ||||||
|     'rarity': int, |     'rarity': int, | ||||||
|  |  | ||||||
							
								
								
									
										189
									
								
								bot/db_utils.py
									
										
									
									
									
								
							
							
						
						
									
										189
									
								
								bot/db_utils.py
									
										
									
									
									
								
							|  | @ -1,7 +1,7 @@ | ||||||
| from random import choices | from random import choices | ||||||
| import sqlite3 | import sqlite3 | ||||||
| import config | import config | ||||||
| from custom_types import Character | from custom_types import Card | ||||||
| 
 | 
 | ||||||
| DB_PATH = config.DB_PATH | DB_PATH = config.DB_PATH | ||||||
| CONNECTION: sqlite3.Connection | CONNECTION: sqlite3.Connection | ||||||
|  | @ -18,16 +18,38 @@ def connect() -> None: | ||||||
|     CURSOR = CONNECTION.cursor() |     CURSOR = CONNECTION.cursor() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_random_character() -> Character | None: | def setup_administrators() -> None: | ||||||
|     ''' Gets a random character from the database''' |     '''Creates administrator players for each handle in the config file''' | ||||||
|     CURSOR.execute('SELECT * FROM characters') |     # Get default admins from config | ||||||
|     characters = CURSOR.fetchall() |     for username in config.ADMINS: | ||||||
|  |         player_id = get_player(username) | ||||||
|  |         if player_id == 0: | ||||||
|  |             # Create player if not exists | ||||||
|  |             print(f'Creating administrator player: {username}') | ||||||
|  |             CURSOR.execute( | ||||||
|  |                 'INSERT INTO players (username, has_rolled, is_administrator) \ | ||||||
|  |                     VALUES (?, ?, ?)', | ||||||
|  |                 (username, False, True) | ||||||
|  |             ) | ||||||
|  |         else: | ||||||
|  |             # Update is_administrator if exists | ||||||
|  |             print(f'Granting administrator to player: {username}') | ||||||
|  |             CURSOR.execute( | ||||||
|  |                 'UPDATE players SET is_administrator = 1 WHERE id = ?', | ||||||
|  |                 (player_id,) | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|     if not characters: | 
 | ||||||
|  | def get_random_card() -> Card | None: | ||||||
|  |     ''' Gets a random card from the database''' | ||||||
|  |     CURSOR.execute('SELECT * FROM cards') | ||||||
|  |     cards = CURSOR.fetchall() | ||||||
|  | 
 | ||||||
|  |     if not cards: | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|     weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in characters] |     weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in cards] | ||||||
|     chosen = choices(characters, weights=weights, k=1)[0] |     chosen = choices(cards, weights=weights, k=1)[0] | ||||||
| 
 | 
 | ||||||
|     return { |     return { | ||||||
|         'id': chosen['id'], |         'id': chosen['id'], | ||||||
|  | @ -37,77 +59,156 @@ def get_random_character() -> Character | None: | ||||||
|         'image_url': chosen['file_id'] |         'image_url': chosen['file_id'] | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def get_player(username: str) -> int: | def get_player(username: str) -> int: | ||||||
|     '''Retrieve a player ID by username, or return None if not found.''' |     '''Retrieve a player ID by username, or return None if not found.''' | ||||||
|     CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,)) |  | ||||||
|     user = CURSOR.fetchone() |  | ||||||
|     if user: |  | ||||||
|         return int(user[0]) |  | ||||||
| 
 |  | ||||||
| def insert_player(username: str) -> int: |  | ||||||
|     '''Insert a new player with default has_rolled = False and return their user ID.''' |  | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'INSERT INTO users (username, has_rolled) VALUES (?, ?)', |         'SELECT id FROM players WHERE username = ?', | ||||||
|         (username, False) |  | ||||||
|     ) |  | ||||||
|     return CURSOR.lastrowid |  | ||||||
| 
 |  | ||||||
| def delete_player(username: str) -> bool: |  | ||||||
|     '''Permanently deletes a user and all their pulls.''' |  | ||||||
|     CURSOR.execute( |  | ||||||
|         'SELECT id FROM users WHERE username = ?', |  | ||||||
|         (username,) |         (username,) | ||||||
|     ) |     ) | ||||||
|     user = CURSOR.fetchone() |     player = CURSOR.fetchone() | ||||||
|  |     if player: | ||||||
|  |         return int(player[0]) | ||||||
|  |     return 0 | ||||||
| 
 | 
 | ||||||
|     user_id = user[0] | 
 | ||||||
|  | def insert_player(username: str) -> int: | ||||||
|  |     '''Insert a new player with default has_rolled = False and return their | ||||||
|  |     player ID.''' | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'INSERT INTO players (username, has_rolled) VALUES (?, ?)', | ||||||
|  |         (username, False) | ||||||
|  |     ) | ||||||
|  |     return CURSOR.lastrowid if CURSOR.lastrowid else 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def delete_player(username: str) -> bool: | ||||||
|  |     '''Permanently deletes a player and all their pulls.''' | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'SELECT id FROM players WHERE username = ?', | ||||||
|  |         (username,) | ||||||
|  |     ) | ||||||
|  |     player = CURSOR.fetchone() | ||||||
|  | 
 | ||||||
|  |     if not player: | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  |     player_id = player[0] | ||||||
| 
 | 
 | ||||||
|     # Delete pulls |     # Delete pulls | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'DELETE FROM pulls WHERE user_id = ?', |         'DELETE FROM pulls WHERE player_id = ?', | ||||||
|         (user_id,) |         (player_id,) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     # Delete user |     # Delete player | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'DELETE FROM users WHERE id = ?', |         'DELETE FROM players WHERE id = ?', | ||||||
|         (user_id,) |         (player_id,) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     return True |     return True | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def ban_player(username: str) -> bool: | ||||||
|  |     '''Adds a player to the ban list.''' | ||||||
|  |     try: | ||||||
|  |         CURSOR.execute( | ||||||
|  |             'INSERT INTO banned_players (handle) VALUES (?)', | ||||||
|  |             (username,) | ||||||
|  |         ) | ||||||
|  |         return True | ||||||
|  |     except sqlite3.IntegrityError: | ||||||
|  |         return False | ||||||
| 
 | 
 | ||||||
| def insert_character( | 
 | ||||||
|         name: str, rarity: int, weight: float, file_id: str) -> int: | def unban_player(username: str) -> bool: | ||||||
|     '''Inserts a character''' |     '''Removes a player from the ban list.''' | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'INSERT INTO characters (name, rarity, weight, file_id) VALUES \ |         'DELETE FROM banned_players WHERE handle = ?', | ||||||
|  |         (username,) | ||||||
|  |     ) | ||||||
|  |     return CURSOR.rowcount > 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def is_player_banned(username: str) -> bool: | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'SELECT * FROM banned_players WHERE handle = ?', | ||||||
|  |         (username,) | ||||||
|  |     ) | ||||||
|  |     row = CURSOR.fetchone() | ||||||
|  |     return row is not None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def is_player_administrator(username: str) -> bool: | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'SELECT is_administrator FROM players WHERE username = ? LIMIT 1', | ||||||
|  |         (username,) | ||||||
|  |     ) | ||||||
|  |     row = CURSOR.fetchone() | ||||||
|  |     return row[0] if row else False | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def insert_card( | ||||||
|  |         name: str, rarity: int, weight: float, file_id: str) -> int: | ||||||
|  |     '''Inserts a card''' | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'INSERT INTO cards (name, rarity, weight, file_id) VALUES \ | ||||||
| (?, ?, ?, ?)', | (?, ?, ?, ?)', | ||||||
|         (name, rarity, weight, file_id) |         (name, rarity, weight, file_id) | ||||||
|     ) |     ) | ||||||
|     character_id = CURSOR.lastrowid |     card_id = CURSOR.lastrowid | ||||||
|     return character_id if character_id else 0 |     return card_id if card_id else 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def insert_pull(user_id: int, character_id: int) -> None: | def insert_pull(player_id: int, card_id: int) -> None: | ||||||
|     '''Creates a pull in the database''' |     '''Creates a pull in the database''' | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', |         'INSERT INTO pulls (player_id, card_id) VALUES (?, ?)', | ||||||
|         (user_id, character_id) |         (player_id, card_id) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_last_rolled_at(user_id: int) -> int: | def get_last_rolled_at(player_id: int) -> int: | ||||||
|     '''Gets the timestamp when the user last rolled''' |     '''Gets the timestamp when the player last rolled''' | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         "SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \ |         "SELECT timestamp FROM pulls WHERE player_id = ? ORDER BY timestamp \ | ||||||
| DESC", | DESC", | ||||||
|         (user_id,)) |         (player_id,)) | ||||||
|     row = CURSOR.fetchone() |     row = CURSOR.fetchone() | ||||||
|     return row[0] if row else 0 |     return row[0] if row else 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def add_to_whitelist(instance: str) -> bool: | ||||||
|  |     '''Adds an instance to the whitelist, returns false if instance was already | ||||||
|  |     present''' | ||||||
|  |     try: | ||||||
|  |         CURSOR.execute( | ||||||
|  |             'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,) | ||||||
|  |         ) | ||||||
|  |         return True | ||||||
|  |     except sqlite3.IntegrityError: | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def remove_from_whitelist(instance: str) -> bool: | ||||||
|  |     '''Removes an instance to the whitelist, returns false if instance was not | ||||||
|  |     present''' | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'DELETE FROM instance_whitelist WHERE tld = ?', (instance,)) | ||||||
|  |     return CURSOR.rowcount > 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def is_whitelisted(instance: str) -> bool: | ||||||
|  |     '''Checks whether an instance is in the whitelist''' | ||||||
|  |     if instance == 'local': | ||||||
|  |         return True | ||||||
|  |     CURSOR.execute( | ||||||
|  |         'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,)) | ||||||
|  |     row = CURSOR.fetchone() | ||||||
|  |     return row is not None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def get_config(key: str) -> str: | def get_config(key: str) -> str: | ||||||
|     '''Reads the value for a specified config key from the db''' |     '''Reads the value for a specified config key from the db''' | ||||||
|     CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) |     CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) | ||||||
|  |  | ||||||
|  | @ -4,9 +4,9 @@ from typing import Dict, Any | ||||||
| import misskey | import misskey | ||||||
| from misskey.exceptions import MisskeyAPIException | from misskey.exceptions import MisskeyAPIException | ||||||
| 
 | 
 | ||||||
| from config import NOTIFICATION_BATCH_SIZE | from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST | ||||||
| from parsing import parse_notification | from parsing import parse_notification | ||||||
| from db_utils import get_config, set_config | from db_utils import get_config, set_config, is_whitelisted, is_player_banned | ||||||
| from response import generate_response | from response import generate_response | ||||||
| from custom_types import BotResponse | from custom_types import BotResponse | ||||||
| 
 | 
 | ||||||
|  | @ -24,7 +24,7 @@ def process_notification( | ||||||
|     host = user.get('host')  # None if local user |     host = user.get('host')  # None if local user | ||||||
|     instance = host if host else 'local' |     instance = host if host else 'local' | ||||||
| 
 | 
 | ||||||
|     if not (instance in WHITELISTED_INSTANCES or instance == 'local'): |     if USE_WHITELIST and not is_whitelisted(instance): | ||||||
|         print(f'⚠️ Blocked notification from untrusted instance: {instance}') |         print(f'⚠️ Blocked notification from untrusted instance: {instance}') | ||||||
|         return |         return | ||||||
| 
 | 
 | ||||||
|  | @ -44,6 +44,11 @@ def process_notification( | ||||||
|     if not parsed_notification: |     if not parsed_notification: | ||||||
|         return |         return | ||||||
| 
 | 
 | ||||||
|  |     author = parsed_notification['author'] | ||||||
|  |     if is_player_banned(author): | ||||||
|  |         print(f'⚠️ Blocked notification from banned player: {author}') | ||||||
|  |         return | ||||||
|  | 
 | ||||||
|     # Get the note Id to reply to |     # Get the note Id to reply to | ||||||
|     note_id = notification.get('note', {}).get('id') |     note_id = notification.get('note', {}).get('id') | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -24,6 +24,8 @@ def parse_notification( | ||||||
|     note_id = note_obj.get("id") |     note_id = note_obj.get("id") | ||||||
| 
 | 
 | ||||||
|     note = note_text.strip().lower() if note_text else "" |     note = note_text.strip().lower() if note_text else "" | ||||||
|  |     # Split words into tokens | ||||||
|  |     parts = note.split() | ||||||
| 
 | 
 | ||||||
|     # Check for both short and fully-qualified name mentions |     # Check for both short and fully-qualified name mentions | ||||||
|     username_variants = [ |     username_variants = [ | ||||||
|  | @ -31,18 +33,16 @@ def parse_notification( | ||||||
|         f'@{config.USER.split("@")[1]}' |         f'@{config.USER.split("@")[1]}' | ||||||
|     ] |     ] | ||||||
| 
 | 
 | ||||||
|     # Make sure the notification text explicitly mentions the bot |     # Notifs must consist of the initial mention and at least one other token | ||||||
|     if not any(variant in note for variant in username_variants): |     if len(parts) <= 1: | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|     # Find command and arguments after the mention |     # Make sure the first token is a mention to the bot | ||||||
|     # Removes all mentions |     if not parts[0] in username_variants: | ||||||
|     # regex = mentions that start with @ and may contain @domain |         return None | ||||||
|     cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip() |  | ||||||
|     parts = cleaned_text.split() |  | ||||||
| 
 | 
 | ||||||
|     command = parts[0].lower() if parts else None |     command = parts[1].lower() | ||||||
|     arguments = parts[1:] if len(parts) > 1 else [] |     arguments = parts[2:] if len(parts) > 2 else [] | ||||||
| 
 | 
 | ||||||
|     return { |     return { | ||||||
|         'author': full_user, |         'author': full_user, | ||||||
|  |  | ||||||
							
								
								
									
										198
									
								
								bot/response.py
									
										
									
									
									
								
							
							
						
						
									
										198
									
								
								bot/response.py
									
										
									
									
									
								
							|  | @ -1,22 +1,22 @@ | ||||||
| from datetime import datetime, timedelta, timezone | from datetime import datetime, timedelta, timezone | ||||||
| from typing import TypedDict, Any, List, Dict | from typing import TypedDict, Any, List, Dict | ||||||
| from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, \ | import db_utils as db | ||||||
|         get_random_character | from add_card import add_card | ||||||
| from add_character import add_character |  | ||||||
| from config import GACHA_ROLL_INTERVAL | from config import GACHA_ROLL_INTERVAL | ||||||
| from custom_types import BotResponse, ParsedNotification | from custom_types import BotResponse, ParsedNotification | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def do_roll(author: str) -> BotResponse: | def do_roll(author: str) -> BotResponse: | ||||||
|     '''Determines whether the user can roll, then pulls a random character''' |     '''Determines whether the user can roll, then pulls a random card''' | ||||||
|     user_id = get_player(author) |     user_id = db.get_player(author) | ||||||
|     if not user_id: |     if not user_id: | ||||||
|         return { |         return { | ||||||
|         'message':f'{author} 🛑 You haven’t signed up yet! Use the `signup` command to start playing.', |             'message': f'{author} 🛑 You haven’t signed up yet! Use the \ | ||||||
|         'attachment_urls': None | `signup` command to start playing.', | ||||||
|  |             'attachment_urls': None | ||||||
|         } |         } | ||||||
|     # Get date of user's last roll |     # Get date of user's last roll | ||||||
|     date = get_last_rolled_at(user_id) |     date = db.get_last_rolled_at(user_id) | ||||||
| 
 | 
 | ||||||
|     # No date means it's users first roll |     # No date means it's users first roll | ||||||
|     if date: |     if date: | ||||||
|  | @ -45,39 +45,43 @@ def do_roll(author: str) -> BotResponse: | ||||||
|                 'attachment_urls': None |                 'attachment_urls': None | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|     character = get_random_character() |     card = db.get_random_card() | ||||||
| 
 | 
 | ||||||
|     if not character: |     if not card: | ||||||
|         return { |         return { | ||||||
|             'message': f'{author} Uwaaa... something went wrong! No \ |             'message': f'{author} Uwaaa... something went wrong! No \ | ||||||
| characters found. 😿', | cards found. 😿', | ||||||
|             'attachment_urls': None |             'attachment_urls': None | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|     insert_pull(user_id, character['id']) |     db.insert_pull(user_id, card['id']) | ||||||
|     stars = '⭐️' * character['rarity'] |     stars = '⭐️' * card['rarity'] | ||||||
|     return { |     return { | ||||||
|         'message': f'{author} 🎲 Congrats! You rolled {stars} \ |         'message': f'{author} 🎲 Congrats! You rolled {stars} \ | ||||||
| **{character['name']}**\nShe\'s all yours now~ 💖✨', | **{card['name']}**\nShe\'s all yours now~ 💖✨', | ||||||
|         'attachment_urls': [character['image_url']] |         'attachment_urls': [card['image_url']] | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def do_signup(author: str) -> BotResponse: | def do_signup(author: str) -> BotResponse: | ||||||
|     '''Registers a new user if they haven’t signed up yet.''' |     '''Registers a new user if they haven’t signed up yet.''' | ||||||
|     user_id = get_player(author) |     user_id = db.get_player(author) | ||||||
| 
 | 
 | ||||||
|     if user_id: |     if user_id: | ||||||
|         return { |         return { | ||||||
|             'message':f'{author} 👀 You’re already signed up! Let the rolling begin~ 🎲', |             'message': f'{author} 👀 You’re already signed up! Let the rolling \ | ||||||
|  | begin~ 🎲', | ||||||
|             'attachment_urls': None |             'attachment_urls': None | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|     new_user_id = insert_player(author) |     new_user_id = db.insert_player(author) | ||||||
|     return { |     return { | ||||||
|             'message': f'{author} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!', |             'message': f'{author} ✅ Signed up successfully! Your gacha \ | ||||||
|  | destiny begins now... ✨ Use the roll command to start!', | ||||||
|             'attachment_urls': None |             'attachment_urls': None | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def is_float(val: Any) -> bool: | def is_float(val: Any) -> bool: | ||||||
|     '''Returns true if `val` can be converted to a float''' |     '''Returns true if `val` can be converted to a float''' | ||||||
|     try: |     try: | ||||||
|  | @ -91,14 +95,14 @@ def do_create( | ||||||
|         author: str, |         author: str, | ||||||
|         arguments: List[str], |         arguments: List[str], | ||||||
|         note_obj: Dict[str, Any]) -> BotResponse: |         note_obj: Dict[str, Any]) -> BotResponse: | ||||||
|     '''Creates a character''' |     '''Creates a card''' | ||||||
|     # Example call from bot logic |     # Example call from bot logic | ||||||
|     image_url = note_obj.get('files', [{}])[0].get('url') \ |     image_url = note_obj.get('files', [{}])[0].get('url') \ | ||||||
|         if note_obj.get('files') else None |         if note_obj.get('files') else None | ||||||
| 
 | 
 | ||||||
|     if not image_url: |     if not image_url: | ||||||
|         return { |         return { | ||||||
|             'message': f'{author} You need an image to create a character, \ |             'message': f'{author} You need an image to create a card, \ | ||||||
| dumbass.', | dumbass.', | ||||||
|             'attachment_urls': None |             'attachment_urls': None | ||||||
|         } |         } | ||||||
|  | @ -123,13 +127,13 @@ must be a decimal value between 0.0 and 1.0', | ||||||
|             'attachment_urls': None |             'attachment_urls': None | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|     character_id, file_id = add_character( |     card_id, file_id = add_card( | ||||||
|         name=arguments[0], |         name=arguments[0], | ||||||
|         rarity=int(arguments[1]), |         rarity=int(arguments[1]), | ||||||
|         image_url=image_url |         image_url=image_url | ||||||
|     ) |     ) | ||||||
|     return { |     return { | ||||||
|         'message': f'{author} Added {arguments[0]}, ID {character_id}.', |         'message': f'{author} Added {arguments[0]}, ID {card_id}.', | ||||||
|         'attachment_urls': [file_id] |         'attachment_urls': [file_id] | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -137,59 +141,145 @@ must be a decimal value between 0.0 and 1.0', | ||||||
| def do_help(author: str) -> BotResponse: | def do_help(author: str) -> BotResponse: | ||||||
|     '''Provides a list of commands that the bot can do.''' |     '''Provides a list of commands that the bot can do.''' | ||||||
|     return { |     return { | ||||||
|         'message':f'{author} Here\'s what I can do:\n \ |         'message': f'{author} Here\'s what I can do:\n\ | ||||||
|             - `roll` Pulls a random character.\ | - `roll` Pulls a random card.\n\ | ||||||
|             - `create <name> <rarity>` Creates a character using a given image.\ | - `create <name> <rarity>` Creates a card using a given image.\n\ | ||||||
|             - `signup` Registers your account.\ | - `signup` Registers your account.\n\ | ||||||
|             - `delete_account` Deletes your account.\ | - `delete_account` Deletes your account.\n\ | ||||||
|             - `help` Shows this message', | - `help` Shows this message', | ||||||
|             'attachment_urls': None |         'attachment_urls': None | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def delete_account(author: str) -> BotResponse: | def delete_account(author: str) -> BotResponse: | ||||||
|     return { |     return { | ||||||
|         'message':f'{author} ⚠️ This will permanently delete your account and all your cards.\n' |         'message': f'{author} ⚠️ This will permanently delete your account \ | ||||||
|         'If you’re sure, reply with `confirm_delete` to proceed.\n\n' | and all your cards.\n' | ||||||
|  |         'If you\'re sure, reply with `confirm_delete_account` to proceed.\n\n' | ||||||
|         '**There is no undo.** Your gacha luck will be lost to the void... 💀✨', |         '**There is no undo.** Your gacha luck will be lost to the void... 💀✨', | ||||||
|         'attachment_urls': None |         'attachment_urls': None | ||||||
| 
 | 
 | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| def confirm_delete(author: str) -> BotResponse: |  | ||||||
| 
 | 
 | ||||||
|     delete_player(author) | def confirm_delete(author: str) -> BotResponse: | ||||||
|  |     db.delete_player(author) | ||||||
| 
 | 
 | ||||||
|     return { |     return { | ||||||
|         'message':f'{author} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨', |         'message': f'{author} 🧼 Your account and all your cards have been \ | ||||||
|  | deleted. RIP your gacha history 🕊️✨', | ||||||
|         'attachment_urls': None |         'attachment_urls': None | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def do_whitelist(author: str, args: list[str]) -> BotResponse: | ||||||
|  |     if len(args) == 0: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Please specify an instance to whitelist', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     if db.add_to_whitelist(args[0]): | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Whitelisted instance: {args[0]}', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  |     else: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Instance already whitelisted: {args[0]}', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def do_unwhitelist(author: str, args: list[str]) -> BotResponse: | ||||||
|  |     if len(args) == 0: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Please specify an instance to remove from \ | ||||||
|  | the whitelist', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     if db.remove_from_whitelist(args[0]): | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Unwhitelisted instance: {args[0]}', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  |     else: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Instance not whitelisted: {args[0]}', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def do_ban(author: str, args: list[str]) -> BotResponse: | ||||||
|  |     if len(args) == 0: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Please specify a user to ban', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     if db.is_player_administrator(args[0]): | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Cannot ban other administrators.', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     if db.ban_player(args[0]): | ||||||
|  |         # Delete banned player's account | ||||||
|  |         db.delete_player(args[0]) | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} 🔨 **BONK!** Get banned, {args[0]}!', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  |     else: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Player is already banned: {args[0]}', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def do_unban(author: str, args: list[str]) -> BotResponse: | ||||||
|  |     if len(args) == 0: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Please specify a user to unban', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     if db.unban_player(args[0]): | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Player unbanned: {args[0]}!', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  |     else: | ||||||
|  |         return { | ||||||
|  |             'message': f'{author} Player was not banned: {args[0]}', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def generate_response(notification: ParsedNotification) -> BotResponse | None: | def generate_response(notification: ParsedNotification) -> BotResponse | None: | ||||||
|     '''Given a command with arguments, processes the game state and |     '''Given a command with arguments, processes the game state and | ||||||
|     returns a response''' |     returns a response''' | ||||||
| 
 | 
 | ||||||
|     # Temporary response variable |     # Temporary response variable | ||||||
|     res: BotResponse | None = None |     res: BotResponse | None = None | ||||||
|     # TODO: Check if the user has an account |  | ||||||
|     author = notification['author'] |     author = notification['author'] | ||||||
|     user_id = get_player(author) |     player_id = db.get_player(author) | ||||||
|     command = notification['command'] |     command = notification['command'] | ||||||
|     # Check if the user is an administrator |  | ||||||
|     # user_is_administrator = user_is_administrator() |  | ||||||
| 
 | 
 | ||||||
|     # Unrestricted commands |     # Unrestricted commands | ||||||
|     match command: |     match command: | ||||||
|  |         case 'roll': | ||||||
|  |             res = do_roll(author) | ||||||
|         case 'signup': |         case 'signup': | ||||||
|             res = do_signup(author) |             res = do_signup(author) | ||||||
|         case 'help': |         case 'help': | ||||||
|             res = do_help(author) |             res = do_help(author) | ||||||
|         case 'roll': |  | ||||||
|             res = do_roll(author) |  | ||||||
|         case _: |         case _: | ||||||
|             pass |             pass | ||||||
| 
 | 
 | ||||||
|     if not user_id: |     # Commands beyond this point require the user to have an account | ||||||
|  |     if not player_id: | ||||||
|         return res |         return res | ||||||
| 
 | 
 | ||||||
|     # User commands |     # User commands | ||||||
|  | @ -200,15 +290,29 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None: | ||||||
|                 notification['arguments'], |                 notification['arguments'], | ||||||
|                 notification['note_obj'] |                 notification['note_obj'] | ||||||
|             ) |             ) | ||||||
|         case 'signup': |  | ||||||
|             res = do_signup(author) |  | ||||||
|         case 'delete_account': |         case 'delete_account': | ||||||
|             res = delete_account(author) |             res = delete_account(author) | ||||||
|         case 'confirm_delete': |         case 'confirm_delete_account': | ||||||
|             res = confirm_delete(author) |             res = confirm_delete(author) | ||||||
|         case _: |         case _: | ||||||
|             pass |             pass | ||||||
|     # if not user_is_administrator: | 
 | ||||||
|     return res |     # Commands beyond this point require the user to be an administrator | ||||||
|  |     if not db.is_player_administrator(author): | ||||||
|  |         return res | ||||||
|  | 
 | ||||||
|  |     # Admin commands | ||||||
|  |     match command: | ||||||
|  |         case 'whitelist': | ||||||
|  |             res = do_whitelist(author, notification['arguments']) | ||||||
|  |         case 'unwhitelist': | ||||||
|  |             res = do_unwhitelist(author, notification['arguments']) | ||||||
|  |         case 'ban': | ||||||
|  |             res = do_ban(author, notification['arguments']) | ||||||
|  |         case 'unban': | ||||||
|  |             res = do_unban(author, notification['arguments']) | ||||||
|  |         case _: | ||||||
|  |             pass | ||||||
| 
 | 
 | ||||||
|     # Administrator commands go here |     # Administrator commands go here | ||||||
|  |     return res | ||||||
|  |  | ||||||
|  | @ -2,9 +2,12 @@ | ||||||
| [application] | [application] | ||||||
| ; Comma separated list of fedi handles for any administrator users | ; Comma separated list of fedi handles for any administrator users | ||||||
| ; More can be added through the application | ; More can be added through the application | ||||||
| DefaultAdmins    = ['admin@example.tld'] | DefaultAdmins    = ["@localadmin", "@remoteadmin@example.tld"] | ||||||
| ; SQLite Database location | ; SQLite Database location | ||||||
| DatabaseLocation = ./gacha_game.db | DatabaseLocation = ./gacha_game.db | ||||||
|  | ; Whether to lmit access to the bot via an instance whitelist | ||||||
|  | ; The whitelist can be adjusted via the application | ||||||
|  | UseWhitelist     = False | ||||||
| 
 | 
 | ||||||
| [gacha] | [gacha] | ||||||
| ; Number of seconds players have to wait between rolls | ; Number of seconds players have to wait between rolls | ||||||
|  |  | ||||||
							
								
								
									
										4
									
								
								migrations/0003_rename_tables.sql
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								migrations/0003_rename_tables.sql
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,4 @@ | ||||||
|  | ALTER TABLE users RENAME TO players; | ||||||
|  | ALTER TABLE characters RENAME TO cards; | ||||||
|  | ALTER TABLE pulls RENAME user_id TO player_id; | ||||||
|  | ALTER TABLE pulls RENAME character_id TO card_id; | ||||||
							
								
								
									
										1
									
								
								migrations/0004_add_administrators.sql
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								migrations/0004_add_administrators.sql
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1 @@ | ||||||
|  | ALTER TABLE players ADD COLUMN is_administrator BOOLEAN NOT NULL DEFAULT 0; | ||||||
							
								
								
									
										7
									
								
								migrations/0005_add_whitelist.sql
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								migrations/0005_add_whitelist.sql
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,7 @@ | ||||||
|  | CREATE TABLE IF NOT EXISTS instance_whitelist ( | ||||||
|  |     tld TEXT UNIQUE PRIMARY KEY | ||||||
|  | ); | ||||||
|  | 
 | ||||||
|  | CREATE TABLE IF NOT EXISTS banned_players ( | ||||||
|  |     handle TEXT UNIQUE PRIMARY KEY | ||||||
|  | ); | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue