Compare commits

...

9 commits

12 changed files with 352 additions and 120 deletions

View file

@ -1,27 +1,27 @@
import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_character
from custom_types import Character
from db_utils import insert_card
from custom_types import Card
from config import RARITY_TO_WEIGHT
def add_character(
def add_card(
name: str,
rarity: int,
image_url: str) -> tuple[int, str]:
'''
Adds a character to the database, uploading the image from a public URL to
Adds a card to the database, uploading the image from a public URL to
the bot's Misskey Drive.
Args:
name (str): Character name.
rarity (int): Character rarity (e.g., 1-5).
name (str): Card name.
rarity (int): Card rarity (e.g., 1-5).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
Returns:
tuple[int, str]: Character ID and bot's Drive file_id.
tuple[int, str]: Card ID and bot's Drive file_id.
Raises:
ValueError: If inputs are invalid.
@ -32,7 +32,7 @@ def add_character(
# Validate inputs
if not stripped_name:
raise ValueError('Character name cannot be empty.')
raise ValueError('Card name cannot be empty.')
if rarity < 1:
raise ValueError('Rarity must be a positive integer.')
if rarity not in RARITY_TO_WEIGHT.keys():
@ -55,10 +55,10 @@ def add_character(
from e
# Insert into database
character_id = insert_character(
card_id = insert_card(
stripped_name,
rarity,
RARITY_TO_WEIGHT[rarity],
file_id
)
return character_id, file_id
return card_id, file_id

View file

@ -12,6 +12,9 @@ if __name__ == '__main__':
# Connect to DB
db.connect()
# Setup default administrators
db.setup_administrators()
print('Listening for notifications...')
while True:
if not process_notifications(client):

View file

@ -1,5 +1,6 @@
'''Essentials for the bot to function'''
import configparser
import json
from os import environ, path
@ -21,7 +22,9 @@ def get_config_file() -> str:
raise ConfigError(f'Could not find {config_path}')
return config_path
def get_rarity_to_weight(config_section):
def get_rarity_to_weight(
config_section: configparser.SectionProxy) -> dict[int, float]:
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
rarity_weights = {}
for key, value in config_section.items():
@ -41,16 +44,17 @@ KEY = config['credentials']['Token']
# Bot's Misskey instance URL
INSTANCE = config['credentials']['Instance'].lower()
# TODO: move this to db
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot
ADMINS = config['application']['DefaultAdmins']
ADMINS = json.loads(config['application']['DefaultAdmins'])
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
# Whether to enable the instance whitelist
USE_WHITELIST = config['application']['UseWhitelist']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])
RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha'])
RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha'])

View file

@ -5,7 +5,7 @@ BotResponse = TypedDict('BotResponse', {
'attachment_urls': List[str] | None
})
Character = TypedDict('Character', {
Card = TypedDict('Card', {
'id': int,
'name': str,
'rarity': int,

View file

@ -1,7 +1,7 @@
from random import choices
import sqlite3
import config
from custom_types import Character
from custom_types import Card
DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection
@ -18,16 +18,38 @@ def connect() -> None:
CURSOR = CONNECTION.cursor()
def get_random_character() -> Character | None:
''' Gets a random character from the database'''
CURSOR.execute('SELECT * FROM characters')
characters = CURSOR.fetchall()
def setup_administrators() -> None:
'''Creates administrator players for each handle in the config file'''
# Get default admins from config
for username in config.ADMINS:
player_id = get_player(username)
if player_id == 0:
# Create player if not exists
print(f'Creating administrator player: {username}')
CURSOR.execute(
'INSERT INTO players (username, has_rolled, is_administrator) \
VALUES (?, ?, ?)',
(username, False, True)
)
else:
# Update is_administrator if exists
print(f'Granting administrator to player: {username}')
CURSOR.execute(
'UPDATE players SET is_administrator = 1 WHERE id = ?',
(player_id,)
)
if not characters:
def get_random_card() -> Card | None:
''' Gets a random card from the database'''
CURSOR.execute('SELECT * FROM cards')
cards = CURSOR.fetchall()
if not cards:
return None
weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in characters]
chosen = choices(characters, weights=weights, k=1)[0]
weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in cards]
chosen = choices(cards, weights=weights, k=1)[0]
return {
'id': chosen['id'],
@ -37,77 +59,156 @@ def get_random_character() -> Character | None:
'image_url': chosen['file_id']
}
def get_player(username: str) -> int:
'''Retrieve a player ID by username, or return None if not found.'''
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
user = CURSOR.fetchone()
if user:
return int(user[0])
def insert_player(username: str) -> int:
'''Insert a new player with default has_rolled = False and return their user ID.'''
CURSOR.execute(
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
(username, False)
)
return CURSOR.lastrowid
def delete_player(username: str) -> bool:
'''Permanently deletes a user and all their pulls.'''
CURSOR.execute(
'SELECT id FROM users WHERE username = ?',
'SELECT id FROM players WHERE username = ?',
(username,)
)
user = CURSOR.fetchone()
player = CURSOR.fetchone()
if player:
return int(player[0])
return 0
user_id = user[0]
def insert_player(username: str) -> int:
'''Insert a new player with default has_rolled = False and return their
player ID.'''
CURSOR.execute(
'INSERT INTO players (username, has_rolled) VALUES (?, ?)',
(username, False)
)
return CURSOR.lastrowid if CURSOR.lastrowid else 0
def delete_player(username: str) -> bool:
'''Permanently deletes a player and all their pulls.'''
CURSOR.execute(
'SELECT id FROM players WHERE username = ?',
(username,)
)
player = CURSOR.fetchone()
if not player:
return False
player_id = player[0]
# Delete pulls
CURSOR.execute(
'DELETE FROM pulls WHERE user_id = ?',
(user_id,)
'DELETE FROM pulls WHERE player_id = ?',
(player_id,)
)
# Delete user
# Delete player
CURSOR.execute(
'DELETE FROM users WHERE id = ?',
(user_id,)
'DELETE FROM players WHERE id = ?',
(player_id,)
)
return True
def ban_player(username: str) -> bool:
'''Adds a player to the ban list.'''
try:
CURSOR.execute(
'INSERT INTO banned_players (handle) VALUES (?)',
(username,)
)
return True
except sqlite3.IntegrityError:
return False
def insert_character(
name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a character'''
def unban_player(username: str) -> bool:
'''Removes a player from the ban list.'''
CURSOR.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES \
'DELETE FROM banned_players WHERE handle = ?',
(username,)
)
return CURSOR.rowcount > 0
def is_player_banned(username: str) -> bool:
CURSOR.execute(
'SELECT * FROM banned_players WHERE handle = ?',
(username,)
)
row = CURSOR.fetchone()
return row is not None
def is_player_administrator(username: str) -> bool:
CURSOR.execute(
'SELECT is_administrator FROM players WHERE username = ? LIMIT 1',
(username,)
)
row = CURSOR.fetchone()
return row[0] if row else False
def insert_card(
name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a card'''
CURSOR.execute(
'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
(?, ?, ?, ?)',
(name, rarity, weight, file_id)
)
character_id = CURSOR.lastrowid
return character_id if character_id else 0
card_id = CURSOR.lastrowid
return card_id if card_id else 0
def insert_pull(user_id: int, character_id: int) -> None:
def insert_pull(player_id: int, card_id: int) -> None:
'''Creates a pull in the database'''
CURSOR.execute(
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
(user_id, character_id)
'INSERT INTO pulls (player_id, card_id) VALUES (?, ?)',
(player_id, card_id)
)
def get_last_rolled_at(user_id: int) -> int:
'''Gets the timestamp when the user last rolled'''
def get_last_rolled_at(player_id: int) -> int:
'''Gets the timestamp when the player last rolled'''
CURSOR.execute(
"SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \
"SELECT timestamp FROM pulls WHERE player_id = ? ORDER BY timestamp \
DESC",
(user_id,))
(player_id,))
row = CURSOR.fetchone()
return row[0] if row else 0
def add_to_whitelist(instance: str) -> bool:
'''Adds an instance to the whitelist, returns false if instance was already
present'''
try:
CURSOR.execute(
'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)
)
return True
except sqlite3.IntegrityError:
return False
def remove_from_whitelist(instance: str) -> bool:
'''Removes an instance to the whitelist, returns false if instance was not
present'''
CURSOR.execute(
'DELETE FROM instance_whitelist WHERE tld = ?', (instance,))
return CURSOR.rowcount > 0
def is_whitelisted(instance: str) -> bool:
'''Checks whether an instance is in the whitelist'''
if instance == 'local':
return True
CURSOR.execute(
'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,))
row = CURSOR.fetchone()
return row is not None
def get_config(key: str) -> str:
'''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))

View file

@ -4,9 +4,9 @@ from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
from parsing import parse_notification
from db_utils import get_config, set_config
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
from response import generate_response
from custom_types import BotResponse
@ -24,7 +24,7 @@ def process_notification(
host = user.get('host') # None if local user
instance = host if host else 'local'
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
if USE_WHITELIST and not is_whitelisted(instance):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
@ -44,6 +44,11 @@ def process_notification(
if not parsed_notification:
return
author = parsed_notification['author']
if is_player_banned(author):
print(f'⚠️ Blocked notification from banned player: {author}')
return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')

View file

@ -24,6 +24,8 @@ def parse_notification(
note_id = note_obj.get("id")
note = note_text.strip().lower() if note_text else ""
# Split words into tokens
parts = note.split()
# Check for both short and fully-qualified name mentions
username_variants = [
@ -31,18 +33,16 @@ def parse_notification(
f'@{config.USER.split("@")[1]}'
]
# Make sure the notification text explicitly mentions the bot
if not any(variant in note for variant in username_variants):
# Notifs must consist of the initial mention and at least one other token
if len(parts) <= 1:
return None
# Find command and arguments after the mention
# Removes all mentions
# regex = mentions that start with @ and may contain @domain
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
parts = cleaned_text.split()
# Make sure the first token is a mention to the bot
if not parts[0] in username_variants:
return None
command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else []
command = parts[1].lower()
arguments = parts[2:] if len(parts) > 2 else []
return {
'author': full_user,

View file

@ -1,22 +1,22 @@
from datetime import datetime, timedelta, timezone
from typing import TypedDict, Any, List, Dict
from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, \
get_random_character
from add_character import add_character
import db_utils as db
from add_card import add_card
from config import GACHA_ROLL_INTERVAL
from custom_types import BotResponse, ParsedNotification
def do_roll(author: str) -> BotResponse:
'''Determines whether the user can roll, then pulls a random character'''
user_id = get_player(author)
'''Determines whether the user can roll, then pulls a random card'''
user_id = db.get_player(author)
if not user_id:
return {
'message':f'{author} 🛑 You havent signed up yet! Use the `signup` command to start playing.',
'attachment_urls': None
'message': f'{author} 🛑 You havent signed up yet! Use the \
`signup` command to start playing.',
'attachment_urls': None
}
# Get date of user's last roll
date = get_last_rolled_at(user_id)
date = db.get_last_rolled_at(user_id)
# No date means it's users first roll
if date:
@ -45,39 +45,43 @@ def do_roll(author: str) -> BotResponse:
'attachment_urls': None
}
character = get_random_character()
card = db.get_random_card()
if not character:
if not card:
return {
'message': f'{author} Uwaaa... something went wrong! No \
characters found. 😿',
cards found. 😿',
'attachment_urls': None
}
insert_pull(user_id, character['id'])
stars = '⭐️' * character['rarity']
db.insert_pull(user_id, card['id'])
stars = '⭐️' * card['rarity']
return {
'message': f'{author} 🎲 Congrats! You rolled {stars} \
**{character['name']}**\nShe\'s all yours now~ 💖✨',
'attachment_urls': [character['image_url']]
**{card['name']}**\nShe\'s all yours now~ 💖✨',
'attachment_urls': [card['image_url']]
}
def do_signup(author: str) -> BotResponse:
'''Registers a new user if they havent signed up yet.'''
user_id = get_player(author)
user_id = db.get_player(author)
if user_id:
return {
'message':f'{author} 👀 Youre already signed up! Let the rolling begin~ 🎲',
'message': f'{author} 👀 Youre already signed up! Let the rolling \
begin~ 🎲',
'attachment_urls': None
}
new_user_id = insert_player(author)
new_user_id = db.insert_player(author)
return {
'message': f'{author} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!',
'message': f'{author} ✅ Signed up successfully! Your gacha \
destiny begins now... Use the roll command to start!',
'attachment_urls': None
}
def is_float(val: Any) -> bool:
'''Returns true if `val` can be converted to a float'''
try:
@ -91,14 +95,14 @@ def do_create(
author: str,
arguments: List[str],
note_obj: Dict[str, Any]) -> BotResponse:
'''Creates a character'''
'''Creates a card'''
# Example call from bot logic
image_url = note_obj.get('files', [{}])[0].get('url') \
if note_obj.get('files') else None
if not image_url:
return {
'message': f'{author} You need an image to create a character, \
'message': f'{author} You need an image to create a card, \
dumbass.',
'attachment_urls': None
}
@ -123,13 +127,13 @@ must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
character_id, file_id = add_character(
card_id, file_id = add_card(
name=arguments[0],
rarity=int(arguments[1]),
image_url=image_url
)
return {
'message': f'{author} Added {arguments[0]}, ID {character_id}.',
'message': f'{author} Added {arguments[0]}, ID {card_id}.',
'attachment_urls': [file_id]
}
@ -137,59 +141,145 @@ must be a decimal value between 0.0 and 1.0',
def do_help(author: str) -> BotResponse:
'''Provides a list of commands that the bot can do.'''
return {
'message':f'{author} Here\'s what I can do:\n \
- `roll` Pulls a random character.\
- `create <name> <rarity>` Creates a character using a given image.\
- `signup` Registers your account.\
- `delete_account` Deletes your account.\
- `help` Shows this message',
'attachment_urls': None
'message': f'{author} Here\'s what I can do:\n\
- `roll` Pulls a random card.\n\
- `create <name> <rarity>` Creates a card using a given image.\n\
- `signup` Registers your account.\n\
- `delete_account` Deletes your account.\n\
- `help` Shows this message',
'attachment_urls': None
}
def delete_account(author: str) -> BotResponse:
return {
'message':f'{author} ⚠️ This will permanently delete your account and all your cards.\n'
'If youre sure, reply with `confirm_delete` to proceed.\n\n'
'message': f'{author} ⚠️ This will permanently delete your account \
and all your cards.\n'
'If you\'re sure, reply with `confirm_delete_account` to proceed.\n\n'
'**There is no undo.** Your gacha luck will be lost to the void... 💀✨',
'attachment_urls': None
}
def confirm_delete(author: str) -> BotResponse:
delete_player(author)
db.delete_player(author)
return {
'message':f'{author} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨',
'message': f'{author} 🧼 Your account and all your cards have been \
deleted. RIP your gacha history 🕊',
'attachment_urls': None
}
def do_whitelist(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify an instance to whitelist',
'attachment_urls': None
}
if db.add_to_whitelist(args[0]):
return {
'message': f'{author} Whitelisted instance: {args[0]}',
'attachment_urls': None
}
else:
return {
'message': f'{author} Instance already whitelisted: {args[0]}',
'attachment_urls': None
}
def do_unwhitelist(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify an instance to remove from \
the whitelist',
'attachment_urls': None
}
if db.remove_from_whitelist(args[0]):
return {
'message': f'{author} Unwhitelisted instance: {args[0]}',
'attachment_urls': None
}
else:
return {
'message': f'{author} Instance not whitelisted: {args[0]}',
'attachment_urls': None
}
def do_ban(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify a user to ban',
'attachment_urls': None
}
if db.is_player_administrator(args[0]):
return {
'message': f'{author} Cannot ban other administrators.',
'attachment_urls': None
}
if db.ban_player(args[0]):
# Delete banned player's account
db.delete_player(args[0])
return {
'message': f'{author} 🔨 **BONK!** Get banned, {args[0]}!',
'attachment_urls': None
}
else:
return {
'message': f'{author} Player is already banned: {args[0]}',
'attachment_urls': None
}
def do_unban(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify a user to unban',
'attachment_urls': None
}
if db.unban_player(args[0]):
return {
'message': f'{author} Player unbanned: {args[0]}!',
'attachment_urls': None
}
else:
return {
'message': f'{author} Player was not banned: {args[0]}',
'attachment_urls': None
}
def generate_response(notification: ParsedNotification) -> BotResponse | None:
'''Given a command with arguments, processes the game state and
returns a response'''
# Temporary response variable
res: BotResponse | None = None
# TODO: Check if the user has an account
author = notification['author']
user_id = get_player(author)
player_id = db.get_player(author)
command = notification['command']
# Check if the user is an administrator
# user_is_administrator = user_is_administrator()
# Unrestricted commands
match command:
case 'roll':
res = do_roll(author)
case 'signup':
res = do_signup(author)
case 'help':
res = do_help(author)
case 'roll':
res = do_roll(author)
case _:
pass
if not user_id:
# Commands beyond this point require the user to have an account
if not player_id:
return res
# User commands
@ -200,15 +290,29 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None:
notification['arguments'],
notification['note_obj']
)
case 'signup':
res = do_signup(author)
case 'delete_account':
res = delete_account(author)
case 'confirm_delete':
case 'confirm_delete_account':
res = confirm_delete(author)
case _:
pass
# if not user_is_administrator:
return res
# Commands beyond this point require the user to be an administrator
if not db.is_player_administrator(author):
return res
# Admin commands
match command:
case 'whitelist':
res = do_whitelist(author, notification['arguments'])
case 'unwhitelist':
res = do_unwhitelist(author, notification['arguments'])
case 'ban':
res = do_ban(author, notification['arguments'])
case 'unban':
res = do_unban(author, notification['arguments'])
case _:
pass
# Administrator commands go here
return res

View file

@ -2,9 +2,12 @@
[application]
; Comma separated list of fedi handles for any administrator users
; More can be added through the application
DefaultAdmins = ['admin@example.tld']
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
; SQLite Database location
DatabaseLocation = ./gacha_game.db
; Whether to lmit access to the bot via an instance whitelist
; The whitelist can be adjusted via the application
UseWhitelist = False
[gacha]
; Number of seconds players have to wait between rolls

View file

@ -0,0 +1,4 @@
ALTER TABLE users RENAME TO players;
ALTER TABLE characters RENAME TO cards;
ALTER TABLE pulls RENAME user_id TO player_id;
ALTER TABLE pulls RENAME character_id TO card_id;

View file

@ -0,0 +1 @@
ALTER TABLE players ADD COLUMN is_administrator BOOLEAN NOT NULL DEFAULT 0;

View file

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS instance_whitelist (
tld TEXT UNIQUE PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS banned_players (
handle TEXT UNIQUE PRIMARY KEY
);