Compare commits
14 commits
master
...
misc-bugfi
Author | SHA1 | Date | |
---|---|---|---|
|
9fdcf21663 | ||
|
918ed75e9b | ||
|
937ed6b2b1 | ||
|
9c5eab51af | ||
|
99442b4e01 | ||
|
34df5fc6ee | ||
|
287246c6fb | ||
|
717ee05b38 | ||
|
bd5e56278f | ||
|
f101119b47 | ||
|
fc3d638bf4 | ||
|
f4499f7afb | ||
|
2468ffa1f3 | ||
|
bf5bc2212c |
40 changed files with 1250 additions and 1378 deletions
9
.gitignore
vendored
9
.gitignore
vendored
|
@ -147,7 +147,6 @@ venv.bak/
|
|||
/site
|
||||
|
||||
# mypy
|
||||
.mypy.ini
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
@ -182,8 +181,6 @@ cython_debug/
|
|||
.cursorindexingignore
|
||||
|
||||
# Custom stuff
|
||||
gacha_game*.db
|
||||
gacha_game*.db.*
|
||||
config*.ini
|
||||
|
||||
.idea
|
||||
*.db
|
||||
*.ini
|
||||
reference/
|
||||
|
|
1
.tool-versions
Normal file
1
.tool-versions
Normal file
|
@ -0,0 +1 @@
|
|||
nodejs 23.4.0
|
|
@ -1,64 +0,0 @@
|
|||
import requests
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
from client import client_connection
|
||||
from db_utils import insert_card
|
||||
from custom_types import Card
|
||||
from config import RARITY_TO_WEIGHT
|
||||
|
||||
|
||||
def add_card(
|
||||
name: str,
|
||||
rarity: int,
|
||||
image_url: str) -> tuple[int, str]:
|
||||
'''
|
||||
Adds a card to the database, uploading the image from a public URL to
|
||||
the bot's Misskey Drive.
|
||||
|
||||
Args:
|
||||
name (str): Card name.
|
||||
rarity (int): Card rarity (e.g., 1-5).
|
||||
image_url (str): Public URL of the image from the post (e.g., from
|
||||
note['files'][i]['url']).
|
||||
|
||||
Returns:
|
||||
tuple[int, str]: Card ID and bot's Drive file_id.
|
||||
|
||||
Raises:
|
||||
ValueError: If inputs are invalid.
|
||||
RuntimeError: If image download/upload or database operation fails.
|
||||
'''
|
||||
|
||||
stripped_name = name.strip()
|
||||
|
||||
# Validate inputs
|
||||
if not stripped_name:
|
||||
raise ValueError('Card name cannot be empty.')
|
||||
if rarity < 1:
|
||||
raise ValueError('Rarity must be a positive integer.')
|
||||
if rarity not in RARITY_TO_WEIGHT.keys():
|
||||
raise ValueError(f'Invalid rarity: {rarity}')
|
||||
if not image_url:
|
||||
raise ValueError('Image URL must be provided.')
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f'Failed to download image from {image_url}')
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
try:
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media['id']
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
|
||||
from e
|
||||
|
||||
# Insert into database
|
||||
card_id = insert_card(
|
||||
stripped_name,
|
||||
rarity,
|
||||
RARITY_TO_WEIGHT[rarity],
|
||||
file_id
|
||||
)
|
||||
return card_id, file_id
|
62
bot/add_character.py
Normal file
62
bot/add_character.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import requests
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from db_utils import get_db_connection
|
||||
|
||||
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
||||
"""
|
||||
Adds a character to the database, uploading the image from a public URL to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
name (str): Character name.
|
||||
rarity (int): Character rarity (e.g., 1-5).
|
||||
weight (float): Pull weight (e.g., 0.02).
|
||||
image_url (str): Public URL of the image from the post.
|
||||
|
||||
Returns:
|
||||
tuple[int, str]: Character ID and file_id.
|
||||
|
||||
Raises:
|
||||
ValueError: If inputs are invalid.
|
||||
RuntimeError: If image download/upload or database operation fails.
|
||||
"""
|
||||
try:
|
||||
# Validate inputs
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Character name cannot be empty.")
|
||||
if not isinstance(rarity, int) or rarity < 1:
|
||||
raise ValueError("Rarity must be a positive integer.")
|
||||
if not isinstance(weight, (int, float)) or weight <= 0:
|
||||
raise ValueError("Weight must be a positive number.")
|
||||
if not image_url:
|
||||
raise ValueError("Image URL must be provided.")
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
|
||||
# Upload to Fediverse instance
|
||||
fediverse_service = get_fediverse_service()
|
||||
try:
|
||||
uploaded_file = fediverse_service.upload_file(response.raw)
|
||||
file_id = uploaded_file.id
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f"Failed to upload image: {e}") from e
|
||||
|
||||
# Insert into database
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
||||
(name.strip(), rarity, float(weight), file_id)
|
||||
)
|
||||
conn.commit()
|
||||
character_id = cur.lastrowid
|
||||
|
||||
return character_id, file_id
|
||||
|
||||
except Exception as e:
|
||||
raise
|
||||
finally:
|
||||
if 'conn' in locals():
|
||||
conn.close()
|
|
@ -1,21 +1,76 @@
|
|||
import time
|
||||
import misskey as misskey
|
||||
from client import client_connection
|
||||
import db_utils as db
|
||||
import traceback
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_or_create_user, add_pull, get_config, set_config
|
||||
from fediverse_factory import get_fediverse_service
|
||||
import config
|
||||
|
||||
from config import NOTIFICATION_POLL_INTERVAL
|
||||
from notification import process_notifications
|
||||
# Initialize the Fediverse service
|
||||
fediverse_service = get_fediverse_service()
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
# Connect to DB
|
||||
db.connect()
|
||||
# Get trusted instances from config
|
||||
whitelisted_instances = config.TRUSTED_INSTANCES
|
||||
|
||||
# Setup default administrators
|
||||
db.setup_administrators()
|
||||
def stream_notifications():
|
||||
print("Starting filtered notification stream...")
|
||||
|
||||
last_seen_id = get_config("last_seen_notif_id")
|
||||
|
||||
print('Listening for notifications...')
|
||||
while True:
|
||||
if not process_notifications(client):
|
||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
||||
try:
|
||||
# Get notifications from the fediverse service
|
||||
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
|
||||
|
||||
if notifications:
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.id
|
||||
|
||||
# Double-check filtering: Even though we pass since_id to the API,
|
||||
# we manually filter again for reliability because:
|
||||
# 1. API might ignore since_id if it's too old (server downtime)
|
||||
# 2. Pagination limits might cause missed notifications
|
||||
# 3. Race conditions between fetch and save operations
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
username = notification.user.username
|
||||
host = notification.user.host
|
||||
|
||||
instance = host if host else "local"
|
||||
|
||||
if instance in whitelisted_instances or instance == "local":
|
||||
note = notification.post.text if notification.post else ""
|
||||
notif_type = notification.type.value
|
||||
|
||||
print(f"📨 [{notif_type}] from @{username}@{instance}")
|
||||
print(f"💬 {note}")
|
||||
print("-" * 30)
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parse_notification(notification, fediverse_service)
|
||||
|
||||
else:
|
||||
print(f"⚠️ Blocked notification from untrusted instance: {host}")
|
||||
|
||||
# Update only if this notif_id is greater
|
||||
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||
new_last_seen_id = notif_id
|
||||
|
||||
# Save the latest seen ID
|
||||
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||
set_config("last_seen_notif_id", new_last_seen_id)
|
||||
last_seen_id = new_last_seen_id
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
stream_notifications()
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import misskey
|
||||
import config
|
||||
|
||||
|
||||
def client_connection() -> misskey.Misskey:
|
||||
def client_connection():
|
||||
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
||||
|
|
124
bot/config.py
124
bot/config.py
|
@ -1,60 +1,90 @@
|
|||
'''Essentials for the bot to function'''
|
||||
import configparser
|
||||
import json
|
||||
from os import environ, path
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
def get_config_file_path():
|
||||
"""Get config file path from command line arguments or default"""
|
||||
parser = argparse.ArgumentParser(add_help=False) # Don't interfere with other argument parsing
|
||||
parser.add_argument('--config', default='config.ini', help='Path to config file (default: config.ini)')
|
||||
|
||||
# Parse only known args to avoid conflicts with other argument parsing
|
||||
args, _ = parser.parse_known_args()
|
||||
return args.config
|
||||
|
||||
class ConfigError(Exception):
|
||||
'''Could not find config file'''
|
||||
def load_config():
|
||||
"""Load and validate config file with proper error handling"""
|
||||
config_file = get_config_file_path()
|
||||
config = configparser.ConfigParser()
|
||||
|
||||
# Check if config file exists
|
||||
if not os.path.exists(config_file):
|
||||
print(f"Error: {config_file} not found!")
|
||||
if config_file == 'config.ini':
|
||||
print("Please copy example_config.ini to config.ini and configure it.")
|
||||
print(f"Or use --config /path/to/your/config.ini to specify a different location.")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
config.read(config_file)
|
||||
except configparser.Error as e:
|
||||
print(f"Error reading {config_file}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Check if [application] section exists
|
||||
if 'application' not in config:
|
||||
print(f"Error: [application] section missing in {config_file}")
|
||||
sys.exit(1)
|
||||
|
||||
# Validate required fields
|
||||
required_fields = ['BotUser', 'ApiKey', 'InstanceUrl', 'InstanceType', 'DatabaseLocation']
|
||||
missing_fields = []
|
||||
|
||||
for field in required_fields:
|
||||
if field not in config['application'] or not config['application'][field].strip():
|
||||
missing_fields.append(field)
|
||||
|
||||
if missing_fields:
|
||||
print(f"Error: Required fields missing in {config_file}: {', '.join(missing_fields)}")
|
||||
sys.exit(1)
|
||||
|
||||
# Validate InstanceType
|
||||
instance_type = config['application']['InstanceType'].lower()
|
||||
if instance_type not in ('misskey', 'pleroma'):
|
||||
print("Error: InstanceType must be either 'misskey' or 'pleroma'")
|
||||
sys.exit(1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_config_file() -> str:
|
||||
'''Gets the path to the config file in the current environment'''
|
||||
env: str | None = environ.get('KEMOVERSE_ENV')
|
||||
if not env:
|
||||
raise ConfigError('Error: KEMOVERSE_ENV is unset')
|
||||
if not (env in ['prod', 'dev']):
|
||||
raise ConfigError(f'Error: Invalid environment: {env}')
|
||||
|
||||
config_path: str = f'config_{env}.ini'
|
||||
|
||||
if not path.isfile(config_path):
|
||||
raise ConfigError(f'Could not find {config_path}')
|
||||
return config_path
|
||||
|
||||
|
||||
def get_rarity_to_weight(
|
||||
config_section: configparser.SectionProxy) -> dict[int, float]:
|
||||
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
|
||||
rarity_weights = {}
|
||||
for key, value in config_section.items():
|
||||
if key.startswith("rarity_"):
|
||||
rarity = int(key.removeprefix("rarity_"))
|
||||
rarity_weights[rarity] = float(value)
|
||||
return rarity_weights
|
||||
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(get_config_file())
|
||||
# Load and validate configuration
|
||||
config = load_config()
|
||||
|
||||
# Username for the bot
|
||||
USER = config['credentials']['User'].lower()
|
||||
# API key for the bot
|
||||
KEY = config['credentials']['Token']
|
||||
# Bot's Misskey instance URL
|
||||
INSTANCE = config['credentials']['Instance'].lower()
|
||||
USER = config['application']['BotUser']
|
||||
|
||||
# API key for the bot
|
||||
KEY = config['application']['ApiKey']
|
||||
|
||||
# Bot's instance URL
|
||||
INSTANCE = config['application']['InstanceUrl']
|
||||
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot
|
||||
ADMINS = json.loads(config['application']['DefaultAdmins'])
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
# Whether to enable the instance whitelist
|
||||
USE_WHITELIST = config['application']['UseWhitelist']
|
||||
|
||||
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
|
||||
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
|
||||
# Instance type
|
||||
INSTANCE_TYPE = config['application']['InstanceType'].lower()
|
||||
|
||||
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])
|
||||
# Web server port
|
||||
WEB_PORT = config['application'].getint('WebPort', 5000)
|
||||
|
||||
RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha'])
|
||||
# Trusted instances
|
||||
trusted_instances_str = config['application'].get('TrustedInstances', '')
|
||||
TRUSTED_INSTANCES = [instance.strip() for instance in trusted_instances_str.split(',') if instance.strip()]
|
||||
|
||||
# Extra stuff for control of the bot
|
||||
|
||||
# TODO: move this to db
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot'''
|
||||
ADMINS = config['application'].get('DefaultAdmins', '[]')
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
from typing import TypedDict, List, Dict, Any
|
||||
|
||||
BotResponse = TypedDict('BotResponse', {
|
||||
'message': str,
|
||||
'attachment_urls': List[str] | None
|
||||
})
|
||||
|
||||
Card = TypedDict('Card', {
|
||||
'id': int,
|
||||
'name': str,
|
||||
'rarity': int,
|
||||
'weight': float,
|
||||
'image_url': str
|
||||
})
|
||||
|
||||
ParsedNotification = TypedDict('ParsedNotification', {
|
||||
'author': str,
|
||||
'command': str | None,
|
||||
'arguments': List[str],
|
||||
'note_obj': Dict[str, Any]
|
||||
})
|
248
bot/db_utils.py
248
bot/db_utils.py
|
@ -1,222 +1,58 @@
|
|||
from random import choices
|
||||
import sqlite3
|
||||
import random
|
||||
import config
|
||||
from custom_types import Card
|
||||
|
||||
DB_PATH = config.DB_PATH
|
||||
CONNECTION: sqlite3.Connection
|
||||
CURSOR: sqlite3.Cursor
|
||||
|
||||
|
||||
def connect() -> None:
|
||||
def get_db_connection():
|
||||
'''Creates a connection to the database'''
|
||||
print('Connecting to the database...')
|
||||
global CONNECTION
|
||||
global CURSOR
|
||||
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
|
||||
CONNECTION.row_factory = sqlite3.Row
|
||||
CURSOR = CONNECTION.cursor()
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def get_or_create_user(username):
|
||||
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
||||
created.'''
|
||||
conn = get_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||
user = cur.fetchone()
|
||||
if user:
|
||||
conn.close()
|
||||
return user[0]
|
||||
|
||||
def setup_administrators() -> None:
|
||||
'''Creates administrator players for each handle in the config file'''
|
||||
# Get default admins from config
|
||||
for username in config.ADMINS:
|
||||
player_id = get_player(username)
|
||||
if player_id == 0:
|
||||
# Create player if not exists
|
||||
print(f'Creating administrator player: {username}')
|
||||
CURSOR.execute(
|
||||
'INSERT INTO players (username, has_rolled, is_administrator) \
|
||||
VALUES (?, ?, ?)',
|
||||
(username, False, True)
|
||||
)
|
||||
else:
|
||||
# Update is_administrator if exists
|
||||
print(f'Granting administrator to player: {username}')
|
||||
CURSOR.execute(
|
||||
'UPDATE players SET is_administrator = 1 WHERE id = ?',
|
||||
(player_id,)
|
||||
)
|
||||
|
||||
|
||||
def get_random_card() -> Card | None:
|
||||
''' Gets a random card from the database'''
|
||||
CURSOR.execute('SELECT * FROM cards')
|
||||
cards = CURSOR.fetchall()
|
||||
|
||||
if not cards:
|
||||
return None
|
||||
|
||||
weights = [config.RARITY_TO_WEIGHT[c['rarity']] for c in cards]
|
||||
chosen = choices(cards, weights=weights, k=1)[0]
|
||||
|
||||
return {
|
||||
'id': chosen['id'],
|
||||
'name': chosen['name'],
|
||||
'rarity': chosen['rarity'],
|
||||
'weight': config.RARITY_TO_WEIGHT[chosen['rarity']],
|
||||
'image_url': chosen['file_id']
|
||||
}
|
||||
|
||||
|
||||
def get_player(username: str) -> int:
|
||||
'''Retrieve a player ID by username, or return None if not found.'''
|
||||
CURSOR.execute(
|
||||
'SELECT id FROM players WHERE username = ?',
|
||||
(username,)
|
||||
)
|
||||
player = CURSOR.fetchone()
|
||||
if player:
|
||||
return int(player[0])
|
||||
return 0
|
||||
|
||||
|
||||
def insert_player(username: str) -> int:
|
||||
'''Insert a new player with default has_rolled = False and return their
|
||||
player ID.'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO players (username, has_rolled) VALUES (?, ?)',
|
||||
# New user starts with has_rolled = False
|
||||
cur.execute(
|
||||
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
||||
(username, False)
|
||||
)
|
||||
return CURSOR.lastrowid if CURSOR.lastrowid else 0
|
||||
conn.commit()
|
||||
user_id = cur.lastrowid
|
||||
conn.close()
|
||||
return user_id
|
||||
|
||||
|
||||
def delete_player(username: str) -> bool:
|
||||
'''Permanently deletes a player and all their pulls.'''
|
||||
CURSOR.execute(
|
||||
'SELECT id FROM players WHERE username = ?',
|
||||
(username,)
|
||||
)
|
||||
player = CURSOR.fetchone()
|
||||
|
||||
if not player:
|
||||
return False
|
||||
|
||||
player_id = player[0]
|
||||
|
||||
# Delete pulls
|
||||
CURSOR.execute(
|
||||
'DELETE FROM pulls WHERE player_id = ?',
|
||||
(player_id,)
|
||||
)
|
||||
|
||||
# Delete player
|
||||
CURSOR.execute(
|
||||
'DELETE FROM players WHERE id = ?',
|
||||
(player_id,)
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def ban_player(username: str) -> bool:
|
||||
'''Adds a player to the ban list.'''
|
||||
try:
|
||||
CURSOR.execute(
|
||||
'INSERT INTO banned_players (handle) VALUES (?)',
|
||||
(username,)
|
||||
)
|
||||
return True
|
||||
except sqlite3.IntegrityError:
|
||||
return False
|
||||
|
||||
|
||||
def unban_player(username: str) -> bool:
|
||||
'''Removes a player from the ban list.'''
|
||||
CURSOR.execute(
|
||||
'DELETE FROM banned_players WHERE handle = ?',
|
||||
(username,)
|
||||
)
|
||||
return CURSOR.rowcount > 0
|
||||
|
||||
|
||||
def is_player_banned(username: str) -> bool:
|
||||
CURSOR.execute(
|
||||
'SELECT * FROM banned_players WHERE handle = ?',
|
||||
(username,)
|
||||
)
|
||||
row = CURSOR.fetchone()
|
||||
return row is not None
|
||||
|
||||
|
||||
def is_player_administrator(username: str) -> bool:
|
||||
CURSOR.execute(
|
||||
'SELECT is_administrator FROM players WHERE username = ? LIMIT 1',
|
||||
(username,)
|
||||
)
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else False
|
||||
|
||||
|
||||
def insert_card(
|
||||
name: str, rarity: int, weight: float, file_id: str) -> int:
|
||||
'''Inserts a card'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
|
||||
(?, ?, ?, ?)',
|
||||
(name, rarity, weight, file_id)
|
||||
)
|
||||
card_id = CURSOR.lastrowid
|
||||
return card_id if card_id else 0
|
||||
|
||||
|
||||
def insert_pull(player_id: int, card_id: int) -> None:
|
||||
def add_pull(user_id, character_id):
|
||||
'''Creates a pull in the database'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO pulls (player_id, card_id) VALUES (?, ?)',
|
||||
(player_id, card_id)
|
||||
)
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_last_rolled_at(player_id: int) -> int:
|
||||
'''Gets the timestamp when the player last rolled'''
|
||||
CURSOR.execute(
|
||||
"SELECT timestamp FROM pulls WHERE player_id = ? ORDER BY timestamp \
|
||||
DESC",
|
||||
(player_id,))
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
|
||||
def add_to_whitelist(instance: str) -> bool:
|
||||
'''Adds an instance to the whitelist, returns false if instance was already
|
||||
present'''
|
||||
try:
|
||||
CURSOR.execute(
|
||||
'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)
|
||||
)
|
||||
return True
|
||||
except sqlite3.IntegrityError:
|
||||
return False
|
||||
|
||||
|
||||
def remove_from_whitelist(instance: str) -> bool:
|
||||
'''Removes an instance to the whitelist, returns false if instance was not
|
||||
present'''
|
||||
CURSOR.execute(
|
||||
'DELETE FROM instance_whitelist WHERE tld = ?', (instance,))
|
||||
return CURSOR.rowcount > 0
|
||||
|
||||
|
||||
def is_whitelisted(instance: str) -> bool:
|
||||
'''Checks whether an instance is in the whitelist'''
|
||||
if instance == 'local':
|
||||
return True
|
||||
CURSOR.execute(
|
||||
'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,))
|
||||
row = CURSOR.fetchone()
|
||||
return row is not None
|
||||
|
||||
|
||||
def get_config(key: str) -> str:
|
||||
def get_config(key):
|
||||
'''Reads the value for a specified config key from the db'''
|
||||
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else ''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
def set_config(key: str, value: str) -> None:
|
||||
def set_config(key, value):
|
||||
'''Writes the value for a specified config key to the db'''
|
||||
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
|
||||
(key, value))
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
|
31
bot/fediverse_factory.py
Normal file
31
bot/fediverse_factory.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
from fediverse_service import FediverseService
|
||||
from misskey_service import MisskeyService
|
||||
from pleroma_service import PleromaService
|
||||
import config
|
||||
|
||||
|
||||
class FediverseServiceFactory:
|
||||
"""Factory for creating FediverseService implementations based on configuration"""
|
||||
|
||||
@staticmethod
|
||||
def create_service() -> FediverseService:
|
||||
"""
|
||||
Create a FediverseService implementation based on the configured instance type.
|
||||
|
||||
Returns:
|
||||
FediverseService implementation (MisskeyService or PleromaService)
|
||||
|
||||
Raises:
|
||||
ValueError: If the instance type is not supported
|
||||
"""
|
||||
if config.INSTANCE_TYPE == "misskey":
|
||||
return MisskeyService()
|
||||
elif config.INSTANCE_TYPE == "pleroma":
|
||||
return PleromaService()
|
||||
else:
|
||||
raise ValueError(f"Unsupported instance type: {config.INSTANCE_TYPE}")
|
||||
|
||||
|
||||
def get_fediverse_service() -> FediverseService:
|
||||
"""Convenience function to get a FediverseService instance"""
|
||||
return FediverseServiceFactory.create_service()
|
74
bot/fediverse_service.py
Normal file
74
bot/fediverse_service.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Union, BinaryIO
|
||||
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
|
||||
|
||||
|
||||
class FediverseService(ABC):
|
||||
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
|
||||
|
||||
@abstractmethod
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""
|
||||
Retrieve notifications from the Fediverse instance.
|
||||
|
||||
Args:
|
||||
since_id: Optional ID to get notifications newer than this ID
|
||||
|
||||
Returns:
|
||||
List of FediverseNotification objects
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""
|
||||
Create a new post on the Fediverse instance.
|
||||
|
||||
Args:
|
||||
text: The text content of the post
|
||||
reply_to_id: Optional ID of post to reply to
|
||||
visibility: Visibility level for the post
|
||||
file_ids: Optional list of file IDs to attach
|
||||
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
|
||||
|
||||
Returns:
|
||||
ID of the created post
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""
|
||||
Retrieve a specific post by its ID.
|
||||
|
||||
Args:
|
||||
post_id: The ID of the post to retrieve
|
||||
|
||||
Returns:
|
||||
FediversePost object if found, None otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""
|
||||
Upload a file to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
file_data: File data as binary stream or bytes
|
||||
filename: Optional filename for the uploaded file
|
||||
|
||||
Returns:
|
||||
FediverseFile object with ID, URL, and other metadata
|
||||
|
||||
Raises:
|
||||
RuntimeError: If file upload fails
|
||||
"""
|
||||
pass
|
73
bot/fediverse_types.py
Normal file
73
bot/fediverse_types.py
Normal file
|
@ -0,0 +1,73 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class NotificationType(Enum):
|
||||
MENTION = "mention"
|
||||
REPLY = "reply"
|
||||
FOLLOW = "follow"
|
||||
FAVOURITE = "favourite"
|
||||
REBLOG = "reblog"
|
||||
POLL = "poll"
|
||||
OTHER = "other"
|
||||
|
||||
|
||||
class Visibility(Enum):
|
||||
PUBLIC = "public"
|
||||
UNLISTED = "unlisted"
|
||||
HOME = "home"
|
||||
FOLLOWERS = "followers"
|
||||
SPECIFIED = "specified"
|
||||
DIRECT = "direct"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseUser:
|
||||
"""Common user representation across Fediverse platforms"""
|
||||
id: str
|
||||
username: str
|
||||
host: Optional[str] = None # None for local users
|
||||
display_name: Optional[str] = None
|
||||
|
||||
@property
|
||||
def full_handle(self) -> str:
|
||||
"""Returns the full fediverse handle (@user@domain or @user for local)"""
|
||||
if self.host:
|
||||
return f"@{self.username}@{self.host}"
|
||||
return f"@{self.username}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseFile:
|
||||
"""Common file/attachment representation"""
|
||||
id: str
|
||||
url: str
|
||||
type: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediversePost:
|
||||
"""Common post representation across Fediverse platforms"""
|
||||
id: str
|
||||
text: Optional[str]
|
||||
user: FediverseUser
|
||||
visibility: Visibility
|
||||
created_at: Optional[str] = None
|
||||
files: List[FediverseFile] = None
|
||||
reply_to_id: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.files is None:
|
||||
self.files = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseNotification:
|
||||
"""Common notification representation across Fediverse platforms"""
|
||||
id: str
|
||||
type: NotificationType
|
||||
user: FediverseUser
|
||||
post: Optional[FediversePost] = None
|
||||
created_at: Optional[str] = None
|
70
bot/gacha_response.py
Normal file
70
bot/gacha_response.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
import random
|
||||
from db_utils import get_or_create_user, add_pull, get_db_connection
|
||||
from add_character import add_character
|
||||
from fediverse_types import FediversePost
|
||||
|
||||
def get_character():
|
||||
''' Gets a random character from the database'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute('SELECT * FROM characters')
|
||||
characters = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
if not characters:
|
||||
return None, None, None, None
|
||||
|
||||
weights = [c['weight'] for c in characters]
|
||||
chosen = random.choices(characters, weights=weights, k=1)[0]
|
||||
|
||||
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
||||
|
||||
def is_float(val):
|
||||
try:
|
||||
float(val)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
# TODO: See issue #3, separate command parsing from game logic.
|
||||
def gacha_response(command: str, full_user: str, arguments: list, post: FediversePost):
|
||||
'''Parses a given command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
if command == "roll":
|
||||
user_id = get_or_create_user(full_user)
|
||||
character_id, character_name, file_id, rarity = get_character()
|
||||
|
||||
if not character_id:
|
||||
#TODO: Can't have tuples of a single element
|
||||
# Return these as a dict or object instead.
|
||||
return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿")
|
||||
|
||||
add_pull(user_id,character_id)
|
||||
stars = '⭐️' * rarity
|
||||
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]])
|
||||
|
||||
if command == "create":
|
||||
# Example call from bot logic
|
||||
image_url = post.files[0].url if post.files else None
|
||||
if not image_url:
|
||||
return "You need an image to create a character, dumbass."
|
||||
|
||||
if len(arguments) != 3:
|
||||
return "Please specify the following attributes in order: name, rarity, drop weighting"
|
||||
|
||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||
return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5"
|
||||
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0"
|
||||
|
||||
character_id, file_id = add_character(
|
||||
name=arguments[0],
|
||||
rarity=int(arguments[1]),
|
||||
weight=float(arguments[2]),
|
||||
image_url=image_url
|
||||
)
|
||||
return([f"Added {arguments[0]}, ID {character_id}.",[file_id]])
|
||||
return None
|
156
bot/misskey_service.py
Normal file
156
bot/misskey_service.py
Normal file
|
@ -0,0 +1,156 @@
|
|||
import misskey
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class MisskeyService(FediverseService):
|
||||
"""Misskey implementation of FediverseService"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
||||
|
||||
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Misskey user data to FediverseUser"""
|
||||
return FediverseUser(
|
||||
id=user_data.get("id", ""),
|
||||
username=user_data.get("username", "unknown"),
|
||||
host=user_data.get("host"),
|
||||
display_name=user_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Misskey file data to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=file_data.get("id", ""),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Misskey visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"home": Visibility.HOME,
|
||||
"followers": Visibility.FOLLOWERS,
|
||||
"specified": Visibility.SPECIFIED
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.HOME)
|
||||
|
||||
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Misskey visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "home",
|
||||
Visibility.FOLLOWERS: "followers",
|
||||
Visibility.SPECIFIED: "specified",
|
||||
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
|
||||
}
|
||||
return visibility_map.get(visibility, "home")
|
||||
|
||||
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Misskey notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"reply": NotificationType.REPLY,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Misskey note data to FediversePost"""
|
||||
files = []
|
||||
if note_data.get("files"):
|
||||
files = [self._convert_misskey_file(f) for f in note_data["files"]]
|
||||
|
||||
return FediversePost(
|
||||
id=str(note_data.get("id", "")), # Misskey IDs (aid/meid/ulid/objectid) are lexicographically sortable as strings
|
||||
text=note_data.get("text"),
|
||||
user=self._convert_misskey_user(note_data.get("user", {})),
|
||||
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
|
||||
created_at=note_data.get("createdAt"),
|
||||
files=files,
|
||||
reply_to_id=str(note_data.get("replyId", "")) if note_data.get("replyId") else None
|
||||
)
|
||||
|
||||
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Misskey notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("note"):
|
||||
post = self._convert_misskey_post(notification_data["note"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=str(notification_data.get("id", "")), # Misskey IDs are lexicographically sortable as strings
|
||||
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_misskey_user(notification_data.get("user", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("createdAt")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Misskey instance"""
|
||||
params = {
|
||||
'include_types': ['mention', 'reply'],
|
||||
'limit': 50
|
||||
}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.i_notifications(**params)
|
||||
return [self._convert_misskey_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Misskey instance"""
|
||||
params = {
|
||||
"text": text,
|
||||
"visibility": self._convert_to_misskey_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["reply_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["file_ids"] = file_ids
|
||||
|
||||
if visible_user_ids and visibility == Visibility.SPECIFIED:
|
||||
params["visible_user_ids"] = visible_user_ids
|
||||
|
||||
response = self.client.notes_create(**params)
|
||||
return response.get("createdNote", {}).get("id", "")
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Misskey instance"""
|
||||
try:
|
||||
note = self.client.notes_show(noteId=post_id)
|
||||
return self._convert_misskey_post(note)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Misskey Drive"""
|
||||
try:
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
media = self.client.drive_files_create(file_data)
|
||||
return self._convert_misskey_file(media)
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Unexpected error during file upload: {e}") from e
|
|
@ -1,127 +0,0 @@
|
|||
import traceback
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
|
||||
from response import generate_response
|
||||
from custom_types import BotResponse
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
|
||||
|
||||
def process_notification(
|
||||
client: misskey.Misskey,
|
||||
notification: Dict[str, Any]) -> None:
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
instance = host if host else 'local'
|
||||
|
||||
if USE_WHITELIST and not is_whitelisted(instance):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
return
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_notification = parse_notification(notification, client)
|
||||
|
||||
if not parsed_notification:
|
||||
return
|
||||
|
||||
author = parsed_notification['author']
|
||||
if is_player_banned(author):
|
||||
print(f'⚠️ Blocked notification from banned player: {author}')
|
||||
return
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
response: BotResponse | None = generate_response(parsed_notification)
|
||||
|
||||
if not response:
|
||||
return
|
||||
|
||||
client.notes_create(
|
||||
text=response['message'],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response['attachment_urls']
|
||||
# TODO: write actual visible users ids so pleromers can use the bot
|
||||
# privately
|
||||
# visible_user_ids=[]
|
||||
)
|
||||
|
||||
|
||||
def process_notifications(client: misskey.Misskey) -> bool:
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id', '')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a \
|
||||
notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
|
@ -1,31 +1,39 @@
|
|||
import re
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
|
||||
import random, re
|
||||
import config
|
||||
from custom_types import ParsedNotification
|
||||
from gacha_response import gacha_response
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from fediverse_types import FediverseNotification, NotificationType, Visibility
|
||||
|
||||
def parse_notification(notification: FediverseNotification, fediverse_service=None):
|
||||
'''Parses any notifications received by the bot and sends any commands to
|
||||
gacha_response()'''
|
||||
|
||||
if fediverse_service is None:
|
||||
fediverse_service = get_fediverse_service()
|
||||
|
||||
def parse_notification(
|
||||
notification: Dict[str, Any],
|
||||
client: misskey.Misskey) -> ParsedNotification | None:
|
||||
'''Parses any notifications received by the bot'''
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# Return early if no post attached
|
||||
if not notification.post:
|
||||
return
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
if notification.post.visibility != Visibility.SPECIFIED:
|
||||
visibility = Visibility.HOME
|
||||
else:
|
||||
visibility = Visibility.SPECIFIED
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host")
|
||||
# Local users may not have a hostname attached
|
||||
full_user = f"@{username}" if not host else f"@{username}@{host}"
|
||||
full_user = notification.user.full_handle
|
||||
|
||||
note_obj = notification.get("note", {})
|
||||
note_text = note_obj.get("text")
|
||||
note_id = note_obj.get("id")
|
||||
note_text = notification.post.text
|
||||
note_id = notification.post.id
|
||||
|
||||
note = note_text.strip().lower() if note_text else ""
|
||||
# Split words into tokens
|
||||
parts = note.split()
|
||||
|
||||
# Check for both short and fully-qualified name mentions
|
||||
username_variants = [
|
||||
|
@ -33,20 +41,34 @@ def parse_notification(
|
|||
f'@{config.USER.split("@")[1]}'
|
||||
]
|
||||
|
||||
# Notifs must consist of the initial mention and at least one other token
|
||||
if len(parts) <= 1:
|
||||
return None
|
||||
# Make sure the notification text explicitly mentions the bot
|
||||
if not any(variant in note for variant in username_variants):
|
||||
return
|
||||
|
||||
# Make sure the first token is a mention to the bot
|
||||
if not parts[0] in username_variants:
|
||||
return None
|
||||
# Find command and arguments after the mention
|
||||
# Removes all mentions (regex = mentions that start with @ and may contain @domain)
|
||||
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
|
||||
parts = cleaned_text.split()
|
||||
|
||||
command = parts[1].lower()
|
||||
arguments = parts[2:] if len(parts) > 2 else []
|
||||
command = parts[0].lower() if parts else None
|
||||
arguments = parts[1:] if len(parts) > 1 else []
|
||||
|
||||
return {
|
||||
'author': full_user,
|
||||
'command': command,
|
||||
'arguments': arguments,
|
||||
'note_obj': note_obj
|
||||
}
|
||||
# TODO: move response generation to a different function
|
||||
response = gacha_response(command.lower(), full_user, arguments, notification.post)
|
||||
if not response:
|
||||
return
|
||||
|
||||
if isinstance(response, str):
|
||||
fediverse_service.create_post(
|
||||
text=response,
|
||||
reply_to_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
else:
|
||||
fediverse_service.create_post(
|
||||
text=response[0],
|
||||
reply_to_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
|
188
bot/pleroma_service.py
Normal file
188
bot/pleroma_service.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
from mastodon import Mastodon
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
import io
|
||||
import filetype
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class PleromaService(FediverseService):
|
||||
"""Pleroma implementation of FediverseService using Mastodon.py"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = Mastodon(
|
||||
access_token=config.KEY,
|
||||
api_base_url=config.INSTANCE
|
||||
)
|
||||
|
||||
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Mastodon/Pleroma user data to FediverseUser"""
|
||||
acct = user_data.get("acct", "")
|
||||
if "@" in acct:
|
||||
username, host = acct.split("@", 1)
|
||||
else:
|
||||
username = acct
|
||||
host = None
|
||||
|
||||
return FediverseUser(
|
||||
id=str(user_data.get("id", "")),
|
||||
username=username,
|
||||
host=host,
|
||||
display_name=user_data.get("display_name")
|
||||
)
|
||||
|
||||
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=str(file_data.get("id", "")),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("description")
|
||||
)
|
||||
|
||||
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Mastodon/Pleroma visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"private": Visibility.FOLLOWERS,
|
||||
"direct": Visibility.DIRECT
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.PUBLIC)
|
||||
|
||||
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
|
||||
Visibility.FOLLOWERS: "private",
|
||||
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
|
||||
Visibility.DIRECT: "direct"
|
||||
}
|
||||
return visibility_map.get(visibility, "public")
|
||||
|
||||
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Mastodon/Pleroma notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Mastodon/Pleroma status data to FediversePost"""
|
||||
files = []
|
||||
if status_data.get("media_attachments"):
|
||||
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
|
||||
|
||||
# Extract plain text from HTML content
|
||||
content = status_data.get("content", "")
|
||||
# Basic HTML stripping - in production you might want to use a proper HTML parser
|
||||
import re
|
||||
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
|
||||
|
||||
return FediversePost(
|
||||
id=str(status_data.get("id", "")), # Pleroma IDs (snowflake format) are lexicographically sortable as strings
|
||||
text=plain_text,
|
||||
user=self._convert_mastodon_user(status_data.get("account", {})),
|
||||
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
|
||||
created_at=status_data.get("created_at"),
|
||||
files=files,
|
||||
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
|
||||
)
|
||||
|
||||
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("status"):
|
||||
post = self._convert_mastodon_status(notification_data["status"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=str(notification_data.get("id", "")), # Pleroma IDs are lexicographically sortable as strings
|
||||
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_mastodon_user(notification_data.get("account", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("created_at")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Pleroma instance"""
|
||||
params = {}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.notifications(**params)
|
||||
return [self._convert_mastodon_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Pleroma instance"""
|
||||
params = {
|
||||
"status": text,
|
||||
"visibility": self._convert_to_mastodon_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["in_reply_to_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["media_ids"] = file_ids
|
||||
|
||||
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
|
||||
# For direct messages, you typically mention users in the status text
|
||||
|
||||
response = self.client.status_post(**params)
|
||||
return str(response.get("id", ""))
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Pleroma instance"""
|
||||
try:
|
||||
status = self.client.status(post_id)
|
||||
return self._convert_mastodon_status(status)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Pleroma instance"""
|
||||
try:
|
||||
# Convert file_data to bytes for MIME detection
|
||||
if hasattr(file_data, 'read'):
|
||||
# Check if we can seek back
|
||||
try:
|
||||
current_pos = file_data.tell()
|
||||
file_bytes = file_data.read()
|
||||
file_data.seek(current_pos)
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
except (io.UnsupportedOperation, OSError):
|
||||
# Non-seekable stream, already read all data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
else:
|
||||
file_bytes = file_data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
|
||||
# Use filetype library for robust MIME detection
|
||||
kind = filetype.guess(file_bytes)
|
||||
if kind is not None:
|
||||
mime_type = kind.mime
|
||||
else:
|
||||
# Fallback to image/jpeg if detection fails
|
||||
mime_type = 'image/jpeg'
|
||||
|
||||
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
||||
return self._convert_mastodon_file(media)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e
|
318
bot/response.py
318
bot/response.py
|
@ -1,318 +0,0 @@
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from typing import TypedDict, Any, List, Dict
|
||||
import db_utils as db
|
||||
from add_card import add_card
|
||||
from config import GACHA_ROLL_INTERVAL
|
||||
from custom_types import BotResponse, ParsedNotification
|
||||
|
||||
|
||||
def do_roll(author: str) -> BotResponse:
|
||||
'''Determines whether the user can roll, then pulls a random card'''
|
||||
user_id = db.get_player(author)
|
||||
if not user_id:
|
||||
return {
|
||||
'message': f'{author} 🛑 You haven’t signed up yet! Use the \
|
||||
`signup` command to start playing.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
# Get date of user's last roll
|
||||
date = db.get_last_rolled_at(user_id)
|
||||
|
||||
# No date means it's users first roll
|
||||
if date:
|
||||
# SQLite timestamps returned by the DB are always in UTC
|
||||
# Below timestamps are to be converted to UTC
|
||||
prev = datetime.strptime(str(date) + '+0000', '%Y-%m-%d %H:%M:%S%z')
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
time_since_last_roll = now - prev
|
||||
roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL)
|
||||
duration = roll_interval - time_since_last_roll
|
||||
|
||||
# User needs to wait before they can roll again
|
||||
if time_since_last_roll < roll_interval:
|
||||
remaining_duration = None
|
||||
if duration.seconds > 3600:
|
||||
remaining_duration = f'{-(duration.seconds // -3600)} hours'
|
||||
elif duration.seconds > 60:
|
||||
remaining_duration = f'{-(duration.seconds // -60)} minutes'
|
||||
else:
|
||||
remaining_duration = f'{duration.seconds} seconds'
|
||||
|
||||
return {
|
||||
'message': f'{author} ⏱️ Please wait another \
|
||||
{remaining_duration} before rolling again.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
card = db.get_random_card()
|
||||
|
||||
if not card:
|
||||
return {
|
||||
'message': f'{author} Uwaaa... something went wrong! No \
|
||||
cards found. 😿',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
db.insert_pull(user_id, card['id'])
|
||||
stars = '⭐️' * card['rarity']
|
||||
return {
|
||||
'message': f'{author} 🎲 Congrats! You rolled {stars} \
|
||||
**{card['name']}**\nShe\'s all yours now~ 💖✨',
|
||||
'attachment_urls': [card['image_url']]
|
||||
}
|
||||
|
||||
|
||||
def do_signup(author: str) -> BotResponse:
|
||||
'''Registers a new user if they haven’t signed up yet.'''
|
||||
user_id = db.get_player(author)
|
||||
|
||||
if user_id:
|
||||
return {
|
||||
'message': f'{author} 👀 You’re already signed up! Let the rolling \
|
||||
begin~ 🎲',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
new_user_id = db.insert_player(author)
|
||||
return {
|
||||
'message': f'{author} ✅ Signed up successfully! Your gacha \
|
||||
destiny begins now... ✨ Use the roll command to start!',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def is_float(val: Any) -> bool:
|
||||
'''Returns true if `val` can be converted to a float'''
|
||||
try:
|
||||
float(val)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def do_create(
|
||||
author: str,
|
||||
arguments: List[str],
|
||||
note_obj: Dict[str, Any]) -> BotResponse:
|
||||
'''Creates a card'''
|
||||
# Example call from bot logic
|
||||
image_url = note_obj.get('files', [{}])[0].get('url') \
|
||||
if note_obj.get('files') else None
|
||||
|
||||
if not image_url:
|
||||
return {
|
||||
'message': f'{author} You need an image to create a card, \
|
||||
dumbass.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if len(arguments) != 2:
|
||||
return {
|
||||
'message': f'{author} Please specify the following attributes \
|
||||
in order: name, rarity',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||
return {
|
||||
'message': f'{author} Invalid rarity: \'{arguments[1]}\' must \
|
||||
be a number between 1 and 5',
|
||||
'attachment_urls': None
|
||||
}
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return {
|
||||
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
|
||||
must be a decimal value between 0.0 and 1.0',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
card_id, file_id = add_card(
|
||||
name=arguments[0],
|
||||
rarity=int(arguments[1]),
|
||||
image_url=image_url
|
||||
)
|
||||
return {
|
||||
'message': f'{author} Added {arguments[0]}, ID {card_id}.',
|
||||
'attachment_urls': [file_id]
|
||||
}
|
||||
|
||||
|
||||
def do_help(author: str) -> BotResponse:
|
||||
'''Provides a list of commands that the bot can do.'''
|
||||
return {
|
||||
'message': f'{author} Here\'s what I can do:\n\
|
||||
- `roll` Pulls a random card.\n\
|
||||
- `create <name> <rarity>` Creates a card using a given image.\n\
|
||||
- `signup` Registers your account.\n\
|
||||
- `delete_account` Deletes your account.\n\
|
||||
- `help` Shows this message',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def delete_account(author: str) -> BotResponse:
|
||||
return {
|
||||
'message': f'{author} ⚠️ This will permanently delete your account \
|
||||
and all your cards.\n'
|
||||
'If you\'re sure, reply with `confirm_delete_account` to proceed.\n\n'
|
||||
'**There is no undo.** Your gacha luck will be lost to the void... 💀✨',
|
||||
'attachment_urls': None
|
||||
|
||||
}
|
||||
|
||||
|
||||
def confirm_delete(author: str) -> BotResponse:
|
||||
db.delete_player(author)
|
||||
|
||||
return {
|
||||
'message': f'{author} 🧼 Your account and all your cards have been \
|
||||
deleted. RIP your gacha history 🕊️✨',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def do_whitelist(author: str, args: list[str]) -> BotResponse:
|
||||
if len(args) == 0:
|
||||
return {
|
||||
'message': f'{author} Please specify an instance to whitelist',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if db.add_to_whitelist(args[0]):
|
||||
return {
|
||||
'message': f'{author} Whitelisted instance: {args[0]}',
|
||||
'attachment_urls': None
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'message': f'{author} Instance already whitelisted: {args[0]}',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def do_unwhitelist(author: str, args: list[str]) -> BotResponse:
|
||||
if len(args) == 0:
|
||||
return {
|
||||
'message': f'{author} Please specify an instance to remove from \
|
||||
the whitelist',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if db.remove_from_whitelist(args[0]):
|
||||
return {
|
||||
'message': f'{author} Unwhitelisted instance: {args[0]}',
|
||||
'attachment_urls': None
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'message': f'{author} Instance not whitelisted: {args[0]}',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def do_ban(author: str, args: list[str]) -> BotResponse:
|
||||
if len(args) == 0:
|
||||
return {
|
||||
'message': f'{author} Please specify a user to ban',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if db.is_player_administrator(args[0]):
|
||||
return {
|
||||
'message': f'{author} Cannot ban other administrators.',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if db.ban_player(args[0]):
|
||||
# Delete banned player's account
|
||||
db.delete_player(args[0])
|
||||
return {
|
||||
'message': f'{author} 🔨 **BONK!** Get banned, {args[0]}!',
|
||||
'attachment_urls': None
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'message': f'{author} Player is already banned: {args[0]}',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def do_unban(author: str, args: list[str]) -> BotResponse:
|
||||
if len(args) == 0:
|
||||
return {
|
||||
'message': f'{author} Please specify a user to unban',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
if db.unban_player(args[0]):
|
||||
return {
|
||||
'message': f'{author} Player unbanned: {args[0]}!',
|
||||
'attachment_urls': None
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'message': f'{author} Player was not banned: {args[0]}',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
|
||||
def generate_response(notification: ParsedNotification) -> BotResponse | None:
|
||||
'''Given a command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
# Temporary response variable
|
||||
res: BotResponse | None = None
|
||||
author = notification['author']
|
||||
player_id = db.get_player(author)
|
||||
command = notification['command']
|
||||
|
||||
# Unrestricted commands
|
||||
match command:
|
||||
case 'roll':
|
||||
res = do_roll(author)
|
||||
case 'signup':
|
||||
res = do_signup(author)
|
||||
case 'help':
|
||||
res = do_help(author)
|
||||
case _:
|
||||
pass
|
||||
|
||||
# Commands beyond this point require the user to have an account
|
||||
if not player_id:
|
||||
return res
|
||||
|
||||
# User commands
|
||||
match command:
|
||||
case 'create':
|
||||
res = do_create(
|
||||
author,
|
||||
notification['arguments'],
|
||||
notification['note_obj']
|
||||
)
|
||||
case 'delete_account':
|
||||
res = delete_account(author)
|
||||
case 'confirm_delete_account':
|
||||
res = confirm_delete(author)
|
||||
case _:
|
||||
pass
|
||||
|
||||
# Commands beyond this point require the user to be an administrator
|
||||
if not db.is_player_administrator(author):
|
||||
return res
|
||||
|
||||
# Admin commands
|
||||
match command:
|
||||
case 'whitelist':
|
||||
res = do_whitelist(author, notification['arguments'])
|
||||
case 'unwhitelist':
|
||||
res = do_unwhitelist(author, notification['arguments'])
|
||||
case 'ban':
|
||||
res = do_ban(author, notification['arguments'])
|
||||
case 'unban':
|
||||
res = do_unban(author, notification['arguments'])
|
||||
case _:
|
||||
pass
|
||||
|
||||
# Administrator commands go here
|
||||
return res
|
101
db.py
Normal file
101
db.py
Normal file
|
@ -0,0 +1,101 @@
|
|||
import sqlite3
|
||||
import configparser
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
def get_config_file_path():
|
||||
"""Get config file path from command line arguments or default"""
|
||||
parser = argparse.ArgumentParser(add_help=False)
|
||||
parser.add_argument('--config', default='config.ini', help='Path to config file (default: config.ini)')
|
||||
args, _ = parser.parse_known_args()
|
||||
return args.config
|
||||
|
||||
def get_db_path():
|
||||
"""Get database path from config with error handling"""
|
||||
config_file = get_config_file_path()
|
||||
|
||||
if not os.path.exists(config_file):
|
||||
print(f"Error: {config_file} not found!")
|
||||
if config_file == 'config.ini':
|
||||
print("Please copy example_config.ini to config.ini and configure it.")
|
||||
print(f"Or use --config /path/to/your/config.ini to specify a different location.")
|
||||
sys.exit(1)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
try:
|
||||
config.read(config_file)
|
||||
except configparser.Error as e:
|
||||
print(f"Error reading {config_file}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if 'application' not in config:
|
||||
print(f"Error: [application] section missing in {config_file}")
|
||||
sys.exit(1)
|
||||
|
||||
if 'DatabaseLocation' not in config['application']:
|
||||
print(f"Error: DatabaseLocation missing in {config_file}")
|
||||
sys.exit(1)
|
||||
|
||||
return config['application']['DatabaseLocation']
|
||||
|
||||
# Connect to SQLite database (or create it if it doesn't exist)
|
||||
db_path = get_db_path()
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create tables
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT UNIQUE NOT NULL,
|
||||
has_rolled BOOLEAN NOT NULL DEFAULT 0
|
||||
)
|
||||
''')
|
||||
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS characters (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
rarity INTEGER NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
file_id TEXT NOT NULL
|
||||
)
|
||||
''')
|
||||
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS pulls (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER,
|
||||
character_id INTEGER,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (user_id) REFERENCES users(id),
|
||||
FOREIGN KEY (character_id) REFERENCES characters(id)
|
||||
)
|
||||
''')
|
||||
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
""" # Insert example characters into the database if they don't already exist
|
||||
characters = [
|
||||
('Murakami-san', 1, 0.35),
|
||||
('Mastodon-kun', 2, 0.25),
|
||||
('Pleroma-tan', 3, 0.2),
|
||||
('Misskey-tan', 4, 0.15),
|
||||
('Syuilo-mama', 5, 0.05)
|
||||
]
|
||||
|
||||
|
||||
cursor.executemany('''
|
||||
INSERT OR IGNORE INTO characters (name, rarity, weight) VALUES (?, ?, ?)
|
||||
''', characters)
|
||||
"""
|
||||
|
||||
# Commit changes and close
|
||||
conn.commit()
|
||||
conn.close()
|
|
@ -1,24 +0,0 @@
|
|||
# 🎲 Kemoverse Documentation
|
||||
|
||||
Welcome to the developer documentation for **Kemoverse**, a gacha trading card game in the Fediverse!
|
||||
|
||||
Features collectible cards, rarity-based pulls, and integration with Misskey.
|
||||
|
||||
Name comes from Kemonomimi and Fediverse.
|
||||
|
||||
---
|
||||
|
||||
## 📁 Table of Contents
|
||||
|
||||
- [Installation](./install.md)
|
||||
- [Game Design](./design.md)
|
||||
- [Bot Architecture](./bot.md)
|
||||
- [Database Structure](./database.md)
|
||||
- [Card System](./cards.md)
|
||||
- [Web UI](./web.md)
|
||||
- [Theming and Assets](./theme.md)
|
||||
- [Contributing](./contributing.md)
|
||||
- [FAQ](./faq.md)
|
||||
|
||||
---
|
||||
|
|
@ -1,83 +0,0 @@
|
|||
|
||||
## 🧪 Installation
|
||||
|
||||
### Download and install dependencies
|
||||
|
||||
Clone the repo
|
||||
|
||||
```sh
|
||||
git clone https://git.waifuism.life/waifu/kemoverse.git
|
||||
cd kemoverse
|
||||
```
|
||||
|
||||
Setup a virtual environment (Optional, recommended)
|
||||
|
||||
```sh
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
```
|
||||
|
||||
Install project dependencies via pip
|
||||
|
||||
```sh
|
||||
python3 -m pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### Setup config file
|
||||
|
||||
A sample config file is included with the project as a template: `example_config.ini`
|
||||
|
||||
Create a copy of this file and replace its' values with your own. Consult the
|
||||
template for more information about individual config values and their meaning.
|
||||
|
||||
Config files are environment-specific. Use `config_dev.ini` for development and
|
||||
`config_prod.ini` for production. Switch between environments using the
|
||||
`KEMOVERSE_ENV` environment variable.
|
||||
|
||||
```sh
|
||||
cp example_config.ini config_dev.ini
|
||||
# Edit config_dev.ini
|
||||
```
|
||||
|
||||
### Setup database
|
||||
|
||||
To set up the database, run:
|
||||
|
||||
```sh
|
||||
KEMOVERSE_ENV=dev python3 setup_db.py
|
||||
```
|
||||
|
||||
### Run the bot
|
||||
|
||||
```sh
|
||||
KEMOVERSE_ENV=dev ./startup.sh
|
||||
```
|
||||
|
||||
If all goes well, you should now be able to interact with the bot.
|
||||
|
||||
### Running in production
|
||||
|
||||
To run the the in a production environment, use `KEMOVERSE_ENV=prod`. You will
|
||||
also need to create a `config_prod.ini` file and run the database setup step
|
||||
again if pointing prod to a different database. (you are pointing dev and prod
|
||||
to different databases, right? 🤨)
|
||||
|
||||
### Updating
|
||||
|
||||
To update the bot, first pull new changes from upstream:
|
||||
|
||||
```sh
|
||||
git pull
|
||||
```
|
||||
|
||||
Then run any database migrations. We recommend testing in dev beforehand to
|
||||
make sure nothing breaks in the update process.
|
||||
|
||||
**Always backup your prod database before running any migrations!**
|
||||
|
||||
```sh
|
||||
# Backup database file
|
||||
cp gacha_game_dev.db gacha_game_dev.db.bak
|
||||
# Run migrations
|
||||
KEMOVERSE_ENV=dev python3 setup_db.py
|
||||
```
|
|
@ -1,33 +0,0 @@
|
|||
Welcome to the **Visual Identity** guide for the Kemoverse. This page contains the standard colors, logos, and graphic elements used across the game (cards, UI, web presence, bots, etc). Please follow these guidelines to ensure consistency.
|
||||
|
||||
---
|
||||
|
||||
## 🟢 Primary Color Palette
|
||||
|
||||
| Color Name | Hex Code | Usage |
|
||||
|----------------|------------|--------------------------------------|
|
||||
| Green | `#5aa02c` | Main buttons, links, headers |
|
||||
| Midnight Black | `#1A1A1A` | Backgrounds, dark mode |
|
||||
| Misty White | `#FAFAFA` | Default backgrounds, light text bg |
|
||||
| Soft Gray | `#CCCCCC` | Borders, placeholders, separators |
|
||||
| Highlight Green | `#8dd35f` | Alerts, emphasis, icons |
|
||||
| Rarity Gold | `#FFD700` | Special rare cards, SSR outlines |
|
||||
| Rarity Silver | `#C0C0C0` | Rare card text, stat glow effects |
|
||||
|
||||
> ✅ Use `Green` and `Misty White` for the standard UI. Avoid mixing in extra palettes unless explicitly needed.
|
||||
|
||||
---
|
||||
|
||||
## 🖼 Logos
|
||||
|
||||
### Main Logo
|
||||
|
||||
<p align="center">
|
||||
<img src="../web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
|
||||
</p>
|
||||
|
||||
- File: `web/static/logo.png`
|
||||
- Usage: Website header, favicon, bot avatar, watermark
|
||||
|
||||
|
||||
---
|
|
@ -1,38 +1,27 @@
|
|||
; Rename me to config.ini and put your values in here
|
||||
[application]
|
||||
; Comma separated list of fedi handles for any administrator users
|
||||
; More can be added through the application
|
||||
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
; Whether to lmit access to the bot via an instance whitelist
|
||||
; The whitelist can be adjusted via the application
|
||||
UseWhitelist = False
|
||||
|
||||
[gacha]
|
||||
; Number of seconds players have to wait between rolls
|
||||
RollInterval = 72000
|
||||
; Rarity drop weights (1 to 5 stars)
|
||||
; Format: rarity=weight per line
|
||||
; In order: common, uncommon, rare, epic and legendary (Example values below)
|
||||
Rarity_1 = 0.7
|
||||
Rarity_2 = 0.2
|
||||
Rarity_3 = 0.08
|
||||
Rarity_4 = 0.015
|
||||
Rarity_5 = 0.005
|
||||
|
||||
[notification]
|
||||
; Number of seconds to sleep while awaiting new notifications
|
||||
PollInterval = 5
|
||||
; Number of notifications to process at once (max 100)
|
||||
BatchSize = 10
|
||||
|
||||
[credentials]
|
||||
; Fully qualified URL of the instance hosting the bot
|
||||
Instance = http://example.tld
|
||||
; Full fedi handle of the bot user
|
||||
User = @bot@example.tld
|
||||
BotUser = @bot@example.tld
|
||||
|
||||
; API key for the bot
|
||||
; Generate one by going to Settings > API > Generate access token
|
||||
Token = abcdefghijklmnopqrstuvwxyz012345
|
||||
ApiKey = abcdefghijklmnopqrstuvwxyz012345
|
||||
|
||||
; Fully qualified URL of the instance hosting the bot
|
||||
InstanceUrl = http://example.tld
|
||||
|
||||
; Instance type - either "misskey" or "pleroma"
|
||||
InstanceType = misskey
|
||||
|
||||
; Comma separated list of fedi handles for any administrator users
|
||||
DefaultAdmins = ['admin@example.tld']
|
||||
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
|
||||
; Web server port (default: 5000)
|
||||
WebPort = 5000
|
||||
|
||||
; Comma-separated list of trusted fediverse instances (leave empty to allow only local users)
|
||||
; Example: TrustedInstances = mastodon.social,misskey.io,pleroma.example.com
|
||||
TrustedInstances =
|
|
@ -1,28 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT UNIQUE NOT NULL,
|
||||
has_rolled BOOLEAN NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS characters (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
rarity INTEGER NOT NULL,
|
||||
weight REAL NOT NULL,
|
||||
file_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS pulls (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER,
|
||||
character_id INTEGER,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (user_id) REFERENCES users(id),
|
||||
FOREIGN KEY (character_id) REFERENCES characters(id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT
|
||||
);
|
||||
INSERT OR IGNORE INTO config VALUES ("schema_version", 0);
|
|
@ -1 +0,0 @@
|
|||
INSERT OR IGNORE INTO config VALUES ("last_seen_notif_id", 0);
|
|
@ -1 +0,0 @@
|
|||
ALTER TABLE characters DROP COLUMN weight;
|
|
@ -1,4 +0,0 @@
|
|||
ALTER TABLE users RENAME TO players;
|
||||
ALTER TABLE characters RENAME TO cards;
|
||||
ALTER TABLE pulls RENAME user_id TO player_id;
|
||||
ALTER TABLE pulls RENAME character_id TO card_id;
|
|
@ -1 +0,0 @@
|
|||
ALTER TABLE players ADD COLUMN is_administrator BOOLEAN NOT NULL DEFAULT 0;
|
|
@ -1,7 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS instance_whitelist (
|
||||
tld TEXT UNIQUE PRIMARY KEY
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS banned_players (
|
||||
handle TEXT UNIQUE PRIMARY KEY
|
||||
);
|
59
readme.md
59
readme.md
|
@ -1,59 +1,47 @@
|
|||
# Kemoverse
|
||||
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
||||
<p align="center">
|
||||
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
|
||||
</p>
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
|
||||
|
||||
## 📝 Docs
|
||||
|
||||
👉 [**Start reading the docs**](./docs/index.md)
|
||||
|
||||
🤌 [**Install instructions for those in a rush**](docs/install.md)
|
||||

|
||||
|
||||
## 🔧 Features
|
||||
|
||||
### ✅ Implemented
|
||||
- 🎲 Character roll system
|
||||
- 🧠 Core database structure for cards
|
||||
- 📦 Basic support for storing pulls per player
|
||||
- ⏱️ Time-based limitations on rolls
|
||||
- ⚠️ Explicit account creation/deletion
|
||||
- 🎴 Cards stats system
|
||||
- 🧠 Core database structure for characters and stats
|
||||
- 📦 Basic support for storing pulls per user
|
||||
|
||||
### 🧩 In Progress
|
||||
- 📝 Whitelist system to limit access
|
||||
- ⏱️ Time-based limitations on rolls
|
||||
- ⚔️ Dueling system
|
||||
|
||||
|
||||
## 🧠 Roadmap
|
||||
|
||||
[See our v2.0 board for more details](https://git.waifuism.life/waifu/kemoverse/projects/3)
|
||||
## 🧠 Planned Features (Long Term)
|
||||
|
||||
### 🛒 Gameplay & Collection
|
||||
- 🔁 **Trading system** between players
|
||||
- 🔁 **Trading system** between users
|
||||
- ⭐ **Favorite characters** (pin them or set profiles)
|
||||
- 📢 **Public post announcements** for rare card pulls
|
||||
- 📊 **Stats** for cards
|
||||
- 🎮 **Games** to play
|
||||
- ⚔️ Dueling
|
||||
- 🧮 **Leaderboards**
|
||||
- Most traded cards
|
||||
- Most owned cards
|
||||
- Most voted cards
|
||||
- Most popular cards (via usage-based popularity metrics)
|
||||
- Users with the rarest cards
|
||||
- Most traded Characters
|
||||
- Most owned Characters
|
||||
- Most voted Characters
|
||||
- Most popular Characters (via usage-based popularity metrics)
|
||||
- Users with the rarest Characters
|
||||
|
||||
### 🎨 Card Aesthetics
|
||||
- 🖼️ Simple card template for character rendering
|
||||
- 🌐 Web app to generate cards from images
|
||||
|
||||
### 🌍 Fediverse Support
|
||||
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
|
||||
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
|
||||
|
||||
## 🗃️ Tech Stack
|
||||
|
||||
- Python (3.12+)
|
||||
- Python (3.11+)
|
||||
- SQLite
|
||||
- Fediverse API integration (via Misskey endpoints)
|
||||
- Fediverse API integration (Misskey and Pleroma support)
|
||||
- Flask
|
||||
- Modular DB design for extensibility
|
||||
|
||||
|
@ -61,16 +49,21 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
|
||||
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
|
||||
|
||||
## 🧪 Getting Started (coming soon)
|
||||
|
||||
Instructions on installing dependencies, initializing the database, and running the bot locally will go here.
|
||||
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
|
||||
subgraph Player Interaction
|
||||
A1[Misskey bot]
|
||||
A1[Fediverse bot]
|
||||
A2[Web]
|
||||
end
|
||||
|
||||
subgraph Misskey
|
||||
B1[Misskey instance]
|
||||
subgraph Fediverse
|
||||
B1[Fediverse instance]
|
||||
end
|
||||
|
||||
subgraph Bot
|
||||
|
@ -78,7 +71,7 @@ flowchart TD
|
|||
C2[Notification parser]
|
||||
C3[Gacha roll logic]
|
||||
C4[Database interface]
|
||||
C5[Misskey API poster]
|
||||
C5[Fediverse API poster]
|
||||
end
|
||||
|
||||
subgraph Website
|
||||
|
|
|
@ -6,3 +6,5 @@ Jinja2==3.1.6
|
|||
MarkupSafe==3.0.2
|
||||
Werkzeug==3.1.3
|
||||
Misskey.py==4.1.0
|
||||
Mastodon.py==1.8.1
|
||||
filetype==1.2.0
|
||||
|
|
129
setup_db.py
129
setup_db.py
|
@ -1,129 +0,0 @@
|
|||
import sqlite3
|
||||
import traceback
|
||||
import os
|
||||
import argparse
|
||||
from configparser import ConfigParser
|
||||
from typing import List, Tuple
|
||||
|
||||
class DBNotFoundError(Exception):
|
||||
'''Could not find the database location'''
|
||||
|
||||
class InvalidMigrationError(Exception):
|
||||
'''Migration file has an invalid name'''
|
||||
|
||||
class KemoverseEnvUnset(Exception):
|
||||
'''KEMOVERSE_ENV is not set or has an invalid value'''
|
||||
|
||||
class ConfigError(Exception):
|
||||
'''Could not find the config file for the current environment'''
|
||||
|
||||
def get_migrations() -> List[Tuple[int, str]] | InvalidMigrationError:
|
||||
'''Returns a list of migration files in numeric order.'''
|
||||
# Store transaction id and filename separately
|
||||
sql_files: List[Tuple[int, str]] = []
|
||||
migrations_dir = 'migrations'
|
||||
|
||||
for filename in os.listdir(migrations_dir):
|
||||
joined_path = os.path.join(migrations_dir, filename)
|
||||
|
||||
# Ignore anything that isn't a .sql file
|
||||
if not (os.path.isfile(joined_path) and filename.endswith('.sql')):
|
||||
print(f'{filename} is not a .sql file, ignoring...')
|
||||
continue
|
||||
|
||||
parts = filename.split('_', 1)
|
||||
|
||||
# Invalid filename format
|
||||
if len(parts) < 2 or not parts[0].isdigit():
|
||||
raise InvalidMigrationError(f'Invalid migration file: {filename}')
|
||||
|
||||
sql_files.append((int(parts[0]), joined_path))
|
||||
|
||||
# Get sorted list of files by migration number
|
||||
sql_files.sort(key=lambda x: x[0])
|
||||
return sql_files
|
||||
|
||||
def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> None:
|
||||
'''Performs a migration on the DB'''
|
||||
print(f'Performing migration {migration[1]}...')
|
||||
|
||||
# Open and execute the sql script
|
||||
with open(migration[1], encoding='utf-8') as file:
|
||||
script = file.read()
|
||||
cursor.executescript(script)
|
||||
# Update the schema version
|
||||
cursor.execute('UPDATE config SET value = ? WHERE key = "schema_version"', (migration[0],))
|
||||
|
||||
def get_db_path() -> str | DBNotFoundError:
|
||||
'''Gets the DB path from config.ini'''
|
||||
env = os.environ.get('KEMOVERSE_ENV')
|
||||
if not (env and env in ['prod', 'dev']):
|
||||
raise KemoverseEnvUnset
|
||||
|
||||
print(f'Running in "{env}" mode')
|
||||
|
||||
config_path = f'config_{env}.ini'
|
||||
|
||||
if not os.path.isfile(config_path):
|
||||
raise ConfigError(f'Could not find {config_path}')
|
||||
|
||||
config = ConfigParser()
|
||||
config.read(config_path)
|
||||
db_path = config['application']['DatabaseLocation']
|
||||
if not db_path:
|
||||
raise DBNotFoundError()
|
||||
return db_path
|
||||
|
||||
def get_current_migration(cursor: sqlite3.Cursor) -> int:
|
||||
'''Gets the current schema version of the database'''
|
||||
try:
|
||||
cursor.execute('SELECT value FROM config WHERE key = ?', ('schema_version',))
|
||||
version = cursor.fetchone()
|
||||
return -1 if not version else int(version[0])
|
||||
except sqlite3.Error:
|
||||
print('Error getting schema version')
|
||||
# Database has not been initialized yet
|
||||
return -1
|
||||
|
||||
def main():
|
||||
'''Does the thing'''
|
||||
# Connect to the DB
|
||||
db_path = ''
|
||||
try:
|
||||
db_path = get_db_path()
|
||||
except ConfigError as ex:
|
||||
print(ex)
|
||||
return
|
||||
except KemoverseEnvUnset:
|
||||
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
|
||||
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
|
||||
print(traceback.format_exc())
|
||||
return
|
||||
|
||||
conn = sqlite3.connect(db_path, autocommit=False)
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Obtain list of migrations to run
|
||||
migrations = get_migrations()
|
||||
# Determine schema version
|
||||
current_migration = get_current_migration(cursor)
|
||||
print(f'Current schema version: {current_migration}')
|
||||
|
||||
# Run any migrations newer than current schema
|
||||
for migration in migrations:
|
||||
if migration[0] <= current_migration:
|
||||
print(f'Migration already up: {migration[1]}')
|
||||
continue
|
||||
try:
|
||||
perform_migration(cursor, migration)
|
||||
conn.commit()
|
||||
except Exception as ex:
|
||||
print(f'An error occurred while applying {migration[1]}: {ex}, aborting...')
|
||||
print(traceback.format_exc())
|
||||
conn.rollback()
|
||||
break
|
||||
conn.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
22
web/app.py
22
web/app.py
|
@ -1,24 +1,20 @@
|
|||
from flask import Flask, render_template
|
||||
import sqlite3
|
||||
import sys
|
||||
import os
|
||||
|
||||
from flask import Flask, render_template, abort
|
||||
from werkzeug.exceptions import HTTPException
|
||||
# Add bot directory to path to import config
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
|
||||
import config
|
||||
|
||||
app = Flask(__name__)
|
||||
DB_PATH = "./gacha_game.db" # Adjust path if needed
|
||||
DB_PATH = config.DB_PATH
|
||||
|
||||
def get_db_connection():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
@app.errorhandler(HTTPException)
|
||||
def handle_exception(error):
|
||||
return render_template("_error.html", error=error), error.code
|
||||
|
||||
@app.route("/i404")
|
||||
def i404():
|
||||
return abort(404)
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
conn = get_db_connection()
|
||||
|
@ -43,8 +39,6 @@ def user_profile(user_id):
|
|||
|
||||
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
|
||||
user = cursor.fetchone()
|
||||
if user is None:
|
||||
abort(404)
|
||||
|
||||
cursor.execute('''
|
||||
SELECT pulls.timestamp, characters.name as character_name, characters.rarity
|
||||
|
@ -68,4 +62,4 @@ def submit_character():
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5000, debug=True)
|
||||
app.run(host='0.0.0.0', port=config.WEB_PORT, debug=True)
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
body {
|
||||
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;
|
||||
background-color: #f4f6fa;
|
||||
color: #333;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
header {
|
||||
background-color: #7289da;
|
||||
color: white;
|
||||
padding: 20px;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
header h1 {
|
||||
margin: 0;
|
||||
font-size: 2.5em;
|
||||
}
|
||||
|
||||
header p {
|
||||
margin-top: 5px;
|
||||
font-size: 1.1em;
|
||||
}
|
||||
|
||||
.container {
|
||||
max-width: 800px;
|
||||
margin: 30px auto;
|
||||
padding: 20px;
|
||||
background-color: #ffffff;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 3px 10px rgba(0, 0, 0, 0.07);
|
||||
}
|
||||
|
||||
h2 {
|
||||
border-bottom: 1px solid #ccc;
|
||||
padding-bottom: 8px;
|
||||
margin-top: 30px;
|
||||
}
|
||||
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding-left: 0;
|
||||
}
|
||||
|
||||
li {
|
||||
margin: 10px 0;
|
||||
}
|
||||
|
||||
a {
|
||||
text-decoration: none;
|
||||
color: #2c3e50;
|
||||
font-weight: bold;
|
||||
background-color: #e3eaf3;
|
||||
padding: 8px 12px;
|
||||
border-radius: 6px;
|
||||
display: inline-block;
|
||||
transition: background-color 0.2s;
|
||||
}
|
||||
|
||||
a:hover {
|
||||
background-color: #cdd8e6;
|
||||
}
|
||||
|
||||
.leaderboard-entry {
|
||||
margin-bottom: 8px;
|
||||
padding: 6px 10px;
|
||||
background: #f9fafc;
|
||||
border-left: 4px solid #7289da;
|
||||
border-radius: 5px;
|
||||
}
|
||||
|
||||
footer {
|
||||
text-align: center;
|
||||
margin-top: 40px;
|
||||
font-size: 0.9em;
|
||||
color: #888;
|
||||
}
|
||||
|
||||
.note {
|
||||
background: #fcfcf0;
|
||||
border: 1px dashed #bbb;
|
||||
padding: 10px;
|
||||
border-radius: 8px;
|
||||
margin-top: 30px;
|
||||
font-size: 0.95em;
|
||||
color: #666;
|
||||
}
|
||||
|
||||
.footer-link {
|
||||
margin: 0 10px;
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='style.css') }}">
|
||||
<link rel="shortcut icon" href="{{ url_for('static', filename='logo.png') }}">
|
||||
<title>
|
||||
{% if title %}
|
||||
{{ title }}
|
||||
{% else %}
|
||||
{% block title %}{% endblock %}
|
||||
{% endif %}
|
||||
| Kemoverse
|
||||
</title>
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
{% block header %}{% endblock %}
|
||||
</header>
|
||||
|
||||
<div class="container">
|
||||
{% block content %}{% endblock %}
|
||||
</div>
|
||||
|
||||
<footer>
|
||||
<a class="footer-link" href="{{ url_for('about') }}">About</a>
|
||||
<a class="footer-link" href="{{ url_for('submit_character') }}">Submit a Character</a>
|
||||
{% block footer_extra %}{% endblock %}
|
||||
</footer>
|
||||
</body>
|
||||
</html>
|
|
@ -1,8 +0,0 @@
|
|||
{% extends "_base.html" %}
|
||||
{% block title %}
|
||||
{{ error.code }}
|
||||
{% endblock %}
|
||||
{% block content %}
|
||||
<h2>{{ error.code }} - {{ error.name }} </h2>
|
||||
<p>{{ error.description }}</p>
|
||||
{% endblock %}
|
|
@ -1,9 +1,13 @@
|
|||
{% extends "_base.html" %}
|
||||
|
||||
{% block content %}
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>About - Misskey Gacha Center</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>About This Gacha</h1>
|
||||
<p>This is a playful Misskey-themed gacha tracker made with Flask and SQLite.</p>
|
||||
<p>All rolls are stored, stats are tracked, and characters are added manually for now.</p>
|
||||
<p>Built with love, chaos, and way too much caffeine ☕.</p>
|
||||
<a href="{{ url_for('index') }}">← Back to Home</a>
|
||||
{% endblock %}
|
||||
</body>
|
||||
</html>
|
||||
|
|
|
@ -1,11 +1,110 @@
|
|||
{% extends "_base.html" %}
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Misskey Gacha Center</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;
|
||||
background-color: #f4f6fa;
|
||||
color: #333;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
{% block header %}
|
||||
<h1>Misskey Gacha Center</h1>
|
||||
<p>Track your luck. Compare your pulls. Compete with friends.</p>
|
||||
{% endblock %}
|
||||
header {
|
||||
background-color: #7289da;
|
||||
color: white;
|
||||
padding: 20px;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
{% block content %}
|
||||
header h1 {
|
||||
margin: 0;
|
||||
font-size: 2.5em;
|
||||
}
|
||||
|
||||
header p {
|
||||
margin-top: 5px;
|
||||
font-size: 1.1em;
|
||||
}
|
||||
|
||||
.container {
|
||||
max-width: 800px;
|
||||
margin: 30px auto;
|
||||
padding: 20px;
|
||||
background-color: #ffffff;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 3px 10px rgba(0, 0, 0, 0.07);
|
||||
}
|
||||
|
||||
h2 {
|
||||
border-bottom: 1px solid #ccc;
|
||||
padding-bottom: 8px;
|
||||
margin-top: 30px;
|
||||
}
|
||||
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding-left: 0;
|
||||
}
|
||||
|
||||
li {
|
||||
margin: 10px 0;
|
||||
}
|
||||
|
||||
a {
|
||||
text-decoration: none;
|
||||
color: #2c3e50;
|
||||
font-weight: bold;
|
||||
background-color: #e3eaf3;
|
||||
padding: 8px 12px;
|
||||
border-radius: 6px;
|
||||
display: inline-block;
|
||||
transition: background-color 0.2s;
|
||||
}
|
||||
|
||||
a:hover {
|
||||
background-color: #cdd8e6;
|
||||
}
|
||||
|
||||
.leaderboard-entry {
|
||||
margin-bottom: 8px;
|
||||
padding: 6px 10px;
|
||||
background: #f9fafc;
|
||||
border-left: 4px solid #7289da;
|
||||
border-radius: 5px;
|
||||
}
|
||||
|
||||
footer {
|
||||
text-align: center;
|
||||
margin-top: 40px;
|
||||
font-size: 0.9em;
|
||||
color: #888;
|
||||
}
|
||||
|
||||
.note {
|
||||
background: #fcfcf0;
|
||||
border: 1px dashed #bbb;
|
||||
padding: 10px;
|
||||
border-radius: 8px;
|
||||
margin-top: 30px;
|
||||
font-size: 0.95em;
|
||||
color: #666;
|
||||
}
|
||||
|
||||
.footer-link {
|
||||
margin: 0 10px;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<header>
|
||||
<h1>Misskey Gacha Center</h1>
|
||||
<p>Track your luck. Compare your pulls. Compete with friends.</p>
|
||||
</header>
|
||||
|
||||
<div class="container">
|
||||
|
||||
<h2>🎖️ Leaderboard: Most Rolls</h2>
|
||||
{% for user in top_users %}
|
||||
|
@ -26,4 +125,13 @@
|
|||
<div class="note">
|
||||
🚀 This is a fun little gacha tracker! More features coming soon. Want to help shape it?
|
||||
</div>
|
||||
{% endblock %}
|
||||
|
||||
</div>
|
||||
|
||||
<footer>
|
||||
<a class="footer-link" href="{{ url_for('about') }}">About</a>
|
||||
<a class="footer-link" href="{{ url_for('submit_character') }}">Submit a Character</a>
|
||||
</footer>
|
||||
|
||||
</body>
|
||||
</html>
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
{% extends "_base.html" %}
|
||||
|
||||
{% block content %}
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Submit a Character - Misskey Gacha Center</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Submit a Character</h1>
|
||||
<p>Want to add a new character to the gacha pool?</p>
|
||||
<p>This feature will be available soon. Stay tuned!</p>
|
||||
<a href="{{ url_for('index') }}">← Back to Home</a>
|
||||
{% endblock %}
|
||||
</body>
|
||||
</html>
|
||||
|
|
|
@ -1,5 +1,57 @@
|
|||
{% extends "_base.html" %}
|
||||
{% block content %}
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>{{ user['username'] }}'s Rolls</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: Arial, sans-serif;
|
||||
background-color: #f4f4f8;
|
||||
margin: 0;
|
||||
padding: 20px;
|
||||
}
|
||||
.profile, .pulls {
|
||||
background-color: white;
|
||||
padding: 15px;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 0 10px rgba(0,0,0,0.1);
|
||||
margin-bottom: 20px;
|
||||
}
|
||||
h1, h2 {
|
||||
margin-top: 0;
|
||||
}
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding: 0;
|
||||
}
|
||||
li {
|
||||
padding: 10px 0;
|
||||
border-bottom: 1px solid #eee;
|
||||
}
|
||||
.rarity {
|
||||
color: gold;
|
||||
font-weight: bold;
|
||||
margin-left: 8px;
|
||||
}
|
||||
.timestamp {
|
||||
color: #888;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
a {
|
||||
display: inline-block;
|
||||
margin-top: 20px;
|
||||
color: #333;
|
||||
text-decoration: none;
|
||||
background-color: #ddd;
|
||||
padding: 8px 12px;
|
||||
border-radius: 5px;
|
||||
}
|
||||
a:hover {
|
||||
background-color: #bbb;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<div class="profile">
|
||||
<h1>{{ user['username'] }}'s Gacha Rolls</h1>
|
||||
<p>User ID: {{ user['id'] }}</p>
|
||||
|
@ -20,4 +72,6 @@
|
|||
</div>
|
||||
|
||||
<a href="{{ url_for('index') }}">← Back to Users</a>
|
||||
{% endblock %}
|
||||
|
||||
</body>
|
||||
</html>
|
||||
|
|
Loading…
Add table
Reference in a new issue