Compare commits
7 commits
631b66e003
...
471f399176
Author | SHA1 | Date | |
---|---|---|---|
471f399176 | |||
d210f44efc | |||
bdd2f20b84 | |||
0690ac5212 | |||
e8774cb8bd | |||
d9027356ab | |||
4a7c9239fc |
5 changed files with 82 additions and 103 deletions
|
@ -1,11 +1,12 @@
|
||||||
import requests
|
import requests
|
||||||
from misskey.exceptions import MisskeyAPIException
|
from misskey.exceptions import MisskeyAPIException
|
||||||
from client import client_connection
|
from client import client_connection
|
||||||
from db_utils import get_db_connection
|
from db_utils import insert_character
|
||||||
|
|
||||||
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
||||||
"""
|
"""
|
||||||
Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
|
Adds a character to the database, uploading the image from a public URL to
|
||||||
|
the bot's Misskey Drive.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
name (str): Character name.
|
name (str): Character name.
|
||||||
|
@ -20,41 +21,29 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
||||||
ValueError: If inputs are invalid.
|
ValueError: If inputs are invalid.
|
||||||
RuntimeError: If image download/upload or database operation fails.
|
RuntimeError: If image download/upload or database operation fails.
|
||||||
"""
|
"""
|
||||||
|
# Validate inputs
|
||||||
|
if not name or not name.strip():
|
||||||
|
raise ValueError("Character name cannot be empty.")
|
||||||
|
if not isinstance(rarity, int) or rarity < 1:
|
||||||
|
raise ValueError("Rarity must be a positive integer.")
|
||||||
|
if not isinstance(weight, (int, float)) or weight <= 0:
|
||||||
|
raise ValueError("Weight must be a positive number.")
|
||||||
|
if not image_url:
|
||||||
|
raise ValueError("Image URL must be provided.")
|
||||||
|
|
||||||
|
# Download image
|
||||||
|
response = requests.get(image_url, stream=True, timeout=30)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||||
|
|
||||||
|
# Upload to bot's Drive
|
||||||
|
mk = client_connection()
|
||||||
try:
|
try:
|
||||||
# Validate inputs
|
media = mk.drive_files_create(response.raw)
|
||||||
if not name or not name.strip():
|
file_id = media["id"]
|
||||||
raise ValueError("Character name cannot be empty.")
|
except MisskeyAPIException as e:
|
||||||
if not isinstance(rarity, int) or rarity < 1:
|
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
||||||
raise ValueError("Rarity must be a positive integer.")
|
|
||||||
if not isinstance(weight, (int, float)) or weight <= 0:
|
|
||||||
raise ValueError("Weight must be a positive number.")
|
|
||||||
if not image_url:
|
|
||||||
raise ValueError("Image URL must be provided.")
|
|
||||||
|
|
||||||
# Download image
|
# Insert into database
|
||||||
response = requests.get(image_url, stream=True, timeout=30)
|
character_id = insert_character(name.strip(), rarity, float(weight), file_id)
|
||||||
if response.status_code != 200:
|
return character_id, file_id
|
||||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
|
||||||
|
|
||||||
# Upload to bot's Drive
|
|
||||||
mk = client_connection()
|
|
||||||
try:
|
|
||||||
media = mk.drive_files_create(response.raw)
|
|
||||||
file_id = media["id"]
|
|
||||||
except MisskeyAPIException as e:
|
|
||||||
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
|
||||||
|
|
||||||
# Insert into database
|
|
||||||
conn = get_db_connection()
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute(
|
|
||||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
|
||||||
(name.strip(), rarity, float(weight), file_id)
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
character_id = cur.lastrowid
|
|
||||||
|
|
||||||
return character_id, file_id
|
|
||||||
finally:
|
|
||||||
if 'conn' in locals():
|
|
||||||
conn.close()
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import time
|
import time
|
||||||
import misskey as misskey
|
import misskey as misskey
|
||||||
from client import client_connection
|
from client import client_connection
|
||||||
|
import db_utils as db
|
||||||
|
|
||||||
from config import NOTIFICATION_POLL_INTERVAL
|
from config import NOTIFICATION_POLL_INTERVAL
|
||||||
from notification import process_notifications
|
from notification import process_notifications
|
||||||
|
@ -8,6 +9,9 @@ from notification import process_notifications
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# Initialize the Misskey client
|
# Initialize the Misskey client
|
||||||
client = client_connection()
|
client = client_connection()
|
||||||
|
# Connect to DB
|
||||||
|
db.connect()
|
||||||
|
|
||||||
print('Listening for notifications...')
|
print('Listening for notifications...')
|
||||||
while True:
|
while True:
|
||||||
if not process_notifications(client):
|
if not process_notifications(client):
|
||||||
|
|
|
@ -1,15 +1,32 @@
|
||||||
|
from random import choices
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import config
|
import config
|
||||||
|
|
||||||
DB_PATH = config.DB_PATH
|
DB_PATH = config.DB_PATH
|
||||||
|
CONNECTION: sqlite3.Connection
|
||||||
|
CURSOR: sqlite3.Cursor
|
||||||
|
|
||||||
# Database functions
|
def connect() -> None:
|
||||||
|
|
||||||
def get_db_connection():
|
|
||||||
'''Creates a connection to the database'''
|
'''Creates a connection to the database'''
|
||||||
conn = sqlite3.connect(DB_PATH)
|
print('Connecting to the database...')
|
||||||
conn.row_factory = sqlite3.Row
|
global CONNECTION
|
||||||
return conn
|
global CURSOR
|
||||||
|
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
|
||||||
|
CONNECTION.row_factory = sqlite3.Row
|
||||||
|
CURSOR = CONNECTION.cursor()
|
||||||
|
|
||||||
|
def get_random_character():
|
||||||
|
''' Gets a random character from the database'''
|
||||||
|
CURSOR.execute('SELECT * FROM characters')
|
||||||
|
characters = CURSOR.fetchall()
|
||||||
|
|
||||||
|
if not characters:
|
||||||
|
return None, None, None, None
|
||||||
|
|
||||||
|
weights = [c['weight'] for c in characters]
|
||||||
|
chosen = choices(characters, weights=weights, k=1)[0]
|
||||||
|
|
||||||
|
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
||||||
|
|
||||||
|
|
||||||
# User functions
|
# User functions
|
||||||
|
@ -17,64 +34,53 @@ def get_db_connection():
|
||||||
def get_or_create_user(username):
|
def get_or_create_user(username):
|
||||||
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
||||||
created.'''
|
created.'''
|
||||||
conn = get_db_connection()
|
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||||
conn.row_factory = sqlite3.Row
|
user = CURSOR.fetchone()
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
|
|
||||||
user = cur.fetchone()
|
|
||||||
if user:
|
if user:
|
||||||
conn.close()
|
|
||||||
return user[0]
|
return user[0]
|
||||||
|
|
||||||
# New user starts with has_rolled = False
|
# New user starts with has_rolled = False
|
||||||
cur.execute(
|
CURSOR.execute(
|
||||||
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
||||||
(username, False)
|
(username, False)
|
||||||
)
|
)
|
||||||
conn.commit()
|
user_id = CURSOR.lastrowid
|
||||||
user_id = cur.lastrowid
|
|
||||||
conn.close()
|
|
||||||
return user_id
|
return user_id
|
||||||
|
|
||||||
|
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
|
||||||
|
'''Inserts a character'''
|
||||||
|
CURSOR.execute(
|
||||||
|
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
||||||
|
(name, rarity, weight, file_id)
|
||||||
|
)
|
||||||
|
character_id = CURSOR.lastrowid
|
||||||
|
return character_id if character_id else 0
|
||||||
|
|
||||||
# Gameplay functions
|
def insert_pull(user_id, character_id):
|
||||||
|
|
||||||
def add_pull(user_id, character_id):
|
|
||||||
'''Creates a pull in the database'''
|
'''Creates a pull in the database'''
|
||||||
conn = get_db_connection()
|
CURSOR.execute(
|
||||||
cur = conn.cursor()
|
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
|
||||||
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
|
(user_id, character_id)
|
||||||
conn.commit()
|
)
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def get_last_rolled_at(user_id):
|
def get_last_rolled_at(user_id):
|
||||||
'''Gets the timestamp when the user last rolled'''
|
'''Gets the timestamp when the user last rolled'''
|
||||||
conn = get_db_connection()
|
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
|
|
||||||
(user_id,))
|
(user_id,))
|
||||||
row = cur.fetchone()
|
row = CURSOR.fetchone()
|
||||||
conn.close()
|
|
||||||
return row[0] if row else None
|
return row[0] if row else None
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
|
|
||||||
def get_config(key):
|
def get_config(key):
|
||||||
'''Reads the value for a specified config key from the db'''
|
'''Reads the value for a specified config key from the db'''
|
||||||
conn = get_db_connection()
|
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||||
cur = conn.cursor()
|
row = CURSOR.fetchone()
|
||||||
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
|
|
||||||
row = cur.fetchone()
|
|
||||||
conn.close()
|
|
||||||
return row[0] if row else None
|
return row[0] if row else None
|
||||||
|
|
||||||
def set_config(key, value):
|
def set_config(key, value):
|
||||||
'''Writes the value for a specified config key to the db'''
|
'''Writes the value for a specified config key to the db'''
|
||||||
conn = get_db_connection()
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
||||||
conn.commit()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
# Character stat functions
|
# Character stat functions
|
||||||
|
|
|
@ -1,25 +1,8 @@
|
||||||
import random
|
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from db_utils import get_or_create_user, add_pull, get_db_connection, get_last_rolled_at
|
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character
|
||||||
from add_character import add_character
|
from add_character import add_character
|
||||||
from config import GACHA_ROLL_INTERVAL
|
from config import GACHA_ROLL_INTERVAL
|
||||||
|
|
||||||
def get_character():
|
|
||||||
''' Gets a random character from the database'''
|
|
||||||
conn = get_db_connection()
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute('SELECT * FROM characters')
|
|
||||||
characters = cur.fetchall()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if not characters:
|
|
||||||
return None, None, None, None
|
|
||||||
|
|
||||||
weights = [c['weight'] for c in characters]
|
|
||||||
chosen = random.choices(characters, weights=weights, k=1)[0]
|
|
||||||
|
|
||||||
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
|
||||||
|
|
||||||
def do_roll(full_user):
|
def do_roll(full_user):
|
||||||
'''Determines whether the user can roll, then pulls a random character'''
|
'''Determines whether the user can roll, then pulls a random character'''
|
||||||
user_id = get_or_create_user(full_user)
|
user_id = get_or_create_user(full_user)
|
||||||
|
@ -50,12 +33,12 @@ def do_roll(full_user):
|
||||||
|
|
||||||
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
|
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
|
||||||
|
|
||||||
character_id, character_name, file_id, rarity = get_character()
|
character_id, character_name, file_id, rarity = get_random_character()
|
||||||
|
|
||||||
if not character_id:
|
if not character_id:
|
||||||
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
|
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
|
||||||
|
|
||||||
add_pull(user_id,character_id)
|
insert_pull(user_id,character_id)
|
||||||
stars = '⭐️' * rarity
|
stars = '⭐️' * rarity
|
||||||
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
|
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
|
||||||
She's all yours now~ 💖✨",[file_id]])
|
She's all yours now~ 💖✨",[file_id]])
|
||||||
|
@ -94,7 +77,6 @@ def do_create(full_user, arguments, note_obj):
|
||||||
)
|
)
|
||||||
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
|
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
|
||||||
|
|
||||||
|
|
||||||
def do_help(full_user):
|
def do_help(full_user):
|
||||||
'''Provides a list of commands that the bot can do.'''
|
'''Provides a list of commands that the bot can do.'''
|
||||||
return f'{full_user} Here\'s what I can do:\n \
|
return f'{full_user} Here\'s what I can do:\n \
|
||||||
|
@ -102,11 +84,6 @@ def do_help(full_user):
|
||||||
- `create <name> <rarity> <weight>` Creates a character using a given image.\
|
- `create <name> <rarity> <weight>` Creates a character using a given image.\
|
||||||
- `help` Shows this message'
|
- `help` Shows this message'
|
||||||
|
|
||||||
def do_invalid_command(command, full_user):
|
|
||||||
'''Generic response when an unknown or invalid command is sent'''
|
|
||||||
return f'{full_user} Unrecognised command: {command}\n\
|
|
||||||
Message \'help\' to get a list of valid commands'
|
|
||||||
|
|
||||||
def generate_response(parsed_command):
|
def generate_response(parsed_command):
|
||||||
'''Given a command with arguments, processes the game state and
|
'''Given a command with arguments, processes the game state and
|
||||||
returns a response'''
|
returns a response'''
|
||||||
|
@ -120,4 +97,4 @@ def generate_response(parsed_command):
|
||||||
case 'help':
|
case 'help':
|
||||||
return do_help(command)
|
return do_help(command)
|
||||||
case _:
|
case _:
|
||||||
return do_invalid_command(command, full_user)
|
return None
|
||||||
|
|
3
db.py
3
db.py
|
@ -50,6 +50,9 @@ CREATE TABLE IF NOT EXISTS character_stats (
|
||||||
)
|
)
|
||||||
''')
|
''')
|
||||||
|
|
||||||
|
# Initialize essential config key
|
||||||
|
cursor.execute('INSERT INTO config VALUES ("last_seen_notif_id", 0)')
|
||||||
|
|
||||||
""" # Insert example characters into the database if they don't already exist
|
""" # Insert example characters into the database if they don't already exist
|
||||||
characters = [
|
characters = [
|
||||||
('Murakami-san', 1, 0.35),
|
('Murakami-san', 1, 0.35),
|
||||||
|
|
Loading…
Add table
Reference in a new issue