Compare commits
51 commits
6aa0bf704d
...
2d7107e745
Author | SHA1 | Date | |
---|---|---|---|
2d7107e745 | |||
e41c965538 | |||
70c8fd7a0c | |||
8569f28f3c | |||
956d5927cd | |||
67afa22e2e | |||
cbdbefb5fc | |||
0a2f7fe00d | |||
3de6a9ac3d | |||
9f9c034461 | |||
4c2b3f589f | |||
0c2e3754da | |||
c2fd435622 | |||
3a09b481e5 | |||
b7d82d9117 | |||
66a91b5deb | |||
dce2c9072b | |||
71b9cd9d16 | |||
a0ed6ded41 | |||
da2ca4cfec | |||
3fdf64c6ed | |||
7b9a5cf428 | |||
6ea5529ef5 | |||
eaa6308906 | |||
7d0ddc492c | |||
de7670204a | |||
846130771e | |||
4f853df32c | |||
8fb91f7754 | |||
37ac7dbb0c | |||
26f23a1354 | |||
36c65b3ae6 | |||
a693a2189d | |||
f3a32f1176 | |||
6912758a44 | |||
25a72b3002 | |||
d210f44efc | |||
bdd2f20b84 | |||
0690ac5212 | |||
e8774cb8bd | |||
d9027356ab | |||
4a7c9239fc | |||
8a331e0c7b | |||
47272aee4f | |||
ff20e26821 | |||
e53bd87d81 | |||
9be92afce3 | |||
d2a7e523e8 | |||
45dd6f10e3 | |||
24309ce900 | |||
6b6777cb89 |
26 changed files with 1041 additions and 556 deletions
8
.gitignore
vendored
8
.gitignore
vendored
|
@ -147,6 +147,7 @@ venv.bak/
|
|||
/site
|
||||
|
||||
# mypy
|
||||
.mypy.ini
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
@ -181,5 +182,8 @@ cython_debug/
|
|||
.cursorindexingignore
|
||||
|
||||
# Custom stuff
|
||||
gacha_game.db
|
||||
config.ini
|
||||
gacha_game*.db
|
||||
gacha_game*.db.*
|
||||
config*.ini
|
||||
|
||||
.idea
|
|
@ -1,17 +1,24 @@
|
|||
import requests
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
from client import client_connection
|
||||
from db_utils import get_db_connection
|
||||
from db_utils import insert_character
|
||||
from custom_types import Character
|
||||
from config import RARITY_TO_WEIGHT
|
||||
|
||||
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
||||
"""
|
||||
Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
|
||||
|
||||
def add_character(
|
||||
name: str,
|
||||
rarity: int,
|
||||
image_url: str) -> tuple[int, str]:
|
||||
'''
|
||||
Adds a character to the database, uploading the image from a public URL to
|
||||
the bot's Misskey Drive.
|
||||
|
||||
Args:
|
||||
name (str): Character name.
|
||||
rarity (int): Character rarity (e.g., 1-5).
|
||||
weight (float): Pull weight (e.g., 0.02).
|
||||
image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']).
|
||||
image_url (str): Public URL of the image from the post (e.g., from
|
||||
note['files'][i]['url']).
|
||||
|
||||
Returns:
|
||||
tuple[int, str]: Character ID and bot's Drive file_id.
|
||||
|
@ -19,45 +26,39 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
|||
Raises:
|
||||
ValueError: If inputs are invalid.
|
||||
RuntimeError: If image download/upload or database operation fails.
|
||||
"""
|
||||
try:
|
||||
'''
|
||||
|
||||
stripped_name = name.strip()
|
||||
|
||||
# Validate inputs
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Character name cannot be empty.")
|
||||
if not isinstance(rarity, int) or rarity < 1:
|
||||
raise ValueError("Rarity must be a positive integer.")
|
||||
if not isinstance(weight, (int, float)) or weight <= 0:
|
||||
raise ValueError("Weight must be a positive number.")
|
||||
if not stripped_name:
|
||||
raise ValueError('Character name cannot be empty.')
|
||||
if rarity < 1:
|
||||
raise ValueError('Rarity must be a positive integer.')
|
||||
if rarity not in RARITY_TO_WEIGHT.keys():
|
||||
raise ValueError(f'Invalid rarity: {rarity}')
|
||||
if not image_url:
|
||||
raise ValueError("Image URL must be provided.")
|
||||
raise ValueError('Image URL must be provided.')
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True)
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
raise RuntimeError(f'Failed to download image from {image_url}')
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
try:
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media["id"]
|
||||
file_id = media['id']
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
||||
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
|
||||
from e
|
||||
|
||||
# Insert into database
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
||||
(name.strip(), rarity, float(weight), file_id)
|
||||
character_id = insert_character(
|
||||
stripped_name,
|
||||
rarity,
|
||||
RARITY_TO_WEIGHT[rarity],
|
||||
file_id
|
||||
)
|
||||
conn.commit()
|
||||
character_id = cur.lastrowid
|
||||
|
||||
return character_id, file_id
|
||||
|
||||
except Exception as e:
|
||||
raise
|
||||
finally:
|
||||
if 'conn' in locals():
|
||||
conn.close()
|
|
@ -1,79 +1,18 @@
|
|||
import time
|
||||
import traceback
|
||||
import misskey
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_or_create_user, add_pull, get_config, set_config
|
||||
import misskey as misskey
|
||||
from client import client_connection
|
||||
import db_utils as db
|
||||
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
from config import NOTIFICATION_POLL_INTERVAL
|
||||
from notification import process_notifications
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
whitelisted_instances: list[str] = []
|
||||
|
||||
def stream_notifications():
|
||||
print("Starting filtered notification stream...")
|
||||
|
||||
last_seen_id = get_config("last_seen_notif_id")
|
||||
if __name__ == '__main__':
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
# Connect to DB
|
||||
db.connect()
|
||||
|
||||
print('Listening for notifications...')
|
||||
while True:
|
||||
try:
|
||||
# May be able to mark notifications as read using misskey.py and
|
||||
# filter them out here. This function also takes a since_id we
|
||||
# could use as well
|
||||
notifications = client.i_notifications()
|
||||
|
||||
if notifications:
|
||||
# Oldest to newest
|
||||
notifications.reverse()
|
||||
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.get("id")
|
||||
|
||||
# Skip old or same ID notifications
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host") # None if local user
|
||||
|
||||
instance = host if host else "local"
|
||||
|
||||
if instance in whitelisted_instances or instance == "local":
|
||||
note = notification.get("note", {}).get("text", "")
|
||||
notif_type = notification.get("type", "unknown")
|
||||
|
||||
print(f"📨 [{notif_type}] from @{username}@{instance}")
|
||||
print(f"💬 {note}")
|
||||
print("-" * 30)
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parse_notification(notification,client)
|
||||
|
||||
else:
|
||||
print(f"⚠️ Blocked notification from untrusted instance: {host}")
|
||||
|
||||
# Update only if this notif_id is greater
|
||||
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||
new_last_seen_id = notif_id
|
||||
|
||||
# Save the latest seen ID
|
||||
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||
set_config("last_seen_notif_id", new_last_seen_id)
|
||||
last_seen_id = new_last_seen_id
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
stream_notifications()
|
||||
if not process_notifications(client):
|
||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import misskey
|
||||
import config
|
||||
|
||||
def client_connection():
|
||||
|
||||
def client_connection() -> misskey.Misskey:
|
||||
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
||||
|
|
|
@ -1,22 +1,56 @@
|
|||
'''Essentials for the bot to function'''
|
||||
import configparser
|
||||
from os import environ, path
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
'''Could not find config file'''
|
||||
|
||||
|
||||
def get_config_file() -> str:
|
||||
'''Gets the path to the config file in the current environment'''
|
||||
env: str | None = environ.get('KEMOVERSE_ENV')
|
||||
if not env:
|
||||
raise ConfigError('Error: KEMOVERSE_ENV is unset')
|
||||
if not (env in ['prod', 'dev']):
|
||||
raise ConfigError(f'Error: Invalid environment: {env}')
|
||||
|
||||
config_path: str = f'config_{env}.ini'
|
||||
|
||||
if not path.isfile(config_path):
|
||||
raise ConfigError(f'Could not find {config_path}')
|
||||
return config_path
|
||||
|
||||
def get_rarity_to_weight(config_section):
|
||||
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
|
||||
rarity_weights = {}
|
||||
for key, value in config_section.items():
|
||||
if key.startswith("rarity_"):
|
||||
rarity = int(key.removeprefix("rarity_"))
|
||||
rarity_weights[rarity] = float(value)
|
||||
return rarity_weights
|
||||
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read('config.ini')
|
||||
config.read(get_config_file())
|
||||
|
||||
# Username for the bot
|
||||
USER = config['application']['BotUser']
|
||||
|
||||
USER = config['credentials']['User'].lower()
|
||||
# API key for the bot
|
||||
KEY = config['application']['ApiKey']
|
||||
KEY = config['credentials']['Token']
|
||||
# Bot's Misskey instance URL
|
||||
INSTANCE = config['application']['InstanceUrl']
|
||||
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
|
||||
# Extra stuff for control of the bot
|
||||
INSTANCE = config['credentials']['Instance'].lower()
|
||||
|
||||
# TODO: move this to db
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot'''
|
||||
# to use extra admin exclusive commands with the bot
|
||||
ADMINS = config['application']['DefaultAdmins']
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
|
||||
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
|
||||
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
|
||||
|
||||
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])
|
||||
|
||||
RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha'])
|
21
bot/custom_types.py
Normal file
21
bot/custom_types.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
from typing import TypedDict, List, Dict, Any
|
||||
|
||||
BotResponse = TypedDict('BotResponse', {
|
||||
'message': str,
|
||||
'attachment_urls': List[str] | None
|
||||
})
|
||||
|
||||
Character = TypedDict('Character', {
|
||||
'id': int,
|
||||
'name': str,
|
||||
'rarity': int,
|
||||
'weight': float,
|
||||
'image_url': str
|
||||
})
|
||||
|
||||
ParsedNotification = TypedDict('ParsedNotification', {
|
||||
'author': str,
|
||||
'command': str | None,
|
||||
'arguments': List[str],
|
||||
'note_obj': Dict[str, Any]
|
||||
})
|
143
bot/db_utils.py
143
bot/db_utils.py
|
@ -1,58 +1,121 @@
|
|||
from random import choices
|
||||
import sqlite3
|
||||
import random
|
||||
import config
|
||||
from custom_types import Character
|
||||
|
||||
DB_PATH = config.DB_PATH
|
||||
CONNECTION: sqlite3.Connection
|
||||
CURSOR: sqlite3.Cursor
|
||||
|
||||
def get_db_connection():
|
||||
|
||||
def connect() -> None:
|
||||
'''Creates a connection to the database'''
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
print('Connecting to the database...')
|
||||
global CONNECTION
|
||||
global CURSOR
|
||||
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
|
||||
CONNECTION.row_factory = sqlite3.Row
|
||||
CURSOR = CONNECTION.cursor()
|
||||
|
||||
def get_or_create_user(username):
|
||||
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
||||
created.'''
|
||||
conn = get_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||
user = cur.fetchone()
|
||||
|
||||
def get_random_character() -> Character | None:
|
||||
''' Gets a random character from the database'''
|
||||
CURSOR.execute('SELECT * FROM characters')
|
||||
characters = CURSOR.fetchall()
|
||||
|
||||
if not characters:
|
||||
return None
|
||||
|
||||
weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in characters]
|
||||
chosen = choices(characters, weights=weights, k=1)[0]
|
||||
|
||||
return {
|
||||
'id': chosen['id'],
|
||||
'name': chosen['name'],
|
||||
'rarity': chosen['rarity'],
|
||||
'weight': config.RARITY_TO_WEIGHT[chosen['rarity']],
|
||||
'image_url': chosen['file_id']
|
||||
}
|
||||
|
||||
def get_player(username: str) -> int:
|
||||
'''Retrieve a player ID by username, or return None if not found.'''
|
||||
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||
user = CURSOR.fetchone()
|
||||
if user:
|
||||
conn.close()
|
||||
return user[0]
|
||||
return int(user[0])
|
||||
|
||||
# New user starts with has_rolled = False
|
||||
cur.execute(
|
||||
def insert_player(username: str) -> int:
|
||||
'''Insert a new player with default has_rolled = False and return their user ID.'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
||||
(username, False)
|
||||
)
|
||||
conn.commit()
|
||||
user_id = cur.lastrowid
|
||||
conn.close()
|
||||
return user_id
|
||||
return CURSOR.lastrowid
|
||||
|
||||
def add_pull(user_id, character_id):
|
||||
def delete_player(username: str) -> bool:
|
||||
'''Permanently deletes a user and all their pulls.'''
|
||||
CURSOR.execute(
|
||||
'SELECT id FROM users WHERE username = ?',
|
||||
(username,)
|
||||
)
|
||||
user = CURSOR.fetchone()
|
||||
|
||||
user_id = user[0]
|
||||
|
||||
# Delete pulls
|
||||
CURSOR.execute(
|
||||
'DELETE FROM pulls WHERE user_id = ?',
|
||||
(user_id,)
|
||||
)
|
||||
|
||||
# Delete user
|
||||
CURSOR.execute(
|
||||
'DELETE FROM users WHERE id = ?',
|
||||
(user_id,)
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
def insert_character(
|
||||
name: str, rarity: int, weight: float, file_id: str) -> int:
|
||||
'''Inserts a character'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES \
|
||||
(?, ?, ?, ?)',
|
||||
(name, rarity, weight, file_id)
|
||||
)
|
||||
character_id = CURSOR.lastrowid
|
||||
return character_id if character_id else 0
|
||||
|
||||
|
||||
def insert_pull(user_id: int, character_id: int) -> None:
|
||||
'''Creates a pull in the database'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
CURSOR.execute(
|
||||
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
|
||||
(user_id, character_id)
|
||||
)
|
||||
|
||||
def get_config(key):
|
||||
|
||||
def get_last_rolled_at(user_id: int) -> int:
|
||||
'''Gets the timestamp when the user last rolled'''
|
||||
CURSOR.execute(
|
||||
"SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \
|
||||
DESC",
|
||||
(user_id,))
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
|
||||
def get_config(key: str) -> str:
|
||||
'''Reads the value for a specified config key from the db'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
return row[0] if row else None
|
||||
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else ''
|
||||
|
||||
def set_config(key, value):
|
||||
|
||||
def set_config(key: str, value: str) -> None:
|
||||
'''Writes the value for a specified config key to the db'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
|
||||
(key, value))
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
import random
|
||||
from db_utils import get_or_create_user, add_pull, get_db_connection
|
||||
from add_character import add_character
|
||||
|
||||
def get_character():
|
||||
''' Gets a random character from the database'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute('SELECT * FROM characters')
|
||||
characters = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
if not characters:
|
||||
return None, None, None, None
|
||||
|
||||
weights = [c['weight'] for c in characters]
|
||||
chosen = random.choices(characters, weights=weights, k=1)[0]
|
||||
|
||||
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
||||
|
||||
def is_float(val):
|
||||
try:
|
||||
float(val)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
# TODO: See issue #3, separate command parsing from game logic.
|
||||
def gacha_response(command,full_user, arguments,note_obj):
|
||||
'''Parses a given command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
if command == "roll":
|
||||
user_id = get_or_create_user(full_user)
|
||||
character_id, character_name, file_id, rarity = get_character()
|
||||
|
||||
if not character_id:
|
||||
#TODO: Can't have tuples of a single element
|
||||
# Return these as a dict or object instead.
|
||||
return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿")
|
||||
|
||||
add_pull(user_id,character_id)
|
||||
stars = '⭐️' * rarity
|
||||
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]])
|
||||
|
||||
if command == "create":
|
||||
# Example call from bot logic
|
||||
image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None
|
||||
if not image_url:
|
||||
return "You need an image to create a character, dumbass."
|
||||
|
||||
if len(arguments) != 3:
|
||||
return "Please specify the following attributes in order: name, rarity, drop weighting"
|
||||
|
||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||
return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5"
|
||||
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0"
|
||||
|
||||
character_id, file_id = add_character(
|
||||
name=arguments[0],
|
||||
rarity=int(arguments[1]),
|
||||
weight=float(arguments[2]),
|
||||
image_url=image_url
|
||||
)
|
||||
return([f"Added {arguments[0]}, ID {character_id}.",[file_id]])
|
||||
return None
|
122
bot/notification.py
Normal file
122
bot/notification.py
Normal file
|
@ -0,0 +1,122 @@
|
|||
import traceback
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config
|
||||
from response import generate_response
|
||||
from custom_types import BotResponse
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
|
||||
|
||||
def process_notification(
|
||||
client: misskey.Misskey,
|
||||
notification: Dict[str, Any]) -> None:
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
instance = host if host else 'local'
|
||||
|
||||
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
return
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_notification = parse_notification(notification, client)
|
||||
|
||||
if not parsed_notification:
|
||||
return
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
response: BotResponse | None = generate_response(parsed_notification)
|
||||
|
||||
if not response:
|
||||
return
|
||||
|
||||
client.notes_create(
|
||||
text=response['message'],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response['attachment_urls']
|
||||
# TODO: write actual visible users ids so pleromers can use the bot
|
||||
# privately
|
||||
# visible_user_ids=[]
|
||||
)
|
||||
|
||||
|
||||
def process_notifications(client: misskey.Misskey) -> bool:
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id', '')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a \
|
||||
notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
|
@ -1,23 +1,16 @@
|
|||
import random, re
|
||||
import re
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
|
||||
import config
|
||||
from gacha_response import gacha_response
|
||||
from custom_types import ParsedNotification
|
||||
|
||||
def parse_notification(notification,client):
|
||||
'''Oarses any notifications received by the bot and sends any commands to
|
||||
gacha_response()'''
|
||||
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
|
||||
notif_type = notification.get("type")
|
||||
if not notif_type in ('mention', 'reply'):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification["note"]["visibility"]
|
||||
if visibility != "specified":
|
||||
visibility = "home"
|
||||
def parse_notification(
|
||||
notification: Dict[str, Any],
|
||||
client: misskey.Misskey) -> ParsedNotification | None:
|
||||
'''Parses any notifications received by the bot'''
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
|
@ -40,32 +33,20 @@ def parse_notification(notification,client):
|
|||
|
||||
# Make sure the notification text explicitly mentions the bot
|
||||
if not any(variant in note for variant in username_variants):
|
||||
return
|
||||
return None
|
||||
|
||||
# Find command and arguments after the mention
|
||||
# Removes all mentions (regex = mentions that start with @ and may contain @domain)
|
||||
# Removes all mentions
|
||||
# regex = mentions that start with @ and may contain @domain
|
||||
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
|
||||
parts = cleaned_text.split()
|
||||
|
||||
command = parts[0].lower() if parts else None
|
||||
arguments = parts[1:] if len(parts) > 1 else []
|
||||
|
||||
# TODO: move response generation to a different function
|
||||
response = gacha_response(command.lower(),full_user, arguments, note_obj)
|
||||
if not response:
|
||||
return
|
||||
|
||||
if isinstance(response, str):
|
||||
client.notes_create(
|
||||
text=response,
|
||||
reply_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
else:
|
||||
client.notes_create(
|
||||
text=response[0],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
return {
|
||||
'author': full_user,
|
||||
'command': command,
|
||||
'arguments': arguments,
|
||||
'note_obj': note_obj
|
||||
}
|
||||
|
|
214
bot/response.py
Normal file
214
bot/response.py
Normal file
|
@ -0,0 +1,214 @@
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from typing import TypedDict, Any, List, Dict
|
||||
from db_utils import get_player, insert_player, delete_player, insert_pull, get_last_rolled_at, \
|
||||
get_random_character
|
||||
from add_character import add_character
|
||||
from config import GACHA_ROLL_INTERVAL
|
||||
from custom_types import BotResponse, ParsedNotification
|
||||
|
||||
|
||||
def do_roll(author: str) -> BotResponse:
|
||||
'''Determines whether the user can roll, then pulls a random character'''
|
||||
user_id = get_player(author)
|
||||
if not user_id:
|
||||
return {
|
||||
'message':f'{author} 🛑 You haven’t signed up yet! Use the `signup` command to start playing.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
# Get date of user's last roll
|
||||
date = get_last_rolled_at(user_id)
|
||||
|
||||
# No date means it's users first roll
|
||||
if date:
|
||||
# SQLite timestamps returned by the DB are always in UTC
|
||||
# Below timestamps are to be converted to UTC
|
||||
prev = datetime.strptime(str(date) + '+0000', '%Y-%m-%d %H:%M:%S%z')
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
time_since_last_roll = now - prev
|
||||
roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL)
|
||||
duration = roll_interval - time_since_last_roll
|
||||
|
||||
# User needs to wait before they can roll again
|
||||
if time_since_last_roll < roll_interval:
|
||||
remaining_duration = None
|
||||
if duration.seconds > 3600:
|
||||
remaining_duration = f'{-(duration.seconds // -3600)} hours'
|
||||
elif duration.seconds > 60:
|
||||
remaining_duration = f'{-(duration.seconds // -60)} minutes'
|
||||
else:
|
||||
remaining_duration = f'{duration.seconds} seconds'
|
||||
|
||||
return {
|
||||
'message': f'{author} ⏱️ Please wait another \
|
||||
{remaining_duration} before rolling again.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
character = get_random_character()
|
||||
|
||||
if not character:
|
||||
return {
|
||||
'message': f'{author} Uwaaa... something went wrong! No \
|
||||
characters found. 😿',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
insert_pull(user_id, character['id'])
|
||||
stars = '⭐️' * character['rarity']
|
||||
return {
|
||||
'message': f'{author} 🎲 Congrats! You rolled {stars} \
|
||||
**{character['name']}**\nShe\'s all yours now~ 💖✨',
|
||||
'attachment_urls': [character['image_url']]
|
||||
}
|
||||
|
||||
def do_signup(author: str) -> BotResponse:
|
||||
'''Registers a new user if they haven’t signed up yet.'''
|
||||
user_id = get_player(author)
|
||||
|
||||
if user_id:
|
||||
return {
|
||||
'message':f'{author} 👀 You’re already signed up! Let the rolling begin~ 🎲',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
new_user_id = insert_player(author)
|
||||
return {
|
||||
'message': f'{author} ✅ Signed up successfully! Your gacha destiny begins now... ✨ Use the roll command to start!',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
def is_float(val: Any) -> bool:
|
||||
'''Returns true if `val` can be converted to a float'''
|
||||
try:
|
||||
float(val)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def do_create(
|
||||
author: str,
|
||||
arguments: List[str],
|
||||
note_obj: Dict[str, Any]) -> BotResponse:
|
||||
'''Creates a character'''
|
||||
# Example call from bot logic
|
||||
image_url = note_obj.get('files', [{}])[0].get('url') \
|
||||
if note_obj.get('files') else None
|
||||
|
||||
if not image_url:
|
||||
return {
|
||||
'message': f'{author} You need an image to create a character, \
|
||||
dumbass.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if len(arguments) != 2:
|
||||
return {
|
||||
'message': f'{author} Please specify the following attributes \
|
||||
in order: name, rarity',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||
return {
|
||||
'message': f'{author} Invalid rarity: \'{arguments[1]}\' must \
|
||||
be a number between 1 and 5',
|
||||
'attachment_urls': None
|
||||
}
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return {
|
||||
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
|
||||
must be a decimal value between 0.0 and 1.0',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
character_id, file_id = add_character(
|
||||
name=arguments[0],
|
||||
rarity=int(arguments[1]),
|
||||
image_url=image_url
|
||||
)
|
||||
return {
|
||||
'message': f'{author} Added {arguments[0]}, ID {character_id}.',
|
||||
'attachment_urls': [file_id]
|
||||
}
|
||||
|
||||
|
||||
def do_help(author: str) -> BotResponse:
|
||||
'''Provides a list of commands that the bot can do.'''
|
||||
return {
|
||||
'message':f'{author} Here\'s what I can do:\n \
|
||||
- `roll` Pulls a random character.\
|
||||
- `create <name> <rarity>` Creates a character using a given image.\
|
||||
- `signup` Registers your account.\
|
||||
- `delete_account` Deletes your account.\
|
||||
- `help` Shows this message',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
def delete_account(author: str) -> BotResponse:
|
||||
return {
|
||||
'message':f'{author} ⚠️ This will permanently delete your account and all your cards.\n'
|
||||
'If you’re sure, reply with `confirm_delete` to proceed.\n\n'
|
||||
'**There is no undo.** Your gacha luck will be lost to the void... 💀✨',
|
||||
'attachment_urls': None
|
||||
|
||||
}
|
||||
|
||||
def confirm_delete(author: str) -> BotResponse:
|
||||
|
||||
delete_player(author)
|
||||
|
||||
return {
|
||||
'message':f'{author} 🧼 Your account and all your cards have been deleted. RIP your gacha history 🕊️✨',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def generate_response(notification: ParsedNotification) -> BotResponse | None:
|
||||
'''Given a command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
# Temporary response variable
|
||||
res: BotResponse | None = None
|
||||
# TODO: Check if the user has an account
|
||||
author = notification['author']
|
||||
user_id = get_player(author)
|
||||
command = notification['command']
|
||||
# Check if the user is an administrator
|
||||
# user_is_administrator = user_is_administrator()
|
||||
|
||||
# Unrestricted commands
|
||||
match command:
|
||||
case 'signup':
|
||||
res = do_signup(author)
|
||||
case 'help':
|
||||
res = do_help(author)
|
||||
case 'roll':
|
||||
res = do_roll(author)
|
||||
case _:
|
||||
pass
|
||||
|
||||
if not user_id:
|
||||
return res
|
||||
|
||||
# User commands
|
||||
match command:
|
||||
case 'create':
|
||||
res = do_create(
|
||||
author,
|
||||
notification['arguments'],
|
||||
notification['note_obj']
|
||||
)
|
||||
case 'signup':
|
||||
res = do_signup(author)
|
||||
case 'delete_account':
|
||||
res = delete_account(author)
|
||||
case 'confirm_delete':
|
||||
res = confirm_delete(author)
|
||||
case _:
|
||||
pass
|
||||
# if not user_is_administrator:
|
||||
return res
|
||||
|
||||
# Administrator commands go here
|
61
db.py
61
db.py
|
@ -1,61 +0,0 @@
|
|||
import sqlite3
|
||||
|
||||
# Connect to SQLite database (or create it if it doesn't exist)
|
||||
conn = sqlite3.connect('gacha_game.db')
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create tables
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT UNIQUE NOT NULL,
|
||||
has_rolled BOOLEAN NOT NULL DEFAULT 0
|
||||
)
|
||||
''')
|
||||
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS characters (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
rarity INTEGER NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
file_id TEXT NOT NULL
|
||||
)
|
||||
''')
|
||||
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS pulls (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER,
|
||||
character_id INTEGER,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (user_id) REFERENCES users(id),
|
||||
FOREIGN KEY (character_id) REFERENCES characters(id)
|
||||
)
|
||||
''')
|
||||
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
""" # Insert example characters into the database if they don't already exist
|
||||
characters = [
|
||||
('Murakami-san', 1, 0.35),
|
||||
('Mastodon-kun', 2, 0.25),
|
||||
('Pleroma-tan', 3, 0.2),
|
||||
('Misskey-tan', 4, 0.15),
|
||||
('Syuilo-mama', 5, 0.05)
|
||||
]
|
||||
|
||||
|
||||
cursor.executemany('''
|
||||
INSERT OR IGNORE INTO characters (name, rarity, weight) VALUES (?, ?, ?)
|
||||
''', characters)
|
||||
"""
|
||||
|
||||
# Commit changes and close
|
||||
conn.commit()
|
||||
conn.close()
|
|
@ -1,17 +1,35 @@
|
|||
; Rename me to config.ini and put your values in here
|
||||
[application]
|
||||
; Full fedi handle of the bot user
|
||||
BotUser = @bot@example.tld
|
||||
|
||||
; API key for the bot
|
||||
; Generate one by going to Settings > API > Generate access token
|
||||
ApiKey = abcdefghijklmnopqrstuvwxyz012345
|
||||
|
||||
; Fully qualified URL of the instance hosting the bot
|
||||
InstanceUrl = http://example.tld
|
||||
|
||||
; Comma separated list of fedi handles for any administrator users
|
||||
; More can be added through the application
|
||||
DefaultAdmins = ['admin@example.tld']
|
||||
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
|
||||
[gacha]
|
||||
; Number of seconds players have to wait between rolls
|
||||
RollInterval = 72000
|
||||
; Rarity drop weights (1 to 5 stars)
|
||||
; Format: rarity=weight per line
|
||||
; In order: common, uncommon, rare, epic and legendary (Example values below)
|
||||
Rarity_1 = 0.7
|
||||
Rarity_2 = 0.2
|
||||
Rarity_3 = 0.08
|
||||
Rarity_4 = 0.015
|
||||
Rarity_5 = 0.005
|
||||
|
||||
[notification]
|
||||
; Number of seconds to sleep while awaiting new notifications
|
||||
PollInterval = 5
|
||||
; Number of notifications to process at once (max 100)
|
||||
BatchSize = 10
|
||||
|
||||
[credentials]
|
||||
; Fully qualified URL of the instance hosting the bot
|
||||
Instance = http://example.tld
|
||||
; Full fedi handle of the bot user
|
||||
User = @bot@example.tld
|
||||
; API key for the bot
|
||||
; Generate one by going to Settings > API > Generate access token
|
||||
Token = abcdefghijklmnopqrstuvwxyz012345
|
||||
|
||||
|
|
28
migrations/0000_setup.sql
Normal file
28
migrations/0000_setup.sql
Normal file
|
@ -0,0 +1,28 @@
|
|||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT UNIQUE NOT NULL,
|
||||
has_rolled BOOLEAN NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS characters (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
rarity INTEGER NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
file_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS pulls (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER,
|
||||
character_id INTEGER,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (user_id) REFERENCES users(id),
|
||||
FOREIGN KEY (character_id) REFERENCES characters(id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT
|
||||
);
|
||||
INSERT OR IGNORE INTO config VALUES ("schema_version", 0);
|
1
migrations/0001_fix_notif_id.sql
Normal file
1
migrations/0001_fix_notif_id.sql
Normal file
|
@ -0,0 +1 @@
|
|||
INSERT OR IGNORE INTO config VALUES ("last_seen_notif_id", 0);
|
1
migrations/0002_weigh_infer.sql
Normal file
1
migrations/0002_weigh_infer.sql
Normal file
|
@ -0,0 +1 @@
|
|||
ALTER TABLE characters DROP COLUMN weight;
|
116
readme.md
116
readme.md
|
@ -2,33 +2,41 @@
|
|||
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
||||
|
||||
## Installation
|
||||
|
||||
## Roadmap
|
||||
|
||||

|
||||
|
||||
## 🔧 Features
|
||||
|
||||
### ✅ Implemented
|
||||
- 🎲 Character roll system
|
||||
- 🎴 Cards stats system
|
||||
- 🧠 Core database structure for characters and stats
|
||||
- 📦 Basic support for storing pulls per user
|
||||
- 🧠 Core database structure for cards
|
||||
- 📦 Basic support for storing pulls per player
|
||||
- ⏱️ Time-based limitations on rolls
|
||||
|
||||
### 🧩 In Progress
|
||||
- 📝 Whitelist system to limit access
|
||||
- ⏱️ Time-based limitations on rolls
|
||||
- ⚔️ Dueling system
|
||||
- ⚠️ Explicit account creation/deletion
|
||||
|
||||
## 🧠 Planned Features (Long Term)
|
||||
## 🧠 Roadmap
|
||||
|
||||
[See our v2.0 board for more details](https://git.waifuism.life/waifu/kemoverse/projects/3)
|
||||
|
||||
### 🛒 Gameplay & Collection
|
||||
- 🔁 **Trading system** between users
|
||||
- 🔁 **Trading system** between players
|
||||
- ⭐ **Favorite characters** (pin them or set profiles)
|
||||
- 📢 **Public post announcements** for rare card pulls
|
||||
- 📊 **Stats** for cards
|
||||
- 🎮 **Games** to play
|
||||
- ⚔️ Dueling
|
||||
- 🧮 **Leaderboards**
|
||||
- Most traded Characters
|
||||
- Most owned Characters
|
||||
- Most voted Characters
|
||||
- Most popular Characters (via usage-based popularity metrics)
|
||||
- Users with the rarest Characters
|
||||
- Most traded cards
|
||||
- Most owned cards
|
||||
- Most voted cards
|
||||
- Most popular cards (via usage-based popularity metrics)
|
||||
- Users with the rarest cards
|
||||
|
||||
### 🎨 Card Aesthetics
|
||||
- 🖼️ Simple card template for character rendering
|
||||
|
@ -39,7 +47,7 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
|
||||
## 🗃️ Tech Stack
|
||||
|
||||
- Python (3.11+)
|
||||
- Python (3.12+)
|
||||
- SQLite
|
||||
- Fediverse API integration (via Misskey endpoints)
|
||||
- Flask
|
||||
|
@ -49,10 +57,88 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
|
||||
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
|
||||
|
||||
## 🧪 Getting Started (coming soon)
|
||||
## 🧪 Installation
|
||||
|
||||
Instructions on installing dependencies, initializing the database, and running the bot locally will go here.
|
||||
### Download and install dependencies
|
||||
|
||||
Clone the repo
|
||||
|
||||
```sh
|
||||
git clone https://git.waifuism.life/waifu/kemoverse.git
|
||||
cd kemoverse
|
||||
```
|
||||
|
||||
Setup a virtual environment (Optional, recommended)
|
||||
|
||||
```sh
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
```
|
||||
|
||||
Install project dependencies via pip
|
||||
|
||||
```sh
|
||||
python3 -m pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### Setup config file
|
||||
|
||||
A sample config file is included with the project as a template: `example_config.ini`
|
||||
|
||||
Create a copy of this file and replace its' values with your own. Consult the
|
||||
template for more information about individual config values and their meaning.
|
||||
|
||||
Config files are environment-specific. Use `config_dev.ini` for development and
|
||||
`config_prod.ini` for production. Switch between environments using the
|
||||
`KEMOVERSE_ENV` environment variable.
|
||||
|
||||
```sh
|
||||
cp example_config.ini config_dev.ini
|
||||
# Edit config_dev.ini
|
||||
```
|
||||
|
||||
### Setup database
|
||||
|
||||
To set up the database, run:
|
||||
|
||||
```sh
|
||||
KEMOVERSE_ENV=dev python3 setup_db.py
|
||||
```
|
||||
|
||||
### Run the bot
|
||||
|
||||
```sh
|
||||
KEMOVERSE_ENV=dev ./startup.sh
|
||||
```
|
||||
|
||||
If all goes well, you should now be able to interact with the bot.
|
||||
|
||||
### Running in production
|
||||
|
||||
To run the the in a production environment, use `KEMOVERSE_ENV=prod`. You will
|
||||
also need to create a `config_prod.ini` file and run the database setup step
|
||||
again if pointing prod to a different database. (you are pointing dev and prod
|
||||
to different databases, right? 🤨)
|
||||
|
||||
### Updating
|
||||
|
||||
To update the bot, first pull new changes from upstream:
|
||||
|
||||
```sh
|
||||
git pull
|
||||
```
|
||||
|
||||
Then run any database migrations. We recommend testing in dev beforehand to
|
||||
make sure nothing breaks in the update process.
|
||||
|
||||
**Always backup your prod database before running any migrations!**
|
||||
|
||||
```sh
|
||||
# Backup database file
|
||||
cp gacha_game_dev.db gacha_game_dev.db.bak
|
||||
# Run migrations
|
||||
KEMOVERSE_ENV=dev python3 setup_db.py
|
||||
```
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
|
|
129
setup_db.py
Normal file
129
setup_db.py
Normal file
|
@ -0,0 +1,129 @@
|
|||
import sqlite3
|
||||
import traceback
|
||||
import os
|
||||
import argparse
|
||||
from configparser import ConfigParser
|
||||
from typing import List, Tuple
|
||||
|
||||
class DBNotFoundError(Exception):
|
||||
'''Could not find the database location'''
|
||||
|
||||
class InvalidMigrationError(Exception):
|
||||
'''Migration file has an invalid name'''
|
||||
|
||||
class KemoverseEnvUnset(Exception):
|
||||
'''KEMOVERSE_ENV is not set or has an invalid value'''
|
||||
|
||||
class ConfigError(Exception):
|
||||
'''Could not find the config file for the current environment'''
|
||||
|
||||
def get_migrations() -> List[Tuple[int, str]] | InvalidMigrationError:
|
||||
'''Returns a list of migration files in numeric order.'''
|
||||
# Store transaction id and filename separately
|
||||
sql_files: List[Tuple[int, str]] = []
|
||||
migrations_dir = 'migrations'
|
||||
|
||||
for filename in os.listdir(migrations_dir):
|
||||
joined_path = os.path.join(migrations_dir, filename)
|
||||
|
||||
# Ignore anything that isn't a .sql file
|
||||
if not (os.path.isfile(joined_path) and filename.endswith('.sql')):
|
||||
print(f'{filename} is not a .sql file, ignoring...')
|
||||
continue
|
||||
|
||||
parts = filename.split('_', 1)
|
||||
|
||||
# Invalid filename format
|
||||
if len(parts) < 2 or not parts[0].isdigit():
|
||||
raise InvalidMigrationError(f'Invalid migration file: {filename}')
|
||||
|
||||
sql_files.append((int(parts[0]), joined_path))
|
||||
|
||||
# Get sorted list of files by migration number
|
||||
sql_files.sort(key=lambda x: x[0])
|
||||
return sql_files
|
||||
|
||||
def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> None:
|
||||
'''Performs a migration on the DB'''
|
||||
print(f'Performing migration {migration[1]}...')
|
||||
|
||||
# Open and execute the sql script
|
||||
with open(migration[1], encoding='utf-8') as file:
|
||||
script = file.read()
|
||||
cursor.executescript(script)
|
||||
# Update the schema version
|
||||
cursor.execute('UPDATE config SET value = ? WHERE key = "schema_version"', (migration[0],))
|
||||
|
||||
def get_db_path() -> str | DBNotFoundError:
|
||||
'''Gets the DB path from config.ini'''
|
||||
env = os.environ.get('KEMOVERSE_ENV')
|
||||
if not (env and env in ['prod', 'dev']):
|
||||
raise KemoverseEnvUnset
|
||||
|
||||
print(f'Running in "{env}" mode')
|
||||
|
||||
config_path = f'config_{env}.ini'
|
||||
|
||||
if not os.path.isfile(config_path):
|
||||
raise ConfigError(f'Could not find {config_path}')
|
||||
|
||||
config = ConfigParser()
|
||||
config.read(config_path)
|
||||
db_path = config['application']['DatabaseLocation']
|
||||
if not db_path:
|
||||
raise DBNotFoundError()
|
||||
return db_path
|
||||
|
||||
def get_current_migration(cursor: sqlite3.Cursor) -> int:
|
||||
'''Gets the current schema version of the database'''
|
||||
try:
|
||||
cursor.execute('SELECT value FROM config WHERE key = ?', ('schema_version',))
|
||||
version = cursor.fetchone()
|
||||
return -1 if not version else int(version[0])
|
||||
except sqlite3.Error:
|
||||
print('Error getting schema version')
|
||||
# Database has not been initialized yet
|
||||
return -1
|
||||
|
||||
def main():
|
||||
'''Does the thing'''
|
||||
# Connect to the DB
|
||||
db_path = ''
|
||||
try:
|
||||
db_path = get_db_path()
|
||||
except ConfigError as ex:
|
||||
print(ex)
|
||||
return
|
||||
except KemoverseEnvUnset:
|
||||
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
|
||||
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
|
||||
print(traceback.format_exc())
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(db_path, autocommit=False)
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Obtain list of migrations to run
|
||||
migrations = get_migrations()
|
||||
# Determine schema version
|
||||
current_migration = get_current_migration(cursor)
|
||||
print(f'Current schema version: {current_migration}')
|
||||
|
||||
# Run any migrations newer than current schema
|
||||
for migration in migrations:
|
||||
if migration[0] <= current_migration:
|
||||
print(f'Migration already up: {migration[1]}')
|
||||
continue
|
||||
try:
|
||||
perform_migration(cursor, migration)
|
||||
conn.commit()
|
||||
except Exception as ex:
|
||||
print(f'An error occurred while applying {migration[1]}: {ex}, aborting...')
|
||||
print(traceback.format_exc())
|
||||
conn.rollback()
|
||||
break
|
||||
conn.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
14
web/app.py
14
web/app.py
|
@ -1,6 +1,8 @@
|
|||
from flask import Flask, render_template
|
||||
import sqlite3
|
||||
|
||||
from flask import Flask, render_template, abort
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
app = Flask(__name__)
|
||||
DB_PATH = "./gacha_game.db" # Adjust path if needed
|
||||
|
||||
|
@ -9,6 +11,14 @@ def get_db_connection():
|
|||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
@app.errorhandler(HTTPException)
|
||||
def handle_exception(error):
|
||||
return render_template("_error.html", error=error), error.code
|
||||
|
||||
@app.route("/i404")
|
||||
def i404():
|
||||
return abort(404)
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
conn = get_db_connection()
|
||||
|
@ -33,6 +43,8 @@ def user_profile(user_id):
|
|||
|
||||
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
|
||||
user = cursor.fetchone()
|
||||
if user is None:
|
||||
abort(404)
|
||||
|
||||
cursor.execute('''
|
||||
SELECT pulls.timestamp, characters.name as character_name, characters.rarity
|
||||
|
|
92
web/static/style.css
Normal file
92
web/static/style.css
Normal file
|
@ -0,0 +1,92 @@
|
|||
body {
|
||||
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;
|
||||
background-color: #f4f6fa;
|
||||
color: #333;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
header {
|
||||
background-color: #7289da;
|
||||
color: white;
|
||||
padding: 20px;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
header h1 {
|
||||
margin: 0;
|
||||
font-size: 2.5em;
|
||||
}
|
||||
|
||||
header p {
|
||||
margin-top: 5px;
|
||||
font-size: 1.1em;
|
||||
}
|
||||
|
||||
.container {
|
||||
max-width: 800px;
|
||||
margin: 30px auto;
|
||||
padding: 20px;
|
||||
background-color: #ffffff;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 3px 10px rgba(0, 0, 0, 0.07);
|
||||
}
|
||||
|
||||
h2 {
|
||||
border-bottom: 1px solid #ccc;
|
||||
padding-bottom: 8px;
|
||||
margin-top: 30px;
|
||||
}
|
||||
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding-left: 0;
|
||||
}
|
||||
|
||||
li {
|
||||
margin: 10px 0;
|
||||
}
|
||||
|
||||
a {
|
||||
text-decoration: none;
|
||||
color: #2c3e50;
|
||||
font-weight: bold;
|
||||
background-color: #e3eaf3;
|
||||
padding: 8px 12px;
|
||||
border-radius: 6px;
|
||||
display: inline-block;
|
||||
transition: background-color 0.2s;
|
||||
}
|
||||
|
||||
a:hover {
|
||||
background-color: #cdd8e6;
|
||||
}
|
||||
|
||||
.leaderboard-entry {
|
||||
margin-bottom: 8px;
|
||||
padding: 6px 10px;
|
||||
background: #f9fafc;
|
||||
border-left: 4px solid #7289da;
|
||||
border-radius: 5px;
|
||||
}
|
||||
|
||||
footer {
|
||||
text-align: center;
|
||||
margin-top: 40px;
|
||||
font-size: 0.9em;
|
||||
color: #888;
|
||||
}
|
||||
|
||||
.note {
|
||||
background: #fcfcf0;
|
||||
border: 1px dashed #bbb;
|
||||
padding: 10px;
|
||||
border-radius: 8px;
|
||||
margin-top: 30px;
|
||||
font-size: 0.95em;
|
||||
color: #666;
|
||||
}
|
||||
|
||||
.footer-link {
|
||||
margin: 0 10px;
|
||||
}
|
30
web/templates/_base.html
Normal file
30
web/templates/_base.html
Normal file
|
@ -0,0 +1,30 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='style.css') }}">
|
||||
<link rel="shortcut icon" href="{{ url_for('static', filename='logo.png') }}">
|
||||
<title>
|
||||
{% if title %}
|
||||
{{ title }}
|
||||
{% else %}
|
||||
{% block title %}{% endblock %}
|
||||
{% endif %}
|
||||
| Kemoverse
|
||||
</title>
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
{% block header %}{% endblock %}
|
||||
</header>
|
||||
|
||||
<div class="container">
|
||||
{% block content %}{% endblock %}
|
||||
</div>
|
||||
|
||||
<footer>
|
||||
<a class="footer-link" href="{{ url_for('about') }}">About</a>
|
||||
<a class="footer-link" href="{{ url_for('submit_character') }}">Submit a Character</a>
|
||||
{% block footer_extra %}{% endblock %}
|
||||
</footer>
|
||||
</body>
|
||||
</html>
|
8
web/templates/_error.html
Normal file
8
web/templates/_error.html
Normal file
|
@ -0,0 +1,8 @@
|
|||
{% extends "_base.html" %}
|
||||
{% block title %}
|
||||
{{ error.code }}
|
||||
{% endblock %}
|
||||
{% block content %}
|
||||
<h2>{{ error.code }} - {{ error.name }} </h2>
|
||||
<p>{{ error.description }}</p>
|
||||
{% endblock %}
|
|
@ -1,13 +1,9 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>About - Misskey Gacha Center</title>
|
||||
</head>
|
||||
<body>
|
||||
{% extends "_base.html" %}
|
||||
|
||||
{% block content %}
|
||||
<h1>About This Gacha</h1>
|
||||
<p>This is a playful Misskey-themed gacha tracker made with Flask and SQLite.</p>
|
||||
<p>All rolls are stored, stats are tracked, and characters are added manually for now.</p>
|
||||
<p>Built with love, chaos, and way too much caffeine ☕.</p>
|
||||
<a href="{{ url_for('index') }}">← Back to Home</a>
|
||||
</body>
|
||||
</html>
|
||||
{% endblock %}
|
|
@ -1,110 +1,11 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Misskey Gacha Center</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;
|
||||
background-color: #f4f6fa;
|
||||
color: #333;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
{% extends "_base.html" %}
|
||||
|
||||
header {
|
||||
background-color: #7289da;
|
||||
color: white;
|
||||
padding: 20px;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
header h1 {
|
||||
margin: 0;
|
||||
font-size: 2.5em;
|
||||
}
|
||||
|
||||
header p {
|
||||
margin-top: 5px;
|
||||
font-size: 1.1em;
|
||||
}
|
||||
|
||||
.container {
|
||||
max-width: 800px;
|
||||
margin: 30px auto;
|
||||
padding: 20px;
|
||||
background-color: #ffffff;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 3px 10px rgba(0, 0, 0, 0.07);
|
||||
}
|
||||
|
||||
h2 {
|
||||
border-bottom: 1px solid #ccc;
|
||||
padding-bottom: 8px;
|
||||
margin-top: 30px;
|
||||
}
|
||||
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding-left: 0;
|
||||
}
|
||||
|
||||
li {
|
||||
margin: 10px 0;
|
||||
}
|
||||
|
||||
a {
|
||||
text-decoration: none;
|
||||
color: #2c3e50;
|
||||
font-weight: bold;
|
||||
background-color: #e3eaf3;
|
||||
padding: 8px 12px;
|
||||
border-radius: 6px;
|
||||
display: inline-block;
|
||||
transition: background-color 0.2s;
|
||||
}
|
||||
|
||||
a:hover {
|
||||
background-color: #cdd8e6;
|
||||
}
|
||||
|
||||
.leaderboard-entry {
|
||||
margin-bottom: 8px;
|
||||
padding: 6px 10px;
|
||||
background: #f9fafc;
|
||||
border-left: 4px solid #7289da;
|
||||
border-radius: 5px;
|
||||
}
|
||||
|
||||
footer {
|
||||
text-align: center;
|
||||
margin-top: 40px;
|
||||
font-size: 0.9em;
|
||||
color: #888;
|
||||
}
|
||||
|
||||
.note {
|
||||
background: #fcfcf0;
|
||||
border: 1px dashed #bbb;
|
||||
padding: 10px;
|
||||
border-radius: 8px;
|
||||
margin-top: 30px;
|
||||
font-size: 0.95em;
|
||||
color: #666;
|
||||
}
|
||||
|
||||
.footer-link {
|
||||
margin: 0 10px;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<header>
|
||||
{% block header %}
|
||||
<h1>Misskey Gacha Center</h1>
|
||||
<p>Track your luck. Compare your pulls. Compete with friends.</p>
|
||||
</header>
|
||||
{% endblock %}
|
||||
|
||||
<div class="container">
|
||||
{% block content %}
|
||||
|
||||
<h2>🎖️ Leaderboard: Most Rolls</h2>
|
||||
{% for user in top_users %}
|
||||
|
@ -125,13 +26,4 @@
|
|||
<div class="note">
|
||||
🚀 This is a fun little gacha tracker! More features coming soon. Want to help shape it?
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
<footer>
|
||||
<a class="footer-link" href="{{ url_for('about') }}">About</a>
|
||||
<a class="footer-link" href="{{ url_for('submit_character') }}">Submit a Character</a>
|
||||
</footer>
|
||||
|
||||
</body>
|
||||
</html>
|
||||
{% endblock %}
|
|
@ -1,12 +1,8 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Submit a Character - Misskey Gacha Center</title>
|
||||
</head>
|
||||
<body>
|
||||
{% extends "_base.html" %}
|
||||
|
||||
{% block content %}
|
||||
<h1>Submit a Character</h1>
|
||||
<p>Want to add a new character to the gacha pool?</p>
|
||||
<p>This feature will be available soon. Stay tuned!</p>
|
||||
<a href="{{ url_for('index') }}">← Back to Home</a>
|
||||
</body>
|
||||
</html>
|
||||
{% endblock %}
|
|
@ -1,57 +1,5 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>{{ user['username'] }}'s Rolls</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: Arial, sans-serif;
|
||||
background-color: #f4f4f8;
|
||||
margin: 0;
|
||||
padding: 20px;
|
||||
}
|
||||
.profile, .pulls {
|
||||
background-color: white;
|
||||
padding: 15px;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 0 10px rgba(0,0,0,0.1);
|
||||
margin-bottom: 20px;
|
||||
}
|
||||
h1, h2 {
|
||||
margin-top: 0;
|
||||
}
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding: 0;
|
||||
}
|
||||
li {
|
||||
padding: 10px 0;
|
||||
border-bottom: 1px solid #eee;
|
||||
}
|
||||
.rarity {
|
||||
color: gold;
|
||||
font-weight: bold;
|
||||
margin-left: 8px;
|
||||
}
|
||||
.timestamp {
|
||||
color: #888;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
a {
|
||||
display: inline-block;
|
||||
margin-top: 20px;
|
||||
color: #333;
|
||||
text-decoration: none;
|
||||
background-color: #ddd;
|
||||
padding: 8px 12px;
|
||||
border-radius: 5px;
|
||||
}
|
||||
a:hover {
|
||||
background-color: #bbb;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
{% extends "_base.html" %}
|
||||
{% block content %}
|
||||
<div class="profile">
|
||||
<h1>{{ user['username'] }}'s Gacha Rolls</h1>
|
||||
<p>User ID: {{ user['id'] }}</p>
|
||||
|
@ -72,6 +20,4 @@
|
|||
</div>
|
||||
|
||||
<a href="{{ url_for('index') }}">← Back to Users</a>
|
||||
|
||||
</body>
|
||||
</html>
|
||||
{% endblock %}
|
Loading…
Add table
Reference in a new issue