Merge branch 'dev' into pleroma-support

This commit is contained in:
Moon 2025-06-14 07:31:02 +09:00
commit 0039717a01
6 changed files with 72 additions and 17 deletions

View file

@ -3,14 +3,13 @@ import config
from fediverse_factory import get_fediverse_service from fediverse_factory import get_fediverse_service
import db_utils import db_utils
def add_card(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]: def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
""" """
Adds a card to the database, uploading the image from a public URL to the Fediverse instance. Adds a card to the database, uploading the image from a public URL to the Fediverse instance.
Args: Args:
name (str): Card name. name (str): Card name.
rarity (int): Card rarity (e.g., 1-5). rarity (int): Card rarity (e.g., 1-5).
weight (float): Pull weight (e.g., 0.02).
image_url (str): Public URL of the image from the post. image_url (str): Public URL of the image from the post.
Returns: Returns:

View file

@ -5,6 +5,8 @@ from notification import process_fediverse_notification
from db_utils import get_config, set_config, connect, setup_administrators from db_utils import get_config, set_config, connect, setup_administrators
from fediverse_factory import get_fediverse_service from fediverse_factory import get_fediverse_service
from config import USE_WHITELIST
def stream_notifications(): def stream_notifications():
# Initialize database connection # Initialize database connection
connect() connect()
@ -18,6 +20,10 @@ def stream_notifications():
# Get the last seen notification ID from the database # Get the last seen notification ID from the database
last_seen_id = get_config("last_seen_notif_id") last_seen_id = get_config("last_seen_notif_id")
# Show whitelist status
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
print(f'Instance whitelisting: {whitelist_status}')
print('Listening for notifications...') print('Listening for notifications...')
while True: while True:
try: try:

View file

@ -1,6 +1,7 @@
'''Essentials for the bot to function''' '''Essentials for the bot to function'''
import configparser import configparser
import json import json
import re
from os import environ, path from os import environ, path
@ -13,8 +14,10 @@ def get_config_file() -> str:
env: str | None = environ.get('KEMOVERSE_ENV') env: str | None = environ.get('KEMOVERSE_ENV')
if not env: if not env:
raise ConfigError('Error: KEMOVERSE_ENV is unset') raise ConfigError('Error: KEMOVERSE_ENV is unset')
if not (env in ['prod', 'dev']):
raise ConfigError(f'Error: Invalid environment: {env}') # Validate environment name contains only alphanumeric, dash, and underscore
if not re.match(r'^[a-zA-Z0-9_-]+$', env):
raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.')
config_path: str = f'config_{env}.ini' config_path: str = f'config_{env}.ini'
@ -23,6 +26,50 @@ def get_config_file() -> str:
return config_path return config_path
def normalize_user(user_string: str) -> str:
"""
Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive
Args:
user_string: User string in various formats
Returns:
Normalized user string
Raises:
ValueError: If the user string is invalid or domain is malformed
"""
if not user_string or not user_string.strip():
raise ValueError("User string cannot be empty")
user_string = user_string.strip()
# Add leading @ if missing
if not user_string.startswith('@'):
user_string = '@' + user_string
# Split into user and domain parts
parts = user_string[1:].split('@', 1) # Remove leading @ and split
if len(parts) != 2:
raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld")
username, domain = parts
if not username:
raise ValueError("Username cannot be empty")
if not domain:
raise ValueError("Domain cannot be empty")
# Validate domain format (basic check for valid domain structure)
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
if not re.match(domain_pattern, domain):
raise ValueError(f"Invalid domain format: {domain}")
# Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive)
return f"@{username}@{domain.lower()}"
def get_rarity_to_weight( def get_rarity_to_weight(
config_section: configparser.SectionProxy) -> dict[int, float]: config_section: configparser.SectionProxy) -> dict[int, float]:
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict.""" """Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
@ -38,7 +85,10 @@ config = configparser.ConfigParser()
config.read(get_config_file()) config.read(get_config_file())
# Username for the bot # Username for the bot
USER = config['credentials']['User'].lower() if 'User' not in config['credentials'] or not config['credentials']['User'].strip():
raise ConfigError("User must be specified in config.ini under [credentials]")
USER = normalize_user(config['credentials']['User'])
# API key for the bot # API key for the bot
KEY = config['credentials']['Token'] KEY = config['credentials']['Token']
# Bot's Misskey/Pleroma instance URL # Bot's Misskey/Pleroma instance URL
@ -56,14 +106,15 @@ INSTANCE_TYPE = instance_type
# Web server port # Web server port
WEB_PORT = config['application'].getint('WebPort', 5000) WEB_PORT = config['application'].getint('WebPort', 5000)
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')
# Fedi handles in the traditional 'user@domain.tld' style, allows these users # Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot # to use extra admin exclusive commands with the bot
ADMINS = json.loads(config['application']['DefaultAdmins']) ADMINS = json.loads(config['application']['DefaultAdmins'])
# SQLite Database location # SQLite Database location
DB_PATH = config['application']['DatabaseLocation'] DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db')
# Whether to enable the instance whitelist # Whether to enable the instance whitelist
USE_WHITELIST = config['application'].getboolean('UseWhitelist', False) USE_WHITELIST = config['application'].getboolean('UseWhitelist', True)
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])

View file

@ -8,8 +8,11 @@ DatabaseLocation = ./gacha_game.db
; Instance type - either "misskey" or "pleroma" ; Instance type - either "misskey" or "pleroma"
InstanceType = misskey InstanceType = misskey
; Web server port (default: 5000) ; Web server port (default: 5000)
WebPort = 5000 WebPort = 5000
; Whether to limit access to the bot via an instance whitelist ; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
BindAddress = 127.0.0.1
; Whether to lmit access to the bot via an instance whitelist
; The whitelist can be adjusted via the application ; The whitelist can be adjusted via the application
UseWhitelist = False UseWhitelist = False

View file

@ -57,16 +57,14 @@ def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> Non
def get_db_path() -> str | DBNotFoundError: def get_db_path() -> str | DBNotFoundError:
'''Gets the DB path from config.ini''' '''Gets the DB path from config.ini'''
env = os.environ.get('KEMOVERSE_ENV') env = os.environ.get('KEMOVERSE_ENV')
if not (env and env in ['prod', 'dev']):
raise KemoverseEnvUnset
print(f'Running in "{env}" mode')
config_path = f'config_{env}.ini' config_path = f'config_{env}.ini'
if not os.path.isfile(config_path): if not os.path.isfile(config_path):
raise ConfigError(f'Could not find {config_path}') raise ConfigError(f'Could not find {config_path}')
print(f'Running in "{env}" mode')
config = ConfigParser() config = ConfigParser()
config.read(config_path) config.read(config_path)
db_path = config['application']['DatabaseLocation'] db_path = config['application']['DatabaseLocation']
@ -96,7 +94,6 @@ def main():
return return
except KemoverseEnvUnset: except KemoverseEnvUnset:
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.') print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
print(traceback.format_exc()) print(traceback.format_exc())
return return

View file

@ -10,10 +10,9 @@ from flask import Flask, render_template, abort
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
app = Flask(__name__) app = Flask(__name__)
DB_PATH = config.DB_PATH
def get_db_connection(): def get_db_connection():
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return conn return conn
@ -74,4 +73,4 @@ def submit_character():
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=config.WEB_PORT, debug=True) app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True)