# Kemoverse - a gacha-style bot for the Fediverse. # Copyright © 2025 Waifu, VD15, Moon, and contributors. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see https://www.gnu.org/licenses/. from random import choices import sqlite3 import config from custom_types import Card from datetime import datetime DB_PATH = config.DB_PATH CONNECTION: sqlite3.Connection CURSOR: sqlite3.Cursor def connect() -> None: '''Creates a connection to the database''' print('Connecting to the database...') global CONNECTION global CURSOR CONNECTION = sqlite3.connect(DB_PATH, autocommit=True) CONNECTION.row_factory = sqlite3.Row CURSOR = CONNECTION.cursor() def setup_administrators() -> None: '''Creates administrator players for each handle in the config file''' # Get default admins from config for username in config.ADMINS: player_id = get_player(username) if player_id == 0: # Create player if not exists print(f'Creating administrator player: {username}') CURSOR.execute( 'INSERT INTO players (username, has_rolled, is_administrator) \ VALUES (?, ?, ?)', (username, False, True) ) else: # Update is_administrator if exists print(f'Granting administrator to player: {username}') CURSOR.execute( 'UPDATE players SET is_administrator = 1 WHERE id = ?', (player_id,) ) def get_random_card() -> Card | None: ''' Gets a random card from the database''' CURSOR.execute('SELECT * FROM cards') cards = CURSOR.fetchall() if not cards: return None weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in cards] chosen = choices(cards, weights=weights, k=1)[0] return { 'id': chosen['id'], 'name': chosen['name'], 'rarity': chosen['rarity'], 'weight': config.RARITY_TO_WEIGHT[chosen['rarity']], 'image_url': chosen['file_id'] } def get_cards(card_ids: list[int]) -> list[tuple]: ''' Retrieves information about cards from the database by their IDs. Returns a list of tuples: (id, name, rarity, file_id, power, charm, wit, ...) ''' if not card_ids: return [] placeholders = ','.join('?' for _ in card_ids) query = f'SELECT * FROM cards WHERE id IN ({placeholders})' CURSOR.execute(query, card_ids) res = CURSOR.fetchall() return res def get_player(username: str) -> int: '''Retrieve a player ID by username, or return None if not found.''' CURSOR.execute( 'SELECT id FROM players WHERE username = ?', (username,) ) player = CURSOR.fetchone() if player: return int(player[0]) return 0 def insert_duel_request(username: str, target: str, duel_type: str) -> int: '''Inserts a duel request into the database. Args: username (str): The username of the player who initiated the duel. target (str): The username of the player being challenged. duel_type (str): The type of duel (e.g., casual, competitive). ''' # get the player ids player_1_1d = get_player(username) player_2_id = get_player(target) competitive = duel_type # insert the duel request and get the duel ID CURSOR.execute( 'INSERT INTO duel_requests (player_1_id, player_2_id, competitive) VALUES (?, ?, ?)', (player_1_1d, player_2_id, competitive) ) duel_id = CURSOR.lastrowid return duel_id def get_duel_request(duel_request_id: int) -> dict | None: '''Retrieves a duel request by its ID. Args: duel_id (int): The ID of the duel request. Returns: dict: A dictionary containing the duel request details, or None if not found. ''' CURSOR.execute( 'SELECT * FROM duel_requests WHERE duel_request_id = ?', (duel_request_id,) ) row = CURSOR.fetchone() if row: return dict(row) return None def accept_duel_request(duel_request_id: int) -> bool: ''' Sets up the duel request to an actual duel. Args: duel_request_id (int): The ID of the duel request to begin. ''' create_duel(duel_request_id) CURSOR.execute( 'UPDATE duel_requests SET accepted = 1 WHERE duel_request_id = ?', (duel_request_id,) ) def take_cards(player_id: int, count: int) -> list[int]: '''Takes a specified number of random cards from a player. Args: player_id (int): The ID of the player to take cards from. count (int): The number of cards to take. Returns: list[int]: A list of card IDs taken from the player. ''' CURSOR.execute( 'SELECT card_id FROM pulls WHERE player_id = ? ORDER BY RANDOM() LIMIT ?', (player_id, count) ) rows = CURSOR.fetchall() return [row['card_id'] for row in rows] def create_duel(duel_request_id: int) -> None: '''Creates a duel from a duel request. Args: duel_request_id (int): The ID of the duel request to create a duel from. ''' CURSOR.execute( 'SELECT player_1_id, player_2_id, competitive FROM duel_requests WHERE duel_request_id = ?', (duel_request_id,) ) row = CURSOR.fetchone() # Select a random player to set as attacker attacker_id = choices([row['player_1_id'], row['player_2_id']])[0] # Set the last round date to now last_round_dt = datetime.now().isoformat() # take 3 random cards from each player player_1_cards = take_cards(row['player_1_id'], 3) player_2_cards = take_cards(row['player_2_id'], 3) # Insert the duel into the database, only players ids, attacker id, cards, last round date and competitive flag CURSOR.execute( ''' INSERT INTO duels (player_1_id, player_2_id, attacker_id, player_1_cards, player_2_cards, last_round_dt, is_finished, competitive) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( row['player_1_id'], row['player_2_id'], attacker_id, ','.join(map(str, player_1_cards)), ','.join(map(str, player_2_cards)), last_round_dt, False, row['competitive'] ) ) duel_id = CURSOR.lastrowid if duel_id: # Start duel start_duel(duel_id, row, player_1_cards, player_2_cards) def start_duel(duel_id: int, row: dict, player_1_cards: list[int], player_2_cards: list[int]) -> None: '''Starts a duel by sending a notification to the players and sending the cards. Args: duel_id (int): The ID of the duel. row (dict): The row containing duel request details. player_1_cards (list[int]): List of card IDs for player 1. player_2_cards (list[int]): List of card IDs for player 2. ''' # Initialize the duel thread by sending a notification to the players from fediverse_factory import get_fediverse_service from fediverse_types import Visibility fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) if row['competitive']==1: duel_type = 'Competitive' else: duel_type = 'Casual' # Get both the player 1 name and the player 1 server_id player_1 = CURSOR.execute( 'SELECT username, server_id FROM players WHERE id = ?', (row['player_1_id'],) ).fetchone() player_1_name = player_1[0] player_1_server_id = player_1[1] # Get both the player 2 name and the player 2 server_id player_2_name = CURSOR.execute( 'SELECT username, server_id FROM players WHERE id = ?', (row['player_2_id'],) ).fetchone() player_2_name = player_2_name[0] player_2_server_id = player_2_name[1] # Create a thread for the duel thread_id = fediverse_service.create_post( text=f'⚔️ A {duel_type} Duel started between {player_1_name} and {player_2_name}! ROUND 1 has begun!', visibility=Visibility.HOME ) # Update the duel with the thread ID """CURSOR.execute( 'UPDATE duels SET thread_id = ? WHERE id = ?', (thread_id, duel_id) )""" print(player_1_cards) # Send the cards to the players # Player 1 # Get the card details for player 1 player_1_cards_details = get_cards(player_1_cards) print(player_1_cards_details) # Send a notification to player 1 with their cards fediverse_service.create_post( text=f'{player_1_name} Your cards for the duel. \ Reply with select <1,2 or 3> to select a card for the duel.', visibility=Visibility.SPECIFIED, file_ids=[card['file_id'] for card in player_1_cards_details], visible_user_ids=[player_1_server_id] ) # Player 2 # Get the card details for player 2 player_2_cards_details = get_cards(player_2_cards) # Send a notification to player 2 with their cards fediverse_service.create_post( text=f'{player_2_name} Your cards for the duel. You are defending!. \ Select a card with <1,2 or 3> and a stat with <1,2 or 3> for Power, Charm and Wit. Example: select 2 3 would select the second card and use its Wit stat.', visibility=Visibility.SPECIFIED, file_ids=[card['file_id'] for card in player_2_cards_details], visible_user_ids=[player_2_server_id] ) def insert_player(username: str) -> int: '''Insert a new player with default has_rolled = False and return their player ID.''' CURSOR.execute( 'INSERT INTO players (username, has_rolled) VALUES (?, ?)', (username, False) ) return CURSOR.lastrowid if CURSOR.lastrowid else 0 def delete_player(username: str) -> bool: '''Permanently deletes a player and all their pulls.''' CURSOR.execute( 'SELECT id FROM players WHERE username = ?', (username,) ) player = CURSOR.fetchone() if not player: return False player_id = player[0] # Delete pulls CURSOR.execute( 'DELETE FROM pulls WHERE player_id = ?', (player_id,) ) # Delete player CURSOR.execute( 'DELETE FROM players WHERE id = ?', (player_id,) ) return True def ban_player(username: str) -> bool: '''Adds a player to the ban list.''' try: CURSOR.execute( 'INSERT INTO banned_players (handle) VALUES (?)', (username,) ) return True except sqlite3.IntegrityError: return False def unban_player(username: str) -> bool: '''Removes a player from the ban list.''' CURSOR.execute( 'DELETE FROM banned_players WHERE handle = ?', (username,) ) return CURSOR.rowcount > 0 def is_player_banned(username: str) -> bool: CURSOR.execute( 'SELECT * FROM banned_players WHERE handle = ?', (username,) ) row = CURSOR.fetchone() return row is not None def is_player_administrator(username: str) -> bool: CURSOR.execute( 'SELECT is_administrator FROM players WHERE username = ? LIMIT 1', (username,) ) row = CURSOR.fetchone() return row[0] if row else False def insert_card( name: str, rarity: int, file_id: str, power: int =None, charm: int = None, wit: int = None) -> int: '''Inserts a card''' if power is not None and charm is not None and wit is not None: CURSOR.execute( ''' INSERT INTO cards (name, rarity, file_id, power, charm, wit) VALUES (?, ?, ?, ?, ?, ?) ''', (name, rarity, file_id, power, charm, wit) ) else: CURSOR.execute( ''' INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?) ''', (name, rarity, file_id) ) card_id = CURSOR.lastrowid return card_id if card_id else 0 def insert_pull(player_id: int, card_id: int) -> None: '''Creates a pull in the database''' CURSOR.execute( 'INSERT INTO pulls (player_id, card_id) VALUES (?, ?)', (player_id, card_id) ) def get_last_rolled_at(player_id: int) -> int: '''Gets the timestamp when the player last rolled''' CURSOR.execute( "SELECT timestamp FROM pulls WHERE player_id = ? ORDER BY timestamp \ DESC", (player_id,)) row = CURSOR.fetchone() return row[0] if row else 0 # Configuration def add_to_whitelist(instance: str) -> bool: '''Adds an instance to the whitelist, returns false if instance was already present''' try: CURSOR.execute( 'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,) ) return True except sqlite3.IntegrityError: return False def remove_from_whitelist(instance: str) -> bool: '''Removes an instance to the whitelist, returns false if instance was not present''' CURSOR.execute( 'DELETE FROM instance_whitelist WHERE tld = ?', (instance,)) return CURSOR.rowcount > 0 def is_whitelisted(instance: str) -> bool: '''Checks whether an instance is in the whitelist''' if instance == 'local': return True CURSOR.execute( 'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,)) row = CURSOR.fetchone() return row is not None def get_config(key: str) -> str: '''Reads the value for a specified config key from the db''' CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) row = CURSOR.fetchone() return row[0] if row else '' def set_config(key: str, value: str) -> None: '''Writes the value for a specified config key to the db''' CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))