Version 1.1 #55
					 9 changed files with 287 additions and 122 deletions
				
			
		
							
								
								
									
										1
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							|  | @ -147,6 +147,7 @@ venv.bak/ | ||||||
| /site | /site | ||||||
| 
 | 
 | ||||||
| # mypy | # mypy | ||||||
|  | .mypy.ini | ||||||
| .mypy_cache/ | .mypy_cache/ | ||||||
| .dmypy.json | .dmypy.json | ||||||
| dmypy.json | dmypy.json | ||||||
|  |  | ||||||
|  | @ -2,9 +2,15 @@ import requests | ||||||
| from misskey.exceptions import MisskeyAPIException | from misskey.exceptions import MisskeyAPIException | ||||||
| from client import client_connection | from client import client_connection | ||||||
| from db_utils import insert_character | from db_utils import insert_character | ||||||
|  | from custom_types import Character | ||||||
| 
 | 
 | ||||||
| def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]: | 
 | ||||||
|     """ | def add_character( | ||||||
|  |         name: str, | ||||||
|  |         rarity: int, | ||||||
|  |         weight: float, | ||||||
|  |         image_url: str) -> tuple[int, str]: | ||||||
|  |     ''' | ||||||
|     Adds a character to the database, uploading the image from a public URL to |     Adds a character to the database, uploading the image from a public URL to | ||||||
|     the bot's Misskey Drive. |     the bot's Misskey Drive. | ||||||
| 
 | 
 | ||||||
|  | @ -12,7 +18,8 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl | ||||||
|         name (str): Character name. |         name (str): Character name. | ||||||
|         rarity (int): Character rarity (e.g., 1-5). |         rarity (int): Character rarity (e.g., 1-5). | ||||||
|         weight (float): Pull weight (e.g., 0.02). |         weight (float): Pull weight (e.g., 0.02). | ||||||
|         image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']). |         image_url (str): Public URL of the image from the post (e.g., from | ||||||
|  |             note['files'][i]['url']). | ||||||
| 
 | 
 | ||||||
|     Returns: |     Returns: | ||||||
|         tuple[int, str]: Character ID and bot's Drive file_id. |         tuple[int, str]: Character ID and bot's Drive file_id. | ||||||
|  | @ -20,30 +27,39 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl | ||||||
|     Raises: |     Raises: | ||||||
|         ValueError: If inputs are invalid. |         ValueError: If inputs are invalid. | ||||||
|         RuntimeError: If image download/upload or database operation fails. |         RuntimeError: If image download/upload or database operation fails. | ||||||
|     """ |     ''' | ||||||
|  | 
 | ||||||
|  |     stripped_name = name.strip() | ||||||
|  | 
 | ||||||
|     # Validate inputs |     # Validate inputs | ||||||
|     if not name or not name.strip(): |     if not stripped_name: | ||||||
|         raise ValueError("Character name cannot be empty.") |         raise ValueError('Character name cannot be empty.') | ||||||
|     if not isinstance(rarity, int) or rarity < 1: |     if rarity < 1: | ||||||
|         raise ValueError("Rarity must be a positive integer.") |         raise ValueError('Rarity must be a positive integer.') | ||||||
|     if not isinstance(weight, (int, float)) or weight <= 0: |     if weight <= 0: | ||||||
|         raise ValueError("Weight must be a positive number.") |         raise ValueError('Weight must be a positive number.') | ||||||
|     if not image_url: |     if not image_url: | ||||||
|         raise ValueError("Image URL must be provided.") |         raise ValueError('Image URL must be provided.') | ||||||
| 
 | 
 | ||||||
|     # Download image |     # Download image | ||||||
|     response = requests.get(image_url, stream=True, timeout=30) |     response = requests.get(image_url, stream=True, timeout=30) | ||||||
|     if response.status_code != 200: |     if response.status_code != 200: | ||||||
|         raise RuntimeError(f"Failed to download image from {image_url}") |         raise RuntimeError(f'Failed to download image from {image_url}') | ||||||
| 
 | 
 | ||||||
|     # Upload to bot's Drive |     # Upload to bot's Drive | ||||||
|     mk = client_connection() |     mk = client_connection() | ||||||
|     try: |     try: | ||||||
|         media = mk.drive_files_create(response.raw) |         media = mk.drive_files_create(response.raw) | ||||||
|         file_id = media["id"] |         file_id = media['id'] | ||||||
|     except MisskeyAPIException as e: |     except MisskeyAPIException as e: | ||||||
|         raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e |         raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\ | ||||||
|  |                 from e | ||||||
| 
 | 
 | ||||||
|     # Insert into database |     # Insert into database | ||||||
|     character_id = insert_character(name.strip(), rarity, float(weight), file_id) |     character_id = insert_character( | ||||||
|  |             stripped_name, | ||||||
|  |             rarity, | ||||||
|  |             float(weight), | ||||||
|  |             file_id | ||||||
|  |     ) | ||||||
|     return character_id, file_id |     return character_id, file_id | ||||||
|  |  | ||||||
|  | @ -1,5 +1,6 @@ | ||||||
| import misskey | import misskey | ||||||
| import config | import config | ||||||
| 
 | 
 | ||||||
| def client_connection(): | 
 | ||||||
|  | def client_connection() -> misskey.Misskey: | ||||||
|     return misskey.Misskey(address=config.INSTANCE, i=config.KEY) |     return misskey.Misskey(address=config.INSTANCE, i=config.KEY) | ||||||
|  |  | ||||||
|  | @ -2,9 +2,11 @@ | ||||||
| import configparser | import configparser | ||||||
| from os import environ, path | from os import environ, path | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| class ConfigError(Exception): | class ConfigError(Exception): | ||||||
|     '''Could not find config file''' |     '''Could not find config file''' | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def get_config_file() -> str: | def get_config_file() -> str: | ||||||
|     '''Gets the path to the config file in the current environment''' |     '''Gets the path to the config file in the current environment''' | ||||||
|     env: str | None = environ.get('KEMOVERSE_ENV') |     env: str | None = environ.get('KEMOVERSE_ENV') | ||||||
|  | @ -19,24 +21,25 @@ def get_config_file() -> str: | ||||||
|         raise ConfigError(f'Could not find {config_path}') |         raise ConfigError(f'Could not find {config_path}') | ||||||
|     return config_path |     return config_path | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| config = configparser.ConfigParser() | config = configparser.ConfigParser() | ||||||
| config.read(get_config_file()) | config.read(get_config_file()) | ||||||
| 
 | 
 | ||||||
| # Username for the bot | # Username for the bot | ||||||
| USER     = config['credentials']['User'].lower() | USER = config['credentials']['User'].lower() | ||||||
| # API key for the bot | # API key for the bot | ||||||
| KEY      = config['credentials']['Token'] | KEY = config['credentials']['Token'] | ||||||
| # Bot's Misskey instance URL | # Bot's Misskey instance URL | ||||||
| INSTANCE = config['credentials']['Instance'].lower() | INSTANCE = config['credentials']['Instance'].lower() | ||||||
| 
 | 
 | ||||||
| # TODO: move this to db | # TODO: move this to db | ||||||
| # Fedi handles in the traditional 'user@domain.tld' style, allows these users | # Fedi handles in the traditional 'user@domain.tld' style, allows these users | ||||||
| # to use extra admin exclusive commands with the bot | # to use extra admin exclusive commands with the bot | ||||||
| ADMINS        = config['application']['DefaultAdmins'] | ADMINS = config['application']['DefaultAdmins'] | ||||||
| # SQLite Database location | # SQLite Database location | ||||||
| DB_PATH       = config['application']['DatabaseLocation'] | DB_PATH = config['application']['DatabaseLocation'] | ||||||
| 
 | 
 | ||||||
| NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) | NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) | ||||||
| NOTIFICATION_BATCH_SIZE    = int(config['notification']['BatchSize']) | NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) | ||||||
| 
 | 
 | ||||||
| GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) | GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) | ||||||
|  |  | ||||||
							
								
								
									
										21
									
								
								bot/custom_types.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								bot/custom_types.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,21 @@ | ||||||
|  | from typing import TypedDict, List, Dict, Any | ||||||
|  | 
 | ||||||
|  | BotResponse = TypedDict('BotResponse', { | ||||||
|  |     'message': str, | ||||||
|  |     'attachment_urls': List[str] | None | ||||||
|  | }) | ||||||
|  | 
 | ||||||
|  | Character = TypedDict('Character', { | ||||||
|  |     'id': int, | ||||||
|  |     'name': str, | ||||||
|  |     'rarity': int, | ||||||
|  |     'weight': float, | ||||||
|  |     'image_url': str | ||||||
|  | }) | ||||||
|  | 
 | ||||||
|  | ParsedNotification = TypedDict('ParsedNotification', { | ||||||
|  |     'author': str, | ||||||
|  |     'command': str | None, | ||||||
|  |     'arguments': List[str], | ||||||
|  |     'note_obj': Dict[str, Any] | ||||||
|  | }) | ||||||
|  | @ -1,11 +1,13 @@ | ||||||
| from random import choices | from random import choices | ||||||
| import sqlite3 | import sqlite3 | ||||||
| import config | import config | ||||||
|  | from custom_types import Character | ||||||
| 
 | 
 | ||||||
| DB_PATH = config.DB_PATH | DB_PATH = config.DB_PATH | ||||||
| CONNECTION: sqlite3.Connection | CONNECTION: sqlite3.Connection | ||||||
| CURSOR: sqlite3.Cursor | CURSOR: sqlite3.Cursor | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def connect() -> None: | def connect() -> None: | ||||||
|     '''Creates a connection to the database''' |     '''Creates a connection to the database''' | ||||||
|     print('Connecting to the database...') |     print('Connecting to the database...') | ||||||
|  | @ -15,24 +17,32 @@ def connect() -> None: | ||||||
|     CONNECTION.row_factory = sqlite3.Row |     CONNECTION.row_factory = sqlite3.Row | ||||||
|     CURSOR = CONNECTION.cursor() |     CURSOR = CONNECTION.cursor() | ||||||
| 
 | 
 | ||||||
| def get_random_character(): | 
 | ||||||
|  | def get_random_character() -> Character | None: | ||||||
|     ''' Gets a random character from the database''' |     ''' Gets a random character from the database''' | ||||||
|     CURSOR.execute('SELECT * FROM characters') |     CURSOR.execute('SELECT * FROM characters') | ||||||
|     characters = CURSOR.fetchall() |     characters = CURSOR.fetchall() | ||||||
| 
 | 
 | ||||||
|     if not characters: |     if not characters: | ||||||
|         return None, None, None, None |         return None | ||||||
| 
 | 
 | ||||||
|     weights = [c['weight'] for c in characters] |     weights = [c['weight'] for c in characters] | ||||||
|     chosen = choices(characters, weights=weights, k=1)[0] |     chosen = choices(characters, weights=weights, k=1)[0] | ||||||
| 
 | 
 | ||||||
|     return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity'] |     return { | ||||||
|  |         'id': chosen['id'], | ||||||
|  |         'name': chosen['name'], | ||||||
|  |         'rarity': chosen['rarity'], | ||||||
|  |         'weight': chosen['weight'], | ||||||
|  |         'image_url': chosen['file_id'] | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
| def get_player(username): | def get_player(username: str) -> int: | ||||||
|     '''Retrieve a player ID by username, or return None if not found.''' |     '''Retrieve a player ID by username, or return None if not found.''' | ||||||
|     CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,)) |     CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,)) | ||||||
|     user = CURSOR.fetchone() |     user = CURSOR.fetchone() | ||||||
|     return user[0] if user else None |     if user: | ||||||
|  |         return int(user[0]) | ||||||
| 
 | 
 | ||||||
| def insert_player(username): | def insert_player(username): | ||||||
|     '''Insert a new player with default has_rolled = False and return their user ID.''' |     '''Insert a new player with default has_rolled = False and return their user ID.''' | ||||||
|  | @ -70,36 +80,45 @@ def delete_player(username): | ||||||
|     return True |     return True | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int: | 
 | ||||||
|  | def insert_character( | ||||||
|  |         name: str, rarity: int, weight: float, file_id: str) -> int: | ||||||
|     '''Inserts a character''' |     '''Inserts a character''' | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)', |         'INSERT INTO characters (name, rarity, weight, file_id) VALUES \ | ||||||
|  | (?, ?, ?, ?)', | ||||||
|         (name, rarity, weight, file_id) |         (name, rarity, weight, file_id) | ||||||
|     ) |     ) | ||||||
|     character_id = CURSOR.lastrowid |     character_id = CURSOR.lastrowid | ||||||
|     return character_id if character_id else 0 |     return character_id if character_id else 0 | ||||||
| 
 | 
 | ||||||
| def insert_pull(user_id, character_id): | 
 | ||||||
|  | def insert_pull(user_id: int, character_id: int) -> None: | ||||||
|     '''Creates a pull in the database''' |     '''Creates a pull in the database''' | ||||||
|     CURSOR.execute( |     CURSOR.execute( | ||||||
|         'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', |         'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', | ||||||
|         (user_id, character_id) |         (user_id, character_id) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| def get_last_rolled_at(user_id): | 
 | ||||||
|  | def get_last_rolled_at(user_id: int) -> int: | ||||||
|     '''Gets the timestamp when the user last rolled''' |     '''Gets the timestamp when the user last rolled''' | ||||||
|     CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \ |     CURSOR.execute( | ||||||
|             (user_id,)) |         "SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \ | ||||||
|  | DESC", | ||||||
|  |         (user_id,)) | ||||||
|     row = CURSOR.fetchone() |     row = CURSOR.fetchone() | ||||||
|     return row[0] if row else None |     return row[0] if row else 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_config(key): | def get_config(key: str) -> str: | ||||||
|     '''Reads the value for a specified config key from the db''' |     '''Reads the value for a specified config key from the db''' | ||||||
|     CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) |     CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) | ||||||
|     row = CURSOR.fetchone() |     row = CURSOR.fetchone() | ||||||
|     return row[0] if row else None |     return row[0] if row else '' | ||||||
| 
 | 
 | ||||||
| def set_config(key, value): | 
 | ||||||
|  | def set_config(key: str, value: str) -> None: | ||||||
|     '''Writes the value for a specified config key to the db''' |     '''Writes the value for a specified config key to the db''' | ||||||
|     CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value)) |     CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", | ||||||
|  |                    (key, value)) | ||||||
|  |  | ||||||
|  | @ -1,16 +1,23 @@ | ||||||
| import traceback | import traceback | ||||||
|  | from typing import Dict, Any | ||||||
|  | 
 | ||||||
|  | import misskey | ||||||
| from misskey.exceptions import MisskeyAPIException | from misskey.exceptions import MisskeyAPIException | ||||||
| 
 | 
 | ||||||
| from config import NOTIFICATION_BATCH_SIZE | from config import NOTIFICATION_BATCH_SIZE | ||||||
| from parsing import parse_notification | from parsing import parse_notification | ||||||
| from db_utils import get_config, set_config | from db_utils import get_config, set_config | ||||||
| from response import generate_response | from response import generate_response | ||||||
|  | from custom_types import BotResponse | ||||||
| 
 | 
 | ||||||
| # Define your whitelist | # Define your whitelist | ||||||
| # TODO: move to config | # TODO: move to config | ||||||
| WHITELISTED_INSTANCES: list[str] = [] | WHITELISTED_INSTANCES: list[str] = [] | ||||||
| 
 | 
 | ||||||
| def process_notification(client, notification): | 
 | ||||||
|  | def process_notification( | ||||||
|  |         client: misskey.Misskey, | ||||||
|  |         notification: Dict[str, Any]) -> None: | ||||||
|     '''Processes an individual notification''' |     '''Processes an individual notification''' | ||||||
|     user = notification.get('user', {}) |     user = notification.get('user', {}) | ||||||
|     username = user.get('username', 'unknown') |     username = user.get('username', 'unknown') | ||||||
|  | @ -32,33 +39,32 @@ def process_notification(client, notification): | ||||||
|     print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') |     print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') | ||||||
| 
 | 
 | ||||||
|     # 🧠 Send to the parser |     # 🧠 Send to the parser | ||||||
|     parsed_command = parse_notification(notification, client) |     parsed_notification = parse_notification(notification, client) | ||||||
|  | 
 | ||||||
|  |     if not parsed_notification: | ||||||
|  |         return | ||||||
| 
 | 
 | ||||||
|     # Get the note Id to reply to |     # Get the note Id to reply to | ||||||
|     note_id = notification.get('note', {}).get('id') |     note_id = notification.get('note', {}).get('id') | ||||||
| 
 | 
 | ||||||
|     # Get the response |     # Get the response | ||||||
|     # TODO: Formalize exactly *what* is returned by this. Ideally just want to |     response: BotResponse | None = generate_response(parsed_notification) | ||||||
|     # handle two cases here: either we have a response, or we don't. |  | ||||||
|     # TODO: Return dictionaries instead of tuples. They handle multiple |  | ||||||
|     # elements a lot better as they're not position dependent |  | ||||||
|     response = generate_response(parsed_command) |  | ||||||
|     if isinstance(response, str): |  | ||||||
|         client.notes_create( |  | ||||||
|             text=response, |  | ||||||
|             reply_id=note_id, |  | ||||||
|             visibility=visibility |  | ||||||
|         ) |  | ||||||
|     elif response: |  | ||||||
|         client.notes_create( |  | ||||||
|             text=response[0], |  | ||||||
|             reply_id=note_id, |  | ||||||
|             visibility=visibility, |  | ||||||
|             file_ids=response[1] |  | ||||||
|             #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately |  | ||||||
|         ) |  | ||||||
| 
 | 
 | ||||||
| def process_notifications(client): |     if not response: | ||||||
|  |         return | ||||||
|  | 
 | ||||||
|  |     client.notes_create( | ||||||
|  |         text=response['message'], | ||||||
|  |         reply_id=note_id, | ||||||
|  |         visibility=visibility, | ||||||
|  |         file_ids=response['attachment_urls'] | ||||||
|  |         # TODO: write actual visible users ids so pleromers can use the bot | ||||||
|  |         # privately | ||||||
|  |         # visible_user_ids=[] | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def process_notifications(client: misskey.Misskey) -> bool: | ||||||
|     '''Processes a batch of unread notifications. Returns False if there are |     '''Processes a batch of unread notifications. Returns False if there are | ||||||
|     no more notifications to process.''' |     no more notifications to process.''' | ||||||
| 
 | 
 | ||||||
|  | @ -87,7 +93,7 @@ def process_notifications(client): | ||||||
|         for notification in notifications: |         for notification in notifications: | ||||||
|             try: |             try: | ||||||
|                 # Skip if we've processed already |                 # Skip if we've processed already | ||||||
|                 notif_id = notification.get('id') |                 notif_id = notification.get('id', '') | ||||||
|                 if notif_id <= last_seen_id: |                 if notif_id <= last_seen_id: | ||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|  | @ -96,7 +102,8 @@ def process_notifications(client): | ||||||
|                 process_notification(client, notification) |                 process_notification(client, notification) | ||||||
| 
 | 
 | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 print(f'An exception has occured while processing a notification: {e}') |                 print(f'An exception has occured while processing a \ | ||||||
|  | notification: {e}') | ||||||
|                 print(traceback.format_exc()) |                 print(traceback.format_exc()) | ||||||
| 
 | 
 | ||||||
|         # If we got as many notifications as we requested, there are probably |         # If we got as many notifications as we requested, there are probably | ||||||
|  |  | ||||||
|  | @ -1,11 +1,16 @@ | ||||||
| import random, re | import re | ||||||
|  | from typing import Dict, Any | ||||||
|  | 
 | ||||||
|  | import misskey | ||||||
|  | 
 | ||||||
| import config | import config | ||||||
| 
 | from custom_types import ParsedNotification | ||||||
| def parse_notification(notification,client): |  | ||||||
|     '''Parses any notifications received by the bot and sends any commands to |  | ||||||
|     gacha_response()''' |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def parse_notification( | ||||||
|  |         notification: Dict[str, Any], | ||||||
|  |         client: misskey.Misskey) -> ParsedNotification | None: | ||||||
|  |     '''Parses any notifications received by the bot''' | ||||||
| 
 | 
 | ||||||
|     # Get the full Activitypub ID of the user |     # Get the full Activitypub ID of the user | ||||||
|     user = notification.get("user", {}) |     user = notification.get("user", {}) | ||||||
|  | @ -28,14 +33,20 @@ def parse_notification(notification,client): | ||||||
| 
 | 
 | ||||||
|     # Make sure the notification text explicitly mentions the bot |     # Make sure the notification text explicitly mentions the bot | ||||||
|     if not any(variant in note for variant in username_variants): |     if not any(variant in note for variant in username_variants): | ||||||
|         return |         return None | ||||||
| 
 | 
 | ||||||
|     # Find command and arguments after the mention |     # Find command and arguments after the mention | ||||||
|     # Removes all mentions (regex = mentions that start with @ and may contain @domain) |     # Removes all mentions | ||||||
|  |     # regex = mentions that start with @ and may contain @domain | ||||||
|     cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip() |     cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip() | ||||||
|     parts = cleaned_text.split() |     parts = cleaned_text.split() | ||||||
| 
 | 
 | ||||||
|     command = parts[0].lower() if parts else None |     command = parts[0].lower() if parts else None | ||||||
|     arguments = parts[1:] if len(parts) > 1 else [] |     arguments = parts[1:] if len(parts) > 1 else [] | ||||||
| 
 | 
 | ||||||
|     return [command,full_user, arguments, note_obj] |     return { | ||||||
|  |         'author': full_user, | ||||||
|  |         'command': command, | ||||||
|  |         'arguments': arguments, | ||||||
|  |         'note_obj': note_obj | ||||||
|  |     } | ||||||
|  |  | ||||||
							
								
								
									
										192
									
								
								bot/response.py
									
										
									
									
									
								
							
							
						
						
									
										192
									
								
								bot/response.py
									
										
									
									
									
								
							|  | @ -1,9 +1,13 @@ | ||||||
| from datetime import datetime, timedelta, timezone | from datetime import datetime, timedelta, timezone | ||||||
| from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, get_random_character | from typing import TypedDict, Any, List, Dict | ||||||
|  | from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, \ | ||||||
|  |         get_random_character | ||||||
| from add_character import add_character | from add_character import add_character | ||||||
| from config import GACHA_ROLL_INTERVAL | from config import GACHA_ROLL_INTERVAL | ||||||
|  | from custom_types import BotResponse, ParsedNotification | ||||||
| 
 | 
 | ||||||
| def do_roll(full_user): | 
 | ||||||
|  | def do_roll(full_user: str) -> BotResponse: | ||||||
|     '''Determines whether the user can roll, then pulls a random character''' |     '''Determines whether the user can roll, then pulls a random character''' | ||||||
|     user_id = get_player(full_user) |     user_id = get_player(full_user) | ||||||
| 
 | 
 | ||||||
|  | @ -16,7 +20,7 @@ def do_roll(full_user): | ||||||
|     if date: |     if date: | ||||||
|         # SQLite timestamps returned by the DB are always in UTC |         # SQLite timestamps returned by the DB are always in UTC | ||||||
|         # Below timestamps are to be converted to UTC |         # Below timestamps are to be converted to UTC | ||||||
|         prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z') |         prev = datetime.strptime(str(date) + '+0000', '%Y-%m-%d %H:%M:%S%z') | ||||||
|         now = datetime.now(timezone.utc) |         now = datetime.now(timezone.utc) | ||||||
| 
 | 
 | ||||||
|         time_since_last_roll = now - prev |         time_since_last_roll = now - prev | ||||||
|  | @ -33,29 +37,46 @@ def do_roll(full_user): | ||||||
|             else: |             else: | ||||||
|                 remaining_duration = f'{duration.seconds} seconds' |                 remaining_duration = f'{duration.seconds} seconds' | ||||||
| 
 | 
 | ||||||
|             return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.' |             return { | ||||||
|  |                 'message': f'{full_user} ⏱️ Please wait another \ | ||||||
|  | {remaining_duration} before rolling again.', | ||||||
|  |                 'attachment_urls': None | ||||||
|  |             } | ||||||
| 
 | 
 | ||||||
|     character_id, character_name, file_id, rarity = get_random_character() |     character = get_random_character() | ||||||
| 
 | 
 | ||||||
|     if not character_id: |     if not character: | ||||||
|         return f'{full_user} Uwaaa... something went wrong! No characters found. 😿' |         return { | ||||||
|  |             'message': f'{full_user} Uwaaa... something went wrong! No \ | ||||||
|  | characters found. 😿', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     insert_pull(user_id,character_id) |     insert_pull(user_id, character['id']) | ||||||
|     stars = '⭐️' * rarity |     stars = '⭐️' * character['rarity'] | ||||||
|     return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\ |     return { | ||||||
|             She's all yours now~ 💖✨",[file_id]]) |         'message': f'@{full_user} 🎲 Congrats! You rolled {stars} \ | ||||||
|  | **{character['name']}**\nShe\'s all yours now~ 💖✨', | ||||||
|  |         'attachment_urls': [character['image_url']] | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
| def do_signup(full_user): | def do_signup(author: str) -> BotResponse: | ||||||
|     '''Registers a new user if they haven’t signed up yet.''' |     '''Registers a new user if they haven’t signed up yet.''' | ||||||
|     user_id = get_player(full_user) |     user_id = get_player(author) | ||||||
| 
 | 
 | ||||||
|     if user_id: |     if user_id: | ||||||
|         return f'@{full_user} 👀 You’re already signed up! Let the rolling begin~ 🎲' |         return { | ||||||
|  |             f'@{author} 👀 You’re already signed up! Let the rolling begin~ 🎲', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     new_user_id = insert_player(full_user) |     new_user_id = insert_player(author) | ||||||
|     return f'@{full_user} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!' |     return { | ||||||
|  |             'message': f'@{author} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
| def is_float(val): | def is_float(val: Any) -> bool: | ||||||
|     '''Returns true if `val` can be converted to a float''' |     '''Returns true if `val` can be converted to a float''' | ||||||
|     try: |     try: | ||||||
|         float(val) |         float(val) | ||||||
|  | @ -63,23 +84,42 @@ def is_float(val): | ||||||
|     except ValueError: |     except ValueError: | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| def do_create(full_user, arguments, note_obj): | 
 | ||||||
|  | def do_create( | ||||||
|  |         full_user: str, | ||||||
|  |         arguments: List[str], | ||||||
|  |         note_obj: Dict[str, Any]) -> BotResponse: | ||||||
|     '''Creates a character''' |     '''Creates a character''' | ||||||
|     # Example call from bot logic |     # Example call from bot logic | ||||||
|     image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None |     image_url = note_obj.get('files', [{}])[0].get('url') \ | ||||||
|  |         if note_obj.get('files') else None | ||||||
|  | 
 | ||||||
|     if not image_url: |     if not image_url: | ||||||
|         return f'{full_user}{full_user} You need an image to create a character, dumbass.' |         return { | ||||||
|  |             'message': f'{full_user} You need an image to create a character, \ | ||||||
|  | dumbass.', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     if len(arguments) != 3: |     if len(arguments) != 3: | ||||||
|         return '{full_user}Please specify the following attributes in order: \ |         return { | ||||||
|                 name, rarity, drop weighting' |             'message': f'{full_user} Please specify the following attributes \ | ||||||
|  | in order: name, rarity, drop weighting', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): |     if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): | ||||||
|         return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5' |         return { | ||||||
| 
 |             'message': f'{full_user} Invalid rarity: \'{arguments[1]}\' must \ | ||||||
|  | be a number between 1 and 5', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
|     if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): |     if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): | ||||||
|         return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \ |         return { | ||||||
|                 must be a decimal value between 0.0 and 1.0' |             'message': f'{full_user} Invalid drop weight: \'{arguments[2]}\' \ | ||||||
|  | must be a decimal value between 0.0 and 1.0', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     character_id, file_id = add_character( |     character_id, file_id = add_character( | ||||||
|         name=arguments[0], |         name=arguments[0], | ||||||
|  | @ -87,50 +127,96 @@ def do_create(full_user, arguments, note_obj): | ||||||
|         weight=float(arguments[2]), |         weight=float(arguments[2]), | ||||||
|         image_url=image_url |         image_url=image_url | ||||||
|     ) |     ) | ||||||
|     return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]]) |     return { | ||||||
|  |         'message': f'{full_user} Added {arguments[0]}, ID {character_id}.', | ||||||
|  |         'attachment_urls': [file_id] | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
| def do_help(full_user): | 
 | ||||||
|  | def do_help(author: str) -> BotResponse: | ||||||
|     '''Provides a list of commands that the bot can do.''' |     '''Provides a list of commands that the bot can do.''' | ||||||
|     return f'{full_user} Here\'s what I can do:\n \ |     return { | ||||||
|  |         'message':f'{full_user} Here\'s what I can do:\n \ | ||||||
|             - `roll` Pulls a random character.\ |             - `roll` Pulls a random character.\ | ||||||
|             - `create <name> <rarity> <weight>` Creates a character using a given image.\ |             - `create <name> <rarity> <weight>` Creates a character using a given image.\ | ||||||
|             - `signup` Registers your account.\ |             - `signup` Registers your account.\ | ||||||
|             - `delete_account` Deletes your account.\ |             - `delete_account` Deletes your account.\ | ||||||
|             - `help` Shows this message' |             - `help` Shows this message', | ||||||
|  |             'attachment_urls': None | ||||||
|  |     } | ||||||
|              |              | ||||||
| def delete_account(full_user): | def delete_account(author: str) -> BotResponse: | ||||||
|     return ( |     return { | ||||||
|         f'@{full_user} ⚠️ This will permanently delete your account and all your cards.\n' |         'message':f'@{author} ⚠️ This will permanently delete your account and all your cards.\n' | ||||||
|         'If you’re sure, reply with `confirm_delete` to proceed.\n\n' |         'If you’re sure, reply with `confirm_delete` to proceed.\n\n' | ||||||
|         '**There is no undo.** Your gacha luck will be lost to the void... 💀✨' |         '**There is no undo.** Your gacha luck will be lost to the void... 💀✨', | ||||||
|     ) |         'attachment_urls': None | ||||||
| 
 | 
 | ||||||
| def confirm_delete(full_user): |     } | ||||||
|     success = delete_player(full_user) | 
 | ||||||
|  | def confirm_delete(author: str) -> BotResponse: | ||||||
|  |     success = delete_player(author) | ||||||
| 
 | 
 | ||||||
|     if not success: |     if not success: | ||||||
|         return f'@{full_user} ❌ No account found to delete. Maybe it’s already gone?' |         return { | ||||||
|  |             'message':f'@{author} ❌ No account found to delete. Maybe it’s already gone?', | ||||||
|  |             'attachment_urls': None | ||||||
|  |         } | ||||||
| 
 | 
 | ||||||
|     return f'@{full_user} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨' |     return { | ||||||
|  |         'message':f'@{author} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨', | ||||||
|  |         'attachment_urls': None | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def generate_response(parsed_command): | def generate_response(notification: ParsedNotification) -> BotResponse | None: | ||||||
|     '''Given a command with arguments, processes the game state and |     '''Given a command with arguments, processes the game state and | ||||||
|     returns a response''' |     returns a response''' | ||||||
| 
 | 
 | ||||||
|     command, full_user, arguments, note_obj = parsed_command |     # Temporary response variable | ||||||
|  |     res: BotResponse | None = None | ||||||
|  |     # TODO: Check if the user has an account | ||||||
|  |     author = notification['author'] | ||||||
|  |     user_id = get_or_create_user(author) | ||||||
|  |     command = notification['command'] | ||||||
|  |     # Check if the user is an administrator | ||||||
|  |     # user_is_administrator = user_is_administrator() | ||||||
|  | 
 | ||||||
|  |     # Unrestricted commands | ||||||
|     match command: |     match command: | ||||||
|         case 'roll': |  | ||||||
|             return do_roll(full_user) |  | ||||||
|         case 'create': |  | ||||||
|             return do_create(full_user, arguments, note_obj) |  | ||||||
|         case 'help': |  | ||||||
|             return do_help(command) |  | ||||||
|         case 'signup': |         case 'signup': | ||||||
|             return do_signup(full_user) |             res = do_signup() | ||||||
|         case 'delete_account': |         case 'help': | ||||||
|             return delete_account(full_user) |             res = do_help(author) | ||||||
|         case 'confirm_delete': | 
 | ||||||
|             return confirm_delete(full_user) |              | ||||||
|         case _: |         case _: | ||||||
|             return None |             pass | ||||||
|  | 
 | ||||||
|  |     if not user_id: | ||||||
|  |         return res | ||||||
|  | 
 | ||||||
|  |     # User commands | ||||||
|  |     match command: | ||||||
|  |         case 'delete_account': | ||||||
|  |             pass | ||||||
|  |         case 'roll': | ||||||
|  |             res = do_roll(author) | ||||||
|  |         case 'create': | ||||||
|  |             res = do_create( | ||||||
|  |                 author, | ||||||
|  |                 notification['arguments'], | ||||||
|  |                 notification['note_obj'] | ||||||
|  |             ) | ||||||
|  |         case 'signup': | ||||||
|  |             res = do_signup(author) | ||||||
|  |         case 'delete_account': | ||||||
|  |             res = delete_account(author) | ||||||
|  |         case 'confirm_delete': | ||||||
|  |             res = confirm_delete(author) | ||||||
|  |         case _: | ||||||
|  |             pass | ||||||
|  |     # if not user_is_administrator: | ||||||
|  |     return res | ||||||
|  | 
 | ||||||
|  |     # Administrator commands go here | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue