pleroma-support - optionally use pleroma as a host for the bot. #54

Open
moon wants to merge 23 commits from pleroma-support into dev
18 changed files with 805 additions and 183 deletions

3
.gitignore vendored
View file

@ -185,5 +185,6 @@ cython_debug/
gacha_game*.db
gacha_game*.db.*
config*.ini
run.sh
.idea
.idea

1
.tool-versions Normal file
View file

@ -0,0 +1 @@
nodejs 23.4.0

View file

@ -1,32 +1,24 @@
import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_card
from custom_types import Card
from config import RARITY_TO_WEIGHT
import config
from fediverse_factory import get_fediverse_service
import db_utils
def add_card(
name: str,
rarity: int,
image_url: str) -> tuple[int, str]:
'''
Adds a card to the database, uploading the image from a public URL to
the bot's Misskey Drive.
def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
"""
Adds a card to the database, uploading the image from a public URL to the Fediverse instance.
Args:
name (str): Card name.
rarity (int): Card rarity (e.g., 1-5).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
image_url (str): Public URL of the image from the post.
Returns:
tuple[int, str]: Card ID and bot's Drive file_id.
tuple[int, str]: Card ID and file_id.
Raises:
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
'''
"""
stripped_name = name.strip()
@ -35,29 +27,33 @@ def add_card(
raise ValueError('Card name cannot be empty.')
if rarity < 1:
raise ValueError('Rarity must be a positive integer.')
if rarity not in RARITY_TO_WEIGHT.keys():
if rarity not in config.RARITY_TO_WEIGHT.keys():
raise ValueError(f'Invalid rarity: {rarity}')
if not image_url:
raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media['id']
except MisskeyAPIException as e:
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
# Insert into database
card_id = insert_card(
# Upload to Fediverse instance
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
try:
uploaded_file = fediverse_service.upload_file(response.raw)
file_id = uploaded_file.id
except RuntimeError as e:
raise RuntimeError(f"Failed to upload image: {e}") from e
# Insert into database using db_utils function
card_id = db_utils.insert_card(
stripped_name,
rarity,
file_id
)
return card_id, file_id
)
return card_id, file_id
except Exception as e:
raise

View file

@ -1,19 +1,24 @@
import time
import misskey as misskey
from client import client_connection
import db_utils as db
import traceback
import config
from notification import process_fediverse_notification
from db_utils import get_config, set_config, connect, setup_administrators
from fediverse_factory import get_fediverse_service
from config import NOTIFICATION_POLL_INTERVAL, USE_WHITELIST
from notification import process_notifications
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
# Connect to DB
db.connect()
from config import USE_WHITELIST
def stream_notifications():
# Initialize database connection
connect()
# Setup default administrators
db.setup_administrators()
setup_administrators()
# Initialize the Fediverse service
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get the last seen notification ID from the database
last_seen_id = get_config("last_seen_notif_id")
# Show whitelist status
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
@ -21,5 +26,37 @@ if __name__ == '__main__':
print('Listening for notifications...')
while True:
if not process_notifications(client):
time.sleep(NOTIFICATION_POLL_INTERVAL)
try:
# Get notifications from the fediverse service
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
if notifications:
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.id
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
# Process the notification using the abstracted processor
process_fediverse_notification(notification, fediverse_service)
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()

View file

@ -1,6 +0,0 @@
import misskey
import config
def client_connection() -> misskey.Misskey:
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)

View file

@ -91,9 +91,19 @@ if 'User' not in config['credentials'] or not config['credentials']['User'].stri
USER = normalize_user(config['credentials']['User'])
# API key for the bot
KEY = config['credentials']['Token']
# Bot's Misskey instance URL
# Bot's Misskey/Pleroma instance URL
INSTANCE = config['credentials']['Instance'].lower()
# Instance type validation
if 'InstanceType' not in config['application']:
raise ValueError("InstanceType must be specified in config.ini")
instance_type = config['application']['InstanceType'].lower()
if instance_type not in ('misskey', 'pleroma'):
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
INSTANCE_TYPE = instance_type
# Web server port
WEB_PORT = config['application'].getint('WebPort', 5000)
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')

40
bot/fediverse_factory.py Normal file
View file

@ -0,0 +1,40 @@
from fediverse_service import FediverseService
from misskey_service import MisskeyService
from pleroma_service import PleromaService
class FediverseServiceFactory:
"""Factory for creating FediverseService implementations"""
@staticmethod
def create_service(instance_type: str) -> FediverseService:
"""
Create a FediverseService implementation based on the instance type.
Args:
instance_type: The type of instance ("misskey" or "pleroma")
Returns:
FediverseService implementation (MisskeyService or PleromaService)
Raises:
ValueError: If the instance type is not supported
"""
instance_type = instance_type.lower()
if instance_type == "misskey":
return MisskeyService()
elif instance_type == "pleroma":
return PleromaService()
else:
raise ValueError(f"Unsupported instance type: {instance_type}")
def get_fediverse_service(instance_type: str) -> FediverseService:
"""
Convenience function to get a FediverseService instance
Args:
instance_type: The instance type ("misskey" or "pleroma")
"""
return FediverseServiceFactory.create_service(instance_type)

74
bot/fediverse_service.py Normal file
View file

@ -0,0 +1,74 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union, BinaryIO
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class FediverseService(ABC):
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
@abstractmethod
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""
Retrieve notifications from the Fediverse instance.
Args:
since_id: Optional ID to get notifications newer than this ID
Returns:
List of FediverseNotification objects
"""
pass
@abstractmethod
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""
Create a new post on the Fediverse instance.
Args:
text: The text content of the post
reply_to_id: Optional ID of post to reply to
visibility: Visibility level for the post
file_ids: Optional list of file IDs to attach
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
Returns:
ID of the created post
"""
pass
@abstractmethod
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""
Retrieve a specific post by its ID.
Args:
post_id: The ID of the post to retrieve
Returns:
FediversePost object if found, None otherwise
"""
pass
@abstractmethod
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""
Upload a file to the Fediverse instance.
Args:
file_data: File data as binary stream or bytes
filename: Optional filename for the uploaded file
Returns:
FediverseFile object with ID, URL, and other metadata
Raises:
RuntimeError: If file upload fails
"""
pass

73
bot/fediverse_types.py Normal file
View file

@ -0,0 +1,73 @@
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from enum import Enum
class NotificationType(Enum):
MENTION = "mention"
REPLY = "reply"
FOLLOW = "follow"
FAVOURITE = "favourite"
REBLOG = "reblog"
POLL = "poll"
OTHER = "other"
class Visibility(Enum):
PUBLIC = "public"
UNLISTED = "unlisted"
HOME = "home"
FOLLOWERS = "followers"
SPECIFIED = "specified"
DIRECT = "direct"
@dataclass
class FediverseUser:
"""Common user representation across Fediverse platforms"""
id: str
username: str
host: Optional[str] = None # None for local users
display_name: Optional[str] = None
@property
def full_handle(self) -> str:
"""Returns the full fediverse handle (@user@domain or @user for local)"""
if self.host:
return f"@{self.username}@{self.host}"
return f"@{self.username}"
@dataclass
class FediverseFile:
"""Common file/attachment representation"""
id: str
url: str
type: Optional[str] = None
name: Optional[str] = None
@dataclass
class FediversePost:
"""Common post representation across Fediverse platforms"""
id: str
text: Optional[str]
user: FediverseUser
visibility: Visibility
created_at: Optional[str] = None
files: List[FediverseFile] = None
reply_to_id: Optional[str] = None
def __post_init__(self):
if self.files is None:
self.files = []
@dataclass
class FediverseNotification:
"""Common notification representation across Fediverse platforms"""
id: str
type: NotificationType
user: FediverseUser
post: Optional[FediversePost] = None
created_at: Optional[str] = None

156
bot/misskey_service.py Normal file
View file

@ -0,0 +1,156 @@
import misskey
from typing import List, Optional, Dict, Any, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class MisskeyService(FediverseService):
"""Misskey implementation of FediverseService"""
def __init__(self):
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Misskey user data to FediverseUser"""
return FediverseUser(
id=user_data.get("id", ""),
username=user_data.get("username", "unknown"),
host=user_data.get("host"),
display_name=user_data.get("name")
)
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Misskey file data to FediverseFile"""
return FediverseFile(
id=file_data.get("id", ""),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("name")
)
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
"""Convert Misskey visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"home": Visibility.HOME,
"followers": Visibility.FOLLOWERS,
"specified": Visibility.SPECIFIED
}
return visibility_map.get(visibility, Visibility.HOME)
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Misskey visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "home",
Visibility.FOLLOWERS: "followers",
Visibility.SPECIFIED: "specified",
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
}
return visibility_map.get(visibility, "home")
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Misskey notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"reply": NotificationType.REPLY,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
"""Convert Misskey note data to FediversePost"""
files = []
if note_data.get("files"):
files = [self._convert_misskey_file(f) for f in note_data["files"]]
return FediversePost(
id=note_data.get("id", ""),
text=note_data.get("text"),
user=self._convert_misskey_user(note_data.get("user", {})),
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
created_at=note_data.get("createdAt"),
files=files,
reply_to_id=note_data.get("replyId")
)
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Misskey notification data to FediverseNotification"""
post = None
if notification_data.get("note"):
post = self._convert_misskey_post(notification_data["note"])
return FediverseNotification(
id=notification_data.get("id", ""),
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
user=self._convert_misskey_user(notification_data.get("user", {})),
post=post,
created_at=notification_data.get("createdAt")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Misskey instance"""
params = {
'include_types': ['mention', 'reply'],
'limit': 50
}
if since_id:
params["since_id"] = since_id
notifications = self.client.i_notifications(**params)
return [self._convert_misskey_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Misskey instance"""
params = {
"text": text,
"visibility": self._convert_to_misskey_visibility(visibility)
}
if reply_to_id:
params["reply_id"] = reply_to_id
if file_ids:
params["file_ids"] = file_ids
if visible_user_ids and visibility == Visibility.SPECIFIED:
params["visible_user_ids"] = visible_user_ids
response = self.client.notes_create(**params)
return response.get("createdNote", {}).get("id", "")
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Misskey instance"""
try:
note = self.client.notes_show(noteId=post_id)
return self._convert_misskey_post(note)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Misskey Drive"""
try:
from misskey.exceptions import MisskeyAPIException
media = self.client.drive_files_create(file_data)
return self._convert_misskey_file(media)
except MisskeyAPIException as e:
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
except Exception as e:
raise RuntimeError(f"Unexpected error during file upload: {e}") from e

View file

@ -0,0 +1,74 @@
"""Mock FediverseService for testing purposes"""
from typing import List, Optional, Union, BinaryIO
from fediverse_service import FediverseService
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
class MockFediverseService(FediverseService):
"""Mock implementation of FediverseService for testing"""
def __init__(self):
self.notifications = []
self.created_posts = []
self.uploaded_files = []
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Return mock notifications, optionally filtered by since_id"""
if since_id is None:
return self.notifications
# Filter notifications newer than since_id
filtered = []
for notif in self.notifications:
if notif.id > since_id:
filtered.append(notif)
return filtered
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Mock post creation, returns fake post ID"""
post_id = f"mock_post_{len(self.created_posts)}"
# Store the post for assertions
self.created_posts.append({
'id': post_id,
'text': text,
'reply_to_id': reply_to_id,
'visibility': visibility,
'file_ids': file_ids,
'visible_user_ids': visible_user_ids
})
return post_id
def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile:
"""Mock file upload, returns fake file"""
file_id = f"mock_file_{len(self.uploaded_files)}"
mock_file = FediverseFile(
id=file_id,
url=f"https://example.com/files/{file_id}",
type="image/png",
name="test_file.png"
)
self.uploaded_files.append(mock_file)
return mock_file
# Helper methods for testing
def add_mock_notification(self, notification: FediverseNotification):
"""Add a mock notification for testing"""
self.notifications.append(notification)
def clear_all(self):
"""Clear all mock data"""
self.notifications.clear()
self.created_posts.clear()
self.uploaded_files.clear()

View file

@ -1,45 +1,47 @@
import traceback
from typing import Dict, Any
import misskey
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
import config
from parsing import parse_notification
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
from db_utils import is_whitelisted, is_player_banned
from response import generate_response
from custom_types import BotResponse
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
def process_notification(
client: misskey.Misskey,
notification: Dict[str, Any]) -> None:
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
host = user.get('host') # None if local user
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None:
'''Processes an individual fediverse notification using the abstraction'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
# Get user and instance info
username = notification.user.username
host = notification.user.host
instance = host if host else 'local'
if USE_WHITELIST and not is_whitelisted(instance):
# Check whitelist
if config.USE_WHITELIST and not is_whitelisted(instance):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
# Copy visibility of the post that was received when replying (so if people
# don't want to dump a bunch of notes on home they don't have to)
visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
# Only process mentions and replies
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return
notif_type = notification.get('type', 'unknown')
notif_id = notification.get('id')
# Return early if no post attached
if not notification.post:
return
# Determine visibility for reply
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
notif_type = notification.type.value
notif_id = notification.id
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_notification = parse_notification(notification, client)
parsed_notification = parse_notification(notification, fediverse_service)
if not parsed_notification:
return
@ -49,79 +51,20 @@ def process_notification(
print(f'⚠️ Blocked notification from banned player: {author}')
return
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
response: BotResponse | None = generate_response(parsed_notification)
if not response:
return
client.notes_create(
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
# Send response using fediverse service
fediverse_service.create_post(
text=response['message'],
reply_id=note_id,
reply_to_id=notification.post.id,
visibility=visibility,
file_ids=response['attachment_urls']
# TODO: write actual visible users ids so pleromers can use the bot
# privately
# visible_user_ids=[]
file_ids=file_ids
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client: misskey.Misskey) -> bool:
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id', '')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a \
notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False

View file

@ -1,27 +1,38 @@
import re
from typing import Dict, Any
import misskey
import config
from response import generate_response
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
from custom_types import ParsedNotification
def parse_notification(notification: FediverseNotification, fediverse_service=None):
'''Parses any notifications received by the bot and sends any commands to
generate_response()'''
if fediverse_service is None:
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
# We get the type of notification to filter the ones that we actually want
# to parse
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return # Ignore anything that isn't a mention
# Return early if no post attached
if not notification.post:
return
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
# Get the full Activitypub ID of the user
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host")
# Local users may not have a hostname attached
full_user = f"@{username}" if not host else f"@{username}@{host}"
full_user = notification.user.full_handle
note_obj = notification.get("note", {})
note_text = note_obj.get("text")
note_id = note_obj.get("id")
note_text = notification.post.text
note_id = notification.post.id
note = note_text.strip().lower() if note_text else ""
# Split words into tokens
@ -44,9 +55,30 @@ def parse_notification(
command = parts[1].lower()
arguments = parts[2:] if len(parts) > 2 else []
return {
# Create ParsedNotification object for the new response system
parsed_notification: ParsedNotification = {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
'note_obj': {
'id': note_id,
'text': note_text,
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
}
}
# Generate response using the new system
response = generate_response(parsed_notification)
if not response:
return
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
fediverse_service.create_post(
text=response['message'],
reply_to_id=note_id,
visibility=visibility,
file_ids=file_ids
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)

188
bot/pleroma_service.py Normal file
View file

@ -0,0 +1,188 @@
from mastodon import Mastodon
from typing import List, Optional, Dict, Any, Union, BinaryIO
import io
import filetype
from fediverse_service import FediverseService
from fediverse_types import (
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
NotificationType, Visibility
)
import config
class PleromaService(FediverseService):
"""Pleroma implementation of FediverseService using Mastodon.py"""
def __init__(self):
self.client = Mastodon(
access_token=config.KEY,
api_base_url=config.INSTANCE
)
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
"""Convert Mastodon/Pleroma user data to FediverseUser"""
acct = user_data.get("acct", "")
if "@" in acct:
username, host = acct.split("@", 1)
else:
username = acct
host = None
return FediverseUser(
id=str(user_data.get("id", "")),
username=username,
host=host,
display_name=user_data.get("display_name")
)
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
return FediverseFile(
id=str(file_data.get("id", "")),
url=file_data.get("url", ""),
type=file_data.get("type"),
name=file_data.get("description")
)
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
"""Convert Mastodon/Pleroma visibility to our enum"""
visibility_map = {
"public": Visibility.PUBLIC,
"unlisted": Visibility.UNLISTED,
"private": Visibility.FOLLOWERS,
"direct": Visibility.DIRECT
}
return visibility_map.get(visibility, Visibility.PUBLIC)
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
visibility_map = {
Visibility.PUBLIC: "public",
Visibility.UNLISTED: "unlisted",
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
Visibility.FOLLOWERS: "private",
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
Visibility.DIRECT: "direct"
}
return visibility_map.get(visibility, "public")
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
"""Convert Mastodon/Pleroma notification type to our enum"""
type_map = {
"mention": NotificationType.MENTION,
"follow": NotificationType.FOLLOW,
"favourite": NotificationType.FAVOURITE,
"reblog": NotificationType.REBLOG,
"poll": NotificationType.POLL
}
return type_map.get(notif_type, NotificationType.OTHER)
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
"""Convert Mastodon/Pleroma status data to FediversePost"""
files = []
if status_data.get("media_attachments"):
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
# Extract plain text from HTML content
content = status_data.get("content", "")
# Basic HTML stripping - in production you might want to use a proper HTML parser
import re
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
return FediversePost(
id=str(status_data.get("id", "")),
text=plain_text,
user=self._convert_mastodon_user(status_data.get("account", {})),
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
created_at=status_data.get("created_at"),
files=files,
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
)
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
post = None
if notification_data.get("status"):
post = self._convert_mastodon_status(notification_data["status"])
return FediverseNotification(
id=str(notification_data.get("id", "")),
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
user=self._convert_mastodon_user(notification_data.get("account", {})),
post=post,
created_at=notification_data.get("created_at")
)
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
"""Get notifications from Pleroma instance"""
params = {}
if since_id:
params["since_id"] = since_id
notifications = self.client.notifications(**params)
return [self._convert_mastodon_notification(notif) for notif in notifications]
def create_post(
self,
text: str,
reply_to_id: Optional[str] = None,
visibility: Visibility = Visibility.HOME,
file_ids: Optional[List[str]] = None,
visible_user_ids: Optional[List[str]] = None
) -> str:
"""Create a post on Pleroma instance"""
params = {
"status": text,
"visibility": self._convert_to_mastodon_visibility(visibility)
}
if reply_to_id:
params["in_reply_to_id"] = reply_to_id
if file_ids:
params["media_ids"] = file_ids
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
# For direct messages, you typically mention users in the status text
response = self.client.status_post(**params)
return str(response.get("id", ""))
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
"""Get a specific post by ID from Pleroma instance"""
try:
status = self.client.status(post_id)
return self._convert_mastodon_status(status)
except Exception:
return None
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
"""Upload a file to Pleroma instance"""
try:
# Convert file_data to bytes for MIME detection
if hasattr(file_data, 'read'):
# Check if we can seek back
try:
current_pos = file_data.tell()
file_bytes = file_data.read()
file_data.seek(current_pos)
file_data = io.BytesIO(file_bytes)
except (io.UnsupportedOperation, OSError):
# Non-seekable stream, already read all data
file_data = io.BytesIO(file_bytes)
else:
file_bytes = file_data
file_data = io.BytesIO(file_bytes)
# Use filetype library for robust MIME detection
kind = filetype.guess(file_bytes)
if kind is not None:
mime_type = kind.mime
else:
# Fallback to image/jpeg if detection fails
mime_type = 'image/jpeg'
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
return self._convert_mastodon_file(media)
except Exception as e:
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e

View file

@ -5,6 +5,8 @@
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
; SQLite Database location
DatabaseLocation = ./gacha_game.db
; Instance type - either "misskey" or "pleroma"
InstanceType = misskey
; Web server port (default: 5000)
WebPort = 5000
; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
@ -40,4 +42,3 @@ User = @bot@example.tld
; API key for the bot
; Generate one by going to Settings > API > Generate access token
Token = abcdefghijklmnopqrstuvwxyz012345

View file

@ -1,6 +1,6 @@
# Kemoverse
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
<p align="center">
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
</p>
@ -47,13 +47,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
- 🌐 Web app to generate cards from images
### 🌍 Fediverse Support
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
## 🗃️ Tech Stack
- Python (3.12+)
- SQLite
- Fediverse API integration (via Misskey endpoints)
- Fediverse API integration (Misskey and Pleroma support)
- Flask
- Modular DB design for extensibility
@ -65,12 +65,12 @@ The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and
flowchart TD
subgraph Player Interaction
A1[Misskey bot]
A1[Fediverse bot]
A2[Web]
end
subgraph Misskey
B1[Misskey instance]
subgraph Fediverse
B1[Fediverse instance]
end
subgraph Bot
@ -78,7 +78,7 @@ flowchart TD
C2[Notification parser]
C3[Gacha roll logic]
C4[Database interface]
C5[Misskey API poster]
C5[Fediverse API poster]
end
subgraph Website

View file

@ -6,3 +6,5 @@ Jinja2==3.1.6
MarkupSafe==3.0.2
Werkzeug==3.1.3
Misskey.py==4.1.0
Mastodon.py==1.8.1
filetype==1.2.0

View file

@ -1,18 +1,18 @@
import sqlite3
import sys
from pathlib import Path
import os
# Add parent directory to Python path so we can import from bot/
sys.path.append(str(Path(__file__).parent.parent))
# Add bot directory to path to import config
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
import config
from bot.config import WEB_PORT, BIND_ADDRESS, DB_PATH
from flask import Flask, render_template, abort
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@ -73,4 +73,4 @@ def submit_character():
if __name__ == '__main__':
app.run(host=BIND_ADDRESS, port=WEB_PORT, debug=True)
app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True)