'''Essentials for the bot to function''' import configparser import json import re from os import environ, path class ConfigError(Exception): '''Could not find config file''' def get_config_file() -> str: '''Gets the path to the config file in the current environment''' env: str | None = environ.get('KEMOVERSE_ENV') if not env: raise ConfigError('Error: KEMOVERSE_ENV is unset') # Validate environment name contains only alphanumeric, dash, and underscore if not re.match(r'^[a-zA-Z0-9_-]+$', env): raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.') config_path: str = f'config_{env}.ini' if not path.isfile(config_path): raise ConfigError(f'Could not find {config_path}') return config_path def normalize_user(user_string: str) -> str: """ Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive Args: user_string: User string in various formats Returns: Normalized user string Raises: ValueError: If the user string is invalid or domain is malformed """ if not user_string or not user_string.strip(): raise ValueError("User string cannot be empty") user_string = user_string.strip() # Add leading @ if missing if not user_string.startswith('@'): user_string = '@' + user_string # Split into user and domain parts parts = user_string[1:].split('@', 1) # Remove leading @ and split if len(parts) != 2: raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld") username, domain = parts if not username: raise ValueError("Username cannot be empty") if not domain: raise ValueError("Domain cannot be empty") # Validate domain format (basic check for valid domain structure) domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$' if not re.match(domain_pattern, domain): raise ValueError(f"Invalid domain format: {domain}") # Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive) return f"@{username}@{domain.lower()}" def get_rarity_to_weight( config_section: configparser.SectionProxy) -> dict[int, float]: """Parses Rarity_X keys from config and returns a {rarity: weight} dict.""" rarity_weights = {} for key, value in config_section.items(): if key.startswith("rarity_"): rarity = int(key.removeprefix("rarity_")) rarity_weights[rarity] = float(value) return rarity_weights config = configparser.ConfigParser() config.read(get_config_file()) # Username for the bot if 'User' not in config['credentials'] or not config['credentials']['User'].strip(): raise ConfigError("User must be specified in config.ini under [credentials]") USER = normalize_user(config['credentials']['User']) # API key for the bot KEY = config['credentials']['Token'] # Bot's Misskey instance URL INSTANCE = config['credentials']['Instance'].lower() # Web server port WEB_PORT = config['application'].getint('WebPort', 5000) BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1') # Fedi handles in the traditional 'user@domain.tld' style, allows these users # to use extra admin exclusive commands with the bot ADMINS = json.loads(config['application']['DefaultAdmins']) # SQLite Database location DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db') # Whether to enable the instance whitelist USE_WHITELIST = config['application'].getboolean('UseWhitelist', True) NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval']) RARITY_TO_WEIGHT = get_rarity_to_weight(config['gacha'])