forked from waifu/kemoverse
Compare commits
No commits in common. "fix_username_case_sensitivity" and "master" have entirely different histories.
fix_userna
...
master
15 changed files with 348 additions and 624 deletions
5
.gitignore
vendored
5
.gitignore
vendored
|
@ -181,6 +181,5 @@ cython_debug/
|
||||||
.cursorindexingignore
|
.cursorindexingignore
|
||||||
|
|
||||||
# Custom stuff
|
# Custom stuff
|
||||||
gacha_game*.db
|
gacha_game.db
|
||||||
gacha_game*.db.*
|
config.ini
|
||||||
config*.ini
|
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
import requests
|
import requests
|
||||||
from misskey.exceptions import MisskeyAPIException
|
from misskey.exceptions import MisskeyAPIException
|
||||||
from client import client_connection
|
from client import client_connection
|
||||||
from db_utils import insert_character
|
from db_utils import get_db_connection
|
||||||
|
|
||||||
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
||||||
"""
|
"""
|
||||||
Adds a character to the database, uploading the image from a public URL to
|
Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
|
||||||
the bot's Misskey Drive.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
name (str): Character name.
|
name (str): Character name.
|
||||||
|
@ -21,6 +20,7 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
||||||
ValueError: If inputs are invalid.
|
ValueError: If inputs are invalid.
|
||||||
RuntimeError: If image download/upload or database operation fails.
|
RuntimeError: If image download/upload or database operation fails.
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
# Validate inputs
|
# Validate inputs
|
||||||
if not name or not name.strip():
|
if not name or not name.strip():
|
||||||
raise ValueError("Character name cannot be empty.")
|
raise ValueError("Character name cannot be empty.")
|
||||||
|
@ -32,7 +32,7 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
||||||
raise ValueError("Image URL must be provided.")
|
raise ValueError("Image URL must be provided.")
|
||||||
|
|
||||||
# Download image
|
# Download image
|
||||||
response = requests.get(image_url, stream=True, timeout=30)
|
response = requests.get(image_url, stream=True)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||||
|
|
||||||
|
@ -45,5 +45,19 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
||||||
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
||||||
|
|
||||||
# Insert into database
|
# Insert into database
|
||||||
character_id = insert_character(name.strip(), rarity, float(weight), file_id)
|
conn = get_db_connection()
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
||||||
|
(name.strip(), rarity, float(weight), file_id)
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
character_id = cur.lastrowid
|
||||||
|
|
||||||
return character_id, file_id
|
return character_id, file_id
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if 'conn' in locals():
|
||||||
|
conn.close()
|
|
@ -1,18 +1,79 @@
|
||||||
import time
|
import time
|
||||||
import misskey as misskey
|
import traceback
|
||||||
|
import misskey
|
||||||
|
from parsing import parse_notification
|
||||||
|
from db_utils import get_or_create_user, add_pull, get_config, set_config
|
||||||
from client import client_connection
|
from client import client_connection
|
||||||
import db_utils as db
|
|
||||||
|
|
||||||
from config import NOTIFICATION_POLL_INTERVAL
|
# Initialize the Misskey client
|
||||||
from notification import process_notifications
|
client = client_connection()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
# Define your whitelist
|
||||||
# Initialize the Misskey client
|
# TODO: move to config
|
||||||
client = client_connection()
|
whitelisted_instances: list[str] = []
|
||||||
# Connect to DB
|
|
||||||
db.connect()
|
def stream_notifications():
|
||||||
|
print("Starting filtered notification stream...")
|
||||||
|
|
||||||
|
last_seen_id = get_config("last_seen_notif_id")
|
||||||
|
|
||||||
print('Listening for notifications...')
|
|
||||||
while True:
|
while True:
|
||||||
if not process_notifications(client):
|
try:
|
||||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
# May be able to mark notifications as read using misskey.py and
|
||||||
|
# filter them out here. This function also takes a since_id we
|
||||||
|
# could use as well
|
||||||
|
notifications = client.i_notifications()
|
||||||
|
|
||||||
|
if notifications:
|
||||||
|
# Oldest to newest
|
||||||
|
notifications.reverse()
|
||||||
|
|
||||||
|
new_last_seen_id = last_seen_id
|
||||||
|
|
||||||
|
for notification in notifications:
|
||||||
|
notif_id = notification.get("id")
|
||||||
|
|
||||||
|
# Skip old or same ID notifications
|
||||||
|
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
user = notification.get("user", {})
|
||||||
|
username = user.get("username", "unknown")
|
||||||
|
host = user.get("host") # None if local user
|
||||||
|
|
||||||
|
instance = host if host else "local"
|
||||||
|
|
||||||
|
if instance in whitelisted_instances or instance == "local":
|
||||||
|
note = notification.get("note", {}).get("text", "")
|
||||||
|
notif_type = notification.get("type", "unknown")
|
||||||
|
|
||||||
|
print(f"📨 [{notif_type}] from @{username}@{instance}")
|
||||||
|
print(f"💬 {note}")
|
||||||
|
print("-" * 30)
|
||||||
|
|
||||||
|
# 🧠 Send to the parser
|
||||||
|
parse_notification(notification,client)
|
||||||
|
|
||||||
|
else:
|
||||||
|
print(f"⚠️ Blocked notification from untrusted instance: {host}")
|
||||||
|
|
||||||
|
# Update only if this notif_id is greater
|
||||||
|
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||||
|
new_last_seen_id = notif_id
|
||||||
|
|
||||||
|
# Save the latest seen ID
|
||||||
|
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||||
|
set_config("last_seen_notif_id", new_last_seen_id)
|
||||||
|
last_seen_id = new_last_seen_id
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
stream_notifications()
|
||||||
|
|
|
@ -1,42 +1,22 @@
|
||||||
'''Essentials for the bot to function'''
|
'''Essentials for the bot to function'''
|
||||||
import configparser
|
import configparser
|
||||||
from os import environ, path
|
|
||||||
|
|
||||||
class ConfigError(Exception):
|
|
||||||
'''Could not find config file'''
|
|
||||||
|
|
||||||
def get_config_file() -> str:
|
|
||||||
'''Gets the path to the config file in the current environment'''
|
|
||||||
env: str | None = environ.get('KEMOVERSE_ENV')
|
|
||||||
if not env:
|
|
||||||
raise ConfigError('Error: KEMOVERSE_ENV is unset')
|
|
||||||
if not (env in ['prod', 'dev']):
|
|
||||||
raise ConfigError(f'Error: Invalid environment: {env}')
|
|
||||||
|
|
||||||
config_path: str = f'config_{env}.ini'
|
|
||||||
|
|
||||||
if not path.isfile(config_path):
|
|
||||||
raise ConfigError(f'Could not find {config_path}')
|
|
||||||
return config_path
|
|
||||||
|
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
config.read(get_config_file())
|
config.read('config.ini')
|
||||||
|
|
||||||
# Username for the bot
|
# Username for the bot
|
||||||
USER = config['credentials']['User'].lower()
|
USER = config['application']['BotUser']
|
||||||
# API key for the bot
|
|
||||||
KEY = config['credentials']['Token']
|
# API key for the bot
|
||||||
# Bot's Misskey instance URL
|
KEY = config['application']['ApiKey']
|
||||||
INSTANCE = config['credentials']['Instance'].lower()
|
# Bot's Misskey instance URL
|
||||||
|
INSTANCE = config['application']['InstanceUrl']
|
||||||
|
|
||||||
# TODO: move this to db
|
|
||||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
|
||||||
# to use extra admin exclusive commands with the bot
|
|
||||||
ADMINS = config['application']['DefaultAdmins']
|
|
||||||
# SQLite Database location
|
# SQLite Database location
|
||||||
DB_PATH = config['application']['DatabaseLocation']
|
DB_PATH = config['application']['DatabaseLocation']
|
||||||
|
|
||||||
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
|
# Extra stuff for control of the bot
|
||||||
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
|
|
||||||
|
|
||||||
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])
|
# TODO: move this to db
|
||||||
|
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||||
|
# to use extra admin exclusive commands with the bot'''
|
||||||
|
ADMINS = config['application']['DefaultAdmins']
|
||||||
|
|
|
@ -1,79 +1,58 @@
|
||||||
from random import choices
|
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import random
|
||||||
import config
|
import config
|
||||||
|
|
||||||
DB_PATH = config.DB_PATH
|
DB_PATH = config.DB_PATH
|
||||||
CONNECTION: sqlite3.Connection
|
|
||||||
CURSOR: sqlite3.Cursor
|
|
||||||
|
|
||||||
def connect() -> None:
|
def get_db_connection():
|
||||||
'''Creates a connection to the database'''
|
'''Creates a connection to the database'''
|
||||||
print('Connecting to the database...')
|
conn = sqlite3.connect(DB_PATH)
|
||||||
global CONNECTION
|
conn.row_factory = sqlite3.Row
|
||||||
global CURSOR
|
return conn
|
||||||
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
|
|
||||||
CONNECTION.row_factory = sqlite3.Row
|
|
||||||
CURSOR = CONNECTION.cursor()
|
|
||||||
|
|
||||||
def get_random_character():
|
|
||||||
''' Gets a random character from the database'''
|
|
||||||
CURSOR.execute('SELECT * FROM characters')
|
|
||||||
characters = CURSOR.fetchall()
|
|
||||||
|
|
||||||
if not characters:
|
|
||||||
return None, None, None, None
|
|
||||||
|
|
||||||
weights = [c['weight'] for c in characters]
|
|
||||||
chosen = choices(characters, weights=weights, k=1)[0]
|
|
||||||
|
|
||||||
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
|
||||||
|
|
||||||
def get_or_create_user(username):
|
def get_or_create_user(username):
|
||||||
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
||||||
created.'''
|
created.'''
|
||||||
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
|
conn = get_db_connection()
|
||||||
user = CURSOR.fetchone()
|
conn.row_factory = sqlite3.Row
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||||
|
user = cur.fetchone()
|
||||||
if user:
|
if user:
|
||||||
|
conn.close()
|
||||||
return user[0]
|
return user[0]
|
||||||
|
|
||||||
# New user starts with has_rolled = False
|
# New user starts with has_rolled = False
|
||||||
CURSOR.execute(
|
cur.execute(
|
||||||
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
||||||
(username, False)
|
(username, False)
|
||||||
)
|
)
|
||||||
user_id = CURSOR.lastrowid
|
conn.commit()
|
||||||
|
user_id = cur.lastrowid
|
||||||
|
conn.close()
|
||||||
return user_id
|
return user_id
|
||||||
|
|
||||||
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
|
def add_pull(user_id, character_id):
|
||||||
'''Inserts a character'''
|
|
||||||
CURSOR.execute(
|
|
||||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
|
||||||
(name, rarity, weight, file_id)
|
|
||||||
)
|
|
||||||
character_id = CURSOR.lastrowid
|
|
||||||
return character_id if character_id else 0
|
|
||||||
|
|
||||||
def insert_pull(user_id, character_id):
|
|
||||||
'''Creates a pull in the database'''
|
'''Creates a pull in the database'''
|
||||||
CURSOR.execute(
|
conn = get_db_connection()
|
||||||
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
|
cur = conn.cursor()
|
||||||
(user_id, character_id)
|
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
|
||||||
)
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
def get_last_rolled_at(user_id):
|
|
||||||
'''Gets the timestamp when the user last rolled'''
|
|
||||||
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
|
|
||||||
(user_id,))
|
|
||||||
row = CURSOR.fetchone()
|
|
||||||
return row[0] if row else None
|
|
||||||
|
|
||||||
|
|
||||||
def get_config(key):
|
def get_config(key):
|
||||||
'''Reads the value for a specified config key from the db'''
|
'''Reads the value for a specified config key from the db'''
|
||||||
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
|
conn = get_db_connection()
|
||||||
row = CURSOR.fetchone()
|
cur = conn.cursor()
|
||||||
|
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||||
|
row = cur.fetchone()
|
||||||
|
conn.close()
|
||||||
return row[0] if row else None
|
return row[0] if row else None
|
||||||
|
|
||||||
def set_config(key, value):
|
def set_config(key, value):
|
||||||
'''Writes the value for a specified config key to the db'''
|
'''Writes the value for a specified config key to the db'''
|
||||||
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
conn = get_db_connection()
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
69
bot/gacha_response.py
Normal file
69
bot/gacha_response.py
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
import random
|
||||||
|
from db_utils import get_or_create_user, add_pull, get_db_connection
|
||||||
|
from add_character import add_character
|
||||||
|
|
||||||
|
def get_character():
|
||||||
|
''' Gets a random character from the database'''
|
||||||
|
conn = get_db_connection()
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute('SELECT * FROM characters')
|
||||||
|
characters = cur.fetchall()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
if not characters:
|
||||||
|
return None, None, None, None
|
||||||
|
|
||||||
|
weights = [c['weight'] for c in characters]
|
||||||
|
chosen = random.choices(characters, weights=weights, k=1)[0]
|
||||||
|
|
||||||
|
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
||||||
|
|
||||||
|
def is_float(val):
|
||||||
|
try:
|
||||||
|
float(val)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: See issue #3, separate command parsing from game logic.
|
||||||
|
def gacha_response(command,full_user, arguments,note_obj):
|
||||||
|
'''Parses a given command with arguments, processes the game state and
|
||||||
|
returns a response'''
|
||||||
|
|
||||||
|
if command == "roll":
|
||||||
|
user_id = get_or_create_user(full_user)
|
||||||
|
character_id, character_name, file_id, rarity = get_character()
|
||||||
|
|
||||||
|
if not character_id:
|
||||||
|
#TODO: Can't have tuples of a single element
|
||||||
|
# Return these as a dict or object instead.
|
||||||
|
return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿")
|
||||||
|
|
||||||
|
add_pull(user_id,character_id)
|
||||||
|
stars = '⭐️' * rarity
|
||||||
|
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]])
|
||||||
|
|
||||||
|
if command == "create":
|
||||||
|
# Example call from bot logic
|
||||||
|
image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None
|
||||||
|
if not image_url:
|
||||||
|
return "You need an image to create a character, dumbass."
|
||||||
|
|
||||||
|
if len(arguments) != 3:
|
||||||
|
return "Please specify the following attributes in order: name, rarity, drop weighting"
|
||||||
|
|
||||||
|
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||||
|
return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5"
|
||||||
|
|
||||||
|
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||||
|
return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0"
|
||||||
|
|
||||||
|
character_id, file_id = add_character(
|
||||||
|
name=arguments[0],
|
||||||
|
rarity=int(arguments[1]),
|
||||||
|
weight=float(arguments[2]),
|
||||||
|
image_url=image_url
|
||||||
|
)
|
||||||
|
return([f"Added {arguments[0]}, ID {character_id}.",[file_id]])
|
||||||
|
return None
|
|
@ -1,115 +0,0 @@
|
||||||
import traceback
|
|
||||||
from misskey.exceptions import MisskeyAPIException
|
|
||||||
|
|
||||||
from config import NOTIFICATION_BATCH_SIZE
|
|
||||||
from parsing import parse_notification
|
|
||||||
from db_utils import get_config, set_config
|
|
||||||
from response import generate_response
|
|
||||||
|
|
||||||
# Define your whitelist
|
|
||||||
# TODO: move to config
|
|
||||||
WHITELISTED_INSTANCES: list[str] = []
|
|
||||||
|
|
||||||
def process_notification(client, notification):
|
|
||||||
'''Processes an individual notification'''
|
|
||||||
user = notification.get('user', {})
|
|
||||||
username = user.get('username', 'unknown')
|
|
||||||
host = user.get('host') # None if local user
|
|
||||||
instance = host if host else 'local'
|
|
||||||
|
|
||||||
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
|
|
||||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
|
||||||
return
|
|
||||||
|
|
||||||
# Copy visibility of the post that was received when replying (so if people
|
|
||||||
# don't want to dump a bunch of notes on home they don't have to)
|
|
||||||
visibility = notification['note']['visibility']
|
|
||||||
if visibility != 'specified':
|
|
||||||
visibility = 'home'
|
|
||||||
|
|
||||||
notif_type = notification.get('type', 'unknown')
|
|
||||||
notif_id = notification.get('id')
|
|
||||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
|
||||||
|
|
||||||
# 🧠 Send to the parser
|
|
||||||
parsed_command = parse_notification(notification, client)
|
|
||||||
|
|
||||||
# Get the note Id to reply to
|
|
||||||
note_id = notification.get('note', {}).get('id')
|
|
||||||
|
|
||||||
# Get the response
|
|
||||||
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
|
|
||||||
# handle two cases here: either we have a response, or we don't.
|
|
||||||
# TODO: Return dictionaries instead of tuples. They handle multiple
|
|
||||||
# elements a lot better as they're not position dependent
|
|
||||||
response = generate_response(parsed_command)
|
|
||||||
if isinstance(response, str):
|
|
||||||
client.notes_create(
|
|
||||||
text=response,
|
|
||||||
reply_id=note_id,
|
|
||||||
visibility=visibility
|
|
||||||
)
|
|
||||||
elif response:
|
|
||||||
client.notes_create(
|
|
||||||
text=response[0],
|
|
||||||
reply_id=note_id,
|
|
||||||
visibility=visibility,
|
|
||||||
file_ids=response[1]
|
|
||||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
|
||||||
)
|
|
||||||
|
|
||||||
def process_notifications(client):
|
|
||||||
'''Processes a batch of unread notifications. Returns False if there are
|
|
||||||
no more notifications to process.'''
|
|
||||||
|
|
||||||
last_seen_id = get_config('last_seen_notif_id')
|
|
||||||
# process_notification writes to last_seen_id, so make a copy
|
|
||||||
new_last_seen_id = last_seen_id
|
|
||||||
|
|
||||||
try:
|
|
||||||
notifications = client.i_notifications(
|
|
||||||
# Fetch notifications we haven't seen yet. This option is a bit
|
|
||||||
# tempermental, sometimes it'll include since_id, sometimes it
|
|
||||||
# won't. We need to keep track of what notifications we've
|
|
||||||
# already processed.
|
|
||||||
since_id=last_seen_id,
|
|
||||||
# Let misskey handle the filtering
|
|
||||||
include_types=['mention', 'reply'],
|
|
||||||
# And handle the batch size while we're at it
|
|
||||||
limit=NOTIFICATION_BATCH_SIZE
|
|
||||||
)
|
|
||||||
|
|
||||||
# No notifications. Wait the poll period.
|
|
||||||
if not notifications:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Iterate oldest to newest
|
|
||||||
for notification in notifications:
|
|
||||||
try:
|
|
||||||
# Skip if we've processed already
|
|
||||||
notif_id = notification.get('id')
|
|
||||||
if notif_id <= last_seen_id:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Update new_last_seen_id and process
|
|
||||||
new_last_seen_id = notif_id
|
|
||||||
process_notification(client, notification)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f'An exception has occured while processing a notification: {e}')
|
|
||||||
print(traceback.format_exc())
|
|
||||||
|
|
||||||
# If we got as many notifications as we requested, there are probably
|
|
||||||
# more in the queue
|
|
||||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
|
||||||
|
|
||||||
except MisskeyAPIException as e:
|
|
||||||
print(f'An exception has occured while reading notifications: {e}\n')
|
|
||||||
print(traceback.format_exc())
|
|
||||||
finally:
|
|
||||||
# Quality jank right here, but finally lets us update the last_seen_id
|
|
||||||
# even if we hit an exception or return early
|
|
||||||
if new_last_seen_id > last_seen_id:
|
|
||||||
set_config('last_seen_notif_id', new_last_seen_id)
|
|
||||||
|
|
||||||
return False
|
|
|
@ -1,11 +1,23 @@
|
||||||
import random, re
|
import random, re
|
||||||
import config
|
import config
|
||||||
|
from gacha_response import gacha_response
|
||||||
|
|
||||||
def parse_notification(notification,client):
|
def parse_notification(notification,client):
|
||||||
'''Parses any notifications received by the bot and sends any commands to
|
'''Oarses any notifications received by the bot and sends any commands to
|
||||||
gacha_response()'''
|
gacha_response()'''
|
||||||
|
|
||||||
|
# We get the type of notification to filter the ones that we actually want
|
||||||
|
# to parse
|
||||||
|
|
||||||
|
notif_type = notification.get("type")
|
||||||
|
if not notif_type in ('mention', 'reply'):
|
||||||
|
return # Ignore anything that isn't a mention
|
||||||
|
|
||||||
|
# We want the visibility to be related to the type that was received (so if
|
||||||
|
# people don't want to dump a bunch of notes on home they don't have to)
|
||||||
|
visibility = notification["note"]["visibility"]
|
||||||
|
if visibility != "specified":
|
||||||
|
visibility = "home"
|
||||||
|
|
||||||
# Get the full Activitypub ID of the user
|
# Get the full Activitypub ID of the user
|
||||||
user = notification.get("user", {})
|
user = notification.get("user", {})
|
||||||
|
@ -38,4 +50,22 @@ def parse_notification(notification,client):
|
||||||
command = parts[0].lower() if parts else None
|
command = parts[0].lower() if parts else None
|
||||||
arguments = parts[1:] if len(parts) > 1 else []
|
arguments = parts[1:] if len(parts) > 1 else []
|
||||||
|
|
||||||
return [command,full_user, arguments, note_obj]
|
# TODO: move response generation to a different function
|
||||||
|
response = gacha_response(command.lower(),full_user, arguments, note_obj)
|
||||||
|
if not response:
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(response, str):
|
||||||
|
client.notes_create(
|
||||||
|
text=response,
|
||||||
|
reply_id=note_id,
|
||||||
|
visibility=visibility
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
client.notes_create(
|
||||||
|
text=response[0],
|
||||||
|
reply_id=note_id,
|
||||||
|
visibility=visibility,
|
||||||
|
file_ids=response[1]
|
||||||
|
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||||
|
)
|
||||||
|
|
100
bot/response.py
100
bot/response.py
|
@ -1,100 +0,0 @@
|
||||||
from datetime import datetime, timedelta, timezone
|
|
||||||
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character
|
|
||||||
from add_character import add_character
|
|
||||||
from config import GACHA_ROLL_INTERVAL
|
|
||||||
|
|
||||||
def do_roll(full_user):
|
|
||||||
'''Determines whether the user can roll, then pulls a random character'''
|
|
||||||
user_id = get_or_create_user(full_user)
|
|
||||||
|
|
||||||
# Get date of user's last roll
|
|
||||||
date = get_last_rolled_at(user_id)
|
|
||||||
|
|
||||||
# No date means it's users first roll
|
|
||||||
if date:
|
|
||||||
# SQLite timestamps returned by the DB are always in UTC
|
|
||||||
# Below timestamps are to be converted to UTC
|
|
||||||
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z')
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
|
|
||||||
time_since_last_roll = now - prev
|
|
||||||
roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL)
|
|
||||||
duration = roll_interval - time_since_last_roll
|
|
||||||
|
|
||||||
# User needs to wait before they can roll again
|
|
||||||
if time_since_last_roll < roll_interval:
|
|
||||||
remaining_duration = None
|
|
||||||
if duration.seconds > 3600:
|
|
||||||
remaining_duration = f'{-(duration.seconds // -3600)} hours'
|
|
||||||
elif duration.seconds > 60:
|
|
||||||
remaining_duration = f'{-(duration.seconds // -60)} minutes'
|
|
||||||
else:
|
|
||||||
remaining_duration = f'{duration.seconds} seconds'
|
|
||||||
|
|
||||||
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
|
|
||||||
|
|
||||||
character_id, character_name, file_id, rarity = get_random_character()
|
|
||||||
|
|
||||||
if not character_id:
|
|
||||||
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
|
|
||||||
|
|
||||||
insert_pull(user_id,character_id)
|
|
||||||
stars = '⭐️' * rarity
|
|
||||||
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
|
|
||||||
She's all yours now~ 💖✨",[file_id]])
|
|
||||||
|
|
||||||
def is_float(val):
|
|
||||||
'''Returns true if `val` can be converted to a float'''
|
|
||||||
try:
|
|
||||||
float(val)
|
|
||||||
return True
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def do_create(full_user, arguments, note_obj):
|
|
||||||
'''Creates a character'''
|
|
||||||
# Example call from bot logic
|
|
||||||
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None
|
|
||||||
if not image_url:
|
|
||||||
return f'{full_user}{full_user} You need an image to create a character, dumbass.'
|
|
||||||
|
|
||||||
if len(arguments) != 3:
|
|
||||||
return '{full_user}Please specify the following attributes in order: \
|
|
||||||
name, rarity, drop weighting'
|
|
||||||
|
|
||||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
|
||||||
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5'
|
|
||||||
|
|
||||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
|
||||||
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \
|
|
||||||
must be a decimal value between 0.0 and 1.0'
|
|
||||||
|
|
||||||
character_id, file_id = add_character(
|
|
||||||
name=arguments[0],
|
|
||||||
rarity=int(arguments[1]),
|
|
||||||
weight=float(arguments[2]),
|
|
||||||
image_url=image_url
|
|
||||||
)
|
|
||||||
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
|
|
||||||
|
|
||||||
def do_help(full_user):
|
|
||||||
'''Provides a list of commands that the bot can do.'''
|
|
||||||
return f'{full_user} Here\'s what I can do:\n \
|
|
||||||
- `roll` Pulls a random character.\
|
|
||||||
- `create <name> <rarity> <weight>` Creates a character using a given image.\
|
|
||||||
- `help` Shows this message'
|
|
||||||
|
|
||||||
def generate_response(parsed_command):
|
|
||||||
'''Given a command with arguments, processes the game state and
|
|
||||||
returns a response'''
|
|
||||||
|
|
||||||
command, full_user, arguments, note_obj = parsed_command
|
|
||||||
match command:
|
|
||||||
case 'roll':
|
|
||||||
return do_roll(full_user)
|
|
||||||
case 'create':
|
|
||||||
return do_create(full_user, arguments, note_obj)
|
|
||||||
case 'help':
|
|
||||||
return do_help(command)
|
|
||||||
case _:
|
|
||||||
return None
|
|
61
db.py
Normal file
61
db.py
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
# Connect to SQLite database (or create it if it doesn't exist)
|
||||||
|
conn = sqlite3.connect('gacha_game.db')
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Create tables
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
username TEXT UNIQUE NOT NULL,
|
||||||
|
has_rolled BOOLEAN NOT NULL DEFAULT 0
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS characters (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
rarity INTEGER NOT NULL,
|
||||||
|
weight REAL NOT NULL,
|
||||||
|
file_id TEXT NOT NULL
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
|
||||||
|
cursor.execute('''
|
||||||
|
CREATE TABLE IF NOT EXISTS pulls (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
user_id INTEGER,
|
||||||
|
character_id INTEGER,
|
||||||
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
FOREIGN KEY (user_id) REFERENCES users(id),
|
||||||
|
FOREIGN KEY (character_id) REFERENCES characters(id)
|
||||||
|
)
|
||||||
|
''')
|
||||||
|
|
||||||
|
cursor.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS config (
|
||||||
|
key TEXT PRIMARY KEY,
|
||||||
|
value TEXT
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
""" # Insert example characters into the database if they don't already exist
|
||||||
|
characters = [
|
||||||
|
('Murakami-san', 1, 0.35),
|
||||||
|
('Mastodon-kun', 2, 0.25),
|
||||||
|
('Pleroma-tan', 3, 0.2),
|
||||||
|
('Misskey-tan', 4, 0.15),
|
||||||
|
('Syuilo-mama', 5, 0.05)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
cursor.executemany('''
|
||||||
|
INSERT OR IGNORE INTO characters (name, rarity, weight) VALUES (?, ?, ?)
|
||||||
|
''', characters)
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Commit changes and close
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
|
@ -1,27 +1,17 @@
|
||||||
; Rename me to config.ini and put your values in here
|
; Rename me to config.ini and put your values in here
|
||||||
[application]
|
[application]
|
||||||
; Comma separated list of fedi handles for any administrator users
|
|
||||||
; More can be added through the application
|
|
||||||
DefaultAdmins = ['admin@example.tld']
|
|
||||||
; SQLite Database location
|
|
||||||
DatabaseLocation = ./gacha_game.db
|
|
||||||
|
|
||||||
[gacha]
|
|
||||||
; Number of seconds players have to wait between rolls
|
|
||||||
RollInterval = 72000
|
|
||||||
|
|
||||||
[notification]
|
|
||||||
; Number of seconds to sleep while awaiting new notifications
|
|
||||||
PollInterval = 5
|
|
||||||
; Number of notifications to process at once (max 100)
|
|
||||||
BatchSize = 10
|
|
||||||
|
|
||||||
[credentials]
|
|
||||||
; Fully qualified URL of the instance hosting the bot
|
|
||||||
Instance = http://example.tld
|
|
||||||
; Full fedi handle of the bot user
|
; Full fedi handle of the bot user
|
||||||
User = @bot@example.tld
|
BotUser = @bot@example.tld
|
||||||
|
|
||||||
; API key for the bot
|
; API key for the bot
|
||||||
; Generate one by going to Settings > API > Generate access token
|
; Generate one by going to Settings > API > Generate access token
|
||||||
Token = abcdefghijklmnopqrstuvwxyz012345
|
ApiKey = abcdefghijklmnopqrstuvwxyz012345
|
||||||
|
|
||||||
|
; Fully qualified URL of the instance hosting the bot
|
||||||
|
InstanceUrl = http://example.tld
|
||||||
|
|
||||||
|
; Comma separated list of fedi handles for any administrator users
|
||||||
|
DefaultAdmins = ['admin@example.tld']
|
||||||
|
|
||||||
|
; SQLite Database location
|
||||||
|
DatabaseLocation = ./gacha_game.db
|
|
@ -1,28 +0,0 @@
|
||||||
CREATE TABLE IF NOT EXISTS users (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
username TEXT UNIQUE NOT NULL,
|
|
||||||
has_rolled BOOLEAN NOT NULL DEFAULT 0
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS characters (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
name TEXT NOT NULL,
|
|
||||||
rarity INTEGER NOT NULL,
|
|
||||||
weight REAL NOT NULL,
|
|
||||||
file_id TEXT NOT NULL
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS pulls (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
user_id INTEGER,
|
|
||||||
character_id INTEGER,
|
|
||||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
FOREIGN KEY (user_id) REFERENCES users(id),
|
|
||||||
FOREIGN KEY (character_id) REFERENCES characters(id)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS config (
|
|
||||||
key TEXT PRIMARY KEY,
|
|
||||||
value TEXT
|
|
||||||
);
|
|
||||||
INSERT OR IGNORE INTO config VALUES ("schema_version", 0);
|
|
|
@ -1 +0,0 @@
|
||||||
INSERT OR IGNORE INTO config VALUES ("last_seen_notif_id", 0);
|
|
116
readme.md
116
readme.md
|
@ -2,41 +2,33 @@
|
||||||
|
|
||||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
||||||
|
|
||||||
## Installation
|
|
||||||
|
|
||||||
## Roadmap
|
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
## 🔧 Features
|
## 🔧 Features
|
||||||
|
|
||||||
### ✅ Implemented
|
### ✅ Implemented
|
||||||
- 🎲 Character roll system
|
- 🎲 Character roll system
|
||||||
- 🧠 Core database structure for cards
|
- 🎴 Cards stats system
|
||||||
- 📦 Basic support for storing pulls per player
|
- 🧠 Core database structure for characters and stats
|
||||||
- ⏱️ Time-based limitations on rolls
|
- 📦 Basic support for storing pulls per user
|
||||||
|
|
||||||
### 🧩 In Progress
|
### 🧩 In Progress
|
||||||
- 📝 Whitelist system to limit access
|
- 📝 Whitelist system to limit access
|
||||||
- ⚠️ Explicit account creation/deletion
|
- ⏱️ Time-based limitations on rolls
|
||||||
|
- ⚔️ Dueling system
|
||||||
|
|
||||||
## 🧠 Roadmap
|
## 🧠 Planned Features (Long Term)
|
||||||
|
|
||||||
[See our v2.0 board for more details](https://git.waifuism.life/waifu/kemoverse/projects/3)
|
|
||||||
|
|
||||||
### 🛒 Gameplay & Collection
|
### 🛒 Gameplay & Collection
|
||||||
- 🔁 **Trading system** between players
|
- 🔁 **Trading system** between users
|
||||||
- ⭐ **Favorite characters** (pin them or set profiles)
|
- ⭐ **Favorite characters** (pin them or set profiles)
|
||||||
- 📢 **Public post announcements** for rare card pulls
|
- 📢 **Public post announcements** for rare card pulls
|
||||||
- 📊 **Stats** for cards
|
|
||||||
- 🎮 **Games** to play
|
|
||||||
- ⚔️ Dueling
|
|
||||||
- 🧮 **Leaderboards**
|
- 🧮 **Leaderboards**
|
||||||
- Most traded cards
|
- Most traded Characters
|
||||||
- Most owned cards
|
- Most owned Characters
|
||||||
- Most voted cards
|
- Most voted Characters
|
||||||
- Most popular cards (via usage-based popularity metrics)
|
- Most popular Characters (via usage-based popularity metrics)
|
||||||
- Users with the rarest cards
|
- Users with the rarest Characters
|
||||||
|
|
||||||
### 🎨 Card Aesthetics
|
### 🎨 Card Aesthetics
|
||||||
- 🖼️ Simple card template for character rendering
|
- 🖼️ Simple card template for character rendering
|
||||||
|
@ -47,7 +39,7 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
||||||
|
|
||||||
## 🗃️ Tech Stack
|
## 🗃️ Tech Stack
|
||||||
|
|
||||||
- Python (3.12+)
|
- Python (3.11+)
|
||||||
- SQLite
|
- SQLite
|
||||||
- Fediverse API integration (via Misskey endpoints)
|
- Fediverse API integration (via Misskey endpoints)
|
||||||
- Flask
|
- Flask
|
||||||
|
@ -57,88 +49,10 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
||||||
|
|
||||||
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
|
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
|
||||||
|
|
||||||
## 🧪 Installation
|
## 🧪 Getting Started (coming soon)
|
||||||
|
|
||||||
### Download and install dependencies
|
Instructions on installing dependencies, initializing the database, and running the bot locally will go here.
|
||||||
|
|
||||||
Clone the repo
|
|
||||||
|
|
||||||
```sh
|
|
||||||
git clone https://git.waifuism.life/waifu/kemoverse.git
|
|
||||||
cd kemoverse
|
|
||||||
```
|
|
||||||
|
|
||||||
Setup a virtual environment (Optional, recommended)
|
|
||||||
|
|
||||||
```sh
|
|
||||||
python3 -m venv venv
|
|
||||||
source venv/bin/activate
|
|
||||||
```
|
|
||||||
|
|
||||||
Install project dependencies via pip
|
|
||||||
|
|
||||||
```sh
|
|
||||||
python3 -m pip install -r requirements.txt
|
|
||||||
```
|
|
||||||
|
|
||||||
### Setup config file
|
|
||||||
|
|
||||||
A sample config file is included with the project as a template: `example_config.ini`
|
|
||||||
|
|
||||||
Create a copy of this file and replace its' values with your own. Consult the
|
|
||||||
template for more information about individual config values and their meaning.
|
|
||||||
|
|
||||||
Config files are environment-specific. Use `config_dev.ini` for development and
|
|
||||||
`config_prod.ini` for production. Switch between environments using the
|
|
||||||
`KEMOVERSE_ENV` environment variable.
|
|
||||||
|
|
||||||
```sh
|
|
||||||
cp example_config.ini config_dev.ini
|
|
||||||
# Edit config_dev.ini
|
|
||||||
```
|
|
||||||
|
|
||||||
### Setup database
|
|
||||||
|
|
||||||
To set up the database, run:
|
|
||||||
|
|
||||||
```sh
|
|
||||||
KEMOVERSE_ENV=dev python3 setup_db.py
|
|
||||||
```
|
|
||||||
|
|
||||||
### Run the bot
|
|
||||||
|
|
||||||
```sh
|
|
||||||
KEMOVERSE_ENV=dev ./startup.sh
|
|
||||||
```
|
|
||||||
|
|
||||||
If all goes well, you should now be able to interact with the bot.
|
|
||||||
|
|
||||||
### Running in production
|
|
||||||
|
|
||||||
To run the the in a production environment, use `KEMOVERSE_ENV=prod`. You will
|
|
||||||
also need to create a `config_prod.ini` file and run the database setup step
|
|
||||||
again if pointing prod to a different database. (you are pointing dev and prod
|
|
||||||
to different databases, right? 🤨)
|
|
||||||
|
|
||||||
### Updating
|
|
||||||
|
|
||||||
To update the bot, first pull new changes from upstream:
|
|
||||||
|
|
||||||
```sh
|
|
||||||
git pull
|
|
||||||
```
|
|
||||||
|
|
||||||
Then run any database migrations. We recommend testing in dev beforehand to
|
|
||||||
make sure nothing breaks in the update process.
|
|
||||||
|
|
||||||
**Always backup your prod database before running any migrations!**
|
|
||||||
|
|
||||||
```sh
|
|
||||||
# Backup database file
|
|
||||||
cp gacha_game_dev.db gacha_game_dev.db.bak
|
|
||||||
# Run migrations
|
|
||||||
KEMOVERSE_ENV=dev python3 setup_db.py
|
|
||||||
```
|
|
||||||
|
|
||||||
```mermaid
|
```mermaid
|
||||||
flowchart TD
|
flowchart TD
|
||||||
|
|
129
setup_db.py
129
setup_db.py
|
@ -1,129 +0,0 @@
|
||||||
import sqlite3
|
|
||||||
import traceback
|
|
||||||
import os
|
|
||||||
import argparse
|
|
||||||
from configparser import ConfigParser
|
|
||||||
from typing import List, Tuple
|
|
||||||
|
|
||||||
class DBNotFoundError(Exception):
|
|
||||||
'''Could not find the database location'''
|
|
||||||
|
|
||||||
class InvalidMigrationError(Exception):
|
|
||||||
'''Migration file has an invalid name'''
|
|
||||||
|
|
||||||
class KemoverseEnvUnset(Exception):
|
|
||||||
'''KEMOVERSE_ENV is not set or has an invalid value'''
|
|
||||||
|
|
||||||
class ConfigError(Exception):
|
|
||||||
'''Could not find the config file for the current environment'''
|
|
||||||
|
|
||||||
def get_migrations() -> List[Tuple[int, str]] | InvalidMigrationError:
|
|
||||||
'''Returns a list of migration files in numeric order.'''
|
|
||||||
# Store transaction id and filename separately
|
|
||||||
sql_files: List[Tuple[int, str]] = []
|
|
||||||
migrations_dir = 'migrations'
|
|
||||||
|
|
||||||
for filename in os.listdir(migrations_dir):
|
|
||||||
joined_path = os.path.join(migrations_dir, filename)
|
|
||||||
|
|
||||||
# Ignore anything that isn't a .sql file
|
|
||||||
if not (os.path.isfile(joined_path) and filename.endswith('.sql')):
|
|
||||||
print(f'{filename} is not a .sql file, ignoring...')
|
|
||||||
continue
|
|
||||||
|
|
||||||
parts = filename.split('_', 1)
|
|
||||||
|
|
||||||
# Invalid filename format
|
|
||||||
if len(parts) < 2 or not parts[0].isdigit():
|
|
||||||
raise InvalidMigrationError(f'Invalid migration file: {filename}')
|
|
||||||
|
|
||||||
sql_files.append((int(parts[0]), joined_path))
|
|
||||||
|
|
||||||
# Get sorted list of files by migration number
|
|
||||||
sql_files.sort(key=lambda x: x[0])
|
|
||||||
return sql_files
|
|
||||||
|
|
||||||
def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> None:
|
|
||||||
'''Performs a migration on the DB'''
|
|
||||||
print(f'Performing migration {migration[1]}...')
|
|
||||||
|
|
||||||
# Open and execute the sql script
|
|
||||||
with open(migration[1], encoding='utf-8') as file:
|
|
||||||
script = file.read()
|
|
||||||
cursor.executescript(script)
|
|
||||||
# Update the schema version
|
|
||||||
cursor.execute('UPDATE config SET value = ? WHERE key = "schema_version"', (migration[0],))
|
|
||||||
|
|
||||||
def get_db_path() -> str | DBNotFoundError:
|
|
||||||
'''Gets the DB path from config.ini'''
|
|
||||||
env = os.environ.get('KEMOVERSE_ENV')
|
|
||||||
if not (env and env in ['prod', 'dev']):
|
|
||||||
raise KemoverseEnvUnset
|
|
||||||
|
|
||||||
print(f'Running in "{env}" mode')
|
|
||||||
|
|
||||||
config_path = f'config_{env}.ini'
|
|
||||||
|
|
||||||
if not os.path.isfile(config_path):
|
|
||||||
raise ConfigError(f'Could not find {config_path}')
|
|
||||||
|
|
||||||
config = ConfigParser()
|
|
||||||
config.read(config_path)
|
|
||||||
db_path = config['application']['DatabaseLocation']
|
|
||||||
if not db_path:
|
|
||||||
raise DBNotFoundError()
|
|
||||||
return db_path
|
|
||||||
|
|
||||||
def get_current_migration(cursor: sqlite3.Cursor) -> int:
|
|
||||||
'''Gets the current schema version of the database'''
|
|
||||||
try:
|
|
||||||
cursor.execute('SELECT value FROM config WHERE key = ?', ('schema_version',))
|
|
||||||
version = cursor.fetchone()
|
|
||||||
return -1 if not version else int(version[0])
|
|
||||||
except sqlite3.Error:
|
|
||||||
print('Error getting schema version')
|
|
||||||
# Database has not been initialized yet
|
|
||||||
return -1
|
|
||||||
|
|
||||||
def main():
|
|
||||||
'''Does the thing'''
|
|
||||||
# Connect to the DB
|
|
||||||
db_path = ''
|
|
||||||
try:
|
|
||||||
db_path = get_db_path()
|
|
||||||
except ConfigError as ex:
|
|
||||||
print(ex)
|
|
||||||
return
|
|
||||||
except KemoverseEnvUnset:
|
|
||||||
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
|
|
||||||
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
|
|
||||||
print(traceback.format_exc())
|
|
||||||
return
|
|
||||||
|
|
||||||
conn = sqlite3.connect(db_path, autocommit=False)
|
|
||||||
conn.row_factory = sqlite3.Row
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
# Obtain list of migrations to run
|
|
||||||
migrations = get_migrations()
|
|
||||||
# Determine schema version
|
|
||||||
current_migration = get_current_migration(cursor)
|
|
||||||
print(f'Current schema version: {current_migration}')
|
|
||||||
|
|
||||||
# Run any migrations newer than current schema
|
|
||||||
for migration in migrations:
|
|
||||||
if migration[0] <= current_migration:
|
|
||||||
print(f'Migration already up: {migration[1]}')
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
perform_migration(cursor, migration)
|
|
||||||
conn.commit()
|
|
||||||
except Exception as ex:
|
|
||||||
print(f'An error occurred while applying {migration[1]}: {ex}, aborting...')
|
|
||||||
print(traceback.format_exc())
|
|
||||||
conn.rollback()
|
|
||||||
break
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
Loading…
Add table
Reference in a new issue