1
0
Fork 0
forked from waifu/kemoverse

Compare commits

..

27 commits

Author SHA1 Message Date
eaa6308906 Fix case sensitivity for bot username 2025-05-31 22:54:04 +01:00
7d0ddc492c Merge pull request 'Add multi-env support' (#39) from 38_install_instructions into dev
Reviewed-on: waifu/kemoverse#39
2025-05-30 00:03:43 -03:00
de7670204a Replace numbered list with h3 2025-05-29 13:36:30 +01:00
846130771e Update vocabulary 2025-05-29 13:34:39 +01:00
4f853df32c Fix spacing in readme again 2025-05-29 13:32:48 +01:00
8fb91f7754 Fix spacing in readme 2025-05-29 13:32:26 +01:00
37ac7dbb0c Add multi-env support 2025-05-29 13:27:56 +01:00
26f23a1354 Merge branch 'master' into dev 2025-05-29 12:22:02 +01:00
36c65b3ae6 Merge pull request 'Add Database Migration System' (#36) from 34_database_migrations into dev
Reviewed-on: waifu/kemoverse#36
2025-05-29 04:01:40 -03:00
f3a32f1176 Update exceptions again 2025-05-26 20:54:52 +01:00
6912758a44 Update exceptions 2025-05-26 20:53:04 +01:00
25a72b3002 Add database migration system 2025-05-26 20:49:21 +01:00
d210f44efc Merge pull request 'Implement Database Connection Pooling' (#31) from 27_pool_db_connections into dev
Reviewed-on: waifu/kemoverse#31
2025-05-26 09:24:18 -03:00
bdd2f20b84 Implement connection pooling 2025-05-26 13:11:48 +01:00
0690ac5212 Merge pull request 'fix: initialize last_seen_notif_id in db.py' (#29) from nyanide/kemoverse:cool_fix into dev
Looking sharp, thanks
Reviewed-on: waifu/kemoverse#29
2025-05-25 00:46:30 -03:00
e8774cb8bd initialize last_seen_notif_id in db.py 2025-05-24 21:15:16 -05:00
d9027356ab Merge pull request 'Remove 'Invalid command' Response' (#24) from fix/remove_invalid_command_response into dev
Reviewed-on: waifu/kemoverse#24
2025-05-24 19:09:08 -03:00
4a7c9239fc Remove 'Invalid command response 2025-05-24 22:43:53 +01:00
8a331e0c7b Merge pull request 'Feat: Add a roll timeout to the bot' (#23) from feat/roll_timeout into dev
Reviewed-on: waifu/kemoverse#23
2025-05-24 01:59:18 -03:00
47272aee4f Add a roll timeout to the bot 2025-05-23 23:44:03 +01:00
w
ff20e26821 Whitelist fix 2025-05-22 00:20:32 -03:00
e53bd87d81 Merge pull request 'Overhaul notification parsing' (#12) from 9_fix_bot_app_py into dev
Reviewed-on: waifu/kemoverse#12
2025-05-21 00:28:49 -03:00
9be92afce3 Fix notification loop 2025-05-20 09:21:06 +01:00
d2a7e523e8 Overhaul notification parsing 2025-05-19 19:53:59 +01:00
45dd6f10e3 Merge branch 'dev' into 9_fix_bot_app_py 2025-05-19 18:01:49 +01:00
w
24309ce900 Gacha_response changed to response 2025-05-19 00:14:47 -03:00
w
6b6777cb89 docs typo 2025-05-18 18:43:50 -03:00
15 changed files with 624 additions and 348 deletions

5
.gitignore vendored
View file

@ -181,5 +181,6 @@ cython_debug/
.cursorindexingignore
# Custom stuff
gacha_game.db
config.ini
gacha_game*.db
gacha_game*.db.*
config*.ini

View file

@ -1,11 +1,12 @@
import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import get_db_connection
from db_utils import insert_character
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
"""
Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
Adds a character to the database, uploading the image from a public URL to
the bot's Misskey Drive.
Args:
name (str): Character name.
@ -20,44 +21,29 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
"""
# Validate inputs
if not name or not name.strip():
raise ValueError("Character name cannot be empty.")
if not isinstance(rarity, int) or rarity < 1:
raise ValueError("Rarity must be a positive integer.")
if not isinstance(weight, (int, float)) or weight <= 0:
raise ValueError("Weight must be a positive number.")
if not image_url:
raise ValueError("Image URL must be provided.")
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
# Upload to bot's Drive
mk = client_connection()
try:
# Validate inputs
if not name or not name.strip():
raise ValueError("Character name cannot be empty.")
if not isinstance(rarity, int) or rarity < 1:
raise ValueError("Rarity must be a positive integer.")
if not isinstance(weight, (int, float)) or weight <= 0:
raise ValueError("Weight must be a positive number.")
if not image_url:
raise ValueError("Image URL must be provided.")
media = mk.drive_files_create(response.raw)
file_id = media["id"]
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
# Download image
response = requests.get(image_url, stream=True)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media["id"]
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
# Insert into database
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
(name.strip(), rarity, float(weight), file_id)
)
conn.commit()
character_id = cur.lastrowid
return character_id, file_id
except Exception as e:
raise
finally:
if 'conn' in locals():
conn.close()
# Insert into database
character_id = insert_character(name.strip(), rarity, float(weight), file_id)
return character_id, file_id

View file

@ -1,79 +1,18 @@
import time
import traceback
import misskey
from parsing import parse_notification
from db_utils import get_or_create_user, add_pull, get_config, set_config
import misskey as misskey
from client import client_connection
import db_utils as db
# Initialize the Misskey client
client = client_connection()
from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications
# Define your whitelist
# TODO: move to config
whitelisted_instances: list[str] = []
def stream_notifications():
print("Starting filtered notification stream...")
last_seen_id = get_config("last_seen_notif_id")
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
# Connect to DB
db.connect()
print('Listening for notifications...')
while True:
try:
# May be able to mark notifications as read using misskey.py and
# filter them out here. This function also takes a since_id we
# could use as well
notifications = client.i_notifications()
if notifications:
# Oldest to newest
notifications.reverse()
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.get("id")
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host") # None if local user
instance = host if host else "local"
if instance in whitelisted_instances or instance == "local":
note = notification.get("note", {}).get("text", "")
notif_type = notification.get("type", "unknown")
print(f"📨 [{notif_type}] from @{username}@{instance}")
print(f"💬 {note}")
print("-" * 30)
# 🧠 Send to the parser
parse_notification(notification,client)
else:
print(f"⚠️ Blocked notification from untrusted instance: {host}")
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()
if not process_notifications(client):
time.sleep(NOTIFICATION_POLL_INTERVAL)

View file

@ -1,22 +1,42 @@
'''Essentials for the bot to function'''
import configparser
from os import environ, path
class ConfigError(Exception):
'''Could not find config file'''
def get_config_file() -> str:
'''Gets the path to the config file in the current environment'''
env: str | None = environ.get('KEMOVERSE_ENV')
if not env:
raise ConfigError('Error: KEMOVERSE_ENV is unset')
if not (env in ['prod', 'dev']):
raise ConfigError(f'Error: Invalid environment: {env}')
config_path: str = f'config_{env}.ini'
if not path.isfile(config_path):
raise ConfigError(f'Could not find {config_path}')
return config_path
config = configparser.ConfigParser()
config.read('config.ini')
config.read(get_config_file())
# Username for the bot
USER = config['application']['BotUser']
USER = config['credentials']['User'].lower()
# API key for the bot
KEY = config['application']['ApiKey']
KEY = config['credentials']['Token']
# Bot's Misskey instance URL
INSTANCE = config['application']['InstanceUrl']
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
# Extra stuff for control of the bot
INSTANCE = config['credentials']['Instance'].lower()
# TODO: move this to db
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot'''
ADMINS = config['application']['DefaultAdmins']
# to use extra admin exclusive commands with the bot
ADMINS = config['application']['DefaultAdmins']
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])

View file

@ -1,58 +1,79 @@
from random import choices
import sqlite3
import random
import config
DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection
CURSOR: sqlite3.Cursor
def get_db_connection():
def connect() -> None:
'''Creates a connection to the database'''
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
print('Connecting to the database...')
global CONNECTION
global CURSOR
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
CONNECTION.row_factory = sqlite3.Row
CURSOR = CONNECTION.cursor()
def get_random_character():
''' Gets a random character from the database'''
CURSOR.execute('SELECT * FROM characters')
characters = CURSOR.fetchall()
if not characters:
return None, None, None, None
weights = [c['weight'] for c in characters]
chosen = choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
def get_or_create_user(username):
'''Retrieves an ID for a given user, if the user does not exist, it will be
created.'''
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
user = cur.fetchone()
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
user = CURSOR.fetchone()
if user:
conn.close()
return user[0]
# New user starts with has_rolled = False
cur.execute(
CURSOR.execute(
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
(username, False)
)
conn.commit()
user_id = cur.lastrowid
conn.close()
user_id = CURSOR.lastrowid
return user_id
def add_pull(user_id, character_id):
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a character'''
CURSOR.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
(name, rarity, weight, file_id)
)
character_id = CURSOR.lastrowid
return character_id if character_id else 0
def insert_pull(user_id, character_id):
'''Creates a pull in the database'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
conn.commit()
conn.close()
CURSOR.execute(
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
(user_id, character_id)
)
def get_last_rolled_at(user_id):
'''Gets the timestamp when the user last rolled'''
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
(user_id,))
row = CURSOR.fetchone()
return row[0] if row else None
def get_config(key):
'''Reads the value for a specified config key from the db'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
row = cur.fetchone()
conn.close()
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
row = CURSOR.fetchone()
return row[0] if row else None
def set_config(key, value):
'''Writes the value for a specified config key to the db'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
conn.commit()
conn.close()
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))

View file

@ -1,69 +0,0 @@
import random
from db_utils import get_or_create_user, add_pull, get_db_connection
from add_character import add_character
def get_character():
''' Gets a random character from the database'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM characters')
characters = cur.fetchall()
conn.close()
if not characters:
return None, None, None, None
weights = [c['weight'] for c in characters]
chosen = random.choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
def is_float(val):
try:
float(val)
return True
except ValueError:
return False
# TODO: See issue #3, separate command parsing from game logic.
def gacha_response(command,full_user, arguments,note_obj):
'''Parses a given command with arguments, processes the game state and
returns a response'''
if command == "roll":
user_id = get_or_create_user(full_user)
character_id, character_name, file_id, rarity = get_character()
if not character_id:
#TODO: Can't have tuples of a single element
# Return these as a dict or object instead.
return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿")
add_pull(user_id,character_id)
stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]])
if command == "create":
# Example call from bot logic
image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None
if not image_url:
return "You need an image to create a character, dumbass."
if len(arguments) != 3:
return "Please specify the following attributes in order: name, rarity, drop weighting"
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5"
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0"
character_id, file_id = add_character(
name=arguments[0],
rarity=int(arguments[1]),
weight=float(arguments[2]),
image_url=image_url
)
return([f"Added {arguments[0]}, ID {character_id}.",[file_id]])
return None

115
bot/notification.py Normal file
View file

@ -0,0 +1,115 @@
import traceback
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification
from db_utils import get_config, set_config
from response import generate_response
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
def process_notification(client, notification):
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
host = user.get('host') # None if local user
instance = host if host else 'local'
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
# Copy visibility of the post that was received when replying (so if people
# don't want to dump a bunch of notes on home they don't have to)
visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
notif_type = notification.get('type', 'unknown')
notif_id = notification.get('id')
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_command = parse_notification(notification, client)
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
# handle two cases here: either we have a response, or we don't.
# TODO: Return dictionaries instead of tuples. They handle multiple
# elements a lot better as they're not position dependent
response = generate_response(parsed_command)
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client):
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False

View file

@ -1,23 +1,11 @@
import random, re
import config
from gacha_response import gacha_response
def parse_notification(notification,client):
'''Oarses any notifications received by the bot and sends any commands to
'''Parses any notifications received by the bot and sends any commands to
gacha_response()'''
# We get the type of notification to filter the ones that we actually want
# to parse
notif_type = notification.get("type")
if not notif_type in ('mention', 'reply'):
return # Ignore anything that isn't a mention
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
visibility = notification["note"]["visibility"]
if visibility != "specified":
visibility = "home"
# Get the full Activitypub ID of the user
user = notification.get("user", {})
@ -50,22 +38,4 @@ def parse_notification(notification,client):
command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else []
# TODO: move response generation to a different function
response = gacha_response(command.lower(),full_user, arguments, note_obj)
if not response:
return
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
else:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
return [command,full_user, arguments, note_obj]

100
bot/response.py Normal file
View file

@ -0,0 +1,100 @@
from datetime import datetime, timedelta, timezone
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character
from add_character import add_character
from config import GACHA_ROLL_INTERVAL
def do_roll(full_user):
'''Determines whether the user can roll, then pulls a random character'''
user_id = get_or_create_user(full_user)
# Get date of user's last roll
date = get_last_rolled_at(user_id)
# No date means it's users first roll
if date:
# SQLite timestamps returned by the DB are always in UTC
# Below timestamps are to be converted to UTC
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z')
now = datetime.now(timezone.utc)
time_since_last_roll = now - prev
roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL)
duration = roll_interval - time_since_last_roll
# User needs to wait before they can roll again
if time_since_last_roll < roll_interval:
remaining_duration = None
if duration.seconds > 3600:
remaining_duration = f'{-(duration.seconds // -3600)} hours'
elif duration.seconds > 60:
remaining_duration = f'{-(duration.seconds // -60)} minutes'
else:
remaining_duration = f'{duration.seconds} seconds'
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
character_id, character_name, file_id, rarity = get_random_character()
if not character_id:
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
insert_pull(user_id,character_id)
stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
She's all yours now~ 💖✨",[file_id]])
def is_float(val):
'''Returns true if `val` can be converted to a float'''
try:
float(val)
return True
except ValueError:
return False
def do_create(full_user, arguments, note_obj):
'''Creates a character'''
# Example call from bot logic
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None
if not image_url:
return f'{full_user}{full_user} You need an image to create a character, dumbass.'
if len(arguments) != 3:
return '{full_user}Please specify the following attributes in order: \
name, rarity, drop weighting'
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5'
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0'
character_id, file_id = add_character(
name=arguments[0],
rarity=int(arguments[1]),
weight=float(arguments[2]),
image_url=image_url
)
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
def do_help(full_user):
'''Provides a list of commands that the bot can do.'''
return f'{full_user} Here\'s what I can do:\n \
- `roll` Pulls a random character.\
- `create <name> <rarity> <weight>` Creates a character using a given image.\
- `help` Shows this message'
def generate_response(parsed_command):
'''Given a command with arguments, processes the game state and
returns a response'''
command, full_user, arguments, note_obj = parsed_command
match command:
case 'roll':
return do_roll(full_user)
case 'create':
return do_create(full_user, arguments, note_obj)
case 'help':
return do_help(command)
case _:
return None

61
db.py
View file

@ -1,61 +0,0 @@
import sqlite3
# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect('gacha_game.db')
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
has_rolled BOOLEAN NOT NULL DEFAULT 0
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS characters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
rarity INTEGER NOT NULL,
weight REAL NOT NULL,
file_id TEXT NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS pulls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
character_id INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (character_id) REFERENCES characters(id)
)
''')
cursor.execute("""
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT
)
""")
""" # Insert example characters into the database if they don't already exist
characters = [
('Murakami-san', 1, 0.35),
('Mastodon-kun', 2, 0.25),
('Pleroma-tan', 3, 0.2),
('Misskey-tan', 4, 0.15),
('Syuilo-mama', 5, 0.05)
]
cursor.executemany('''
INSERT OR IGNORE INTO characters (name, rarity, weight) VALUES (?, ?, ?)
''', characters)
"""
# Commit changes and close
conn.commit()
conn.close()

View file

@ -1,17 +1,27 @@
; Rename me to config.ini and put your values in here
[application]
; Full fedi handle of the bot user
BotUser = @bot@example.tld
; Comma separated list of fedi handles for any administrator users
; More can be added through the application
DefaultAdmins = ['admin@example.tld']
; SQLite Database location
DatabaseLocation = ./gacha_game.db
[gacha]
; Number of seconds players have to wait between rolls
RollInterval = 72000
[notification]
; Number of seconds to sleep while awaiting new notifications
PollInterval = 5
; Number of notifications to process at once (max 100)
BatchSize = 10
[credentials]
; Fully qualified URL of the instance hosting the bot
Instance = http://example.tld
; Full fedi handle of the bot user
User = @bot@example.tld
; API key for the bot
; Generate one by going to Settings > API > Generate access token
ApiKey = abcdefghijklmnopqrstuvwxyz012345
Token = abcdefghijklmnopqrstuvwxyz012345
; Fully qualified URL of the instance hosting the bot
InstanceUrl = http://example.tld
; Comma separated list of fedi handles for any administrator users
DefaultAdmins = ['admin@example.tld']
; SQLite Database location
DatabaseLocation = ./gacha_game.db

28
migrations/0000_setup.sql Normal file
View file

@ -0,0 +1,28 @@
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
has_rolled BOOLEAN NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS characters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
rarity INTEGER NOT NULL,
weight REAL NOT NULL,
file_id TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS pulls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
character_id INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (character_id) REFERENCES characters(id)
);
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT
);
INSERT OR IGNORE INTO config VALUES ("schema_version", 0);

View file

@ -0,0 +1 @@
INSERT OR IGNORE INTO config VALUES ("last_seen_notif_id", 0);

116
readme.md
View file

@ -2,33 +2,41 @@
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
## Installation
## Roadmap
![Fediverse Gacha Bot Logo](./web/static/logo.png)
## 🔧 Features
### ✅ Implemented
- 🎲 Character roll system
- 🎴 Cards stats system
- 🧠 Core database structure for characters and stats
- 📦 Basic support for storing pulls per user
- 🧠 Core database structure for cards
- 📦 Basic support for storing pulls per player
- ⏱️ Time-based limitations on rolls
### 🧩 In Progress
- 📝 Whitelist system to limit access
- ⏱️ Time-based limitations on rolls
- ⚔️ Dueling system
- ⚠️ Explicit account creation/deletion
## 🧠 Planned Features (Long Term)
## 🧠 Roadmap
[See our v2.0 board for more details](https://git.waifuism.life/waifu/kemoverse/projects/3)
### 🛒 Gameplay & Collection
- 🔁 **Trading system** between users
- 🔁 **Trading system** between players
- ⭐ **Favorite characters** (pin them or set profiles)
- 📢 **Public post announcements** for rare card pulls
- 📊 **Stats** for cards
- 🎮 **Games** to play
- ⚔️ Dueling
- 🧮 **Leaderboards**
- Most traded Characters
- Most owned Characters
- Most voted Characters
- Most popular Characters (via usage-based popularity metrics)
- Users with the rarest Characters
- Most traded cards
- Most owned cards
- Most voted cards
- Most popular cards (via usage-based popularity metrics)
- Users with the rarest cards
### 🎨 Card Aesthetics
- 🖼️ Simple card template for character rendering
@ -39,7 +47,7 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
## 🗃️ Tech Stack
- Python (3.11+)
- Python (3.12+)
- SQLite
- Fediverse API integration (via Misskey endpoints)
- Flask
@ -49,10 +57,88 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
## 🧪 Getting Started (coming soon)
## 🧪 Installation
Instructions on installing dependencies, initializing the database, and running the bot locally will go here.
### Download and install dependencies
Clone the repo
```sh
git clone https://git.waifuism.life/waifu/kemoverse.git
cd kemoverse
```
Setup a virtual environment (Optional, recommended)
```sh
python3 -m venv venv
source venv/bin/activate
```
Install project dependencies via pip
```sh
python3 -m pip install -r requirements.txt
```
### Setup config file
A sample config file is included with the project as a template: `example_config.ini`
Create a copy of this file and replace its' values with your own. Consult the
template for more information about individual config values and their meaning.
Config files are environment-specific. Use `config_dev.ini` for development and
`config_prod.ini` for production. Switch between environments using the
`KEMOVERSE_ENV` environment variable.
```sh
cp example_config.ini config_dev.ini
# Edit config_dev.ini
```
### Setup database
To set up the database, run:
```sh
KEMOVERSE_ENV=dev python3 setup_db.py
```
### Run the bot
```sh
KEMOVERSE_ENV=dev ./startup.sh
```
If all goes well, you should now be able to interact with the bot.
### Running in production
To run the the in a production environment, use `KEMOVERSE_ENV=prod`. You will
also need to create a `config_prod.ini` file and run the database setup step
again if pointing prod to a different database. (you are pointing dev and prod
to different databases, right? 🤨)
### Updating
To update the bot, first pull new changes from upstream:
```sh
git pull
```
Then run any database migrations. We recommend testing in dev beforehand to
make sure nothing breaks in the update process.
**Always backup your prod database before running any migrations!**
```sh
# Backup database file
cp gacha_game_dev.db gacha_game_dev.db.bak
# Run migrations
KEMOVERSE_ENV=dev python3 setup_db.py
```
```mermaid
flowchart TD

129
setup_db.py Normal file
View file

@ -0,0 +1,129 @@
import sqlite3
import traceback
import os
import argparse
from configparser import ConfigParser
from typing import List, Tuple
class DBNotFoundError(Exception):
'''Could not find the database location'''
class InvalidMigrationError(Exception):
'''Migration file has an invalid name'''
class KemoverseEnvUnset(Exception):
'''KEMOVERSE_ENV is not set or has an invalid value'''
class ConfigError(Exception):
'''Could not find the config file for the current environment'''
def get_migrations() -> List[Tuple[int, str]] | InvalidMigrationError:
'''Returns a list of migration files in numeric order.'''
# Store transaction id and filename separately
sql_files: List[Tuple[int, str]] = []
migrations_dir = 'migrations'
for filename in os.listdir(migrations_dir):
joined_path = os.path.join(migrations_dir, filename)
# Ignore anything that isn't a .sql file
if not (os.path.isfile(joined_path) and filename.endswith('.sql')):
print(f'{filename} is not a .sql file, ignoring...')
continue
parts = filename.split('_', 1)
# Invalid filename format
if len(parts) < 2 or not parts[0].isdigit():
raise InvalidMigrationError(f'Invalid migration file: {filename}')
sql_files.append((int(parts[0]), joined_path))
# Get sorted list of files by migration number
sql_files.sort(key=lambda x: x[0])
return sql_files
def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> None:
'''Performs a migration on the DB'''
print(f'Performing migration {migration[1]}...')
# Open and execute the sql script
with open(migration[1], encoding='utf-8') as file:
script = file.read()
cursor.executescript(script)
# Update the schema version
cursor.execute('UPDATE config SET value = ? WHERE key = "schema_version"', (migration[0],))
def get_db_path() -> str | DBNotFoundError:
'''Gets the DB path from config.ini'''
env = os.environ.get('KEMOVERSE_ENV')
if not (env and env in ['prod', 'dev']):
raise KemoverseEnvUnset
print(f'Running in "{env}" mode')
config_path = f'config_{env}.ini'
if not os.path.isfile(config_path):
raise ConfigError(f'Could not find {config_path}')
config = ConfigParser()
config.read(config_path)
db_path = config['application']['DatabaseLocation']
if not db_path:
raise DBNotFoundError()
return db_path
def get_current_migration(cursor: sqlite3.Cursor) -> int:
'''Gets the current schema version of the database'''
try:
cursor.execute('SELECT value FROM config WHERE key = ?', ('schema_version',))
version = cursor.fetchone()
return -1 if not version else int(version[0])
except sqlite3.Error:
print('Error getting schema version')
# Database has not been initialized yet
return -1
def main():
'''Does the thing'''
# Connect to the DB
db_path = ''
try:
db_path = get_db_path()
except ConfigError as ex:
print(ex)
return
except KemoverseEnvUnset:
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
print(traceback.format_exc())
return
conn = sqlite3.connect(db_path, autocommit=False)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Obtain list of migrations to run
migrations = get_migrations()
# Determine schema version
current_migration = get_current_migration(cursor)
print(f'Current schema version: {current_migration}')
# Run any migrations newer than current schema
for migration in migrations:
if migration[0] <= current_migration:
print(f'Migration already up: {migration[1]}')
continue
try:
perform_migration(cursor, migration)
conn.commit()
except Exception as ex:
print(f'An error occurred while applying {migration[1]}: {ex}, aborting...')
print(traceback.format_exc())
conn.rollback()
break
conn.close()
if __name__ == '__main__':
main()