Compare commits

..

No commits in common. "pleroma-support" and "master" have entirely different histories.

23 changed files with 571 additions and 1317 deletions

3
.gitignore vendored
View file

@ -185,6 +185,5 @@ cython_debug/
gacha_game*.db gacha_game*.db
gacha_game*.db.* gacha_game*.db.*
config*.ini config*.ini
run.sh
.idea .idea

View file

@ -1 +0,0 @@
nodejs 23.4.0

726
LICENSE

File diff suppressed because it is too large Load diff

View file

@ -1,24 +1,32 @@
import requests import requests
import config from misskey.exceptions import MisskeyAPIException
from fediverse_factory import get_fediverse_service from client import client_connection
import db_utils from db_utils import insert_card
from custom_types import Card
from config import RARITY_TO_WEIGHT
def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
""" def add_card(
Adds a card to the database, uploading the image from a public URL to the Fediverse instance. name: str,
rarity: int,
image_url: str) -> tuple[int, str]:
'''
Adds a card to the database, uploading the image from a public URL to
the bot's Misskey Drive.
Args: Args:
name (str): Card name. name (str): Card name.
rarity (int): Card rarity (e.g., 1-5). rarity (int): Card rarity (e.g., 1-5).
image_url (str): Public URL of the image from the post. image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
Returns: Returns:
tuple[int, str]: Card ID and file_id. tuple[int, str]: Card ID and bot's Drive file_id.
Raises: Raises:
ValueError: If inputs are invalid. ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails. RuntimeError: If image download/upload or database operation fails.
""" '''
stripped_name = name.strip() stripped_name = name.strip()
@ -27,33 +35,30 @@ def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
raise ValueError('Card name cannot be empty.') raise ValueError('Card name cannot be empty.')
if rarity < 1: if rarity < 1:
raise ValueError('Rarity must be a positive integer.') raise ValueError('Rarity must be a positive integer.')
if rarity not in config.RARITY_TO_WEIGHT.keys(): if rarity not in RARITY_TO_WEIGHT.keys():
raise ValueError(f'Invalid rarity: {rarity}') raise ValueError(f'Invalid rarity: {rarity}')
if not image_url: if not image_url:
raise ValueError('Image URL must be provided.') raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try: try:
# Download image media = mk.drive_files_create(response.raw)
response = requests.get(image_url, stream=True, timeout=30) file_id = media['id']
if response.status_code != 200: except MisskeyAPIException as e:
raise RuntimeError(f"Failed to download image from {image_url}") raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Upload to Fediverse instance # Insert into database
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) card_id = insert_card(
try:
uploaded_file = fediverse_service.upload_file(response.raw)
file_id = uploaded_file.id
except RuntimeError as e:
raise RuntimeError(f"Failed to upload image: {e}") from e
# Insert into database using db_utils function
card_id = db_utils.insert_card(
stripped_name, stripped_name,
rarity, rarity,
RARITY_TO_WEIGHT[rarity],
file_id file_id
) )
return card_id, file_id
return card_id, file_id
except Exception as e:
raise

View file

@ -1,78 +1,21 @@
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
import time import time
import traceback import misskey as misskey
import config from client import client_connection
from notification import process_fediverse_notification import db_utils as db
from db_utils import get_config, set_config, connect, setup_administrators
from fediverse_factory import get_fediverse_service
from config import USE_WHITELIST from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
# Connect to DB
db.connect()
def stream_notifications():
# Initialize database connection
connect()
# Setup default administrators # Setup default administrators
setup_administrators() db.setup_administrators()
# Initialize the Fediverse service
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get the last seen notification ID from the database
last_seen_id = get_config("last_seen_notif_id")
# Show whitelist status
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
print(f'Instance whitelisting: {whitelist_status}')
print('Listening for notifications...') print('Listening for notifications...')
while True: while True:
try: if not process_notifications(client):
# Get notifications from the fediverse service time.sleep(NOTIFICATION_POLL_INTERVAL)
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
if notifications:
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.id
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
# Process the notification using the abstracted processor
process_fediverse_notification(notification, fediverse_service)
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()

6
bot/client.py Normal file
View file

@ -0,0 +1,6 @@
import misskey
import config
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -1,7 +1,6 @@
'''Essentials for the bot to function''' '''Essentials for the bot to function'''
import configparser import configparser
import json import json
import re
from os import environ, path from os import environ, path
@ -14,10 +13,8 @@ def get_config_file() -> str:
env: str | None = environ.get('KEMOVERSE_ENV') env: str | None = environ.get('KEMOVERSE_ENV')
if not env: if not env:
raise ConfigError('Error: KEMOVERSE_ENV is unset') raise ConfigError('Error: KEMOVERSE_ENV is unset')
if not (env in ['prod', 'dev']):
# Validate environment name contains only alphanumeric, dash, and underscore raise ConfigError(f'Error: Invalid environment: {env}')
if not re.match(r'^[a-zA-Z0-9_-]+$', env):
raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.')
config_path: str = f'config_{env}.ini' config_path: str = f'config_{env}.ini'
@ -26,50 +23,6 @@ def get_config_file() -> str:
return config_path return config_path
def normalize_user(user_string: str) -> str:
"""
Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive
Args:
user_string: User string in various formats
Returns:
Normalized user string
Raises:
ValueError: If the user string is invalid or domain is malformed
"""
if not user_string or not user_string.strip():
raise ValueError("User string cannot be empty")
user_string = user_string.strip()
# Add leading @ if missing
if not user_string.startswith('@'):
user_string = '@' + user_string
# Split into user and domain parts
parts = user_string[1:].split('@', 1) # Remove leading @ and split
if len(parts) != 2:
raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld")
username, domain = parts
if not username:
raise ValueError("Username cannot be empty")
if not domain:
raise ValueError("Domain cannot be empty")
# Validate domain format (basic check for valid domain structure)
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
if not re.match(domain_pattern, domain):
raise ValueError(f"Invalid domain format: {domain}")
# Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive)
return f"@{username}@{domain.lower()}"
def get_rarity_to_weight( def get_rarity_to_weight(
config_section: configparser.SectionProxy) -> dict[int, float]: config_section: configparser.SectionProxy) -> dict[int, float]:
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict.""" """Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
@ -85,36 +38,19 @@ config = configparser.ConfigParser()
config.read(get_config_file()) config.read(get_config_file())
# Username for the bot # Username for the bot
if 'User' not in config['credentials'] or not config['credentials']['User'].strip(): USER = config['credentials']['User'].lower()
raise ConfigError("User must be specified in config.ini under [credentials]")
USER = normalize_user(config['credentials']['User'])
# API key for the bot # API key for the bot
KEY = config['credentials']['Token'] KEY = config['credentials']['Token']
# Bot's Misskey/Pleroma instance URL # Bot's Misskey instance URL
INSTANCE = config['credentials']['Instance'].lower() INSTANCE = config['credentials']['Instance'].lower()
# Instance type validation
if 'InstanceType' not in config['application']:
raise ValueError("InstanceType must be specified in config.ini")
instance_type = config['application']['InstanceType'].lower()
if instance_type not in ('misskey', 'pleroma'):
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
INSTANCE_TYPE = instance_type
# Web server port
WEB_PORT = config['application'].getint('WebPort', 5000)
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')
# Fedi handles in the traditional 'user@domain.tld' style, allows these users # Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot # to use extra admin exclusive commands with the bot
ADMINS = json.loads(config['application']['DefaultAdmins']) ADMINS = json.loads(config['application']['DefaultAdmins'])
# SQLite Database location # SQLite Database location
DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db') DB_PATH = config['application']['DatabaseLocation']
# Whether to enable the instance whitelist # Whether to enable the instance whitelist
USE_WHITELIST = config['application'].getboolean('UseWhitelist', True) USE_WHITELIST = config['application']['UseWhitelist']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])

View file

@ -150,11 +150,12 @@ def is_player_administrator(username: str) -> bool:
def insert_card( def insert_card(
name: str, rarity: int, file_id: str) -> int: name: str, rarity: int, weight: float, file_id: str) -> int:
'''Inserts a card''' '''Inserts a card'''
CURSOR.execute( CURSOR.execute(
'INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?)', 'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
(name, rarity, file_id) (?, ?, ?, ?)',
(name, rarity, weight, file_id)
) )
card_id = CURSOR.lastrowid card_id = CURSOR.lastrowid
return card_id if card_id else 0 return card_id if card_id else 0

View file

@ -1,40 +0,0 @@
from fediverse_service import FediverseService
from misskey_service import MisskeyService
from pleroma_service import PleromaService
class FediverseServiceFactory:
"""Factory for creating FediverseService implementations"""
@staticmethod
def create_service(instance_type: str) -> FediverseService:
"""
Create a FediverseService implementation based on the instance type.
Args:
instance_type: The type of instance ("misskey" or "pleroma")
Returns:
FediverseService implementation (MisskeyService or PleromaService)
Raises:
ValueError: If the instance type is not supported
"""
instance_type = instance_type.lower()
if instance_type == "misskey":
return MisskeyService()
elif instance_type == "pleroma":
return PleromaService()
else:
raise ValueError(f"Unsupported instance type: {instance_type}")
def get_fediverse_service(instance_type: str) -> FediverseService:
"""
Convenience function to get a FediverseService instance
Args:
instance_type: The instance type ("misskey" or "pleroma")
"""
return FediverseServiceFactory.create_service(instance_type)

View file

@ -1,74 +0,0 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union, BinaryIO
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class FediverseService(ABC):
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
@abstractmethod
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""
Retrieve notifications from the Fediverse instance.
Args:
since_id: Optional ID to get notifications newer than this ID
Returns:
List of FediverseNotification objects
"""
pass
@abstractmethod
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""
Create a new post on the Fediverse instance.
Args:
text: The text content of the post
reply_to_id: Optional ID of post to reply to
visibility: Visibility level for the post
file_ids: Optional list of file IDs to attach
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
Returns:
ID of the created post
"""
pass
@abstractmethod
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""
Retrieve a specific post by its ID.
Args:
post_id: The ID of the post to retrieve
Returns:
FediversePost object if found, None otherwise
"""
pass
@abstractmethod
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""
Upload a file to the Fediverse instance.
Args:
file_data: File data as binary stream or bytes
filename: Optional filename for the uploaded file
Returns:
FediverseFile object with ID, URL, and other metadata
Raises:
RuntimeError: If file upload fails
"""
pass

View file

@ -1,73 +0,0 @@
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from enum import Enum
class NotificationType(Enum):
MENTION = "mention"
REPLY = "reply"
FOLLOW = "follow"
FAVOURITE = "favourite"
REBLOG = "reblog"
POLL = "poll"
OTHER = "other"
class Visibility(Enum):
PUBLIC = "public"
UNLISTED = "unlisted"
HOME = "home"
FOLLOWERS = "followers"
SPECIFIED = "specified"
DIRECT = "direct"
@dataclass
class FediverseUser:
"""Common user representation across Fediverse platforms"""
id: str
username: str
host: Optional[str] = None # None for local users
display_name: Optional[str] = None
@property
def full_handle(self) -> str:
"""Returns the full fediverse handle (@user@domain or @user for local)"""
if self.host:
return f"@{self.username}@{self.host}"
return f"@{self.username}"
@dataclass
class FediverseFile:
"""Common file/attachment representation"""
id: str
url: str
type: Optional[str] = None
name: Optional[str] = None
@dataclass
class FediversePost:
"""Common post representation across Fediverse platforms"""
id: str
text: Optional[str]
user: FediverseUser
visibility: Visibility
created_at: Optional[str] = None
files: List[FediverseFile] = None
reply_to_id: Optional[str] = None
def __post_init__(self):
if self.files is None:
self.files = []
@dataclass
class FediverseNotification:
"""Common notification representation across Fediverse platforms"""
id: str
type: NotificationType
user: FediverseUser
post: Optional[FediversePost] = None
created_at: Optional[str] = None

View file

@ -1,156 +0,0 @@
import misskey
from typing import List, Optional, Dict, Any, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class MisskeyService(FediverseService):
"""Misskey implementation of FediverseService"""
def __init__(self):
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Misskey user data to FediverseUser"""
return FediverseUser(
id=user_data.get("id", ""),
username=user_data.get("username", "unknown"),
host=user_data.get("host"),
display_name=user_data.get("name")
)
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Misskey file data to FediverseFile"""
return FediverseFile(
id=file_data.get("id", ""),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("name")
)
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
"""Convert Misskey visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"home": Visibility.HOME,
"followers": Visibility.FOLLOWERS,
"specified": Visibility.SPECIFIED
}
return visibility_map.get(visibility, Visibility.HOME)
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Misskey visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "home",
Visibility.FOLLOWERS: "followers",
Visibility.SPECIFIED: "specified",
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
}
return visibility_map.get(visibility, "home")
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Misskey notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"reply": NotificationType.REPLY,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
"""Convert Misskey note data to FediversePost"""
files = []
if note_data.get("files"):
files = [self._convert_misskey_file(f) for f in note_data["files"]]
return FediversePost(
id=note_data.get("id", ""),
text=note_data.get("text"),
user=self._convert_misskey_user(note_data.get("user", {})),
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
created_at=note_data.get("createdAt"),
files=files,
reply_to_id=note_data.get("replyId")
)
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Misskey notification data to FediverseNotification"""
post = None
if notification_data.get("note"):
post = self._convert_misskey_post(notification_data["note"])
return FediverseNotification(
id=notification_data.get("id", ""),
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
user=self._convert_misskey_user(notification_data.get("user", {})),
post=post,
created_at=notification_data.get("createdAt")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Misskey instance"""
params = {
'include_types': ['mention', 'reply'],
'limit': 50
}
if since_id:
params["since_id"] = since_id
notifications = self.client.i_notifications(**params)
return [self._convert_misskey_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Misskey instance"""
params = {
"text": text,
"visibility": self._convert_to_misskey_visibility(visibility)
}
if reply_to_id:
params["reply_id"] = reply_to_id
if file_ids:
params["file_ids"] = file_ids
if visible_user_ids and visibility == Visibility.SPECIFIED:
params["visible_user_ids"] = visible_user_ids
response = self.client.notes_create(**params)
return response.get("createdNote", {}).get("id", "")
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Misskey instance"""
try:
note = self.client.notes_show(noteId=post_id)
return self._convert_misskey_post(note)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Misskey Drive"""
try:
from misskey.exceptions import MisskeyAPIException
media = self.client.drive_files_create(file_data)
return self._convert_misskey_file(media)
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
except Exception as e:
raise RuntimeError(f"Unexpected error during file upload: {e}") from e

View file

@ -1,74 +0,0 @@
"""Mock FediverseService for testing purposes"""
from typing import List, Optional, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class MockFediverseService(FediverseService):
"""Mock implementation of FediverseService for testing"""
def __init__(self):
self.notifications = []
self.created_posts = []
self.uploaded_files = []
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Return mock notifications, optionally filtered by since_id"""
if since_id is None:
return self.notifications
# Filter notifications newer than since_id
filtered = []
for notif in self.notifications:
if notif.id > since_id:
filtered.append(notif)
return filtered
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Mock post creation, returns fake post ID"""
post_id = f"mock_post_{len(self.created_posts)}"
# Store the post for assertions
self.created_posts.append({
'id': post_id,
'text': text,
'reply_to_id': reply_to_id,
'visibility': visibility,
'file_ids': file_ids,
'visible_user_ids': visible_user_ids
})
return post_id
def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile:
"""Mock file upload, returns fake file"""
file_id = f"mock_file_{len(self.uploaded_files)}"
mock_file = FediverseFile(
id=file_id,
url=f"https://example.com/files/{file_id}",
type="image/png",
name="test_file.png"
)
self.uploaded_files.append(mock_file)
return mock_file
# Helper methods for testing
def add_mock_notification(self, notification: FediverseNotification):
"""Add a mock notification for testing"""
self.notifications.append(notification)
def clear_all(self):
"""Clear all mock data"""
self.notifications.clear()
self.created_posts.clear()
self.uploaded_files.clear()

View file

@ -1,47 +1,45 @@
import config import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
from parsing import parse_notification from parsing import parse_notification
from db_utils import is_whitelisted, is_player_banned from db_utils import get_config, set_config, is_whitelisted, is_player_banned
from response import generate_response from response import generate_response
from custom_types import BotResponse from custom_types import BotResponse
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility # Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None: def process_notification(
'''Processes an individual fediverse notification using the abstraction''' client: misskey.Misskey,
if fediverse_service is None: notification: Dict[str, Any]) -> None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) '''Processes an individual notification'''
user = notification.get('user', {})
# Get user and instance info username = user.get('username', 'unknown')
username = notification.user.username host = user.get('host') # None if local user
host = notification.user.host
instance = host if host else 'local' instance = host if host else 'local'
# Check whitelist if USE_WHITELIST and not is_whitelisted(instance):
if config.USE_WHITELIST and not is_whitelisted(instance):
print(f'⚠️ Blocked notification from untrusted instance: {instance}') print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return return
# Only process mentions and replies # Copy visibility of the post that was received when replying (so if people
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY): # don't want to dump a bunch of notes on home they don't have to)
return visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
# Return early if no post attached notif_type = notification.get('type', 'unknown')
if not notification.post: notif_id = notification.get('id')
return
# Determine visibility for reply
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
notif_type = notification.type.value
notif_id = notification.id
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser # 🧠 Send to the parser
parsed_notification = parse_notification(notification, fediverse_service) parsed_notification = parse_notification(notification, client)
if not parsed_notification: if not parsed_notification:
return return
@ -51,20 +49,79 @@ def process_fediverse_notification(notification: FediverseNotification, fedivers
print(f'⚠️ Blocked notification from banned player: {author}') print(f'⚠️ Blocked notification from banned player: {author}')
return return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response # Get the response
response: BotResponse | None = generate_response(parsed_notification) response: BotResponse | None = generate_response(parsed_notification)
if not response: if not response:
return return
# Handle attachment URLs (convert to file IDs if needed) client.notes_create(
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
# Send response using fediverse service
fediverse_service.create_post(
text=response['message'], text=response['message'],
reply_to_id=notification.post.id, reply_id=note_id,
visibility=visibility, visibility=visibility,
file_ids=file_ids file_ids=response['attachment_urls']
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately # TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
) )
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id', '')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False

View file

@ -1,38 +1,27 @@
import re import re
from typing import Dict, Any
import misskey
import config import config
from response import generate_response
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
from custom_types import ParsedNotification from custom_types import ParsedNotification
def parse_notification(notification: FediverseNotification, fediverse_service=None):
'''Parses any notifications received by the bot and sends any commands to
generate_response()'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# We get the type of notification to filter the ones that we actually want def parse_notification(
# to parse notification: Dict[str, Any],
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY): client: misskey.Misskey) -> ParsedNotification | None:
return # Ignore anything that isn't a mention '''Parses any notifications received by the bot'''
# Return early if no post attached
if not notification.post:
return
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
# Get the full Activitypub ID of the user # Get the full Activitypub ID of the user
full_user = notification.user.full_handle user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host")
# Local users may not have a hostname attached
full_user = f"@{username}" if not host else f"@{username}@{host}"
note_text = notification.post.text note_obj = notification.get("note", {})
note_id = notification.post.id note_text = note_obj.get("text")
note_id = note_obj.get("id")
note = note_text.strip().lower() if note_text else "" note = note_text.strip().lower() if note_text else ""
# Split words into tokens # Split words into tokens
@ -55,30 +44,9 @@ def parse_notification(notification: FediverseNotification, fediverse_service=No
command = parts[1].lower() command = parts[1].lower()
arguments = parts[2:] if len(parts) > 2 else [] arguments = parts[2:] if len(parts) > 2 else []
# Create ParsedNotification object for the new response system return {
parsed_notification: ParsedNotification = {
'author': full_user, 'author': full_user,
'command': command, 'command': command,
'arguments': arguments, 'arguments': arguments,
'note_obj': { 'note_obj': note_obj
'id': note_id,
'text': note_text,
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
}
} }
# Generate response using the new system
response = generate_response(parsed_notification)
if not response:
return
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
fediverse_service.create_post(
text=response['message'],
reply_to_id=note_id,
visibility=visibility,
file_ids=file_ids
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)

View file

@ -1,188 +0,0 @@
from mastodon import Mastodon
from typing import List, Optional, Dict, Any, Union, BinaryIO
import io
import filetype
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class PleromaService(FediverseService):
"""Pleroma implementation of FediverseService using Mastodon.py"""
def __init__(self):
self.client = Mastodon(
access_token=config.KEY,
api_base_url=config.INSTANCE
)
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Mastodon/Pleroma user data to FediverseUser"""
acct = user_data.get("acct", "")
if "@" in acct:
username, host = acct.split("@", 1)
else:
username = acct
host = None
return FediverseUser(
id=str(user_data.get("id", "")),
username=username,
host=host,
display_name=user_data.get("display_name")
)
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
return FediverseFile(
id=str(file_data.get("id", "")),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("description")
)
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
"""Convert Mastodon/Pleroma visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"private": Visibility.FOLLOWERS,
"direct": Visibility.DIRECT
}
return visibility_map.get(visibility, Visibility.PUBLIC)
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
Visibility.FOLLOWERS: "private",
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
Visibility.DIRECT: "direct"
}
return visibility_map.get(visibility, "public")
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Mastodon/Pleroma notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
"""Convert Mastodon/Pleroma status data to FediversePost"""
files = []
if status_data.get("media_attachments"):
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
# Extract plain text from HTML content
content = status_data.get("content", "")
# Basic HTML stripping - in production you might want to use a proper HTML parser
import re
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
return FediversePost(
id=str(status_data.get("id", "")),
text=plain_text,
user=self._convert_mastodon_user(status_data.get("account", {})),
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
created_at=status_data.get("created_at"),
files=files,
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
)
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
post = None
if notification_data.get("status"):
post = self._convert_mastodon_status(notification_data["status"])
return FediverseNotification(
id=str(notification_data.get("id", "")),
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
user=self._convert_mastodon_user(notification_data.get("account", {})),
post=post,
created_at=notification_data.get("created_at")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Pleroma instance"""
params = {}
if since_id:
params["since_id"] = since_id
notifications = self.client.notifications(**params)
return [self._convert_mastodon_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Pleroma instance"""
params = {
"status": text,
"visibility": self._convert_to_mastodon_visibility(visibility)
}
if reply_to_id:
params["in_reply_to_id"] = reply_to_id
if file_ids:
params["media_ids"] = file_ids
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
# For direct messages, you typically mention users in the status text
response = self.client.status_post(**params)
return str(response.get("id", ""))
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Pleroma instance"""
try:
status = self.client.status(post_id)
return self._convert_mastodon_status(status)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Pleroma instance"""
try:
# Convert file_data to bytes for MIME detection
if hasattr(file_data, 'read'):
# Check if we can seek back
try:
current_pos = file_data.tell()
file_bytes = file_data.read()
file_data.seek(current_pos)
file_data = io.BytesIO(file_bytes)
except (io.UnsupportedOperation, OSError):
# Non-seekable stream, already read all data
file_data = io.BytesIO(file_bytes)
else:
file_bytes = file_data
file_data = io.BytesIO(file_bytes)
# Use filetype library for robust MIME detection
kind = filetype.guess(file_bytes)
if kind is not None:
mime_type = kind.mime
else:
# Fallback to image/jpeg if detection fails
mime_type = 'image/jpeg'
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
return self._convert_mastodon_file(media)
except Exception as e:
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e

View file

@ -120,6 +120,12 @@ in order: name, rarity',
be a number between 1 and 5', be a number between 1 and 5',
'attachment_urls': None 'attachment_urls': None
} }
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return {
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
card_id, file_id = add_card( card_id, file_id = add_card(
name=arguments[0], name=arguments[0],

View file

@ -5,13 +5,6 @@
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"] DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
; SQLite Database location ; SQLite Database location
DatabaseLocation = ./gacha_game.db DatabaseLocation = ./gacha_game.db
; Instance type - either "misskey" or "pleroma"
InstanceType = misskey
; Web server port (default: 5000)
WebPort = 5000
; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
BindAddress = 127.0.0.1
; Whether to lmit access to the bot via an instance whitelist ; Whether to lmit access to the bot via an instance whitelist
; The whitelist can be adjusted via the application ; The whitelist can be adjusted via the application
UseWhitelist = False UseWhitelist = False
@ -42,3 +35,4 @@ User = @bot@example.tld
; API key for the bot ; API key for the bot
; Generate one by going to Settings > API > Generate access token ; Generate one by going to Settings > API > Generate access token
Token = abcdefghijklmnopqrstuvwxyz012345 Token = abcdefghijklmnopqrstuvwxyz012345

View file

@ -1,6 +1,6 @@
# Kemoverse # Kemoverse
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse. A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
<p align="center"> <p align="center">
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto"> <img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
</p> </p>
@ -47,13 +47,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
- 🌐 Web app to generate cards from images - 🌐 Web app to generate cards from images
### 🌍 Fediverse Support ### 🌍 Fediverse Support
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends. ✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
## 🗃️ Tech Stack ## 🗃️ Tech Stack
- Python (3.12+) - Python (3.12+)
- SQLite - SQLite
- Fediverse API integration (Misskey and Pleroma support) - Fediverse API integration (via Misskey endpoints)
- Flask - Flask
- Modular DB design for extensibility - Modular DB design for extensibility
@ -61,33 +61,16 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics. The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
## 📝 License
This project is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0).
This means:
- You are free to use, modify, and redistribute this software under the same license.
- If you modify and deploy the software (for example, as part of a web service), you must also make the source code of your modified version available to users of that service.
You should have received a copy of the GNU Affero General Public License along with this program. If not, see https://www.gnu.org/licenses/.
The full license text can be found here: https://www.gnu.org/licenses/agpl-3.0.html
The AGPL is designed to ensure that software freedom is preserved, especially in networked environments. If you improve this project or build something on top of it, please give back to the community by sharing your changes too.
Unless another license is listed, every file in the project is licensed under the GNU Affero General Public License version 3 (or at your option), any later version.
```mermaid ```mermaid
flowchart TD flowchart TD
subgraph Player Interaction subgraph Player Interaction
A1[Fediverse bot] A1[Misskey bot]
A2[Web] A2[Web]
end end
subgraph Fediverse subgraph Misskey
B1[Fediverse instance] B1[Misskey instance]
end end
subgraph Bot subgraph Bot
@ -95,7 +78,7 @@ flowchart TD
C2[Notification parser] C2[Notification parser]
C3[Gacha roll logic] C3[Gacha roll logic]
C4[Database interface] C4[Database interface]
C5[Fediverse API poster] C5[Misskey API poster]
end end
subgraph Website subgraph Website

View file

@ -6,5 +6,3 @@ Jinja2==3.1.6
MarkupSafe==3.0.2 MarkupSafe==3.0.2
Werkzeug==3.1.3 Werkzeug==3.1.3
Misskey.py==4.1.0 Misskey.py==4.1.0
Mastodon.py==1.8.1
filetype==1.2.0

View file

@ -1,19 +1,3 @@
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
import sqlite3 import sqlite3
import traceback import traceback
import os import os
@ -73,14 +57,16 @@ def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> Non
def get_db_path() -> str | DBNotFoundError: def get_db_path() -> str | DBNotFoundError:
'''Gets the DB path from config.ini''' '''Gets the DB path from config.ini'''
env = os.environ.get('KEMOVERSE_ENV') env = os.environ.get('KEMOVERSE_ENV')
if not (env and env in ['prod', 'dev']):
raise KemoverseEnvUnset
print(f'Running in "{env}" mode')
config_path = f'config_{env}.ini' config_path = f'config_{env}.ini'
if not os.path.isfile(config_path): if not os.path.isfile(config_path):
raise ConfigError(f'Could not find {config_path}') raise ConfigError(f'Could not find {config_path}')
print(f'Running in "{env}" mode')
config = ConfigParser() config = ConfigParser()
config.read(config_path) config.read(config_path)
db_path = config['application']['DatabaseLocation'] db_path = config['application']['DatabaseLocation']
@ -110,6 +96,7 @@ def main():
return return
except KemoverseEnvUnset: except KemoverseEnvUnset:
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.') print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
print(traceback.format_exc()) print(traceback.format_exc())
return return

View file

@ -1,21 +1,5 @@
#!/bin/bash #!/bin/bash
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
# Navigate to the project directory (optional) # Navigate to the project directory (optional)
cd "$(dirname "$0")" cd "$(dirname "$0")"

View file

@ -1,34 +1,13 @@
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
import sqlite3 import sqlite3
import sys
import os
# Add bot directory to path to import config
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
import config
from flask import Flask, render_template, abort from flask import Flask, render_template, abort
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
app = Flask(__name__) app = Flask(__name__)
DB_PATH = "./gacha_game.db" # Adjust path if needed
def get_db_connection(): def get_db_connection():
conn = sqlite3.connect(config.DB_PATH) conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return conn return conn
@ -89,4 +68,4 @@ def submit_character():
if __name__ == '__main__': if __name__ == '__main__':
app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True) app.run(host='0.0.0.0', port=5000, debug=True)