Compare commits

..

No commits in common. "471f39917690a9c03c25d913d601d7cd16bbf1b3" and "631b66e0034c5e86a321f6d425569ad4230a2ddd" have entirely different histories.

5 changed files with 103 additions and 82 deletions

View file

@ -1,12 +1,11 @@
import requests import requests
from misskey.exceptions import MisskeyAPIException from misskey.exceptions import MisskeyAPIException
from client import client_connection from client import client_connection
from db_utils import insert_character from db_utils import get_db_connection
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]: def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
""" """
Adds a character to the database, uploading the image from a public URL to Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
the bot's Misskey Drive.
Args: Args:
name (str): Character name. name (str): Character name.
@ -21,6 +20,7 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
ValueError: If inputs are invalid. ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails. RuntimeError: If image download/upload or database operation fails.
""" """
try:
# Validate inputs # Validate inputs
if not name or not name.strip(): if not name or not name.strip():
raise ValueError("Character name cannot be empty.") raise ValueError("Character name cannot be empty.")
@ -45,5 +45,16 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
# Insert into database # Insert into database
character_id = insert_character(name.strip(), rarity, float(weight), file_id) conn = get_db_connection()
cur = conn.cursor()
cur.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
(name.strip(), rarity, float(weight), file_id)
)
conn.commit()
character_id = cur.lastrowid
return character_id, file_id return character_id, file_id
finally:
if 'conn' in locals():
conn.close()

View file

@ -1,7 +1,6 @@
import time import time
import misskey as misskey import misskey as misskey
from client import client_connection from client import client_connection
import db_utils as db
from config import NOTIFICATION_POLL_INTERVAL from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications from notification import process_notifications
@ -9,9 +8,6 @@ from notification import process_notifications
if __name__ == '__main__': if __name__ == '__main__':
# Initialize the Misskey client # Initialize the Misskey client
client = client_connection() client = client_connection()
# Connect to DB
db.connect()
print('Listening for notifications...') print('Listening for notifications...')
while True: while True:
if not process_notifications(client): if not process_notifications(client):

View file

@ -1,32 +1,15 @@
from random import choices
import sqlite3 import sqlite3
import config import config
DB_PATH = config.DB_PATH DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection
CURSOR: sqlite3.Cursor
def connect() -> None: # Database functions
def get_db_connection():
'''Creates a connection to the database''' '''Creates a connection to the database'''
print('Connecting to the database...') conn = sqlite3.connect(DB_PATH)
global CONNECTION conn.row_factory = sqlite3.Row
global CURSOR return conn
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
CONNECTION.row_factory = sqlite3.Row
CURSOR = CONNECTION.cursor()
def get_random_character():
''' Gets a random character from the database'''
CURSOR.execute('SELECT * FROM characters')
characters = CURSOR.fetchall()
if not characters:
return None, None, None, None
weights = [c['weight'] for c in characters]
chosen = choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
# User functions # User functions
@ -34,53 +17,64 @@ def get_random_character():
def get_or_create_user(username): def get_or_create_user(username):
'''Retrieves an ID for a given user, if the user does not exist, it will be '''Retrieves an ID for a given user, if the user does not exist, it will be
created.''' created.'''
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,)) conn = get_db_connection()
user = CURSOR.fetchone() conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
user = cur.fetchone()
if user: if user:
conn.close()
return user[0] return user[0]
# New user starts with has_rolled = False # New user starts with has_rolled = False
CURSOR.execute( cur.execute(
'INSERT INTO users (username, has_rolled) VALUES (?, ?)', 'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
(username, False) (username, False)
) )
user_id = CURSOR.lastrowid conn.commit()
user_id = cur.lastrowid
conn.close()
return user_id return user_id
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a character'''
CURSOR.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
(name, rarity, weight, file_id)
)
character_id = CURSOR.lastrowid
return character_id if character_id else 0
def insert_pull(user_id, character_id): # Gameplay functions
def add_pull(user_id, character_id):
'''Creates a pull in the database''' '''Creates a pull in the database'''
CURSOR.execute( conn = get_db_connection()
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', cur = conn.cursor()
(user_id, character_id) cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
) conn.commit()
conn.close()
def get_last_rolled_at(user_id): def get_last_rolled_at(user_id):
'''Gets the timestamp when the user last rolled''' '''Gets the timestamp when the user last rolled'''
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \ conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
(user_id,)) (user_id,))
row = CURSOR.fetchone() row = cur.fetchone()
conn.close()
return row[0] if row else None return row[0] if row else None
# Configuration # Configuration
def get_config(key): def get_config(key):
'''Reads the value for a specified config key from the db''' '''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) conn = get_db_connection()
row = CURSOR.fetchone() cur = conn.cursor()
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
row = cur.fetchone()
conn.close()
return row[0] if row else None return row[0] if row else None
def set_config(key, value): def set_config(key, value):
'''Writes the value for a specified config key to the db''' '''Writes the value for a specified config key to the db'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value)) cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
conn.commit()
conn.close()
# Character stat functions # Character stat functions

View file

@ -1,8 +1,25 @@
import random
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character from db_utils import get_or_create_user, add_pull, get_db_connection, get_last_rolled_at
from add_character import add_character from add_character import add_character
from config import GACHA_ROLL_INTERVAL from config import GACHA_ROLL_INTERVAL
def get_character():
''' Gets a random character from the database'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM characters')
characters = cur.fetchall()
conn.close()
if not characters:
return None, None, None, None
weights = [c['weight'] for c in characters]
chosen = random.choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
def do_roll(full_user): def do_roll(full_user):
'''Determines whether the user can roll, then pulls a random character''' '''Determines whether the user can roll, then pulls a random character'''
user_id = get_or_create_user(full_user) user_id = get_or_create_user(full_user)
@ -33,12 +50,12 @@ def do_roll(full_user):
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.' return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
character_id, character_name, file_id, rarity = get_random_character() character_id, character_name, file_id, rarity = get_character()
if not character_id: if not character_id:
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿' return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
insert_pull(user_id,character_id) add_pull(user_id,character_id)
stars = '⭐️' * rarity stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\ return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
She's all yours now~ 💖✨",[file_id]]) She's all yours now~ 💖✨",[file_id]])
@ -77,6 +94,7 @@ def do_create(full_user, arguments, note_obj):
) )
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]]) return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
def do_help(full_user): def do_help(full_user):
'''Provides a list of commands that the bot can do.''' '''Provides a list of commands that the bot can do.'''
return f'{full_user} Here\'s what I can do:\n \ return f'{full_user} Here\'s what I can do:\n \
@ -84,6 +102,11 @@ def do_help(full_user):
- `create <name> <rarity> <weight>` Creates a character using a given image.\ - `create <name> <rarity> <weight>` Creates a character using a given image.\
- `help` Shows this message' - `help` Shows this message'
def do_invalid_command(command, full_user):
'''Generic response when an unknown or invalid command is sent'''
return f'{full_user} Unrecognised command: {command}\n\
Message \'help\' to get a list of valid commands'
def generate_response(parsed_command): def generate_response(parsed_command):
'''Given a command with arguments, processes the game state and '''Given a command with arguments, processes the game state and
returns a response''' returns a response'''
@ -97,4 +120,4 @@ def generate_response(parsed_command):
case 'help': case 'help':
return do_help(command) return do_help(command)
case _: case _:
return None return do_invalid_command(command, full_user)

3
db.py
View file

@ -50,9 +50,6 @@ CREATE TABLE IF NOT EXISTS character_stats (
) )
''') ''')
# Initialize essential config key
cursor.execute('INSERT INTO config VALUES ("last_seen_notif_id", 0)')
""" # Insert example characters into the database if they don't already exist """ # Insert example characters into the database if they don't already exist
characters = [ characters = [
('Murakami-san', 1, 0.35), ('Murakami-san', 1, 0.35),