Compare commits

...

4 commits

9 changed files with 287 additions and 122 deletions

1
.gitignore vendored
View file

@ -147,6 +147,7 @@ venv.bak/
/site
# mypy
.mypy.ini
.mypy_cache/
.dmypy.json
dmypy.json

View file

@ -2,9 +2,15 @@ import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_character
from custom_types import Character
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
"""
def add_character(
name: str,
rarity: int,
weight: float,
image_url: str) -> tuple[int, str]:
'''
Adds a character to the database, uploading the image from a public URL to
the bot's Misskey Drive.
@ -12,7 +18,8 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
name (str): Character name.
rarity (int): Character rarity (e.g., 1-5).
weight (float): Pull weight (e.g., 0.02).
image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
Returns:
tuple[int, str]: Character ID and bot's Drive file_id.
@ -20,30 +27,39 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
Raises:
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
"""
'''
stripped_name = name.strip()
# Validate inputs
if not name or not name.strip():
raise ValueError("Character name cannot be empty.")
if not isinstance(rarity, int) or rarity < 1:
raise ValueError("Rarity must be a positive integer.")
if not isinstance(weight, (int, float)) or weight <= 0:
raise ValueError("Weight must be a positive number.")
if not stripped_name:
raise ValueError('Character name cannot be empty.')
if rarity < 1:
raise ValueError('Rarity must be a positive integer.')
if weight <= 0:
raise ValueError('Weight must be a positive number.')
if not image_url:
raise ValueError("Image URL must be provided.")
raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media["id"]
file_id = media['id']
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Insert into database
character_id = insert_character(name.strip(), rarity, float(weight), file_id)
character_id = insert_character(
stripped_name,
rarity,
float(weight),
file_id
)
return character_id, file_id

View file

@ -1,5 +1,6 @@
import misskey
import config
def client_connection():
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -2,9 +2,11 @@
import configparser
from os import environ, path
class ConfigError(Exception):
'''Could not find config file'''
def get_config_file() -> str:
'''Gets the path to the config file in the current environment'''
env: str | None = environ.get('KEMOVERSE_ENV')
@ -19,6 +21,7 @@ def get_config_file() -> str:
raise ConfigError(f'Could not find {config_path}')
return config_path
config = configparser.ConfigParser()
config.read(get_config_file())

21
bot/custom_types.py Normal file
View file

@ -0,0 +1,21 @@
from typing import TypedDict, List, Dict, Any
BotResponse = TypedDict('BotResponse', {
'message': str,
'attachment_urls': List[str] | None
})
Character = TypedDict('Character', {
'id': int,
'name': str,
'rarity': int,
'weight': float,
'image_url': str
})
ParsedNotification = TypedDict('ParsedNotification', {
'author': str,
'command': str | None,
'arguments': List[str],
'note_obj': Dict[str, Any]
})

View file

@ -1,11 +1,13 @@
from random import choices
import sqlite3
import config
from custom_types import Character
DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection
CURSOR: sqlite3.Cursor
def connect() -> None:
'''Creates a connection to the database'''
print('Connecting to the database...')
@ -15,24 +17,32 @@ def connect() -> None:
CONNECTION.row_factory = sqlite3.Row
CURSOR = CONNECTION.cursor()
def get_random_character():
def get_random_character() -> Character | None:
''' Gets a random character from the database'''
CURSOR.execute('SELECT * FROM characters')
characters = CURSOR.fetchall()
if not characters:
return None, None, None, None
return None
weights = [c['weight'] for c in characters]
chosen = choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
return {
'id': chosen['id'],
'name': chosen['name'],
'rarity': chosen['rarity'],
'weight': chosen['weight'],
'image_url': chosen['file_id']
}
def get_player(username):
def get_player(username: str) -> int:
'''Retrieve a player ID by username, or return None if not found.'''
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
user = CURSOR.fetchone()
return user[0] if user else None
if user:
return int(user[0])
def insert_player(username):
'''Insert a new player with default has_rolled = False and return their user ID.'''
@ -70,36 +80,45 @@ def delete_player(username):
return True
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
def insert_character(
name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a character'''
CURSOR.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
'INSERT INTO characters (name, rarity, weight, file_id) VALUES \
(?, ?, ?, ?)',
(name, rarity, weight, file_id)
)
character_id = CURSOR.lastrowid
return character_id if character_id else 0
def insert_pull(user_id, character_id):
def insert_pull(user_id: int, character_id: int) -> None:
'''Creates a pull in the database'''
CURSOR.execute(
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
(user_id, character_id)
)
def get_last_rolled_at(user_id):
def get_last_rolled_at(user_id: int) -> int:
'''Gets the timestamp when the user last rolled'''
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
CURSOR.execute(
"SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \
DESC",
(user_id,))
row = CURSOR.fetchone()
return row[0] if row else None
return row[0] if row else 0
def get_config(key):
def get_config(key: str) -> str:
'''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
row = CURSOR.fetchone()
return row[0] if row else None
return row[0] if row else ''
def set_config(key, value):
def set_config(key: str, value: str) -> None:
'''Writes the value for a specified config key to the db'''
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
(key, value))

View file

@ -1,16 +1,23 @@
import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification
from db_utils import get_config, set_config
from response import generate_response
from custom_types import BotResponse
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
def process_notification(client, notification):
def process_notification(
client: misskey.Misskey,
notification: Dict[str, Any]) -> None:
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
@ -32,33 +39,32 @@ def process_notification(client, notification):
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_command = parse_notification(notification, client)
parsed_notification = parse_notification(notification, client)
if not parsed_notification:
return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
# handle two cases here: either we have a response, or we don't.
# TODO: Return dictionaries instead of tuples. They handle multiple
# elements a lot better as they're not position dependent
response = generate_response(parsed_command)
if isinstance(response, str):
response: BotResponse | None = generate_response(parsed_notification)
if not response:
return
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
text=response['message'],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
file_ids=response['attachment_urls']
# TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
)
def process_notifications(client):
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
@ -87,7 +93,7 @@ def process_notifications(client):
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id')
notif_id = notification.get('id', '')
if notif_id <= last_seen_id:
continue
@ -96,7 +102,8 @@ def process_notifications(client):
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a notification: {e}')
print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably

View file

@ -1,11 +1,16 @@
import random, re
import re
from typing import Dict, Any
import misskey
import config
def parse_notification(notification,client):
'''Parses any notifications received by the bot and sends any commands to
gacha_response()'''
from custom_types import ParsedNotification
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
# Get the full Activitypub ID of the user
user = notification.get("user", {})
@ -28,14 +33,20 @@ def parse_notification(notification,client):
# Make sure the notification text explicitly mentions the bot
if not any(variant in note for variant in username_variants):
return
return None
# Find command and arguments after the mention
# Removes all mentions (regex = mentions that start with @ and may contain @domain)
# Removes all mentions
# regex = mentions that start with @ and may contain @domain
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
parts = cleaned_text.split()
command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else []
return [command,full_user, arguments, note_obj]
return {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
}

View file

@ -1,9 +1,13 @@
from datetime import datetime, timedelta, timezone
from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, get_random_character
from typing import TypedDict, Any, List, Dict
from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, \
get_random_character
from add_character import add_character
from config import GACHA_ROLL_INTERVAL
from custom_types import BotResponse, ParsedNotification
def do_roll(full_user):
def do_roll(full_user: str) -> BotResponse:
'''Determines whether the user can roll, then pulls a random character'''
user_id = get_player(full_user)
@ -16,7 +20,7 @@ def do_roll(full_user):
if date:
# SQLite timestamps returned by the DB are always in UTC
# Below timestamps are to be converted to UTC
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z')
prev = datetime.strptime(str(date) + '+0000', '%Y-%m-%d %H:%M:%S%z')
now = datetime.now(timezone.utc)
time_since_last_roll = now - prev
@ -33,29 +37,46 @@ def do_roll(full_user):
else:
remaining_duration = f'{duration.seconds} seconds'
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
return {
'message': f'{full_user} ⏱️ Please wait another \
{remaining_duration} before rolling again.',
'attachment_urls': None
}
character_id, character_name, file_id, rarity = get_random_character()
character = get_random_character()
if not character_id:
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
if not character:
return {
'message': f'{full_user} Uwaaa... something went wrong! No \
characters found. 😿',
'attachment_urls': None
}
insert_pull(user_id,character_id)
stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
She's all yours now~ 💖✨",[file_id]])
insert_pull(user_id, character['id'])
stars = '⭐️' * character['rarity']
return {
'message': f'@{full_user} 🎲 Congrats! You rolled {stars} \
**{character['name']}**\nShe\'s all yours now~ 💖✨',
'attachment_urls': [character['image_url']]
}
def do_signup(full_user):
def do_signup(author: str) -> BotResponse:
'''Registers a new user if they havent signed up yet.'''
user_id = get_player(full_user)
user_id = get_player(author)
if user_id:
return f'@{full_user} 👀 Youre already signed up! Let the rolling begin~ 🎲'
return {
f'@{author} 👀 Youre already signed up! Let the rolling begin~ 🎲',
'attachment_urls': None
}
new_user_id = insert_player(full_user)
return f'@{full_user} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!'
new_user_id = insert_player(author)
return {
'message': f'@{author} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!',
'attachment_urls': None
}
def is_float(val):
def is_float(val: Any) -> bool:
'''Returns true if `val` can be converted to a float'''
try:
float(val)
@ -63,23 +84,42 @@ def is_float(val):
except ValueError:
return False
def do_create(full_user, arguments, note_obj):
def do_create(
full_user: str,
arguments: List[str],
note_obj: Dict[str, Any]) -> BotResponse:
'''Creates a character'''
# Example call from bot logic
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None
image_url = note_obj.get('files', [{}])[0].get('url') \
if note_obj.get('files') else None
if not image_url:
return f'{full_user}{full_user} You need an image to create a character, dumbass.'
return {
'message': f'{full_user} You need an image to create a character, \
dumbass.',
'attachment_urls': None
}
if len(arguments) != 3:
return '{full_user}Please specify the following attributes in order: \
name, rarity, drop weighting'
return {
'message': f'{full_user} Please specify the following attributes \
in order: name, rarity, drop weighting',
'attachment_urls': None
}
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5'
return {
'message': f'{full_user} Invalid rarity: \'{arguments[1]}\' must \
be a number between 1 and 5',
'attachment_urls': None
}
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0'
return {
'message': f'{full_user} Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
character_id, file_id = add_character(
name=arguments[0],
@ -87,50 +127,96 @@ def do_create(full_user, arguments, note_obj):
weight=float(arguments[2]),
image_url=image_url
)
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
return {
'message': f'{full_user} Added {arguments[0]}, ID {character_id}.',
'attachment_urls': [file_id]
}
def do_help(full_user):
def do_help(author: str) -> BotResponse:
'''Provides a list of commands that the bot can do.'''
return f'{full_user} Here\'s what I can do:\n \
return {
'message':f'{full_user} Here\'s what I can do:\n \
- `roll` Pulls a random character.\
- `create <name> <rarity> <weight>` Creates a character using a given image.\
- `signup` Registers your account.\
- `delete_account` Deletes your account.\
- `help` Shows this message'
- `help` Shows this message',
'attachment_urls': None
}
def delete_account(full_user):
return (
f'@{full_user} ⚠️ This will permanently delete your account and all your cards.\n'
def delete_account(author: str) -> BotResponse:
return {
'message':f'@{author} ⚠️ This will permanently delete your account and all your cards.\n'
'If youre sure, reply with `confirm_delete` to proceed.\n\n'
'**There is no undo.** Your gacha luck will be lost to the void... 💀✨'
)
'**There is no undo.** Your gacha luck will be lost to the void... 💀✨',
'attachment_urls': None
def confirm_delete(full_user):
success = delete_player(full_user)
}
def confirm_delete(author: str) -> BotResponse:
success = delete_player(author)
if not success:
return f'@{full_user} ❌ No account found to delete. Maybe its already gone?'
return {
'message':f'@{author} ❌ No account found to delete. Maybe its already gone?',
'attachment_urls': None
}
return f'@{full_user} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨'
return {
'message':f'@{author} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨',
'attachment_urls': None
}
def generate_response(parsed_command):
def generate_response(notification: ParsedNotification) -> BotResponse | None:
'''Given a command with arguments, processes the game state and
returns a response'''
command, full_user, arguments, note_obj = parsed_command
# Temporary response variable
res: BotResponse | None = None
# TODO: Check if the user has an account
author = notification['author']
user_id = get_or_create_user(author)
command = notification['command']
# Check if the user is an administrator
# user_is_administrator = user_is_administrator()
# Unrestricted commands
match command:
case 'roll':
return do_roll(full_user)
case 'create':
return do_create(full_user, arguments, note_obj)
case 'help':
return do_help(command)
case 'signup':
return do_signup(full_user)
case 'delete_account':
return delete_account(full_user)
case 'confirm_delete':
return confirm_delete(full_user)
res = do_signup()
case 'help':
res = do_help(author)
case _:
return None
pass
if not user_id:
return res
# User commands
match command:
case 'delete_account':
pass
case 'roll':
res = do_roll(author)
case 'create':
res = do_create(
author,
notification['arguments'],
notification['note_obj']
)
case 'signup':
res = do_signup(author)
case 'delete_account':
res = delete_account(author)
case 'confirm_delete':
res = confirm_delete(author)
case _:
pass
# if not user_is_administrator:
return res
# Administrator commands go here