forked from waifu/kemoverse
		
	Compare commits
	
		
			8 commits
		
	
	
		
			24309ce900
			...
			8a331e0c7b
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 8a331e0c7b | |||
| 47272aee4f | |||
| ff20e26821 | |||
| e53bd87d81 | |||
| 9be92afce3 | |||
| d2a7e523e8 | |||
| 45dd6f10e3 | |||
| d5de73cf22 | 
					 7 changed files with 261 additions and 176 deletions
				
			
		|  | @ -32,7 +32,7 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl | ||||||
|             raise ValueError("Image URL must be provided.") |             raise ValueError("Image URL must be provided.") | ||||||
| 
 | 
 | ||||||
|         # Download image |         # Download image | ||||||
|         response = requests.get(image_url, stream=True) |         response = requests.get(image_url, stream=True, timeout=30) | ||||||
|         if response.status_code != 200: |         if response.status_code != 200: | ||||||
|             raise RuntimeError(f"Failed to download image from {image_url}") |             raise RuntimeError(f"Failed to download image from {image_url}") | ||||||
| 
 | 
 | ||||||
|  | @ -55,9 +55,6 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl | ||||||
|         character_id = cur.lastrowid |         character_id = cur.lastrowid | ||||||
| 
 | 
 | ||||||
|         return character_id, file_id |         return character_id, file_id | ||||||
| 
 |  | ||||||
|     except Exception as e: |  | ||||||
|         raise |  | ||||||
|     finally: |     finally: | ||||||
|         if 'conn' in locals(): |         if 'conn' in locals(): | ||||||
|             conn.close() |             conn.close() | ||||||
							
								
								
									
										118
									
								
								bot/bot_app.py
									
										
									
									
									
								
							
							
						
						
									
										118
									
								
								bot/bot_app.py
									
										
									
									
									
								
							|  | @ -1,114 +1,14 @@ | ||||||
| import time | import time | ||||||
| import traceback | import misskey as misskey | ||||||
| import misskey |  | ||||||
| from parsing import parse_notification |  | ||||||
| from db_utils import get_or_create_user, add_pull, get_config, set_config |  | ||||||
| from client import client_connection | from client import client_connection | ||||||
| from response import generate_response |  | ||||||
| 
 | 
 | ||||||
| # Initialize the Misskey client | from config import NOTIFICATION_POLL_INTERVAL | ||||||
| client = client_connection() | from notification import process_notifications | ||||||
| 
 |  | ||||||
| # Define your whitelist |  | ||||||
| # TODO: move to config |  | ||||||
| whitelisted_instances: list[str] = [] |  | ||||||
| 
 |  | ||||||
| def stream_notifications(): |  | ||||||
|     print("Starting filtered notification stream...") |  | ||||||
| 
 |  | ||||||
|     last_seen_id = get_config("last_seen_notif_id") |  | ||||||
| 
 | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     # Initialize the Misskey client | ||||||
|  |     client = client_connection() | ||||||
|  |     print('Listening for notifications...') | ||||||
|     while True: |     while True: | ||||||
|         try: |         if not process_notifications(client): | ||||||
|             # May be able to mark notifications as read using misskey.py and |             time.sleep(NOTIFICATION_POLL_INTERVAL) | ||||||
|             # filter them out here. This function also takes a since_id we |  | ||||||
|             # could use as well |  | ||||||
|             notifications = client.i_notifications() |  | ||||||
| 
 |  | ||||||
|             if notifications: |  | ||||||
|                 # Oldest to newest |  | ||||||
|                 notifications.reverse() |  | ||||||
| 
 |  | ||||||
|                 new_last_seen_id = last_seen_id |  | ||||||
| 
 |  | ||||||
|                 for notification in notifications: |  | ||||||
|                     notif_id = notification.get("id") |  | ||||||
| 
 |  | ||||||
|                     # Skip old or same ID notifications |  | ||||||
|                     if last_seen_id is not None and notif_id <= last_seen_id: |  | ||||||
|                         continue |  | ||||||
| 
 |  | ||||||
|                     user = notification.get("user", {}) |  | ||||||
|                     username = user.get("username", "unknown") |  | ||||||
|                     host = user.get("host")  # None if local user |  | ||||||
| 
 |  | ||||||
|                     instance = host if host else "local" |  | ||||||
| 
 |  | ||||||
|                     if instance in whitelisted_instances or instance == "local": |  | ||||||
|                         note = notification.get("note", {}) |  | ||||||
|                         note_text = note.get("text", "") |  | ||||||
|                         note_id = note.get("id") |  | ||||||
|                         notif_type = notification.get("type", "unknown") |  | ||||||
| 
 |  | ||||||
|                         # We want the visibility to be related to the type that was received (so if |  | ||||||
|                         # people don't want to dump a bunch of notes on home they don't have to) |  | ||||||
|                          |  | ||||||
|                         visibility = notification["note"]["visibility"] |  | ||||||
|                         if visibility != "specified": |  | ||||||
|                             visibility = "home" |  | ||||||
| 
 |  | ||||||
|                         print(f"📨 [{notif_type}] from @{username}@{instance}") |  | ||||||
|                         print(f"💬 {note_text}") |  | ||||||
|                         print("-" * 30) |  | ||||||
| 
 |  | ||||||
|                          |  | ||||||
|                         # We get the type of notification to filter the ones that we actually want |  | ||||||
|                         # to parse |  | ||||||
| 
 |  | ||||||
|                         notif_type = notification.get("type") |  | ||||||
|                         if notif_type in ('mention', 'reply'): |  | ||||||
|                             # 🧠 Send to the parser |  | ||||||
|                             parsed_command = parse_notification(notification,client) |  | ||||||
|                             # Get the response |  | ||||||
|                             response = generate_response(parsed_command) |  | ||||||
|                             if isinstance(response, str): |  | ||||||
|                                 client.notes_create( |  | ||||||
|                                     text=response, |  | ||||||
|                                     reply_id=note_id, |  | ||||||
|                                     visibility=visibility |  | ||||||
|                                 ) |  | ||||||
|                             elif response: |  | ||||||
|                                 client.notes_create( |  | ||||||
|                                     text=response[0], |  | ||||||
|                                     reply_id=note_id, |  | ||||||
|                                     visibility=visibility, |  | ||||||
|                                     file_ids=response[1] |  | ||||||
|                                     #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately |  | ||||||
|                                 ) |  | ||||||
|                              |  | ||||||
| 
 |  | ||||||
|                          |  | ||||||
| 
 |  | ||||||
|                     else: |  | ||||||
|                         print(f"⚠️ Blocked notification from untrusted instance: {host}") |  | ||||||
| 
 |  | ||||||
|                     # Update only if this notif_id is greater |  | ||||||
|                     if new_last_seen_id is None or notif_id > new_last_seen_id: |  | ||||||
|                         new_last_seen_id = notif_id |  | ||||||
| 
 |  | ||||||
|                 # Save the latest seen ID |  | ||||||
|                 if new_last_seen_id and new_last_seen_id != last_seen_id: |  | ||||||
|                     set_config("last_seen_notif_id", new_last_seen_id) |  | ||||||
|                     last_seen_id = new_last_seen_id |  | ||||||
| 
 |  | ||||||
|             time.sleep(5) |  | ||||||
| 
 |  | ||||||
|         except Exception as e: |  | ||||||
|             print(f"An exception has occured: {e}\n{traceback.format_exc()}") |  | ||||||
|             time.sleep(2) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| if __name__ == "__main__": |  | ||||||
|     stream_notifications() |  | ||||||
|  |  | ||||||
|  | @ -4,19 +4,20 @@ config = configparser.ConfigParser() | ||||||
| config.read('config.ini') | config.read('config.ini') | ||||||
| 
 | 
 | ||||||
| # Username for the bot | # Username for the bot | ||||||
| USER     = config['application']['BotUser'] | USER     = config['credentials']['User'] | ||||||
| 
 |  | ||||||
| # API key for the bot | # API key for the bot | ||||||
| KEY      = config['application']['ApiKey'] | KEY      = config['credentials']['Token'] | ||||||
| # Bot's Misskey instance URL | # Bot's Misskey instance URL | ||||||
| INSTANCE = config['application']['InstanceUrl'] | INSTANCE = config['credentials']['Instance'] | ||||||
| 
 |  | ||||||
| # SQLite Database location |  | ||||||
| DB_PATH = config['application']['DatabaseLocation'] |  | ||||||
| 
 |  | ||||||
| # Extra stuff for control of the bot |  | ||||||
| 
 | 
 | ||||||
| # TODO: move this to db | # TODO: move this to db | ||||||
| # Fedi handles in the traditional 'user@domain.tld' style, allows these users | # Fedi handles in the traditional 'user@domain.tld' style, allows these users | ||||||
| # to use extra admin exclusive commands with the bot''' | # to use extra admin exclusive commands with the bot | ||||||
| ADMINS        = config['application']['DefaultAdmins'] | ADMINS        = config['application']['DefaultAdmins'] | ||||||
|  | # SQLite Database location | ||||||
|  | DB_PATH       = config['application']['DatabaseLocation'] | ||||||
|  | 
 | ||||||
|  | NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) | ||||||
|  | NOTIFICATION_BATCH_SIZE    = int(config['notification']['BatchSize']) | ||||||
|  | 
 | ||||||
|  | GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) | ||||||
|  |  | ||||||
|  | @ -1,5 +1,4 @@ | ||||||
| import sqlite3 | import sqlite3 | ||||||
| import random |  | ||||||
| import config | import config | ||||||
| 
 | 
 | ||||||
| DB_PATH = config.DB_PATH | DB_PATH = config.DB_PATH | ||||||
|  | @ -40,6 +39,17 @@ def add_pull(user_id, character_id): | ||||||
|     conn.commit() |     conn.commit() | ||||||
|     conn.close() |     conn.close() | ||||||
| 
 | 
 | ||||||
|  | def get_last_rolled_at(user_id): | ||||||
|  |     '''Gets the timestamp when the user last rolled''' | ||||||
|  |     conn = get_db_connection() | ||||||
|  |     cur = conn.cursor() | ||||||
|  |     cur.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \ | ||||||
|  |             (user_id,)) | ||||||
|  |     row = cur.fetchone() | ||||||
|  |     conn.close() | ||||||
|  |     return row[0] if row else None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def get_config(key): | def get_config(key): | ||||||
|     '''Reads the value for a specified config key from the db''' |     '''Reads the value for a specified config key from the db''' | ||||||
|     conn = get_db_connection() |     conn = get_db_connection() | ||||||
|  |  | ||||||
							
								
								
									
										115
									
								
								bot/notification.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								bot/notification.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,115 @@ | ||||||
|  | import traceback | ||||||
|  | from misskey.exceptions import MisskeyAPIException | ||||||
|  | 
 | ||||||
|  | from config import NOTIFICATION_BATCH_SIZE | ||||||
|  | from parsing import parse_notification | ||||||
|  | from db_utils import get_config, set_config | ||||||
|  | from response import generate_response | ||||||
|  | 
 | ||||||
|  | # Define your whitelist | ||||||
|  | # TODO: move to config | ||||||
|  | WHITELISTED_INSTANCES: list[str] = [] | ||||||
|  | 
 | ||||||
|  | def process_notification(client, notification): | ||||||
|  |     '''Processes an individual notification''' | ||||||
|  |     user = notification.get('user', {}) | ||||||
|  |     username = user.get('username', 'unknown') | ||||||
|  |     host = user.get('host')  # None if local user | ||||||
|  |     instance = host if host else 'local' | ||||||
|  | 
 | ||||||
|  |     if not (instance in WHITELISTED_INSTANCES or instance == 'local'): | ||||||
|  |         print(f'⚠️ Blocked notification from untrusted instance: {instance}') | ||||||
|  |         return | ||||||
|  | 
 | ||||||
|  |     # Copy visibility of the post that was received when replying (so if people | ||||||
|  |     # don't want to dump a bunch of notes on home they don't have to) | ||||||
|  |     visibility = notification['note']['visibility'] | ||||||
|  |     if visibility != 'specified': | ||||||
|  |         visibility = 'home' | ||||||
|  | 
 | ||||||
|  |     notif_type = notification.get('type', 'unknown') | ||||||
|  |     notif_id = notification.get('id') | ||||||
|  |     print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') | ||||||
|  | 
 | ||||||
|  |     # 🧠 Send to the parser | ||||||
|  |     parsed_command = parse_notification(notification, client) | ||||||
|  | 
 | ||||||
|  |     # Get the note Id to reply to | ||||||
|  |     note_id = notification.get('note', {}).get('id') | ||||||
|  | 
 | ||||||
|  |     # Get the response | ||||||
|  |     # TODO: Formalize exactly *what* is returned by this. Ideally just want to | ||||||
|  |     # handle two cases here: either we have a response, or we don't. | ||||||
|  |     # TODO: Return dictionaries instead of tuples. They handle multiple | ||||||
|  |     # elements a lot better as they're not position dependent | ||||||
|  |     response = generate_response(parsed_command) | ||||||
|  |     if isinstance(response, str): | ||||||
|  |         client.notes_create( | ||||||
|  |             text=response, | ||||||
|  |             reply_id=note_id, | ||||||
|  |             visibility=visibility | ||||||
|  |         ) | ||||||
|  |     elif response: | ||||||
|  |         client.notes_create( | ||||||
|  |             text=response[0], | ||||||
|  |             reply_id=note_id, | ||||||
|  |             visibility=visibility, | ||||||
|  |             file_ids=response[1] | ||||||
|  |             #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  | def process_notifications(client): | ||||||
|  |     '''Processes a batch of unread notifications. Returns False if there are | ||||||
|  |     no more notifications to process.''' | ||||||
|  | 
 | ||||||
|  |     last_seen_id = get_config('last_seen_notif_id') | ||||||
|  |     # process_notification writes to last_seen_id, so make a copy | ||||||
|  |     new_last_seen_id = last_seen_id | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         notifications = client.i_notifications( | ||||||
|  |                 # Fetch notifications we haven't seen yet. This option is a bit | ||||||
|  |                 # tempermental, sometimes it'll include since_id, sometimes it | ||||||
|  |                 # won't. We need to keep track of what notifications we've | ||||||
|  |                 # already processed. | ||||||
|  |                 since_id=last_seen_id, | ||||||
|  |                 # Let misskey handle the filtering | ||||||
|  |                 include_types=['mention', 'reply'], | ||||||
|  |                 # And handle the batch size while we're at it | ||||||
|  |                 limit=NOTIFICATION_BATCH_SIZE | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # No notifications. Wait the poll period. | ||||||
|  |         if not notifications: | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         # Iterate oldest to newest | ||||||
|  |         for notification in notifications: | ||||||
|  |             try: | ||||||
|  |                 # Skip if we've processed already | ||||||
|  |                 notif_id = notification.get('id') | ||||||
|  |                 if notif_id <= last_seen_id: | ||||||
|  |                     continue | ||||||
|  | 
 | ||||||
|  |                 # Update new_last_seen_id and process | ||||||
|  |                 new_last_seen_id = notif_id | ||||||
|  |                 process_notification(client, notification) | ||||||
|  | 
 | ||||||
|  |             except Exception as e: | ||||||
|  |                 print(f'An exception has occured while processing a notification: {e}') | ||||||
|  |                 print(traceback.format_exc()) | ||||||
|  | 
 | ||||||
|  |         # If we got as many notifications as we requested, there are probably | ||||||
|  |         # more in the queue | ||||||
|  |         return len(notifications) == NOTIFICATION_BATCH_SIZE | ||||||
|  | 
 | ||||||
|  |     except MisskeyAPIException as e: | ||||||
|  |         print(f'An exception has occured while reading notifications: {e}\n') | ||||||
|  |         print(traceback.format_exc()) | ||||||
|  |     finally: | ||||||
|  |         # Quality jank right here, but finally lets us update the last_seen_id | ||||||
|  |         # even if we hit an exception or return early | ||||||
|  |         if new_last_seen_id > last_seen_id: | ||||||
|  |             set_config('last_seen_notif_id', new_last_seen_id) | ||||||
|  | 
 | ||||||
|  |     return False | ||||||
							
								
								
									
										112
									
								
								bot/response.py
									
										
									
									
									
								
							
							
						
						
									
										112
									
								
								bot/response.py
									
										
									
									
									
								
							|  | @ -1,6 +1,8 @@ | ||||||
| import random | import random | ||||||
| from db_utils import get_or_create_user, add_pull, get_db_connection | from datetime import datetime, timedelta, timezone | ||||||
|  | from db_utils import get_or_create_user, add_pull, get_db_connection, get_last_rolled_at | ||||||
| from add_character import add_character | from add_character import add_character | ||||||
|  | from config import GACHA_ROLL_INTERVAL | ||||||
| 
 | 
 | ||||||
| def get_character(): | def get_character(): | ||||||
|     ''' Gets a random character from the database''' |     ''' Gets a random character from the database''' | ||||||
|  | @ -18,48 +20,71 @@ def get_character(): | ||||||
| 
 | 
 | ||||||
|     return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity'] |     return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity'] | ||||||
| 
 | 
 | ||||||
|  | def do_roll(full_user): | ||||||
|  |     '''Determines whether the user can roll, then pulls a random character''' | ||||||
|  |     user_id = get_or_create_user(full_user) | ||||||
|  | 
 | ||||||
|  |     # Get date of user's last roll | ||||||
|  |     date = get_last_rolled_at(user_id) | ||||||
|  | 
 | ||||||
|  |     # No date means it's users first roll | ||||||
|  |     if date: | ||||||
|  |         # SQLite timestamps returned by the DB are always in UTC | ||||||
|  |         # Below timestamps are to be converted to UTC | ||||||
|  |         prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z') | ||||||
|  |         now = datetime.now(timezone.utc) | ||||||
|  | 
 | ||||||
|  |         time_since_last_roll = now - prev | ||||||
|  |         roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL) | ||||||
|  |         duration = roll_interval - time_since_last_roll | ||||||
|  | 
 | ||||||
|  |         # User needs to wait before they can roll again | ||||||
|  |         if time_since_last_roll < roll_interval: | ||||||
|  |             remaining_duration = None | ||||||
|  |             if duration.seconds > 3600: | ||||||
|  |                 remaining_duration = f'{-(duration.seconds // -3600)} hours' | ||||||
|  |             elif duration.seconds > 60: | ||||||
|  |                 remaining_duration = f'{-(duration.seconds // -60)} minutes' | ||||||
|  |             else: | ||||||
|  |                 remaining_duration = f'{duration.seconds} seconds' | ||||||
|  | 
 | ||||||
|  |             return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.' | ||||||
|  | 
 | ||||||
|  |     character_id, character_name, file_id, rarity = get_character() | ||||||
|  | 
 | ||||||
|  |     if not character_id: | ||||||
|  |         return f'{full_user} Uwaaa... something went wrong! No characters found. 😿' | ||||||
|  | 
 | ||||||
|  |     add_pull(user_id,character_id) | ||||||
|  |     stars = '⭐️' * rarity | ||||||
|  |     return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\ | ||||||
|  |             She's all yours now~ 💖✨",[file_id]]) | ||||||
|  | 
 | ||||||
| def is_float(val): | def is_float(val): | ||||||
|  |     '''Returns true if `val` can be converted to a float''' | ||||||
|     try: |     try: | ||||||
|         float(val) |         float(val) | ||||||
|         return True |         return True | ||||||
|     except ValueError: |     except ValueError: | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| 
 | def do_create(full_user, arguments, note_obj): | ||||||
| def generate_response(parsed_command): |     '''Creates a character''' | ||||||
|      |  | ||||||
|     '''Given a command with arguments, processes the game state and |  | ||||||
|     returns a response''' |  | ||||||
| 
 |  | ||||||
|     command, full_user, arguments, note_obj = parsed_command |  | ||||||
|      |  | ||||||
|     if command == "roll": |  | ||||||
|         user_id = get_or_create_user(full_user) |  | ||||||
|         character_id, character_name, file_id, rarity = get_character() |  | ||||||
| 
 |  | ||||||
|         if not character_id: |  | ||||||
|             #TODO: Can't have tuples of a single element |  | ||||||
|             # Return these as a dict or object instead. |  | ||||||
|             return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿") |  | ||||||
| 
 |  | ||||||
|         add_pull(user_id,character_id) |  | ||||||
|         stars = '⭐️' * rarity |  | ||||||
|         return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]]) |  | ||||||
| 
 |  | ||||||
|     if command == "create": |  | ||||||
|     # Example call from bot logic |     # Example call from bot logic | ||||||
|         image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None |     image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None | ||||||
|     if not image_url: |     if not image_url: | ||||||
|             return "You need an image to create a character, dumbass." |         return f'{full_user}{full_user} You need an image to create a character, dumbass.' | ||||||
| 
 | 
 | ||||||
|     if len(arguments) != 3: |     if len(arguments) != 3: | ||||||
|             return "Please specify the following attributes in order: name, rarity, drop weighting" |         return '{full_user}Please specify the following attributes in order: \ | ||||||
|  |                 name, rarity, drop weighting' | ||||||
| 
 | 
 | ||||||
|     if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): |     if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): | ||||||
|             return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5" |         return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5' | ||||||
| 
 | 
 | ||||||
|     if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): |     if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): | ||||||
|             return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0" |         return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \ | ||||||
|  |                 must be a decimal value between 0.0 and 1.0' | ||||||
| 
 | 
 | ||||||
|     character_id, file_id = add_character( |     character_id, file_id = add_character( | ||||||
|         name=arguments[0], |         name=arguments[0], | ||||||
|  | @ -67,5 +92,32 @@ def generate_response(parsed_command): | ||||||
|         weight=float(arguments[2]), |         weight=float(arguments[2]), | ||||||
|         image_url=image_url |         image_url=image_url | ||||||
|     ) |     ) | ||||||
|         return([f"Added {arguments[0]}, ID {character_id}.",[file_id]]) |     return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]]) | ||||||
|     return None | 
 | ||||||
|  | 
 | ||||||
|  | def do_help(full_user): | ||||||
|  |     '''Provides a list of commands that the bot can do.''' | ||||||
|  |     return f'{full_user} Here\'s what I can do:\n \ | ||||||
|  |             - `roll` Pulls a random character.\ | ||||||
|  |             - `create <name> <rarity> <weight>` Creates a character using a given image.\ | ||||||
|  |             - `help` Shows this message' | ||||||
|  | 
 | ||||||
|  | def do_invalid_command(command, full_user): | ||||||
|  |     '''Generic response when an unknown or invalid command is sent''' | ||||||
|  |     return f'{full_user} Unrecognised command: {command}\n\ | ||||||
|  |             Message \'help\' to get a list of valid commands' | ||||||
|  | 
 | ||||||
|  | def generate_response(parsed_command): | ||||||
|  |     '''Given a command with arguments, processes the game state and | ||||||
|  |     returns a response''' | ||||||
|  | 
 | ||||||
|  |     command, full_user, arguments, note_obj = parsed_command | ||||||
|  |     match command: | ||||||
|  |         case 'roll': | ||||||
|  |             return do_roll(full_user) | ||||||
|  |         case 'create': | ||||||
|  |             return do_create(full_user, arguments, note_obj) | ||||||
|  |         case 'help': | ||||||
|  |             return do_help(command) | ||||||
|  |         case _: | ||||||
|  |             return do_invalid_command(command, full_user) | ||||||
|  |  | ||||||
|  | @ -1,17 +1,27 @@ | ||||||
| ; Rename me to config.ini and put your values in here | ; Rename me to config.ini and put your values in here | ||||||
| [application] | [application] | ||||||
| ; Full fedi handle of the bot user |  | ||||||
| BotUser       = @bot@example.tld |  | ||||||
| 
 |  | ||||||
| ; API key for the bot |  | ||||||
| ; Generate one by going to Settings > API > Generate access token |  | ||||||
| ApiKey        = abcdefghijklmnopqrstuvwxyz012345 |  | ||||||
| 
 |  | ||||||
| ; Fully qualified URL of the instance hosting the bot |  | ||||||
| InstanceUrl   = http://example.tld |  | ||||||
| 
 |  | ||||||
| ; Comma separated list of fedi handles for any administrator users | ; Comma separated list of fedi handles for any administrator users | ||||||
|  | ; More can be added through the application | ||||||
| DefaultAdmins    = ['admin@example.tld'] | DefaultAdmins    = ['admin@example.tld'] | ||||||
| 
 |  | ||||||
| ; SQLite Database location | ; SQLite Database location | ||||||
| DatabaseLocation = ./gacha_game.db | DatabaseLocation = ./gacha_game.db | ||||||
|  | 
 | ||||||
|  | [gacha] | ||||||
|  | ; Number of seconds players have to wait between rolls | ||||||
|  | RollInterval = 72000 | ||||||
|  | 
 | ||||||
|  | [notification] | ||||||
|  | ; Number of seconds to sleep while awaiting new notifications | ||||||
|  | PollInterval = 5 | ||||||
|  | ; Number of notifications to process at once (max 100) | ||||||
|  | BatchSize    = 10 | ||||||
|  | 
 | ||||||
|  | [credentials] | ||||||
|  | ; Fully qualified URL of the instance hosting the bot | ||||||
|  | Instance = http://example.tld | ||||||
|  | ; Full fedi handle of the bot user | ||||||
|  | User     = @bot@example.tld | ||||||
|  | ; API key for the bot | ||||||
|  | ; Generate one by going to Settings > API > Generate access token | ||||||
|  | Token    = abcdefghijklmnopqrstuvwxyz012345 | ||||||
|  | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue