Compare commits

..

43 commits

Author SHA1 Message Date
Moon
c74fc17dfb Merge branch 'dev' into pleroma-support 2025-06-16 08:31:30 +09:00
w
6494ac4909 Typo that saves the west 2025-06-15 16:50:27 -03:00
6ab3e4a427 Merge pull request 'license_change' (#53) from license_change into dev
Reviewed-on: #53
2025-06-15 12:46:49 -07:00
w
b6fb6d9245 Merge branch 'license_change' of git.waifuism.life:waifu/kemoverse into license_change 2025-06-15 16:46:26 -03:00
w
5580e06028 Merge branch 'dev' into license_change 2025-06-15 16:46:03 -03:00
b08b224476 Merge branch 'dev' into license_change 2025-06-15 06:54:56 -07:00
Moon
0039717a01 Merge branch 'dev' into pleroma-support 2025-06-14 07:31:02 +09:00
Moon
bf8d9823b2 Merge branch 'dev' into pleroma-support 2025-06-14 07:18:18 +09:00
7c4fd5fc41 Merge pull request 'any-env-and-bind-address' (#58) from any-env-and-bind-address into dev
Reviewed-on: #58
Reviewed-by: VD15 <valkyriedev15@gmail.com>
2025-06-13 14:54:27 -07:00
Moon
24bfe88dc1 rm obsolete print 2025-06-14 05:57:01 +09:00
Moon
1a35750d0a restrict characters in KEMOVERSE_ENV 2025-06-14 05:51:49 +09:00
Moon
d416ae1b2d rm limitation on KEMOVERSE_ENV from another place. 2025-06-14 05:47:43 +09:00
Moon
7fd4d5db25 indicate on startup if whitelisting is enabled. 2025-06-14 05:17:39 +09:00
Moon
f4f847e577 make sure UseWhitelist is a boolean, default to True 2025-06-14 05:14:40 +09:00
Moon
2ef70801c7 fix config module ref 2025-06-14 05:08:59 +09:00
Moon
fdf21b3f5f rm check for env dev or prod so any can be used 2025-06-14 04:43:43 +09:00
Moon
337a989671 rm vestigial trusted instances config. 2025-06-14 04:34:20 +09:00
Moon
3ad4edbc45 ignore custom startup script 2025-06-14 04:10:16 +09:00
49e49bd187 Merge branch 'dev' into license_change 2025-06-13 06:56:21 -07:00
1984b67523 Merge pull request 'fix-missing-weight-column in dev' (#57) from fix-missing-weight-column into dev
Reviewed-on: #57
Reviewed-by: VD15 <valkyriedev15@gmail.com>
2025-06-13 04:14:53 -07:00
Moon
bbb64a869d Merge remote-tracking branch 'origin/master' into pleroma-support 2025-06-13 19:50:47 +09:00
Moon
fc45c688e8 Merge branch 'fix-missing-weight-column' into pleroma-support 2025-06-13 19:42:41 +09:00
Moon
77d4fa13bb rm validation of removed weight 2025-06-13 19:31:23 +09:00
Moon
bd287b096a rm reference to weight column. 2025-06-13 18:47:54 +09:00
Moon
40f018a83b make instantiation explicit so easier to test. 2025-06-13 13:53:41 +09:00
Moon
a516a9b55a minor change to make testing easier. 2025-06-13 13:36:38 +09:00
Moon
7161712f15 rm unused function that bypassed server abstraction 2025-06-13 13:26:01 +09:00
Moon
fadd4dfe27 Merge remote-tracking branch 'origin/dev' into pleroma-support 2025-06-13 13:05:51 +09:00
Moon
7b32ee7fcf had to change some things to get cursor for db working. 2025-06-13 11:23:34 +09:00
Moon
f70b2147cd Remove legacy files that bypass FediverseService abstraction
- Remove bot/client.py (replaced by fediverse factory)
- Remove bot/notification.py (replaced by bot_app.py)
- All functionality now properly uses FediverseService abstraction
2025-06-13 10:57:33 +09:00
Moon
298d7fda72 rm needless file 2025-06-13 10:52:20 +09:00
Moon
8918b5205d named some damn properties wrong 2025-06-13 10:52:20 +09:00
Moon
67b4d949fd use a library for content type detection instead of handrolled 2025-06-13 10:50:24 +09:00
Moon
89ae8a7290 back to using mastodon.py 2025-06-13 10:50:24 +09:00
Moon
91376c0eba works now but may need to revert file upload change 2025-06-13 10:50:24 +09:00
Moon
b2ca6dd59a documentation update for configurable backends 2025-06-13 10:50:03 +09:00
Moon
dac05a3ed8 completely untested refactor. bug fixes to come later. 2025-06-13 10:48:35 +09:00
Moon
fe8e7d246f actually use the abstracted type not just the id 2025-06-13 10:37:27 +09:00
Moon
a47530180d first whack at file upload 2025-06-13 10:37:27 +09:00
Moon
e0cf42f8f6 baseline fedi abstraction stuff. 2025-06-13 10:37:24 +09:00
w
2b194f3c9e readme changes for license 2025-06-11 22:42:57 -03:00
w
df76fb9131 adding license to main files 2025-06-11 22:29:06 -03:00
w
81c890a83d license change 2025-06-11 22:06:17 -03:00
23 changed files with 1315 additions and 569 deletions

1
.gitignore vendored
View file

@ -185,5 +185,6 @@ cython_debug/
gacha_game*.db
gacha_game*.db.*
config*.ini
run.sh
.idea

1
.tool-versions Normal file
View file

@ -0,0 +1 @@
nodejs 23.4.0

722
LICENSE

File diff suppressed because it is too large Load diff

View file

@ -1,32 +1,24 @@
import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_card
from custom_types import Card
from config import RARITY_TO_WEIGHT
import config
from fediverse_factory import get_fediverse_service
import db_utils
def add_card(
name: str,
rarity: int,
image_url: str) -> tuple[int, str]:
'''
Adds a card to the database, uploading the image from a public URL to
the bot's Misskey Drive.
def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
"""
Adds a card to the database, uploading the image from a public URL to the Fediverse instance.
Args:
name (str): Card name.
rarity (int): Card rarity (e.g., 1-5).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
image_url (str): Public URL of the image from the post.
Returns:
tuple[int, str]: Card ID and bot's Drive file_id.
tuple[int, str]: Card ID and file_id.
Raises:
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
'''
"""
stripped_name = name.strip()
@ -35,30 +27,33 @@ def add_card(
raise ValueError('Card name cannot be empty.')
if rarity < 1:
raise ValueError('Rarity must be a positive integer.')
if rarity not in RARITY_TO_WEIGHT.keys():
if rarity not in config.RARITY_TO_WEIGHT.keys():
raise ValueError(f'Invalid rarity: {rarity}')
if not image_url:
raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media['id']
except MisskeyAPIException as e:
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
# Insert into database
card_id = insert_card(
# Upload to Fediverse instance
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
try:
uploaded_file = fediverse_service.upload_file(response.raw)
file_id = uploaded_file.id
except RuntimeError as e:
raise RuntimeError(f"Failed to upload image: {e}") from e
# Insert into database using db_utils function
card_id = db_utils.insert_card(
stripped_name,
rarity,
RARITY_TO_WEIGHT[rarity],
file_id
)
return card_id, file_id
)
return card_id, file_id
except Exception as e:
raise

View file

@ -1,21 +1,78 @@
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
import time
import misskey as misskey
from client import client_connection
import db_utils as db
import traceback
import config
from notification import process_fediverse_notification
from db_utils import get_config, set_config, connect, setup_administrators
from fediverse_factory import get_fediverse_service
from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications
from config import USE_WHITELIST
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
# Connect to DB
db.connect()
def stream_notifications():
# Initialize database connection
connect()
# Setup default administrators
db.setup_administrators()
setup_administrators()
# Initialize the Fediverse service
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get the last seen notification ID from the database
last_seen_id = get_config("last_seen_notif_id")
# Show whitelist status
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
print(f'Instance whitelisting: {whitelist_status}')
print('Listening for notifications...')
while True:
if not process_notifications(client):
time.sleep(NOTIFICATION_POLL_INTERVAL)
try:
# Get notifications from the fediverse service
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
if notifications:
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.id
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
# Process the notification using the abstracted processor
process_fediverse_notification(notification, fediverse_service)
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()

View file

@ -1,6 +0,0 @@
import misskey
import config
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -1,6 +1,7 @@
'''Essentials for the bot to function'''
import configparser
import json
import re
from os import environ, path
@ -13,8 +14,10 @@ def get_config_file() -> str:
env: str | None = environ.get('KEMOVERSE_ENV')
if not env:
raise ConfigError('Error: KEMOVERSE_ENV is unset')
if not (env in ['prod', 'dev']):
raise ConfigError(f'Error: Invalid environment: {env}')
# Validate environment name contains only alphanumeric, dash, and underscore
if not re.match(r'^[a-zA-Z0-9_-]+$', env):
raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.')
config_path: str = f'config_{env}.ini'
@ -23,6 +26,50 @@ def get_config_file() -> str:
return config_path
def normalize_user(user_string: str) -> str:
"""
Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive
Args:
user_string: User string in various formats
Returns:
Normalized user string
Raises:
ValueError: If the user string is invalid or domain is malformed
"""
if not user_string or not user_string.strip():
raise ValueError("User string cannot be empty")
user_string = user_string.strip()
# Add leading @ if missing
if not user_string.startswith('@'):
user_string = '@' + user_string
# Split into user and domain parts
parts = user_string[1:].split('@', 1) # Remove leading @ and split
if len(parts) != 2:
raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld")
username, domain = parts
if not username:
raise ValueError("Username cannot be empty")
if not domain:
raise ValueError("Domain cannot be empty")
# Validate domain format (basic check for valid domain structure)
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
if not re.match(domain_pattern, domain):
raise ValueError(f"Invalid domain format: {domain}")
# Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive)
return f"@{username}@{domain.lower()}"
def get_rarity_to_weight(
config_section: configparser.SectionProxy) -> dict[int, float]:
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
@ -38,19 +85,36 @@ config = configparser.ConfigParser()
config.read(get_config_file())
# Username for the bot
USER = config['credentials']['User'].lower()
if 'User' not in config['credentials'] or not config['credentials']['User'].strip():
raise ConfigError("User must be specified in config.ini under [credentials]")
USER = normalize_user(config['credentials']['User'])
# API key for the bot
KEY = config['credentials']['Token']
# Bot's Misskey instance URL
# Bot's Misskey/Pleroma instance URL
INSTANCE = config['credentials']['Instance'].lower()
# Instance type validation
if 'InstanceType' not in config['application']:
raise ValueError("InstanceType must be specified in config.ini")
instance_type = config['application']['InstanceType'].lower()
if instance_type not in ('misskey', 'pleroma'):
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
INSTANCE_TYPE = instance_type
# Web server port
WEB_PORT = config['application'].getint('WebPort', 5000)
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot
ADMINS = json.loads(config['application']['DefaultAdmins'])
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db')
# Whether to enable the instance whitelist
USE_WHITELIST = config['application']['UseWhitelist']
USE_WHITELIST = config['application'].getboolean('UseWhitelist', True)
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])

View file

@ -150,12 +150,11 @@ def is_player_administrator(username: str) -> bool:
def insert_card(
name: str, rarity: int, weight: float, file_id: str) -> int:
name: str, rarity: int, file_id: str) -> int:
'''Inserts a card'''
CURSOR.execute(
'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
(?, ?, ?, ?)',
(name, rarity, weight, file_id)
'INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?)',
(name, rarity, file_id)
)
card_id = CURSOR.lastrowid
return card_id if card_id else 0

40
bot/fediverse_factory.py Normal file
View file

@ -0,0 +1,40 @@
from fediverse_service import FediverseService
from misskey_service import MisskeyService
from pleroma_service import PleromaService
class FediverseServiceFactory:
"""Factory for creating FediverseService implementations"""
@staticmethod
def create_service(instance_type: str) -> FediverseService:
"""
Create a FediverseService implementation based on the instance type.
Args:
instance_type: The type of instance ("misskey" or "pleroma")
Returns:
FediverseService implementation (MisskeyService or PleromaService)
Raises:
ValueError: If the instance type is not supported
"""
instance_type = instance_type.lower()
if instance_type == "misskey":
return MisskeyService()
elif instance_type == "pleroma":
return PleromaService()
else:
raise ValueError(f"Unsupported instance type: {instance_type}")
def get_fediverse_service(instance_type: str) -> FediverseService:
"""
Convenience function to get a FediverseService instance
Args:
instance_type: The instance type ("misskey" or "pleroma")
"""
return FediverseServiceFactory.create_service(instance_type)

74
bot/fediverse_service.py Normal file
View file

@ -0,0 +1,74 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union, BinaryIO
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class FediverseService(ABC):
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
@abstractmethod
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""
Retrieve notifications from the Fediverse instance.
Args:
since_id: Optional ID to get notifications newer than this ID
Returns:
List of FediverseNotification objects
"""
pass
@abstractmethod
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""
Create a new post on the Fediverse instance.
Args:
text: The text content of the post
reply_to_id: Optional ID of post to reply to
visibility: Visibility level for the post
file_ids: Optional list of file IDs to attach
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
Returns:
ID of the created post
"""
pass
@abstractmethod
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""
Retrieve a specific post by its ID.
Args:
post_id: The ID of the post to retrieve
Returns:
FediversePost object if found, None otherwise
"""
pass
@abstractmethod
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""
Upload a file to the Fediverse instance.
Args:
file_data: File data as binary stream or bytes
filename: Optional filename for the uploaded file
Returns:
FediverseFile object with ID, URL, and other metadata
Raises:
RuntimeError: If file upload fails
"""
pass

73
bot/fediverse_types.py Normal file
View file

@ -0,0 +1,73 @@
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from enum import Enum
class NotificationType(Enum):
MENTION = "mention"
REPLY = "reply"
FOLLOW = "follow"
FAVOURITE = "favourite"
REBLOG = "reblog"
POLL = "poll"
OTHER = "other"
class Visibility(Enum):
PUBLIC = "public"
UNLISTED = "unlisted"
HOME = "home"
FOLLOWERS = "followers"
SPECIFIED = "specified"
DIRECT = "direct"
@dataclass
class FediverseUser:
"""Common user representation across Fediverse platforms"""
id: str
username: str
host: Optional[str] = None # None for local users
display_name: Optional[str] = None
@property
def full_handle(self) -> str:
"""Returns the full fediverse handle (@user@domain or @user for local)"""
if self.host:
return f"@{self.username}@{self.host}"
return f"@{self.username}"
@dataclass
class FediverseFile:
"""Common file/attachment representation"""
id: str
url: str
type: Optional[str] = None
name: Optional[str] = None
@dataclass
class FediversePost:
"""Common post representation across Fediverse platforms"""
id: str
text: Optional[str]
user: FediverseUser
visibility: Visibility
created_at: Optional[str] = None
files: List[FediverseFile] = None
reply_to_id: Optional[str] = None
def __post_init__(self):
if self.files is None:
self.files = []
@dataclass
class FediverseNotification:
"""Common notification representation across Fediverse platforms"""
id: str
type: NotificationType
user: FediverseUser
post: Optional[FediversePost] = None
created_at: Optional[str] = None

156
bot/misskey_service.py Normal file
View file

@ -0,0 +1,156 @@
import misskey
from typing import List, Optional, Dict, Any, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class MisskeyService(FediverseService):
"""Misskey implementation of FediverseService"""
def __init__(self):
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Misskey user data to FediverseUser"""
return FediverseUser(
id=user_data.get("id", ""),
username=user_data.get("username", "unknown"),
host=user_data.get("host"),
display_name=user_data.get("name")
)
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Misskey file data to FediverseFile"""
return FediverseFile(
id=file_data.get("id", ""),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("name")
)
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
"""Convert Misskey visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"home": Visibility.HOME,
"followers": Visibility.FOLLOWERS,
"specified": Visibility.SPECIFIED
}
return visibility_map.get(visibility, Visibility.HOME)
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Misskey visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "home",
Visibility.FOLLOWERS: "followers",
Visibility.SPECIFIED: "specified",
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
}
return visibility_map.get(visibility, "home")
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Misskey notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"reply": NotificationType.REPLY,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
"""Convert Misskey note data to FediversePost"""
files = []
if note_data.get("files"):
files = [self._convert_misskey_file(f) for f in note_data["files"]]
return FediversePost(
id=note_data.get("id", ""),
text=note_data.get("text"),
user=self._convert_misskey_user(note_data.get("user", {})),
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
created_at=note_data.get("createdAt"),
files=files,
reply_to_id=note_data.get("replyId")
)
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Misskey notification data to FediverseNotification"""
post = None
if notification_data.get("note"):
post = self._convert_misskey_post(notification_data["note"])
return FediverseNotification(
id=notification_data.get("id", ""),
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
user=self._convert_misskey_user(notification_data.get("user", {})),
post=post,
created_at=notification_data.get("createdAt")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Misskey instance"""
params = {
'include_types': ['mention', 'reply'],
'limit': 50
}
if since_id:
params["since_id"] = since_id
notifications = self.client.i_notifications(**params)
return [self._convert_misskey_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Misskey instance"""
params = {
"text": text,
"visibility": self._convert_to_misskey_visibility(visibility)
}
if reply_to_id:
params["reply_id"] = reply_to_id
if file_ids:
params["file_ids"] = file_ids
if visible_user_ids and visibility == Visibility.SPECIFIED:
params["visible_user_ids"] = visible_user_ids
response = self.client.notes_create(**params)
return response.get("createdNote", {}).get("id", "")
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Misskey instance"""
try:
note = self.client.notes_show(noteId=post_id)
return self._convert_misskey_post(note)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Misskey Drive"""
try:
from misskey.exceptions import MisskeyAPIException
media = self.client.drive_files_create(file_data)
return self._convert_misskey_file(media)
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
except Exception as e:
raise RuntimeError(f"Unexpected error during file upload: {e}") from e

View file

@ -0,0 +1,74 @@
"""Mock FediverseService for testing purposes"""
from typing import List, Optional, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class MockFediverseService(FediverseService):
"""Mock implementation of FediverseService for testing"""
def __init__(self):
self.notifications = []
self.created_posts = []
self.uploaded_files = []
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Return mock notifications, optionally filtered by since_id"""
if since_id is None:
return self.notifications
# Filter notifications newer than since_id
filtered = []
for notif in self.notifications:
if notif.id > since_id:
filtered.append(notif)
return filtered
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Mock post creation, returns fake post ID"""
post_id = f"mock_post_{len(self.created_posts)}"
# Store the post for assertions
self.created_posts.append({
'id': post_id,
'text': text,
'reply_to_id': reply_to_id,
'visibility': visibility,
'file_ids': file_ids,
'visible_user_ids': visible_user_ids
})
return post_id
def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile:
"""Mock file upload, returns fake file"""
file_id = f"mock_file_{len(self.uploaded_files)}"
mock_file = FediverseFile(
id=file_id,
url=f"https://example.com/files/{file_id}",
type="image/png",
name="test_file.png"
)
self.uploaded_files.append(mock_file)
return mock_file
# Helper methods for testing
def add_mock_notification(self, notification: FediverseNotification):
"""Add a mock notification for testing"""
self.notifications.append(notification)
def clear_all(self):
"""Clear all mock data"""
self.notifications.clear()
self.created_posts.clear()
self.uploaded_files.clear()

View file

@ -1,45 +1,47 @@
import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
import config
from parsing import parse_notification
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
from db_utils import is_whitelisted, is_player_banned
from response import generate_response
from custom_types import BotResponse
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
def process_notification(
client: misskey.Misskey,
notification: Dict[str, Any]) -> None:
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
host = user.get('host') # None if local user
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None:
'''Processes an individual fediverse notification using the abstraction'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get user and instance info
username = notification.user.username
host = notification.user.host
instance = host if host else 'local'
if USE_WHITELIST and not is_whitelisted(instance):
# Check whitelist
if config.USE_WHITELIST and not is_whitelisted(instance):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
# Copy visibility of the post that was received when replying (so if people
# don't want to dump a bunch of notes on home they don't have to)
visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
# Only process mentions and replies
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return
notif_type = notification.get('type', 'unknown')
notif_id = notification.get('id')
# Return early if no post attached
if not notification.post:
return
# Determine visibility for reply
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
notif_type = notification.type.value
notif_id = notification.id
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_notification = parse_notification(notification, client)
parsed_notification = parse_notification(notification, fediverse_service)
if not parsed_notification:
return
@ -49,79 +51,20 @@ def process_notification(
print(f'⚠️ Blocked notification from banned player: {author}')
return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
response: BotResponse | None = generate_response(parsed_notification)
if not response:
return
client.notes_create(
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
# Send response using fediverse service
fediverse_service.create_post(
text=response['message'],
reply_id=note_id,
reply_to_id=notification.post.id,
visibility=visibility,
file_ids=response['attachment_urls']
# TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
file_ids=file_ids
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id', '')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False

View file

@ -1,27 +1,38 @@
import re
from typing import Dict, Any
import misskey
import config
from response import generate_response
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
from custom_types import ParsedNotification
def parse_notification(notification: FediverseNotification, fediverse_service=None):
'''Parses any notifications received by the bot and sends any commands to
generate_response()'''
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# We get the type of notification to filter the ones that we actually want
# to parse
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return # Ignore anything that isn't a mention
# Return early if no post attached
if not notification.post:
return
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
# Get the full Activitypub ID of the user
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host")
# Local users may not have a hostname attached
full_user = f"@{username}" if not host else f"@{username}@{host}"
full_user = notification.user.full_handle
note_obj = notification.get("note", {})
note_text = note_obj.get("text")
note_id = note_obj.get("id")
note_text = notification.post.text
note_id = notification.post.id
note = note_text.strip().lower() if note_text else ""
# Split words into tokens
@ -44,9 +55,30 @@ def parse_notification(
command = parts[1].lower()
arguments = parts[2:] if len(parts) > 2 else []
return {
# Create ParsedNotification object for the new response system
parsed_notification: ParsedNotification = {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
'note_obj': {
'id': note_id,
'text': note_text,
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
}
}
# Generate response using the new system
response = generate_response(parsed_notification)
if not response:
return
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
fediverse_service.create_post(
text=response['message'],
reply_to_id=note_id,
visibility=visibility,
file_ids=file_ids
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)

188
bot/pleroma_service.py Normal file
View file

@ -0,0 +1,188 @@
from mastodon import Mastodon
from typing import List, Optional, Dict, Any, Union, BinaryIO
import io
import filetype
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class PleromaService(FediverseService):
"""Pleroma implementation of FediverseService using Mastodon.py"""
def __init__(self):
self.client = Mastodon(
access_token=config.KEY,
api_base_url=config.INSTANCE
)
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Mastodon/Pleroma user data to FediverseUser"""
acct = user_data.get("acct", "")
if "@" in acct:
username, host = acct.split("@", 1)
else:
username = acct
host = None
return FediverseUser(
id=str(user_data.get("id", "")),
username=username,
host=host,
display_name=user_data.get("display_name")
)
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
return FediverseFile(
id=str(file_data.get("id", "")),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("description")
)
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
"""Convert Mastodon/Pleroma visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"private": Visibility.FOLLOWERS,
"direct": Visibility.DIRECT
}
return visibility_map.get(visibility, Visibility.PUBLIC)
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
Visibility.FOLLOWERS: "private",
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
Visibility.DIRECT: "direct"
}
return visibility_map.get(visibility, "public")
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Mastodon/Pleroma notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
"""Convert Mastodon/Pleroma status data to FediversePost"""
files = []
if status_data.get("media_attachments"):
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
# Extract plain text from HTML content
content = status_data.get("content", "")
# Basic HTML stripping - in production you might want to use a proper HTML parser
import re
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
return FediversePost(
id=str(status_data.get("id", "")),
text=plain_text,
user=self._convert_mastodon_user(status_data.get("account", {})),
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
created_at=status_data.get("created_at"),
files=files,
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
)
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
post = None
if notification_data.get("status"):
post = self._convert_mastodon_status(notification_data["status"])
return FediverseNotification(
id=str(notification_data.get("id", "")),
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
user=self._convert_mastodon_user(notification_data.get("account", {})),
post=post,
created_at=notification_data.get("created_at")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Pleroma instance"""
params = {}
if since_id:
params["since_id"] = since_id
notifications = self.client.notifications(**params)
return [self._convert_mastodon_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Pleroma instance"""
params = {
"status": text,
"visibility": self._convert_to_mastodon_visibility(visibility)
}
if reply_to_id:
params["in_reply_to_id"] = reply_to_id
if file_ids:
params["media_ids"] = file_ids
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
# For direct messages, you typically mention users in the status text
response = self.client.status_post(**params)
return str(response.get("id", ""))
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Pleroma instance"""
try:
status = self.client.status(post_id)
return self._convert_mastodon_status(status)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Pleroma instance"""
try:
# Convert file_data to bytes for MIME detection
if hasattr(file_data, 'read'):
# Check if we can seek back
try:
current_pos = file_data.tell()
file_bytes = file_data.read()
file_data.seek(current_pos)
file_data = io.BytesIO(file_bytes)
except (io.UnsupportedOperation, OSError):
# Non-seekable stream, already read all data
file_data = io.BytesIO(file_bytes)
else:
file_bytes = file_data
file_data = io.BytesIO(file_bytes)
# Use filetype library for robust MIME detection
kind = filetype.guess(file_bytes)
if kind is not None:
mime_type = kind.mime
else:
# Fallback to image/jpeg if detection fails
mime_type = 'image/jpeg'
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
return self._convert_mastodon_file(media)
except Exception as e:
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e

View file

@ -120,12 +120,6 @@ in order: name, rarity',
be a number between 1 and 5',
'attachment_urls': None
}
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return {
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
card_id, file_id = add_card(
name=arguments[0],

View file

@ -5,6 +5,13 @@
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
; SQLite Database location
DatabaseLocation = ./gacha_game.db
; Instance type - either "misskey" or "pleroma"
InstanceType = misskey
; Web server port (default: 5000)
WebPort = 5000
; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
BindAddress = 127.0.0.1
; Whether to lmit access to the bot via an instance whitelist
; The whitelist can be adjusted via the application
UseWhitelist = False
@ -35,4 +42,3 @@ User = @bot@example.tld
; API key for the bot
; Generate one by going to Settings > API > Generate access token
Token = abcdefghijklmnopqrstuvwxyz012345

View file

@ -1,6 +1,6 @@
# Kemoverse
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
<p align="center">
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
</p>
@ -47,13 +47,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
- 🌐 Web app to generate cards from images
### 🌍 Fediverse Support
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
## 🗃️ Tech Stack
- Python (3.12+)
- SQLite
- Fediverse API integration (via Misskey endpoints)
- Fediverse API integration (Misskey and Pleroma support)
- Flask
- Modular DB design for extensibility
@ -61,16 +61,33 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
## 📝 License
This project is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0).
This means:
- You are free to use, modify, and redistribute this software under the same license.
- If you modify and deploy the software (for example, as part of a web service), you must also make the source code of your modified version available to users of that service.
You should have received a copy of the GNU Affero General Public License along with this program. If not, see https://www.gnu.org/licenses/.
The full license text can be found here: https://www.gnu.org/licenses/agpl-3.0.html
The AGPL is designed to ensure that software freedom is preserved, especially in networked environments. If you improve this project or build something on top of it, please give back to the community by sharing your changes too.
Unless another license is listed, every file in the project is licensed under the GNU Affero General Public License version 3 (or at your option), any later version.
```mermaid
flowchart TD
subgraph Player Interaction
A1[Misskey bot]
A1[Fediverse bot]
A2[Web]
end
subgraph Misskey
B1[Misskey instance]
subgraph Fediverse
B1[Fediverse instance]
end
subgraph Bot
@ -78,7 +95,7 @@ flowchart TD
C2[Notification parser]
C3[Gacha roll logic]
C4[Database interface]
C5[Misskey API poster]
C5[Fediverse API poster]
end
subgraph Website

View file

@ -6,3 +6,5 @@ Jinja2==3.1.6
MarkupSafe==3.0.2
Werkzeug==3.1.3
Misskey.py==4.1.0
Mastodon.py==1.8.1
filetype==1.2.0

View file

@ -1,3 +1,19 @@
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
import sqlite3
import traceback
import os
@ -57,16 +73,14 @@ def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> Non
def get_db_path() -> str | DBNotFoundError:
'''Gets the DB path from config.ini'''
env = os.environ.get('KEMOVERSE_ENV')
if not (env and env in ['prod', 'dev']):
raise KemoverseEnvUnset
print(f'Running in "{env}" mode')
config_path = f'config_{env}.ini'
if not os.path.isfile(config_path):
raise ConfigError(f'Could not find {config_path}')
print(f'Running in "{env}" mode')
config = ConfigParser()
config.read(config_path)
db_path = config['application']['DatabaseLocation']
@ -96,7 +110,6 @@ def main():
return
except KemoverseEnvUnset:
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
print(traceback.format_exc())
return

View file

@ -1,5 +1,21 @@
#!/bin/bash
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
# Navigate to the project directory (optional)
cd "$(dirname "$0")"

View file

@ -1,13 +1,34 @@
#Kemoverse - a gacha-style bot for the Fediverse.
#Copyright © 2025 Waifu
#
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as
#published by the Free Software Foundation, either version 3 of the
#License, or (at your option) any later version.
#
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see https://www.gnu.org/licenses/.
import sqlite3
import sys
import os
# Add bot directory to path to import config
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
import config
from flask import Flask, render_template, abort
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
DB_PATH = "./gacha_game.db" # Adjust path if needed
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@ -68,4 +89,4 @@ def submit_character():
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True)