Compare commits

...

3 commits

9 changed files with 263 additions and 105 deletions

1
.gitignore vendored
View file

@ -147,6 +147,7 @@ venv.bak/
/site
# mypy
.mypy.ini
.mypy_cache/
.dmypy.json
dmypy.json

View file

@ -2,9 +2,15 @@ import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_character
from custom_types import Character
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
"""
def add_character(
name: str,
rarity: int,
weight: float,
image_url: str) -> tuple[int, str]:
'''
Adds a character to the database, uploading the image from a public URL to
the bot's Misskey Drive.
@ -12,7 +18,8 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
name (str): Character name.
rarity (int): Character rarity (e.g., 1-5).
weight (float): Pull weight (e.g., 0.02).
image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
Returns:
tuple[int, str]: Character ID and bot's Drive file_id.
@ -20,30 +27,39 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
Raises:
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
"""
'''
stripped_name = name.strip()
# Validate inputs
if not name or not name.strip():
raise ValueError("Character name cannot be empty.")
if not isinstance(rarity, int) or rarity < 1:
raise ValueError("Rarity must be a positive integer.")
if not isinstance(weight, (int, float)) or weight <= 0:
raise ValueError("Weight must be a positive number.")
if not stripped_name:
raise ValueError('Character name cannot be empty.')
if rarity < 1:
raise ValueError('Rarity must be a positive integer.')
if weight <= 0:
raise ValueError('Weight must be a positive number.')
if not image_url:
raise ValueError("Image URL must be provided.")
raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media["id"]
file_id = media['id']
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Insert into database
character_id = insert_character(name.strip(), rarity, float(weight), file_id)
character_id = insert_character(
stripped_name,
rarity,
float(weight),
file_id
)
return character_id, file_id

View file

@ -1,5 +1,6 @@
import misskey
import config
def client_connection():
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -2,9 +2,11 @@
import configparser
from os import environ, path
class ConfigError(Exception):
'''Could not find config file'''
def get_config_file() -> str:
'''Gets the path to the config file in the current environment'''
env: str | None = environ.get('KEMOVERSE_ENV')
@ -19,24 +21,25 @@ def get_config_file() -> str:
raise ConfigError(f'Could not find {config_path}')
return config_path
config = configparser.ConfigParser()
config.read(get_config_file())
# Username for the bot
USER = config['credentials']['User'].lower()
USER = config['credentials']['User'].lower()
# API key for the bot
KEY = config['credentials']['Token']
KEY = config['credentials']['Token']
# Bot's Misskey instance URL
INSTANCE = config['credentials']['Instance'].lower()
# TODO: move this to db
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot
ADMINS = config['application']['DefaultAdmins']
ADMINS = config['application']['DefaultAdmins']
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
DB_PATH = config['application']['DatabaseLocation']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])

21
bot/custom_types.py Normal file
View file

@ -0,0 +1,21 @@
from typing import TypedDict, List, Dict, Any
BotResponse = TypedDict('BotResponse', {
'message': str,
'attachment_urls': List[str] | None
})
Character = TypedDict('Character', {
'id': int,
'name': str,
'rarity': int,
'weight': float,
'image_url': str
})
ParsedNotification = TypedDict('ParsedNotification', {
'author': str,
'command': str | None,
'arguments': List[str],
'note_obj': Dict[str, Any]
})

View file

@ -1,11 +1,13 @@
from random import choices
import sqlite3
import config
from custom_types import Character
DB_PATH = config.DB_PATH
CONNECTION: sqlite3.Connection
CURSOR: sqlite3.Cursor
def connect() -> None:
'''Creates a connection to the database'''
print('Connecting to the database...')
@ -15,26 +17,34 @@ def connect() -> None:
CONNECTION.row_factory = sqlite3.Row
CURSOR = CONNECTION.cursor()
def get_random_character():
def get_random_character() -> Character | None:
''' Gets a random character from the database'''
CURSOR.execute('SELECT * FROM characters')
characters = CURSOR.fetchall()
if not characters:
return None, None, None, None
return None
weights = [c['weight'] for c in characters]
chosen = choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
return {
'id': chosen['id'],
'name': chosen['name'],
'rarity': chosen['rarity'],
'weight': chosen['weight'],
'image_url': chosen['file_id']
}
def get_or_create_user(username):
def get_or_create_user(username: str) -> int:
'''Retrieves an ID for a given user, if the user does not exist, it will be
created.'''
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
user = CURSOR.fetchone()
if user:
return user[0]
return int(user[0])
# New user starts with has_rolled = False
CURSOR.execute(
@ -42,38 +52,47 @@ def get_or_create_user(username):
(username, False)
)
user_id = CURSOR.lastrowid
return user_id
return user_id if user_id else 0
def insert_character(name: str, rarity: int, weight: float, file_id: str) -> int:
def insert_character(
name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a character'''
CURSOR.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
'INSERT INTO characters (name, rarity, weight, file_id) VALUES \
(?, ?, ?, ?)',
(name, rarity, weight, file_id)
)
character_id = CURSOR.lastrowid
return character_id if character_id else 0
def insert_pull(user_id, character_id):
def insert_pull(user_id: int, character_id: int) -> None:
'''Creates a pull in the database'''
CURSOR.execute(
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
(user_id, character_id)
)
def get_last_rolled_at(user_id):
def get_last_rolled_at(user_id: int) -> int:
'''Gets the timestamp when the user last rolled'''
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
(user_id,))
CURSOR.execute(
"SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp \
DESC",
(user_id,))
row = CURSOR.fetchone()
return row[0] if row else None
return row[0] if row else 0
def get_config(key):
def get_config(key: str) -> str:
'''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
row = CURSOR.fetchone()
return row[0] if row else None
return row[0] if row else ''
def set_config(key, value):
def set_config(key: str, value: str) -> None:
'''Writes the value for a specified config key to the db'''
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
CURSOR.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
(key, value))

View file

@ -1,16 +1,23 @@
import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification
from db_utils import get_config, set_config
from response import generate_response
from custom_types import BotResponse
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
def process_notification(client, notification):
def process_notification(
client: misskey.Misskey,
notification: Dict[str, Any]) -> None:
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
@ -32,33 +39,32 @@ def process_notification(client, notification):
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_command = parse_notification(notification, client)
parsed_notification = parse_notification(notification, client)
if not parsed_notification:
return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
# handle two cases here: either we have a response, or we don't.
# TODO: Return dictionaries instead of tuples. They handle multiple
# elements a lot better as they're not position dependent
response = generate_response(parsed_command)
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
response: BotResponse | None = generate_response(parsed_notification)
def process_notifications(client):
if not response:
return
client.notes_create(
text=response['message'],
reply_id=note_id,
visibility=visibility,
file_ids=response['attachment_urls']
# TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
)
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
@ -87,7 +93,7 @@ def process_notifications(client):
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id')
notif_id = notification.get('id', '')
if notif_id <= last_seen_id:
continue
@ -96,7 +102,8 @@ def process_notifications(client):
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a notification: {e}')
print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably

View file

@ -1,11 +1,16 @@
import random, re
import re
from typing import Dict, Any
import misskey
import config
def parse_notification(notification,client):
'''Parses any notifications received by the bot and sends any commands to
gacha_response()'''
from custom_types import ParsedNotification
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
# Get the full Activitypub ID of the user
user = notification.get("user", {})
@ -28,14 +33,20 @@ def parse_notification(notification,client):
# Make sure the notification text explicitly mentions the bot
if not any(variant in note for variant in username_variants):
return
return None
# Find command and arguments after the mention
# Removes all mentions (regex = mentions that start with @ and may contain @domain)
# Removes all mentions
# regex = mentions that start with @ and may contain @domain
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
parts = cleaned_text.split()
command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else []
return [command,full_user, arguments, note_obj]
return {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
}

View file

@ -1,9 +1,13 @@
from datetime import datetime, timedelta, timezone
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character
from typing import TypedDict, Any, List, Dict
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, \
get_random_character
from add_character import add_character
from config import GACHA_ROLL_INTERVAL
from custom_types import BotResponse, ParsedNotification
def do_roll(full_user):
def do_roll(full_user: str) -> BotResponse:
'''Determines whether the user can roll, then pulls a random character'''
user_id = get_or_create_user(full_user)
@ -14,7 +18,7 @@ def do_roll(full_user):
if date:
# SQLite timestamps returned by the DB are always in UTC
# Below timestamps are to be converted to UTC
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z')
prev = datetime.strptime(str(date) + '+0000', '%Y-%m-%d %H:%M:%S%z')
now = datetime.now(timezone.utc)
time_since_last_roll = now - prev
@ -31,19 +35,31 @@ def do_roll(full_user):
else:
remaining_duration = f'{duration.seconds} seconds'
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
return {
'message': f'{full_user} ⏱️ Please wait another \
{remaining_duration} before rolling again.',
'attachment_urls': None
}
character_id, character_name, file_id, rarity = get_random_character()
character = get_random_character()
if not character_id:
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
if not character:
return {
'message': f'{full_user} Uwaaa... something went wrong! No \
characters found. 😿',
'attachment_urls': None
}
insert_pull(user_id,character_id)
stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
She's all yours now~ 💖✨",[file_id]])
insert_pull(user_id, character['id'])
stars = '⭐️' * character['rarity']
return {
'message': f'@{full_user} 🎲 Congrats! You rolled {stars} \
**{character['name']}**\nShe\'s all yours now~ 💖✨',
'attachment_urls': [character['image_url']]
}
def is_float(val):
def is_float(val: Any) -> bool:
'''Returns true if `val` can be converted to a float'''
try:
float(val)
@ -51,23 +67,42 @@ def is_float(val):
except ValueError:
return False
def do_create(full_user, arguments, note_obj):
def do_create(
full_user: str,
arguments: List[str],
note_obj: Dict[str, Any]) -> BotResponse:
'''Creates a character'''
# Example call from bot logic
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None
image_url = note_obj.get('files', [{}])[0].get('url') \
if note_obj.get('files') else None
if not image_url:
return f'{full_user}{full_user} You need an image to create a character, dumbass.'
return {
'message': f'{full_user} You need an image to create a character, \
dumbass.',
'attachment_urls': None
}
if len(arguments) != 3:
return '{full_user}Please specify the following attributes in order: \
name, rarity, drop weighting'
return {
'message': f'{full_user} Please specify the following attributes \
in order: name, rarity, drop weighting',
'attachment_urls': None
}
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5'
return {
'message': f'{full_user} Invalid rarity: \'{arguments[1]}\' must \
be a number between 1 and 5',
'attachment_urls': None
}
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0'
return {
'message': f'{full_user} Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
character_id, file_id = add_character(
name=arguments[0],
@ -75,26 +110,70 @@ def do_create(full_user, arguments, note_obj):
weight=float(arguments[2]),
image_url=image_url
)
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
return {
'message': f'{full_user} Added {arguments[0]}, ID {character_id}.',
'attachment_urls': [file_id]
}
def do_help(full_user):
def do_help(author: str) -> BotResponse:
'''Provides a list of commands that the bot can do.'''
return f'{full_user} Here\'s what I can do:\n \
- `roll` Pulls a random character.\
- `create <name> <rarity> <weight>` Creates a character using a given image.\
- `help` Shows this message'
return {
'message': f'{author} Here\'s what I can do:\n\
- `roll` Pulls a random character.\n\
- `create <name> <rarity> <weight>` Creates a character using a given image.\n\
- `help` Shows this message.',
'attachment_urls': None
}
def generate_response(parsed_command):
def do_signup() -> BotResponse:
return {
'message': '',
'attachment_urls': None
}
def generate_response(notification: ParsedNotification) -> BotResponse | None:
'''Given a command with arguments, processes the game state and
returns a response'''
command, full_user, arguments, note_obj = parsed_command
# Temporary response variable
res: BotResponse | None = None
# TODO: Check if the user has an account
author = notification['author']
user_id = get_or_create_user(author)
command = notification['command']
# Check if the user is an administrator
# user_is_administrator = user_is_administrator()
# Unrestricted commands
match command:
case 'roll':
return do_roll(full_user)
case 'create':
return do_create(full_user, arguments, note_obj)
case 'signup':
res = do_signup()
case 'help':
return do_help(command)
res = do_help(author)
case _:
return None
pass
if not user_id:
return res
# User commands
match command:
case 'delete_account':
pass
case 'roll':
res = do_roll(author)
case 'create':
res = do_create(
author,
notification['arguments'],
notification['note_obj']
)
case _:
pass
# if not user_is_administrator:
return res
# Administrator commands go here