You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
kemoverse/bot/db_utils.py

174 lines
4.8 KiB

import sqlite3
import config
DB_PATH = config.DB_PATH
# Database functions
def get_db_connection():
'''Creates a connection to the database'''
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# User functions
def get_or_create_user(username):
'''Retrieves an ID for a given user, if the user does not exist, it will be
created.'''
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
user = cur.fetchone()
if user:
conn.close()
return user[0]
# New user starts with has_rolled = False
cur.execute(
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
(username, False)
)
conn.commit()
user_id = cur.lastrowid
conn.close()
return user_id
# Gameplay functions
def add_pull(user_id, character_id):
'''Creates a pull in the database'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
conn.commit()
conn.close()
def get_last_rolled_at(user_id):
'''Gets the timestamp when the user last rolled'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
(user_id,))
row = cur.fetchone()
conn.close()
return row[0] if row else None
# Configuration
# Configuration
def get_config(key):
'''Reads the value for a specified config key from the db'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
row = cur.fetchone()
conn.close()
return row[0] if row else None
def set_config(key, value):
'''Writes the value for a specified config key to the db'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
conn.commit()
conn.close()
# Character stat functions
def add_character_stats(character_id, stats):
'''
Adds or updates character stats in the character_stats table.
`stats` should be a dictionary like {'power': 5, 'charm': 3}
'''
if not stats:
return
conn = get_db_connection()
cur = conn.cursor()
columns = ', '.join(stats.keys())
placeholders = ', '.join(['?'] * len(stats))
updates = ', '.join([f"{col}=excluded.{col}" for col in stats.keys()])
values = list(stats.values())
sql = f'''
INSERT INTO character_stats (character_id, {columns})
VALUES (?, {placeholders})
ON CONFLICT(character_id) DO UPDATE SET {updates}
'''
cur.execute(sql, [character_id] + values)
conn.commit()
conn.close()
def update_character_stat(character_id, stat_name, value):
'''Updates a single stat field for a character'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute(f'''
UPDATE character_stats SET {stat_name} = ? WHERE character_id = ?
''', (value, character_id))
conn.commit()
conn.close()
def get_character_stats(character_id):
'''Retrieves all stats for a single character dynamically'''
conn = get_db_connection()
conn.row_factory = sqlite3.Row # Enables dict-style access to rows
cur = conn.cursor()
cur.execute('SELECT * FROM character_stats WHERE character_id = ?', (character_id,))
row = cur.fetchone()
conn.close()
if row:
return {key: row[key] for key in row.keys() if key != 'character_id'}
else:
return {}
def get_character_stat(character_id, stat_name):
'''Retrieves a single stat value for a character'''
if stat_name not in ('power', 'charm'):
raise ValueError("Invalid stat name")
conn = get_db_connection()
cur = conn.cursor()
cur.execute(f'SELECT {stat_name} FROM character_stats WHERE character_id = ?', (character_id,))
row = cur.fetchone()
conn.close()
return row[0] if row else 0
def get_stats_for_multiple_characters(character_ids):
'''
Retrieves stats for a list of character IDs.
Returns a dictionary of character_id -> {stat_name: value, ...}
'''
if not character_ids:
return {}
placeholders = ','.join('?' for _ in character_ids)
query = f'''
SELECT *
FROM character_stats
WHERE character_id IN ({placeholders})
'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute(query, character_ids)
rows = cur.fetchall()
col_names = [desc[0] for desc in cur.description]
conn.close()
result = {}
for row in rows:
character_id = row[0]
stats = dict(zip(col_names[1:], row[1:])) # Skip character_id
result[character_id] = stats
return result