Compare commits

...
Sign in to create a new pull request.

33 commits

Author SHA1 Message Date
Moon
0039717a01 Merge branch 'dev' into pleroma-support 2025-06-14 07:31:02 +09:00
Moon
bf8d9823b2 Merge branch 'dev' into pleroma-support 2025-06-14 07:18:18 +09:00
7c4fd5fc41 Merge pull request 'any-env-and-bind-address' (#58) from any-env-and-bind-address into dev
Reviewed-on: #58
Reviewed-by: VD15 <valkyriedev15@gmail.com>
2025-06-13 14:54:27 -07:00
Moon
24bfe88dc1 rm obsolete print 2025-06-14 05:57:01 +09:00
Moon
1a35750d0a restrict characters in KEMOVERSE_ENV 2025-06-14 05:51:49 +09:00
Moon
d416ae1b2d rm limitation on KEMOVERSE_ENV from another place. 2025-06-14 05:47:43 +09:00
Moon
7fd4d5db25 indicate on startup if whitelisting is enabled. 2025-06-14 05:17:39 +09:00
Moon
f4f847e577 make sure UseWhitelist is a boolean, default to True 2025-06-14 05:14:40 +09:00
Moon
2ef70801c7 fix config module ref 2025-06-14 05:08:59 +09:00
Moon
fdf21b3f5f rm check for env dev or prod so any can be used 2025-06-14 04:43:43 +09:00
Moon
337a989671 rm vestigial trusted instances config. 2025-06-14 04:34:20 +09:00
Moon
3ad4edbc45 ignore custom startup script 2025-06-14 04:10:16 +09:00
1984b67523 Merge pull request 'fix-missing-weight-column in dev' (#57) from fix-missing-weight-column into dev
Reviewed-on: #57
Reviewed-by: VD15 <valkyriedev15@gmail.com>
2025-06-13 04:14:53 -07:00
Moon
bbb64a869d Merge remote-tracking branch 'origin/master' into pleroma-support 2025-06-13 19:50:47 +09:00
Moon
fc45c688e8 Merge branch 'fix-missing-weight-column' into pleroma-support 2025-06-13 19:42:41 +09:00
Moon
77d4fa13bb rm validation of removed weight 2025-06-13 19:31:23 +09:00
Moon
bd287b096a rm reference to weight column. 2025-06-13 18:47:54 +09:00
Moon
40f018a83b make instantiation explicit so easier to test. 2025-06-13 13:53:41 +09:00
Moon
a516a9b55a minor change to make testing easier. 2025-06-13 13:36:38 +09:00
Moon
7161712f15 rm unused function that bypassed server abstraction 2025-06-13 13:26:01 +09:00
Moon
fadd4dfe27 Merge remote-tracking branch 'origin/dev' into pleroma-support 2025-06-13 13:05:51 +09:00
Moon
7b32ee7fcf had to change some things to get cursor for db working. 2025-06-13 11:23:34 +09:00
Moon
f70b2147cd Remove legacy files that bypass FediverseService abstraction
- Remove bot/client.py (replaced by fediverse factory)
- Remove bot/notification.py (replaced by bot_app.py)
- All functionality now properly uses FediverseService abstraction
2025-06-13 10:57:33 +09:00
Moon
298d7fda72 rm needless file 2025-06-13 10:52:20 +09:00
Moon
8918b5205d named some damn properties wrong 2025-06-13 10:52:20 +09:00
Moon
67b4d949fd use a library for content type detection instead of handrolled 2025-06-13 10:50:24 +09:00
Moon
89ae8a7290 back to using mastodon.py 2025-06-13 10:50:24 +09:00
Moon
91376c0eba works now but may need to revert file upload change 2025-06-13 10:50:24 +09:00
Moon
b2ca6dd59a documentation update for configurable backends 2025-06-13 10:50:03 +09:00
Moon
dac05a3ed8 completely untested refactor. bug fixes to come later. 2025-06-13 10:48:35 +09:00
Moon
fe8e7d246f actually use the abstracted type not just the id 2025-06-13 10:37:27 +09:00
Moon
a47530180d first whack at file upload 2025-06-13 10:37:27 +09:00
Moon
e0cf42f8f6 baseline fedi abstraction stuff. 2025-06-13 10:37:24 +09:00
21 changed files with 880 additions and 201 deletions

3
.gitignore vendored
View file

@ -185,5 +185,6 @@ cython_debug/
gacha_game*.db
gacha_game*.db.*
config*.ini
run.sh
.idea
.idea

1
.tool-versions Normal file
View file

@ -0,0 +1 @@
nodejs 23.4.0

View file

@ -1,32 +1,24 @@
import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_card
from custom_types import Card
from config import RARITY_TO_WEIGHT
import config
from fediverse_factory import get_fediverse_service
import db_utils
def add_card(
name: str,
rarity: int,
image_url: str) -> tuple[int, str]:
'''
Adds a card to the database, uploading the image from a public URL to
the bot's Misskey Drive.
def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
"""
Adds a card to the database, uploading the image from a public URL to the Fediverse instance.
Args:
name (str): Card name.
rarity (int): Card rarity (e.g., 1-5).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
image_url (str): Public URL of the image from the post.
Returns:
tuple[int, str]: Card ID and bot's Drive file_id.
tuple[int, str]: Card ID and file_id.
Raises:
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
'''
"""
stripped_name = name.strip()
@ -35,30 +27,33 @@ def add_card(
raise ValueError('Card name cannot be empty.')
if rarity < 1:
raise ValueError('Rarity must be a positive integer.')
if rarity not in RARITY_TO_WEIGHT.keys():
if rarity not in config.RARITY_TO_WEIGHT.keys():
raise ValueError(f'Invalid rarity: {rarity}')
if not image_url:
raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media['id']
except MisskeyAPIException as e:
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
# Insert into database
card_id = insert_card(
# Upload to Fediverse instance
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
try:
uploaded_file = fediverse_service.upload_file(response.raw)
file_id = uploaded_file.id
except RuntimeError as e:
raise RuntimeError(f"Failed to upload image: {e}") from e
# Insert into database using db_utils function
card_id = db_utils.insert_card(
stripped_name,
rarity,
RARITY_TO_WEIGHT[rarity],
file_id
)
return card_id, file_id
)
return card_id, file_id
except Exception as e:
raise

View file

@ -1,21 +1,62 @@
import time
import misskey as misskey
from client import client_connection
import db_utils as db
import traceback
import config
from notification import process_fediverse_notification
from db_utils import get_config, set_config, connect, setup_administrators
from fediverse_factory import get_fediverse_service
from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
# Connect to DB
db.connect()
from config import USE_WHITELIST
def stream_notifications():
# Initialize database connection
connect()
# Setup default administrators
db.setup_administrators()
setup_administrators()
# Initialize the Fediverse service
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get the last seen notification ID from the database
last_seen_id = get_config("last_seen_notif_id")
# Show whitelist status
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
print(f'Instance whitelisting: {whitelist_status}')
print('Listening for notifications...')
while True:
if not process_notifications(client):
time.sleep(NOTIFICATION_POLL_INTERVAL)
try:
# Get notifications from the fediverse service
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
if notifications:
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.id
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
# Process the notification using the abstracted processor
process_fediverse_notification(notification, fediverse_service)
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()

View file

@ -1,6 +0,0 @@
import misskey
import config
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -1,6 +1,7 @@
'''Essentials for the bot to function'''
import configparser
import json
import re
from os import environ, path
@ -13,8 +14,10 @@ def get_config_file() -> str:
env: str | None = environ.get('KEMOVERSE_ENV')
if not env:
raise ConfigError('Error: KEMOVERSE_ENV is unset')
if not (env in ['prod', 'dev']):
raise ConfigError(f'Error: Invalid environment: {env}')
# Validate environment name contains only alphanumeric, dash, and underscore
if not re.match(r'^[a-zA-Z0-9_-]+$', env):
raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.')
config_path: str = f'config_{env}.ini'
@ -23,6 +26,50 @@ def get_config_file() -> str:
return config_path
def normalize_user(user_string: str) -> str:
"""
Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive
Args:
user_string: User string in various formats
Returns:
Normalized user string
Raises:
ValueError: If the user string is invalid or domain is malformed
"""
if not user_string or not user_string.strip():
raise ValueError("User string cannot be empty")
user_string = user_string.strip()
# Add leading @ if missing
if not user_string.startswith('@'):
user_string = '@' + user_string
# Split into user and domain parts
parts = user_string[1:].split('@', 1) # Remove leading @ and split
if len(parts) != 2:
raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld")
username, domain = parts
if not username:
raise ValueError("Username cannot be empty")
if not domain:
raise ValueError("Domain cannot be empty")
# Validate domain format (basic check for valid domain structure)
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
if not re.match(domain_pattern, domain):
raise ValueError(f"Invalid domain format: {domain}")
# Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive)
return f"@{username}@{domain.lower()}"
def get_rarity_to_weight(
config_section: configparser.SectionProxy) -> dict[int, float]:
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
@ -38,19 +85,36 @@ config = configparser.ConfigParser()
config.read(get_config_file())
# Username for the bot
USER = config['credentials']['User'].lower()
if 'User' not in config['credentials'] or not config['credentials']['User'].strip():
raise ConfigError("User must be specified in config.ini under [credentials]")
USER = normalize_user(config['credentials']['User'])
# API key for the bot
KEY = config['credentials']['Token']
# Bot's Misskey instance URL
# Bot's Misskey/Pleroma instance URL
INSTANCE = config['credentials']['Instance'].lower()
# Instance type validation
if 'InstanceType' not in config['application']:
raise ValueError("InstanceType must be specified in config.ini")
instance_type = config['application']['InstanceType'].lower()
if instance_type not in ('misskey', 'pleroma'):
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
INSTANCE_TYPE = instance_type
# Web server port
WEB_PORT = config['application'].getint('WebPort', 5000)
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot
ADMINS = json.loads(config['application']['DefaultAdmins'])
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db')
# Whether to enable the instance whitelist
USE_WHITELIST = config['application']['UseWhitelist']
USE_WHITELIST = config['application'].getboolean('UseWhitelist', True)
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])

View file

@ -150,12 +150,11 @@ def is_player_administrator(username: str) -> bool:
def insert_card(
name: str, rarity: int, weight: float, file_id: str) -> int:
name: str, rarity: int, file_id: str) -> int:
'''Inserts a card'''
CURSOR.execute(
'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
(?, ?, ?, ?)',
(name, rarity, weight, file_id)
'INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?)',
(name, rarity, file_id)
)
card_id = CURSOR.lastrowid
return card_id if card_id else 0

40
bot/fediverse_factory.py Normal file
View file

@ -0,0 +1,40 @@
from fediverse_service import FediverseService
from misskey_service import MisskeyService
from pleroma_service import PleromaService
class FediverseServiceFactory:
"""Factory for creating FediverseService implementations"""
@staticmethod
def create_service(instance_type: str) -> FediverseService:
"""
Create a FediverseService implementation based on the instance type.
Args:
instance_type: The type of instance ("misskey" or "pleroma")
Returns:
FediverseService implementation (MisskeyService or PleromaService)
Raises:
ValueError: If the instance type is not supported
"""
instance_type = instance_type.lower()
if instance_type == "misskey":
return MisskeyService()
elif instance_type == "pleroma":
return PleromaService()
else:
raise ValueError(f"Unsupported instance type: {instance_type}")
def get_fediverse_service(instance_type: str) -> FediverseService:
"""
Convenience function to get a FediverseService instance
Args:
instance_type: The instance type ("misskey" or "pleroma")
"""
return FediverseServiceFactory.create_service(instance_type)

74
bot/fediverse_service.py Normal file
View file

@ -0,0 +1,74 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union, BinaryIO
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class FediverseService(ABC):
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
@abstractmethod
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""
Retrieve notifications from the Fediverse instance.
Args:
since_id: Optional ID to get notifications newer than this ID
Returns:
List of FediverseNotification objects
"""
pass
@abstractmethod
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""
Create a new post on the Fediverse instance.
Args:
text: The text content of the post
reply_to_id: Optional ID of post to reply to
visibility: Visibility level for the post
file_ids: Optional list of file IDs to attach
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
Returns:
ID of the created post
"""
pass
@abstractmethod
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""
Retrieve a specific post by its ID.
Args:
post_id: The ID of the post to retrieve
Returns:
FediversePost object if found, None otherwise
"""
pass
@abstractmethod
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""
Upload a file to the Fediverse instance.
Args:
file_data: File data as binary stream or bytes
filename: Optional filename for the uploaded file
Returns:
FediverseFile object with ID, URL, and other metadata
Raises:
RuntimeError: If file upload fails
"""
pass

73
bot/fediverse_types.py Normal file
View file

@ -0,0 +1,73 @@
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from enum import Enum
class NotificationType(Enum):
MENTION = "mention"
REPLY = "reply"
FOLLOW = "follow"
FAVOURITE = "favourite"
REBLOG = "reblog"
POLL = "poll"
OTHER = "other"
class Visibility(Enum):
PUBLIC = "public"
UNLISTED = "unlisted"
HOME = "home"
FOLLOWERS = "followers"
SPECIFIED = "specified"
DIRECT = "direct"
@dataclass
class FediverseUser:
"""Common user representation across Fediverse platforms"""
id: str
username: str
host: Optional[str] = None # None for local users
display_name: Optional[str] = None
@property
def full_handle(self) -> str:
"""Returns the full fediverse handle (@user@domain or @user for local)"""
if self.host:
return f"@{self.username}@{self.host}"
return f"@{self.username}"
@dataclass
class FediverseFile:
"""Common file/attachment representation"""
id: str
url: str
type: Optional[str] = None
name: Optional[str] = None
@dataclass
class FediversePost:
"""Common post representation across Fediverse platforms"""
id: str
text: Optional[str]
user: FediverseUser
visibility: Visibility
created_at: Optional[str] = None
files: List[FediverseFile] = None
reply_to_id: Optional[str] = None
def __post_init__(self):
if self.files is None:
self.files = []
@dataclass
class FediverseNotification:
"""Common notification representation across Fediverse platforms"""
id: str
type: NotificationType
user: FediverseUser
post: Optional[FediversePost] = None
created_at: Optional[str] = None

156
bot/misskey_service.py Normal file
View file

@ -0,0 +1,156 @@
import misskey
from typing import List, Optional, Dict, Any, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class MisskeyService(FediverseService):
"""Misskey implementation of FediverseService"""
def __init__(self):
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Misskey user data to FediverseUser"""
return FediverseUser(
id=user_data.get("id", ""),
username=user_data.get("username", "unknown"),
host=user_data.get("host"),
display_name=user_data.get("name")
)
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Misskey file data to FediverseFile"""
return FediverseFile(
id=file_data.get("id", ""),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("name")
)
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
"""Convert Misskey visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"home": Visibility.HOME,
"followers": Visibility.FOLLOWERS,
"specified": Visibility.SPECIFIED
}
return visibility_map.get(visibility, Visibility.HOME)
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Misskey visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "home",
Visibility.FOLLOWERS: "followers",
Visibility.SPECIFIED: "specified",
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
}
return visibility_map.get(visibility, "home")
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Misskey notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"reply": NotificationType.REPLY,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
"""Convert Misskey note data to FediversePost"""
files = []
if note_data.get("files"):
files = [self._convert_misskey_file(f) for f in note_data["files"]]
return FediversePost(
id=note_data.get("id", ""),
text=note_data.get("text"),
user=self._convert_misskey_user(note_data.get("user", {})),
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
created_at=note_data.get("createdAt"),
files=files,
reply_to_id=note_data.get("replyId")
)
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Misskey notification data to FediverseNotification"""
post = None
if notification_data.get("note"):
post = self._convert_misskey_post(notification_data["note"])
return FediverseNotification(
id=notification_data.get("id", ""),
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
user=self._convert_misskey_user(notification_data.get("user", {})),
post=post,
created_at=notification_data.get("createdAt")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Misskey instance"""
params = {
'include_types': ['mention', 'reply'],
'limit': 50
}
if since_id:
params["since_id"] = since_id
notifications = self.client.i_notifications(**params)
return [self._convert_misskey_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Misskey instance"""
params = {
"text": text,
"visibility": self._convert_to_misskey_visibility(visibility)
}
if reply_to_id:
params["reply_id"] = reply_to_id
if file_ids:
params["file_ids"] = file_ids
if visible_user_ids and visibility == Visibility.SPECIFIED:
params["visible_user_ids"] = visible_user_ids
response = self.client.notes_create(**params)
return response.get("createdNote", {}).get("id", "")
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Misskey instance"""
try:
note = self.client.notes_show(noteId=post_id)
return self._convert_misskey_post(note)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Misskey Drive"""
try:
from misskey.exceptions import MisskeyAPIException
media = self.client.drive_files_create(file_data)
return self._convert_misskey_file(media)
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
except Exception as e:
raise RuntimeError(f"Unexpected error during file upload: {e}") from e

View file

@ -0,0 +1,74 @@
"""Mock FediverseService for testing purposes"""
from typing import List, Optional, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class MockFediverseService(FediverseService):
"""Mock implementation of FediverseService for testing"""
def __init__(self):
self.notifications = []
self.created_posts = []
self.uploaded_files = []
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Return mock notifications, optionally filtered by since_id"""
if since_id is None:
return self.notifications
# Filter notifications newer than since_id
filtered = []
for notif in self.notifications:
if notif.id > since_id:
filtered.append(notif)
return filtered
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Mock post creation, returns fake post ID"""
post_id = f"mock_post_{len(self.created_posts)}"
# Store the post for assertions
self.created_posts.append({
'id': post_id,
'text': text,
'reply_to_id': reply_to_id,
'visibility': visibility,
'file_ids': file_ids,
'visible_user_ids': visible_user_ids
})
return post_id
def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile:
"""Mock file upload, returns fake file"""
file_id = f"mock_file_{len(self.uploaded_files)}"
mock_file = FediverseFile(
id=file_id,
url=f"https://example.com/files/{file_id}",
type="image/png",
name="test_file.png"
)
self.uploaded_files.append(mock_file)
return mock_file
# Helper methods for testing
def add_mock_notification(self, notification: FediverseNotification):
"""Add a mock notification for testing"""
self.notifications.append(notification)
def clear_all(self):
"""Clear all mock data"""
self.notifications.clear()
self.created_posts.clear()
self.uploaded_files.clear()

View file

@ -1,45 +1,47 @@
import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
import config
from parsing import parse_notification
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
from db_utils import is_whitelisted, is_player_banned
from response import generate_response
from custom_types import BotResponse
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
def process_notification(
client: misskey.Misskey,
notification: Dict[str, Any]) -> None:
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
host = user.get('host') # None if local user
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None:
'''Processes an individual fediverse notification using the abstraction'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get user and instance info
username = notification.user.username
host = notification.user.host
instance = host if host else 'local'
if USE_WHITELIST and not is_whitelisted(instance):
# Check whitelist
if config.USE_WHITELIST and not is_whitelisted(instance):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
# Copy visibility of the post that was received when replying (so if people
# don't want to dump a bunch of notes on home they don't have to)
visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
# Only process mentions and replies
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return
notif_type = notification.get('type', 'unknown')
notif_id = notification.get('id')
# Return early if no post attached
if not notification.post:
return
# Determine visibility for reply
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
notif_type = notification.type.value
notif_id = notification.id
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_notification = parse_notification(notification, client)
parsed_notification = parse_notification(notification, fediverse_service)
if not parsed_notification:
return
@ -49,79 +51,20 @@ def process_notification(
print(f'⚠️ Blocked notification from banned player: {author}')
return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
response: BotResponse | None = generate_response(parsed_notification)
if not response:
return
client.notes_create(
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
# Send response using fediverse service
fediverse_service.create_post(
text=response['message'],
reply_id=note_id,
reply_to_id=notification.post.id,
visibility=visibility,
file_ids=response['attachment_urls']
# TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
file_ids=file_ids
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id', '')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False

View file

@ -1,27 +1,38 @@
import re
from typing import Dict, Any
import misskey
import config
from response import generate_response
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
from custom_types import ParsedNotification
def parse_notification(notification: FediverseNotification, fediverse_service=None):
'''Parses any notifications received by the bot and sends any commands to
generate_response()'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
# We get the type of notification to filter the ones that we actually want
# to parse
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return # Ignore anything that isn't a mention
# Return early if no post attached
if not notification.post:
return
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
# Get the full Activitypub ID of the user
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host")
# Local users may not have a hostname attached
full_user = f"@{username}" if not host else f"@{username}@{host}"
full_user = notification.user.full_handle
note_obj = notification.get("note", {})
note_text = note_obj.get("text")
note_id = note_obj.get("id")
note_text = notification.post.text
note_id = notification.post.id
note = note_text.strip().lower() if note_text else ""
# Split words into tokens
@ -44,9 +55,30 @@ def parse_notification(
command = parts[1].lower()
arguments = parts[2:] if len(parts) > 2 else []
return {
# Create ParsedNotification object for the new response system
parsed_notification: ParsedNotification = {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
'note_obj': {
'id': note_id,
'text': note_text,
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
}
}
# Generate response using the new system
response = generate_response(parsed_notification)
if not response:
return
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
fediverse_service.create_post(
text=response['message'],
reply_to_id=note_id,
visibility=visibility,
file_ids=file_ids
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)

188
bot/pleroma_service.py Normal file
View file

@ -0,0 +1,188 @@
from mastodon import Mastodon
from typing import List, Optional, Dict, Any, Union, BinaryIO
import io
import filetype
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class PleromaService(FediverseService):
"""Pleroma implementation of FediverseService using Mastodon.py"""
def __init__(self):
self.client = Mastodon(
access_token=config.KEY,
api_base_url=config.INSTANCE
)
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Mastodon/Pleroma user data to FediverseUser"""
acct = user_data.get("acct", "")
if "@" in acct:
username, host = acct.split("@", 1)
else:
username = acct
host = None
return FediverseUser(
id=str(user_data.get("id", "")),
username=username,
host=host,
display_name=user_data.get("display_name")
)
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
return FediverseFile(
id=str(file_data.get("id", "")),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("description")
)
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
"""Convert Mastodon/Pleroma visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"private": Visibility.FOLLOWERS,
"direct": Visibility.DIRECT
}
return visibility_map.get(visibility, Visibility.PUBLIC)
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
Visibility.FOLLOWERS: "private",
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
Visibility.DIRECT: "direct"
}
return visibility_map.get(visibility, "public")
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Mastodon/Pleroma notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
"""Convert Mastodon/Pleroma status data to FediversePost"""
files = []
if status_data.get("media_attachments"):
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
# Extract plain text from HTML content
content = status_data.get("content", "")
# Basic HTML stripping - in production you might want to use a proper HTML parser
import re
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
return FediversePost(
id=str(status_data.get("id", "")),
text=plain_text,
user=self._convert_mastodon_user(status_data.get("account", {})),
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
created_at=status_data.get("created_at"),
files=files,
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
)
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
post = None
if notification_data.get("status"):
post = self._convert_mastodon_status(notification_data["status"])
return FediverseNotification(
id=str(notification_data.get("id", "")),
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
user=self._convert_mastodon_user(notification_data.get("account", {})),
post=post,
created_at=notification_data.get("created_at")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Pleroma instance"""
params = {}
if since_id:
params["since_id"] = since_id
notifications = self.client.notifications(**params)
return [self._convert_mastodon_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Pleroma instance"""
params = {
"status": text,
"visibility": self._convert_to_mastodon_visibility(visibility)
}
if reply_to_id:
params["in_reply_to_id"] = reply_to_id
if file_ids:
params["media_ids"] = file_ids
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
# For direct messages, you typically mention users in the status text
response = self.client.status_post(**params)
return str(response.get("id", ""))
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Pleroma instance"""
try:
status = self.client.status(post_id)
return self._convert_mastodon_status(status)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Pleroma instance"""
try:
# Convert file_data to bytes for MIME detection
if hasattr(file_data, 'read'):
# Check if we can seek back
try:
current_pos = file_data.tell()
file_bytes = file_data.read()
file_data.seek(current_pos)
file_data = io.BytesIO(file_bytes)
except (io.UnsupportedOperation, OSError):
# Non-seekable stream, already read all data
file_data = io.BytesIO(file_bytes)
else:
file_bytes = file_data
file_data = io.BytesIO(file_bytes)
# Use filetype library for robust MIME detection
kind = filetype.guess(file_bytes)
if kind is not None:
mime_type = kind.mime
else:
# Fallback to image/jpeg if detection fails
mime_type = 'image/jpeg'
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
return self._convert_mastodon_file(media)
except Exception as e:
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e

View file

@ -120,12 +120,6 @@ in order: name, rarity',
be a number between 1 and 5',
'attachment_urls': None
}
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return {
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0',
'attachment_urls': None
}
card_id, file_id = add_card(
name=arguments[0],

View file

@ -5,6 +5,13 @@
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
; SQLite Database location
DatabaseLocation = ./gacha_game.db
; Instance type - either "misskey" or "pleroma"
InstanceType = misskey
; Web server port (default: 5000)
WebPort = 5000
; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
BindAddress = 127.0.0.1
; Whether to lmit access to the bot via an instance whitelist
; The whitelist can be adjusted via the application
UseWhitelist = False
@ -35,4 +42,3 @@ User = @bot@example.tld
; API key for the bot
; Generate one by going to Settings > API > Generate access token
Token = abcdefghijklmnopqrstuvwxyz012345

View file

@ -1,6 +1,6 @@
# Kemoverse
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
<p align="center">
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
</p>
@ -47,13 +47,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
- 🌐 Web app to generate cards from images
### 🌍 Fediverse Support
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
## 🗃️ Tech Stack
- Python (3.12+)
- SQLite
- Fediverse API integration (via Misskey endpoints)
- Fediverse API integration (Misskey and Pleroma support)
- Flask
- Modular DB design for extensibility
@ -65,12 +65,12 @@ The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and
flowchart TD
subgraph Player Interaction
A1[Misskey bot]
A1[Fediverse bot]
A2[Web]
end
subgraph Misskey
B1[Misskey instance]
subgraph Fediverse
B1[Fediverse instance]
end
subgraph Bot
@ -78,7 +78,7 @@ flowchart TD
C2[Notification parser]
C3[Gacha roll logic]
C4[Database interface]
C5[Misskey API poster]
C5[Fediverse API poster]
end
subgraph Website

View file

@ -6,3 +6,5 @@ Jinja2==3.1.6
MarkupSafe==3.0.2
Werkzeug==3.1.3
Misskey.py==4.1.0
Mastodon.py==1.8.1
filetype==1.2.0

View file

@ -57,16 +57,14 @@ def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> Non
def get_db_path() -> str | DBNotFoundError:
'''Gets the DB path from config.ini'''
env = os.environ.get('KEMOVERSE_ENV')
if not (env and env in ['prod', 'dev']):
raise KemoverseEnvUnset
print(f'Running in "{env}" mode')
config_path = f'config_{env}.ini'
if not os.path.isfile(config_path):
raise ConfigError(f'Could not find {config_path}')
print(f'Running in "{env}" mode')
config = ConfigParser()
config.read(config_path)
db_path = config['application']['DatabaseLocation']
@ -96,7 +94,6 @@ def main():
return
except KemoverseEnvUnset:
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
print(traceback.format_exc())
return

View file

@ -1,13 +1,18 @@
import sqlite3
import sys
import os
# Add bot directory to path to import config
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
import config
from flask import Flask, render_template, abort
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
DB_PATH = "./gacha_game.db" # Adjust path if needed
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@ -68,4 +73,4 @@ def submit_character():
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True)