kemoverse/bot/db_utils.py

468 lines
14 KiB
Python

# Kemoverse - a gacha-style bot for the Fediverse.
# Copyright © 2025 Waifu, VD15, Moon, and contributors.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see https://www.gnu.org/licenses/.
from random import choices
import sqlite3
import config
from custom_types import Card
from datetime import datetime
DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection
CURSOR: sqlite3.Cursor
def connect() -> None:
'''Creates a connection to the database'''
print('Connecting to the database...')
global CONNECTION
global CURSOR
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
CONNECTION.row_factory = sqlite3.Row
CURSOR = CONNECTION.cursor()
def setup_administrators() -> None:
'''Creates administrator players for each handle in the config file'''
# Get default admins from config
for username in config.ADMINS:
player_id = get_player(username)
if player_id == 0:
# Create player if not exists
print(f'Creating administrator player: {username}')
CURSOR.execute(
'INSERT INTO players (username, has_rolled, is_administrator) \
VALUES (?, ?, ?)',
(username, False, True)
)
else:
# Update is_administrator if exists
print(f'Granting administrator to player: {username}')
CURSOR.execute(
'UPDATE players SET is_administrator = 1 WHERE id = ?',
(player_id,)
)
def get_random_card() -> Card | None:
''' Gets a random card from the database'''
CURSOR.execute('SELECT * FROM cards')
cards = CURSOR.fetchall()
if not cards:
return None
weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in cards]
chosen = choices(cards, weights=weights, k=1)[0]
return {
'id': chosen['id'],
'name': chosen['name'],
'rarity': chosen['rarity'],
'weight': config.RARITY_TO_WEIGHT[chosen['rarity']],
'image_url': chosen['file_id']
}
def get_cards(card_ids: list[int]) -> list[tuple]:
'''
Retrieves information about cards from the database by their IDs.
Returns a list of tuples: (id, name, rarity, file_id, power, charm, wit, ...)
'''
if not card_ids:
return []
placeholders = ','.join('?' for _ in card_ids)
query = f'SELECT * FROM cards WHERE id IN ({placeholders})'
CURSOR.execute(query, card_ids)
res = CURSOR.fetchall()
return res
def get_player(username: str) -> int:
'''Retrieve a player ID by username, or return None if not found.'''
CURSOR.execute(
'SELECT id FROM players WHERE username = ?',
(username,)
)
player = CURSOR.fetchone()
if player:
return int(player[0])
return 0
def insert_duel_request(username: str, target: str, duel_type: str) -> int:
'''Inserts a duel request into the database.
Args:
username (str): The username of the player who initiated the duel.
target (str): The username of the player being challenged.
duel_type (str): The type of duel (e.g., casual, competitive).
'''
# get the player ids
player_1_1d = get_player(username)
player_2_id = get_player(target)
competitive = duel_type
# insert the duel request and get the duel ID
CURSOR.execute(
'INSERT INTO duel_requests (player_1_id, player_2_id, competitive) VALUES (?, ?, ?)',
(player_1_1d, player_2_id, competitive)
)
duel_id = CURSOR.lastrowid
return duel_id
def get_duel_request(duel_request_id: int) -> dict | None:
'''Retrieves a duel request by its ID.
Args:
duel_id (int): The ID of the duel request.
Returns:
dict: A dictionary containing the duel request details, or None if not found.
'''
CURSOR.execute(
'SELECT * FROM duel_requests WHERE duel_request_id = ?',
(duel_request_id,)
)
row = CURSOR.fetchone()
if row:
return dict(row)
return None
def accept_duel_request(duel_request_id: int) -> bool:
''' Sets up the duel request to an actual duel.
Args:
duel_request_id (int): The ID of the duel request to begin.
'''
create_duel(duel_request_id)
CURSOR.execute(
'UPDATE duel_requests SET accepted = 1 WHERE duel_request_id = ?',
(duel_request_id,)
)
def take_cards(player_id: int, count: int) -> list[int]:
'''Takes a specified number of random cards from a player.
Args:
player_id (int): The ID of the player to take cards from.
count (int): The number of cards to take.
Returns:
list[int]: A list of card IDs taken from the player.
'''
CURSOR.execute(
'SELECT card_id FROM pulls WHERE player_id = ? ORDER BY RANDOM() LIMIT ?',
(player_id, count)
)
rows = CURSOR.fetchall()
return [row['card_id'] for row in rows]
def create_duel(duel_request_id: int) -> None:
'''Creates a duel from a duel request.
Args:
duel_request_id (int): The ID of the duel request to create a duel from.
'''
CURSOR.execute(
'SELECT player_1_id, player_2_id, competitive FROM duel_requests WHERE duel_request_id = ?',
(duel_request_id,)
)
row = CURSOR.fetchone()
# Select a random player to set as attacker
attacker_id = choices([row['player_1_id'], row['player_2_id']])[0]
# Set the last round date to now
last_round_dt = datetime.now().isoformat()
# take 3 random cards from each player
player_1_cards = take_cards(row['player_1_id'], 3)
player_2_cards = take_cards(row['player_2_id'], 3)
# Insert the duel into the database, only players ids, attacker id, cards, last round date and competitive flag
CURSOR.execute(
'''
INSERT INTO duels (player_1_id, player_2_id, attacker_id, player_1_cards, player_2_cards, last_round_dt, is_finished, competitive)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''',
(
row['player_1_id'],
row['player_2_id'],
attacker_id,
','.join(map(str, player_1_cards)),
','.join(map(str, player_2_cards)),
last_round_dt,
False,
row['competitive']
)
)
duel_id = CURSOR.lastrowid
if duel_id:
# Start duel
start_duel(duel_id, row, player_1_cards, player_2_cards)
def start_duel(duel_id: int, row: dict, player_1_cards: list[int], player_2_cards: list[int]) -> None:
'''Starts a duel by sending a notification to the players and sending the cards.
Args:
duel_id (int): The ID of the duel.
row (dict): The row containing duel request details.
player_1_cards (list[int]): List of card IDs for player 1.
player_2_cards (list[int]): List of card IDs for player 2.
'''
# Initialize the duel thread by sending a notification to the players
from fediverse_factory import get_fediverse_service
from fediverse_types import Visibility
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
if row['competitive']==1:
duel_type = 'Competitive'
else:
duel_type = 'Casual'
# Get both the player 1 name and the player 1 server_id
player_1 = CURSOR.execute(
'SELECT username, server_id FROM players WHERE id = ?',
(row['player_1_id'],)
).fetchone()
player_1_name = player_1[0]
player_1_server_id = player_1[1]
# Get both the player 2 name and the player 2 server_id
player_2_name = CURSOR.execute(
'SELECT username, server_id FROM players WHERE id = ?',
(row['player_2_id'],)
).fetchone()
player_2_name = player_2_name[0]
player_2_server_id = player_2_name[1]
# Create a thread for the duel
thread_id = fediverse_service.create_post(
text=f'⚔️ A {duel_type} Duel started between {player_1_name} and {player_2_name}! ROUND 1 has begun!',
visibility=Visibility.HOME
)
# Update the duel with the thread ID
"""CURSOR.execute(
'UPDATE duels SET thread_id = ? WHERE id = ?',
(thread_id, duel_id)
)"""
print(player_1_cards)
# Send the cards to the players
# Player 1
# Get the card details for player 1
player_1_cards_details = get_cards(player_1_cards)
print(player_1_cards_details)
# Send a notification to player 1 with their cards
fediverse_service.create_post(
text=f'{player_1_name} Your cards for the duel. \
Reply with select <1,2 or 3> to select a card for the duel.',
visibility=Visibility.SPECIFIED,
file_ids=[card['file_id'] for card in player_1_cards_details],
visible_user_ids=[player_1_server_id]
)
# Player 2
# Get the card details for player 2
player_2_cards_details = get_cards(player_2_cards)
# Send a notification to player 2 with their cards
fediverse_service.create_post(
text=f'{player_2_name} Your cards for the duel. \
Reply with select <1,2 or 3> to select a card for the duel.',
visibility=Visibility.SPECIFIED,
file_ids=[card['file_id'] for card in player_2_cards_details],
visible_user_ids=[player_2_server_id]
)
def insert_player(username: str) -> int:
'''Insert a new player with default has_rolled = False and return their
player ID.'''
CURSOR.execute(
'INSERT INTO players (username, has_rolled) VALUES (?, ?)',
(username, False)
)
return CURSOR.lastrowid if CURSOR.lastrowid else 0
def delete_player(username: str) -> bool:
'''Permanently deletes a player and all their pulls.'''
CURSOR.execute(
'SELECT id FROM players WHERE username = ?',
(username,)
)
player = CURSOR.fetchone()
if not player:
return False
player_id = player[0]
# Delete pulls
CURSOR.execute(
'DELETE FROM pulls WHERE player_id = ?',
(player_id,)
)
# Delete player
CURSOR.execute(
'DELETE FROM players WHERE id = ?',
(player_id,)
)
return True
def ban_player(username: str) -> bool:
'''Adds a player to the ban list.'''
try:
CURSOR.execute(
'INSERT INTO banned_players (handle) VALUES (?)',
(username,)
)
return True
except sqlite3.IntegrityError:
return False
def unban_player(username: str) -> bool:
'''Removes a player from the ban list.'''
CURSOR.execute(
'DELETE FROM banned_players WHERE handle = ?',
(username,)
)
return CURSOR.rowcount > 0
def is_player_banned(username: str) -> bool:
CURSOR.execute(
'SELECT * FROM banned_players WHERE handle = ?',
(username,)
)
row = CURSOR.fetchone()
return row is not None
def is_player_administrator(username: str) -> bool:
CURSOR.execute(
'SELECT is_administrator FROM players WHERE username = ? LIMIT 1',
(username,)
)
row = CURSOR.fetchone()
return row[0] if row else False
def insert_card(
name: str, rarity: int, file_id: str,
power: int =None, charm: int = None, wit: int = None) -> int:
'''Inserts a card'''
if power is not None and charm is not None and wit is not None:
CURSOR.execute(
'''
INSERT INTO cards (name, rarity, file_id, power, charm, wit)
VALUES (?, ?, ?, ?, ?, ?)
''',
(name, rarity, file_id, power, charm, wit)
)
else:
CURSOR.execute(
'''
INSERT INTO cards (name, rarity, file_id)
VALUES (?, ?, ?)
''',
(name, rarity, file_id)
)
card_id = CURSOR.lastrowid
return card_id if card_id else 0
def insert_pull(player_id: int, card_id: int) -> None:
'''Creates a pull in the database'''
CURSOR.execute(
'INSERT INTO pulls (player_id, card_id) VALUES (?, ?)',
(player_id, card_id)
)
def get_last_rolled_at(player_id: int) -> int:
'''Gets the timestamp when the player last rolled'''
CURSOR.execute(
"SELECT timestamp FROM pulls WHERE player_id = ? ORDER BY timestamp \
DESC",
(player_id,))
row = CURSOR.fetchone()
return row[0] if row else 0
# Configuration
def add_to_whitelist(instance: str) -> bool:
'''Adds an instance to the whitelist, returns false if instance was already
present'''
try:
CURSOR.execute(
'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)
)
return True
except sqlite3.IntegrityError:
return False
def remove_from_whitelist(instance: str) -> bool:
'''Removes an instance to the whitelist, returns false if instance was not
present'''
CURSOR.execute(
'DELETE FROM instance_whitelist WHERE tld = ?', (instance,))
return CURSOR.rowcount > 0
def is_whitelisted(instance: str) -> bool:
'''Checks whether an instance is in the whitelist'''
if instance == 'local':
return True
CURSOR.execute(
'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,))
row = CURSOR.fetchone()
return row is not None
def get_config(key: str) -> str:
'''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
row = CURSOR.fetchone()
return row[0] if row else ''
def set_config(key: str, value: str) -> None:
'''Writes the value for a specified config key to the db'''
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
(key, value))