Merge pull request 'Reduce reliance on tuples' (#41) from 18_reduce_tuples into dev

Reviewed-on: #41
This commit is contained in:
waifu 2025-06-01 23:17:07 -03:00
commit 71b9cd9d16
9 changed files with 263 additions and 105 deletions

1
.gitignore vendored
View file

@ -147,6 +147,7 @@ venv.bak/
/site /site
# mypy # mypy
.mypy.ini
.mypy_cache/ .mypy_cache/
.dmypy.json .dmypy.json
dmypy.json dmypy.json

View file

@ -2,9 +2,15 @@ import requests
from misskey.exceptions import MisskeyAPIException from misskey.exceptions import MisskeyAPIException
from client import client_connection from client import client_connection
from db_utils import insert_character from db_utils import insert_character
from custom_types import Character
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
""" def add_character(
name: str,
rarity: int,
weight: float,
image_url: str) -> tuple[int, str]:
'''
Adds a character to the database, uploading the image from a public URL to Adds a character to the database, uploading the image from a public URL to
the bot's Misskey Drive. the bot's Misskey Drive.
@ -12,7 +18,8 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
name (str): Character name. name (str): Character name.
rarity (int): Character rarity (e.g., 1-5). rarity (int): Character rarity (e.g., 1-5).
weight (float): Pull weight (e.g., 0.02). weight (float): Pull weight (e.g., 0.02).
image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']). image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
Returns: Returns:
tuple[int, str]: Character ID and bot's Drive file_id. tuple[int, str]: Character ID and bot's Drive file_id.
@ -20,30 +27,39 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
Raises: Raises:
ValueError: If inputs are invalid. ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails. RuntimeError: If image download/upload or database operation fails.
""" '''
stripped_name = name.strip()
# Validate inputs # Validate inputs
if not name or not name.strip(): if not stripped_name:
raise ValueError("Character name cannot be empty.") raise ValueError('Character name cannot be empty.')
if not isinstance(rarity, int) or rarity < 1: if rarity < 1:
raise ValueError("Rarity must be a positive integer.") raise ValueError('Rarity must be a positive integer.')
if not isinstance(weight, (int, float)) or weight <= 0: if weight <= 0:
raise ValueError("Weight must be a positive number.") raise ValueError('Weight must be a positive number.')
if not image_url: if not image_url:
raise ValueError("Image URL must be provided.") raise ValueError('Image URL must be provided.')
# Download image # Download image
response = requests.get(image_url, stream=True, timeout=30) response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200: if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}") raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive # Upload to bot's Drive
mk = client_connection() mk = client_connection()
try: try:
media = mk.drive_files_create(response.raw) media = mk.drive_files_create(response.raw)
file_id = media["id"] file_id = media['id']
except MisskeyAPIException as e: except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Insert into database # Insert into database
character_id = insert_character(name.strip(), rarity, float(weight), file_id) character_id = insert_character(
stripped_name,
rarity,
float(weight),
file_id
)
return character_id, file_id return character_id, file_id

View file

@ -1,5 +1,6 @@
import misskey import misskey
import config import config
def client_connection():
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY) return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -2,9 +2,11 @@
import configparser import configparser
from os import environ, path from os import environ, path
class ConfigError(Exception): class ConfigError(Exception):
'''Could not find config file''' '''Could not find config file'''
def get_config_file() -> str: def get_config_file() -> str:
'''Gets the path to the config file in the current environment''' '''Gets the path to the config file in the current environment'''
env: str | None = environ.get('KEMOVERSE_ENV') env: str | None = environ.get('KEMOVERSE_ENV')
@ -19,24 +21,25 @@ def get_config_file() -> str:
raise ConfigError(f'Could not find {config_path}') raise ConfigError(f'Could not find {config_path}')
return config_path return config_path
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(get_config_file()) config.read(get_config_file())
# Username for the bot # Username for the bot
USER = config['credentials']['User'].lower() USER = config['credentials']['User'].lower()
# API key for the bot # API key for the bot
KEY = config['credentials']['Token'] KEY = config['credentials']['Token']
# Bot's Misskey instance URL # Bot's Misskey instance URL
INSTANCE = config['credentials']['Instance'].lower() INSTANCE = config['credentials']['Instance'].lower()
# TODO: move this to db # TODO: move this to db
# Fedi handles in the traditional 'user@domain.tld' style, allows these users # Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot # to use extra admin exclusive commands with the bot
ADMINS = config['application']['DefaultAdmins'] ADMINS = config['application']['DefaultAdmins']
# SQLite Database location # SQLite Database location
DB_PATH = config['application']['DatabaseLocation'] DB_PATH = config['application']['DatabaseLocation']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])

21
bot/custom_types.py Normal file
View file

@ -0,0 +1,21 @@
from typing import TypedDict, List, Dict, Any
BotResponse = TypedDict('BotResponse', {
'message': str,
'attachment_urls': List[str] | None
})
Character = TypedDict('Character', {
'id': int,
'name': str,
'rarity': int,
'weight': float,
'image_url': str
})
ParsedNotification = TypedDict('ParsedNotification', {
'author': str,
'command': str | None,
'arguments': List[str],
'note_obj': Dict[str, Any]
})

View file

@ -1,11 +1,13 @@
from random import choices from random import choices
import sqlite3 import sqlite3
import config import config
from custom_types import Character
DB_PATH = config.DB_PATH DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection CONNECTION: sqlite3.Connection
CURSOR: sqlite3.Cursor CURSOR: sqlite3.Cursor
def connect() -> None: def connect() -> None:
'''Creates a connection to the database''' '''Creates a connection to the database'''
print('Connecting to the database...') print('Connecting to the database...')
@ -15,26 +17,34 @@ def connect() -> None:
CONNECTION.row_factory = sqlite3.Row CONNECTION.row_factory = sqlite3.Row
CURSOR = CONNECTION.cursor() CURSOR = CONNECTION.cursor()
def get_random_character():
def get_random_character() -> Character | None:
''' Gets a random character from the database''' ''' Gets a random character from the database'''
CURSOR.execute('SELECT * FROM characters') CURSOR.execute('SELECT * FROM characters')
characters = CURSOR.fetchall() characters = CURSOR.fetchall()
if not characters: if not characters:
return None, None, None, None return None
weights = [c['weight'] for c in characters] weights = [c['weight'] for c in characters]
chosen = choices(characters, weights=weights, k=1)[0] chosen = choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity'] return {
'id': chosen['id'],
'name': chosen['name'],
'rarity': chosen['rarity'],
'weight': chosen['weight'],
'image_url': chosen['file_id']
}
def get_or_create_user(username):
def get_or_create_user(username: str) -> int:
'''Retrieves an ID for a given user, if the user does not exist, it will be '''Retrieves an ID for a given user, if the user does not exist, it will be
created.''' created.'''
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,)) CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
user = CURSOR.fetchone() user = CURSOR.fetchone()
if user: if user:
return user[0] return int(user[0])
# New user starts with has_rolled = False # New user starts with has_rolled = False
CURSOR.execute( CURSOR.execute(
@ -42,38 +52,47 @@ def get_or_create_user(username):
(username, False) (username, False)
) )
user_id = CURSOR.lastrowid user_id = CURSOR.lastrowid
return user_id return user_id if user_id else 0
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
def insert_character(
name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a character''' '''Inserts a character'''
CURSOR.execute( CURSOR.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)', 'INSERT INTO characters (name, rarity, weight, file_id) VALUES \
(?, ?, ?, ?)',
(name, rarity, weight, file_id) (name, rarity, weight, file_id)
) )
character_id = CURSOR.lastrowid character_id = CURSOR.lastrowid
return character_id if character_id else 0 return character_id if character_id else 0
def insert_pull(user_id, character_id):
def insert_pull(user_id: int, character_id: int) -> None:
'''Creates a pull in the database''' '''Creates a pull in the database'''
CURSOR.execute( CURSOR.execute(
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', 'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
(user_id, character_id) (user_id, character_id)
) )
def get_last_rolled_at(user_id):
def get_last_rolled_at(user_id: int) -> int:
'''Gets the timestamp when the user last rolled''' '''Gets the timestamp when the user last rolled'''
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \ CURSOR.execute(
(user_id,)) "SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \
DESC",
(user_id,))
row = CURSOR.fetchone() row = CURSOR.fetchone()
return row[0] if row else None return row[0] if row else 0
def get_config(key): def get_config(key: str) -> str:
'''Reads the value for a specified config key from the db''' '''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
row = CURSOR.fetchone() row = CURSOR.fetchone()
return row[0] if row else None return row[0] if row else ''
def set_config(key, value):
def set_config(key: str, value: str) -> None:
'''Writes the value for a specified config key to the db''' '''Writes the value for a specified config key to the db'''
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value)) CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
(key, value))

View file

@ -1,16 +1,23 @@
import traceback import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification from parsing import parse_notification
from db_utils import get_config, set_config from db_utils import get_config, set_config
from response import generate_response from response import generate_response
from custom_types import BotResponse
# Define your whitelist # Define your whitelist
# TODO: move to config # TODO: move to config
WHITELISTED_INSTANCES: list[str] = [] WHITELISTED_INSTANCES: list[str] = []
def process_notification(client, notification):
def process_notification(
client: misskey.Misskey,
notification: Dict[str, Any]) -> None:
'''Processes an individual notification''' '''Processes an individual notification'''
user = notification.get('user', {}) user = notification.get('user', {})
username = user.get('username', 'unknown') username = user.get('username', 'unknown')
@ -32,33 +39,32 @@ def process_notification(client, notification):
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser # 🧠 Send to the parser
parsed_command = parse_notification(notification, client) parsed_notification = parse_notification(notification, client)
if not parsed_notification:
return
# Get the note Id to reply to # Get the note Id to reply to
note_id = notification.get('note', {}).get('id') note_id = notification.get('note', {}).get('id')
# Get the response # Get the response
# TODO: Formalize exactly *what* is returned by this. Ideally just want to response: BotResponse | None = generate_response(parsed_notification)
# handle two cases here: either we have a response, or we don't.
# TODO: Return dictionaries instead of tuples. They handle multiple
# elements a lot better as they're not position dependent
response = generate_response(parsed_command)
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client): if not response:
return
client.notes_create(
text=response['message'],
reply_id=note_id,
visibility=visibility,
file_ids=response['attachment_urls']
# TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
)
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are '''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.''' no more notifications to process.'''
@ -87,7 +93,7 @@ def process_notifications(client):
for notification in notifications: for notification in notifications:
try: try:
# Skip if we've processed already # Skip if we've processed already
notif_id = notification.get('id') notif_id = notification.get('id', '')
if notif_id <= last_seen_id: if notif_id <= last_seen_id:
continue continue
@ -96,7 +102,8 @@ def process_notifications(client):
process_notification(client, notification) process_notification(client, notification)
except Exception as e: except Exception as e:
print(f'An exception has occured while processing a notification: {e}') print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc()) print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably # If we got as many notifications as we requested, there are probably

View file

@ -1,11 +1,16 @@
import random, re import re
from typing import Dict, Any
import misskey
import config import config
from custom_types import ParsedNotification
def parse_notification(notification,client):
'''Parses any notifications received by the bot and sends any commands to
gacha_response()'''
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
# Get the full Activitypub ID of the user # Get the full Activitypub ID of the user
user = notification.get("user", {}) user = notification.get("user", {})
@ -28,14 +33,20 @@ def parse_notification(notification,client):
# Make sure the notification text explicitly mentions the bot # Make sure the notification text explicitly mentions the bot
if not any(variant in note for variant in username_variants): if not any(variant in note for variant in username_variants):
return return None
# Find command and arguments after the mention # Find command and arguments after the mention
# Removes all mentions (regex = mentions that start with @ and may contain @domain) # Removes all mentions
# regex = mentions that start with @ and may contain @domain
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip() cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
parts = cleaned_text.split() parts = cleaned_text.split()
command = parts[0].lower() if parts else None command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else [] arguments = parts[1:] if len(parts) > 1 else []
return [command,full_user, arguments, note_obj] return {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
}

View file

@ -1,9 +1,13 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character from typing import TypedDict, Any, List, Dict
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, \
get_random_character
from add_character import add_character from add_character import add_character
from config import GACHA_ROLL_INTERVAL from config import GACHA_ROLL_INTERVAL
from custom_types import BotResponse, ParsedNotification
def do_roll(full_user):
def do_roll(full_user: str) -> BotResponse:
'''Determines whether the user can roll, then pulls a random character''' '''Determines whether the user can roll, then pulls a random character'''
user_id = get_or_create_user(full_user) user_id = get_or_create_user(full_user)
@ -14,7 +18,7 @@ def do_roll(full_user):
if date: if date:
# SQLite timestamps returned by the DB are always in UTC # SQLite timestamps returned by the DB are always in UTC
# Below timestamps are to be converted to UTC # Below timestamps are to be converted to UTC
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z') prev = datetime.strptime(str(date) + '+0000', '%Y-%m-%d %H:%M:%S%z')
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
time_since_last_roll = now - prev time_since_last_roll = now - prev
@ -31,19 +35,31 @@ def do_roll(full_user):
else: else:
remaining_duration = f'{duration.seconds} seconds' remaining_duration = f'{duration.seconds} seconds'
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.' return {
'message': f'{full_user} ⏱️ Please wait another \
{remaining_duration} before rolling again.',
'attachment_urls': None
}
character_id, character_name, file_id, rarity = get_random_character() character = get_random_character()
if not character_id: if not character:
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿' return {
'message': f'{full_user} Uwaaa... something went wrong! No \
characters found. 😿',
'attachment_urls': None
}
insert_pull(user_id,character_id) insert_pull(user_id, character['id'])
stars = '⭐️' * rarity stars = '⭐️' * character['rarity']
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\ return {
She's all yours now~ 💖✨",[file_id]]) 'message': f'@{full_user} 🎲 Congrats! You rolled {stars} \
**{character['name']}**\nShe\'s all yours now~ 💖✨',
'attachment_urls': [character['image_url']]
}
def is_float(val):
def is_float(val: Any) -> bool:
'''Returns true if `val` can be converted to a float''' '''Returns true if `val` can be converted to a float'''
try: try:
float(val) float(val)
@ -51,23 +67,42 @@ def is_float(val):
except ValueError: except ValueError:
return False return False
def do_create(full_user, arguments, note_obj):
def do_create(
full_user: str,
arguments: List[str],
note_obj: Dict[str, Any]) -> BotResponse:
'''Creates a character''' '''Creates a character'''
# Example call from bot logic # Example call from bot logic
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None image_url = note_obj.get('files', [{}])[0].get('url') \
if note_obj.get('files') else None
if not image_url: if not image_url:
return f'{full_user}{full_user} You need an image to create a character, dumbass.' return {
'message': f'{full_user} You need an image to create a character, \
dumbass.',
'attachment_urls': None
}
if len(arguments) != 3: if len(arguments) != 3:
return '{full_user}Please specify the following attributes in order: \ return {
name, rarity, drop weighting' 'message': f'{full_user} Please specify the following attributes \
in order: name, rarity, drop weighting',
'attachment_urls': None
}
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5): if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5' return {
'message': f'{full_user} Invalid rarity: \'{arguments[1]}\' must \
be a number between 1 and 5',
'attachment_urls': None
}
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0): if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \ return {
must be a decimal value between 0.0 and 1.0' 'message': f'{full_user} Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
character_id, file_id = add_character( character_id, file_id = add_character(
name=arguments[0], name=arguments[0],
@ -75,26 +110,70 @@ def do_create(full_user, arguments, note_obj):
weight=float(arguments[2]), weight=float(arguments[2]),
image_url=image_url image_url=image_url
) )
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]]) return {
'message': f'{full_user} Added {arguments[0]}, ID {character_id}.',
'attachment_urls': [file_id]
}
def do_help(full_user):
def do_help(author: str) -> BotResponse:
'''Provides a list of commands that the bot can do.''' '''Provides a list of commands that the bot can do.'''
return f'{full_user} Here\'s what I can do:\n \ return {
- `roll` Pulls a random character.\ 'message': f'{author} Here\'s what I can do:\n\
- `create <name> <rarity> <weight>` Creates a character using a given image.\ - `roll` Pulls a random character.\n\
- `help` Shows this message' - `create <name> <rarity> <weight>` Creates a character using a given image.\n\
- `help` Shows this message.',
'attachment_urls': None
}
def generate_response(parsed_command):
def do_signup() -> BotResponse:
return {
'message': '',
'attachment_urls': None
}
def generate_response(notification: ParsedNotification) -> BotResponse | None:
'''Given a command with arguments, processes the game state and '''Given a command with arguments, processes the game state and
returns a response''' returns a response'''
command, full_user, arguments, note_obj = parsed_command # Temporary response variable
res: BotResponse | None = None
# TODO: Check if the user has an account
author = notification['author']
user_id = get_or_create_user(author)
command = notification['command']
# Check if the user is an administrator
# user_is_administrator = user_is_administrator()
# Unrestricted commands
match command: match command:
case 'roll': case 'signup':
return do_roll(full_user) res = do_signup()
case 'create':
return do_create(full_user, arguments, note_obj)
case 'help': case 'help':
return do_help(command) res = do_help(author)
case _: case _:
return None pass
if not user_id:
return res
# User commands
match command:
case 'delete_account':
pass
case 'roll':
res = do_roll(author)
case 'create':
res = do_create(
author,
notification['arguments'],
notification['note_obj']
)
case _:
pass
# if not user_is_administrator:
return res
# Administrator commands go here