Compare commits
43 commits
master
...
pleroma-su
Author | SHA1 | Date | |
---|---|---|---|
|
c74fc17dfb | ||
6494ac4909 | |||
6ab3e4a427 | |||
b6fb6d9245 | |||
5580e06028 | |||
b08b224476 | |||
|
0039717a01 | ||
|
bf8d9823b2 | ||
7c4fd5fc41 | |||
|
24bfe88dc1 | ||
|
1a35750d0a | ||
|
d416ae1b2d | ||
|
7fd4d5db25 | ||
|
f4f847e577 | ||
|
2ef70801c7 | ||
|
fdf21b3f5f | ||
|
337a989671 | ||
|
3ad4edbc45 | ||
49e49bd187 | |||
1984b67523 | |||
|
bbb64a869d | ||
|
fc45c688e8 | ||
|
77d4fa13bb | ||
|
bd287b096a | ||
|
40f018a83b | ||
|
a516a9b55a | ||
|
7161712f15 | ||
|
fadd4dfe27 | ||
|
7b32ee7fcf | ||
|
f70b2147cd | ||
|
298d7fda72 | ||
|
8918b5205d | ||
|
67b4d949fd | ||
|
89ae8a7290 | ||
|
91376c0eba | ||
|
b2ca6dd59a | ||
|
dac05a3ed8 | ||
|
fe8e7d246f | ||
|
a47530180d | ||
|
e0cf42f8f6 | ||
2b194f3c9e | |||
df76fb9131 | |||
81c890a83d |
23 changed files with 1315 additions and 569 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -185,5 +185,6 @@ cython_debug/
|
|||
gacha_game*.db
|
||||
gacha_game*.db.*
|
||||
config*.ini
|
||||
run.sh
|
||||
|
||||
.idea
|
1
.tool-versions
Normal file
1
.tool-versions
Normal file
|
@ -0,0 +1 @@
|
|||
nodejs 23.4.0
|
|
@ -1,32 +1,24 @@
|
|||
import requests
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
from client import client_connection
|
||||
from db_utils import insert_card
|
||||
from custom_types import Card
|
||||
from config import RARITY_TO_WEIGHT
|
||||
import config
|
||||
from fediverse_factory import get_fediverse_service
|
||||
import db_utils
|
||||
|
||||
|
||||
def add_card(
|
||||
name: str,
|
||||
rarity: int,
|
||||
image_url: str) -> tuple[int, str]:
|
||||
'''
|
||||
Adds a card to the database, uploading the image from a public URL to
|
||||
the bot's Misskey Drive.
|
||||
def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
|
||||
"""
|
||||
Adds a card to the database, uploading the image from a public URL to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
name (str): Card name.
|
||||
rarity (int): Card rarity (e.g., 1-5).
|
||||
image_url (str): Public URL of the image from the post (e.g., from
|
||||
note['files'][i]['url']).
|
||||
image_url (str): Public URL of the image from the post.
|
||||
|
||||
Returns:
|
||||
tuple[int, str]: Card ID and bot's Drive file_id.
|
||||
tuple[int, str]: Card ID and file_id.
|
||||
|
||||
Raises:
|
||||
ValueError: If inputs are invalid.
|
||||
RuntimeError: If image download/upload or database operation fails.
|
||||
'''
|
||||
"""
|
||||
|
||||
stripped_name = name.strip()
|
||||
|
||||
|
@ -35,30 +27,33 @@ def add_card(
|
|||
raise ValueError('Card name cannot be empty.')
|
||||
if rarity < 1:
|
||||
raise ValueError('Rarity must be a positive integer.')
|
||||
if rarity not in RARITY_TO_WEIGHT.keys():
|
||||
if rarity not in config.RARITY_TO_WEIGHT.keys():
|
||||
raise ValueError(f'Invalid rarity: {rarity}')
|
||||
if not image_url:
|
||||
raise ValueError('Image URL must be provided.')
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f'Failed to download image from {image_url}')
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
try:
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media['id']
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
|
||||
from e
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
|
||||
# Insert into database
|
||||
card_id = insert_card(
|
||||
# Upload to Fediverse instance
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
try:
|
||||
uploaded_file = fediverse_service.upload_file(response.raw)
|
||||
file_id = uploaded_file.id
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f"Failed to upload image: {e}") from e
|
||||
|
||||
# Insert into database using db_utils function
|
||||
card_id = db_utils.insert_card(
|
||||
stripped_name,
|
||||
rarity,
|
||||
RARITY_TO_WEIGHT[rarity],
|
||||
file_id
|
||||
)
|
||||
return card_id, file_id
|
||||
)
|
||||
|
||||
return card_id, file_id
|
||||
|
||||
except Exception as e:
|
||||
raise
|
||||
|
|
|
@ -1,21 +1,78 @@
|
|||
#Kemoverse - a gacha-style bot for the Fediverse.
|
||||
#Copyright © 2025 Waifu
|
||||
#
|
||||
#This program is free software: you can redistribute it and/or modify
|
||||
#it under the terms of the GNU Affero General Public License as
|
||||
#published by the Free Software Foundation, either version 3 of the
|
||||
#License, or (at your option) any later version.
|
||||
#
|
||||
#This program is distributed in the hope that it will be useful,
|
||||
#but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
#GNU Affero General Public License for more details.
|
||||
#
|
||||
#You should have received a copy of the GNU Affero General Public License
|
||||
#along with this program. If not, see https://www.gnu.org/licenses/.
|
||||
|
||||
import time
|
||||
import misskey as misskey
|
||||
from client import client_connection
|
||||
import db_utils as db
|
||||
import traceback
|
||||
import config
|
||||
from notification import process_fediverse_notification
|
||||
from db_utils import get_config, set_config, connect, setup_administrators
|
||||
from fediverse_factory import get_fediverse_service
|
||||
|
||||
from config import NOTIFICATION_POLL_INTERVAL
|
||||
from notification import process_notifications
|
||||
from config import USE_WHITELIST
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
# Connect to DB
|
||||
db.connect()
|
||||
def stream_notifications():
|
||||
# Initialize database connection
|
||||
connect()
|
||||
|
||||
# Setup default administrators
|
||||
db.setup_administrators()
|
||||
setup_administrators()
|
||||
|
||||
# Initialize the Fediverse service
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
|
||||
# Get the last seen notification ID from the database
|
||||
last_seen_id = get_config("last_seen_notif_id")
|
||||
|
||||
# Show whitelist status
|
||||
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
|
||||
print(f'Instance whitelisting: {whitelist_status}')
|
||||
|
||||
print('Listening for notifications...')
|
||||
while True:
|
||||
if not process_notifications(client):
|
||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
||||
try:
|
||||
# Get notifications from the fediverse service
|
||||
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
|
||||
|
||||
if notifications:
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.id
|
||||
|
||||
# Skip old or same ID notifications
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Process the notification using the abstracted processor
|
||||
process_fediverse_notification(notification, fediverse_service)
|
||||
|
||||
# Update only if this notif_id is greater
|
||||
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||
new_last_seen_id = notif_id
|
||||
|
||||
# Save the latest seen ID
|
||||
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||
set_config("last_seen_notif_id", new_last_seen_id)
|
||||
last_seen_id = new_last_seen_id
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||
time.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
stream_notifications()
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
import misskey
|
||||
import config
|
||||
|
||||
|
||||
def client_connection() -> misskey.Misskey:
|
||||
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
|
@ -1,6 +1,7 @@
|
|||
'''Essentials for the bot to function'''
|
||||
import configparser
|
||||
import json
|
||||
import re
|
||||
from os import environ, path
|
||||
|
||||
|
||||
|
@ -13,8 +14,10 @@ def get_config_file() -> str:
|
|||
env: str | None = environ.get('KEMOVERSE_ENV')
|
||||
if not env:
|
||||
raise ConfigError('Error: KEMOVERSE_ENV is unset')
|
||||
if not (env in ['prod', 'dev']):
|
||||
raise ConfigError(f'Error: Invalid environment: {env}')
|
||||
|
||||
# Validate environment name contains only alphanumeric, dash, and underscore
|
||||
if not re.match(r'^[a-zA-Z0-9_-]+$', env):
|
||||
raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.')
|
||||
|
||||
config_path: str = f'config_{env}.ini'
|
||||
|
||||
|
@ -23,6 +26,50 @@ def get_config_file() -> str:
|
|||
return config_path
|
||||
|
||||
|
||||
def normalize_user(user_string: str) -> str:
|
||||
"""
|
||||
Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive
|
||||
|
||||
Args:
|
||||
user_string: User string in various formats
|
||||
|
||||
Returns:
|
||||
Normalized user string
|
||||
|
||||
Raises:
|
||||
ValueError: If the user string is invalid or domain is malformed
|
||||
"""
|
||||
if not user_string or not user_string.strip():
|
||||
raise ValueError("User string cannot be empty")
|
||||
|
||||
user_string = user_string.strip()
|
||||
|
||||
# Add leading @ if missing
|
||||
if not user_string.startswith('@'):
|
||||
user_string = '@' + user_string
|
||||
|
||||
# Split into user and domain parts
|
||||
parts = user_string[1:].split('@', 1) # Remove leading @ and split
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld")
|
||||
|
||||
username, domain = parts
|
||||
|
||||
if not username:
|
||||
raise ValueError("Username cannot be empty")
|
||||
|
||||
if not domain:
|
||||
raise ValueError("Domain cannot be empty")
|
||||
|
||||
# Validate domain format (basic check for valid domain structure)
|
||||
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
|
||||
if not re.match(domain_pattern, domain):
|
||||
raise ValueError(f"Invalid domain format: {domain}")
|
||||
|
||||
# Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive)
|
||||
return f"@{username}@{domain.lower()}"
|
||||
|
||||
|
||||
def get_rarity_to_weight(
|
||||
config_section: configparser.SectionProxy) -> dict[int, float]:
|
||||
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
|
||||
|
@ -38,19 +85,36 @@ config = configparser.ConfigParser()
|
|||
config.read(get_config_file())
|
||||
|
||||
# Username for the bot
|
||||
USER = config['credentials']['User'].lower()
|
||||
if 'User' not in config['credentials'] or not config['credentials']['User'].strip():
|
||||
raise ConfigError("User must be specified in config.ini under [credentials]")
|
||||
|
||||
USER = normalize_user(config['credentials']['User'])
|
||||
# API key for the bot
|
||||
KEY = config['credentials']['Token']
|
||||
# Bot's Misskey instance URL
|
||||
# Bot's Misskey/Pleroma instance URL
|
||||
INSTANCE = config['credentials']['Instance'].lower()
|
||||
|
||||
# Instance type validation
|
||||
if 'InstanceType' not in config['application']:
|
||||
raise ValueError("InstanceType must be specified in config.ini")
|
||||
|
||||
instance_type = config['application']['InstanceType'].lower()
|
||||
if instance_type not in ('misskey', 'pleroma'):
|
||||
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
|
||||
|
||||
INSTANCE_TYPE = instance_type
|
||||
|
||||
# Web server port
|
||||
WEB_PORT = config['application'].getint('WebPort', 5000)
|
||||
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')
|
||||
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot
|
||||
ADMINS = json.loads(config['application']['DefaultAdmins'])
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db')
|
||||
# Whether to enable the instance whitelist
|
||||
USE_WHITELIST = config['application']['UseWhitelist']
|
||||
USE_WHITELIST = config['application'].getboolean('UseWhitelist', True)
|
||||
|
||||
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
|
||||
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
|
||||
|
|
|
@ -150,12 +150,11 @@ def is_player_administrator(username: str) -> bool:
|
|||
|
||||
|
||||
def insert_card(
|
||||
name: str, rarity: int, weight: float, file_id: str) -> int:
|
||||
name: str, rarity: int, file_id: str) -> int:
|
||||
'''Inserts a card'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
|
||||
(?, ?, ?, ?)',
|
||||
(name, rarity, weight, file_id)
|
||||
'INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?)',
|
||||
(name, rarity, file_id)
|
||||
)
|
||||
card_id = CURSOR.lastrowid
|
||||
return card_id if card_id else 0
|
||||
|
|
40
bot/fediverse_factory.py
Normal file
40
bot/fediverse_factory.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from fediverse_service import FediverseService
|
||||
from misskey_service import MisskeyService
|
||||
from pleroma_service import PleromaService
|
||||
|
||||
|
||||
class FediverseServiceFactory:
|
||||
"""Factory for creating FediverseService implementations"""
|
||||
|
||||
@staticmethod
|
||||
def create_service(instance_type: str) -> FediverseService:
|
||||
"""
|
||||
Create a FediverseService implementation based on the instance type.
|
||||
|
||||
Args:
|
||||
instance_type: The type of instance ("misskey" or "pleroma")
|
||||
|
||||
Returns:
|
||||
FediverseService implementation (MisskeyService or PleromaService)
|
||||
|
||||
Raises:
|
||||
ValueError: If the instance type is not supported
|
||||
"""
|
||||
instance_type = instance_type.lower()
|
||||
|
||||
if instance_type == "misskey":
|
||||
return MisskeyService()
|
||||
elif instance_type == "pleroma":
|
||||
return PleromaService()
|
||||
else:
|
||||
raise ValueError(f"Unsupported instance type: {instance_type}")
|
||||
|
||||
|
||||
def get_fediverse_service(instance_type: str) -> FediverseService:
|
||||
"""
|
||||
Convenience function to get a FediverseService instance
|
||||
|
||||
Args:
|
||||
instance_type: The instance type ("misskey" or "pleroma")
|
||||
"""
|
||||
return FediverseServiceFactory.create_service(instance_type)
|
74
bot/fediverse_service.py
Normal file
74
bot/fediverse_service.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Union, BinaryIO
|
||||
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
|
||||
|
||||
|
||||
class FediverseService(ABC):
|
||||
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
|
||||
|
||||
@abstractmethod
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""
|
||||
Retrieve notifications from the Fediverse instance.
|
||||
|
||||
Args:
|
||||
since_id: Optional ID to get notifications newer than this ID
|
||||
|
||||
Returns:
|
||||
List of FediverseNotification objects
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""
|
||||
Create a new post on the Fediverse instance.
|
||||
|
||||
Args:
|
||||
text: The text content of the post
|
||||
reply_to_id: Optional ID of post to reply to
|
||||
visibility: Visibility level for the post
|
||||
file_ids: Optional list of file IDs to attach
|
||||
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
|
||||
|
||||
Returns:
|
||||
ID of the created post
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""
|
||||
Retrieve a specific post by its ID.
|
||||
|
||||
Args:
|
||||
post_id: The ID of the post to retrieve
|
||||
|
||||
Returns:
|
||||
FediversePost object if found, None otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""
|
||||
Upload a file to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
file_data: File data as binary stream or bytes
|
||||
filename: Optional filename for the uploaded file
|
||||
|
||||
Returns:
|
||||
FediverseFile object with ID, URL, and other metadata
|
||||
|
||||
Raises:
|
||||
RuntimeError: If file upload fails
|
||||
"""
|
||||
pass
|
73
bot/fediverse_types.py
Normal file
73
bot/fediverse_types.py
Normal file
|
@ -0,0 +1,73 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class NotificationType(Enum):
|
||||
MENTION = "mention"
|
||||
REPLY = "reply"
|
||||
FOLLOW = "follow"
|
||||
FAVOURITE = "favourite"
|
||||
REBLOG = "reblog"
|
||||
POLL = "poll"
|
||||
OTHER = "other"
|
||||
|
||||
|
||||
class Visibility(Enum):
|
||||
PUBLIC = "public"
|
||||
UNLISTED = "unlisted"
|
||||
HOME = "home"
|
||||
FOLLOWERS = "followers"
|
||||
SPECIFIED = "specified"
|
||||
DIRECT = "direct"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseUser:
|
||||
"""Common user representation across Fediverse platforms"""
|
||||
id: str
|
||||
username: str
|
||||
host: Optional[str] = None # None for local users
|
||||
display_name: Optional[str] = None
|
||||
|
||||
@property
|
||||
def full_handle(self) -> str:
|
||||
"""Returns the full fediverse handle (@user@domain or @user for local)"""
|
||||
if self.host:
|
||||
return f"@{self.username}@{self.host}"
|
||||
return f"@{self.username}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseFile:
|
||||
"""Common file/attachment representation"""
|
||||
id: str
|
||||
url: str
|
||||
type: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediversePost:
|
||||
"""Common post representation across Fediverse platforms"""
|
||||
id: str
|
||||
text: Optional[str]
|
||||
user: FediverseUser
|
||||
visibility: Visibility
|
||||
created_at: Optional[str] = None
|
||||
files: List[FediverseFile] = None
|
||||
reply_to_id: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.files is None:
|
||||
self.files = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseNotification:
|
||||
"""Common notification representation across Fediverse platforms"""
|
||||
id: str
|
||||
type: NotificationType
|
||||
user: FediverseUser
|
||||
post: Optional[FediversePost] = None
|
||||
created_at: Optional[str] = None
|
156
bot/misskey_service.py
Normal file
156
bot/misskey_service.py
Normal file
|
@ -0,0 +1,156 @@
|
|||
import misskey
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class MisskeyService(FediverseService):
|
||||
"""Misskey implementation of FediverseService"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
||||
|
||||
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Misskey user data to FediverseUser"""
|
||||
return FediverseUser(
|
||||
id=user_data.get("id", ""),
|
||||
username=user_data.get("username", "unknown"),
|
||||
host=user_data.get("host"),
|
||||
display_name=user_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Misskey file data to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=file_data.get("id", ""),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Misskey visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"home": Visibility.HOME,
|
||||
"followers": Visibility.FOLLOWERS,
|
||||
"specified": Visibility.SPECIFIED
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.HOME)
|
||||
|
||||
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Misskey visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "home",
|
||||
Visibility.FOLLOWERS: "followers",
|
||||
Visibility.SPECIFIED: "specified",
|
||||
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
|
||||
}
|
||||
return visibility_map.get(visibility, "home")
|
||||
|
||||
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Misskey notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"reply": NotificationType.REPLY,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Misskey note data to FediversePost"""
|
||||
files = []
|
||||
if note_data.get("files"):
|
||||
files = [self._convert_misskey_file(f) for f in note_data["files"]]
|
||||
|
||||
return FediversePost(
|
||||
id=note_data.get("id", ""),
|
||||
text=note_data.get("text"),
|
||||
user=self._convert_misskey_user(note_data.get("user", {})),
|
||||
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
|
||||
created_at=note_data.get("createdAt"),
|
||||
files=files,
|
||||
reply_to_id=note_data.get("replyId")
|
||||
)
|
||||
|
||||
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Misskey notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("note"):
|
||||
post = self._convert_misskey_post(notification_data["note"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=notification_data.get("id", ""),
|
||||
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_misskey_user(notification_data.get("user", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("createdAt")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Misskey instance"""
|
||||
params = {
|
||||
'include_types': ['mention', 'reply'],
|
||||
'limit': 50
|
||||
}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.i_notifications(**params)
|
||||
return [self._convert_misskey_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Misskey instance"""
|
||||
params = {
|
||||
"text": text,
|
||||
"visibility": self._convert_to_misskey_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["reply_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["file_ids"] = file_ids
|
||||
|
||||
if visible_user_ids and visibility == Visibility.SPECIFIED:
|
||||
params["visible_user_ids"] = visible_user_ids
|
||||
|
||||
response = self.client.notes_create(**params)
|
||||
return response.get("createdNote", {}).get("id", "")
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Misskey instance"""
|
||||
try:
|
||||
note = self.client.notes_show(noteId=post_id)
|
||||
return self._convert_misskey_post(note)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Misskey Drive"""
|
||||
try:
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
media = self.client.drive_files_create(file_data)
|
||||
return self._convert_misskey_file(media)
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Unexpected error during file upload: {e}") from e
|
74
bot/mock_fediverse_service.py
Normal file
74
bot/mock_fediverse_service.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
"""Mock FediverseService for testing purposes"""
|
||||
|
||||
from typing import List, Optional, Union, BinaryIO
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
|
||||
|
||||
|
||||
class MockFediverseService(FediverseService):
|
||||
"""Mock implementation of FediverseService for testing"""
|
||||
|
||||
def __init__(self):
|
||||
self.notifications = []
|
||||
self.created_posts = []
|
||||
self.uploaded_files = []
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Return mock notifications, optionally filtered by since_id"""
|
||||
if since_id is None:
|
||||
return self.notifications
|
||||
|
||||
# Filter notifications newer than since_id
|
||||
filtered = []
|
||||
for notif in self.notifications:
|
||||
if notif.id > since_id:
|
||||
filtered.append(notif)
|
||||
return filtered
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Mock post creation, returns fake post ID"""
|
||||
post_id = f"mock_post_{len(self.created_posts)}"
|
||||
|
||||
# Store the post for assertions
|
||||
self.created_posts.append({
|
||||
'id': post_id,
|
||||
'text': text,
|
||||
'reply_to_id': reply_to_id,
|
||||
'visibility': visibility,
|
||||
'file_ids': file_ids,
|
||||
'visible_user_ids': visible_user_ids
|
||||
})
|
||||
|
||||
return post_id
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile:
|
||||
"""Mock file upload, returns fake file"""
|
||||
file_id = f"mock_file_{len(self.uploaded_files)}"
|
||||
|
||||
mock_file = FediverseFile(
|
||||
id=file_id,
|
||||
url=f"https://example.com/files/{file_id}",
|
||||
type="image/png",
|
||||
name="test_file.png"
|
||||
)
|
||||
|
||||
self.uploaded_files.append(mock_file)
|
||||
return mock_file
|
||||
|
||||
# Helper methods for testing
|
||||
def add_mock_notification(self, notification: FediverseNotification):
|
||||
"""Add a mock notification for testing"""
|
||||
self.notifications.append(notification)
|
||||
|
||||
def clear_all(self):
|
||||
"""Clear all mock data"""
|
||||
self.notifications.clear()
|
||||
self.created_posts.clear()
|
||||
self.uploaded_files.clear()
|
|
@ -1,45 +1,47 @@
|
|||
import traceback
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
|
||||
import config
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
|
||||
from db_utils import is_whitelisted, is_player_banned
|
||||
from response import generate_response
|
||||
from custom_types import BotResponse
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from fediverse_types import FediverseNotification, NotificationType, Visibility
|
||||
|
||||
|
||||
def process_notification(
|
||||
client: misskey.Misskey,
|
||||
notification: Dict[str, Any]) -> None:
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None:
|
||||
'''Processes an individual fediverse notification using the abstraction'''
|
||||
if fediverse_service is None:
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
|
||||
# Get user and instance info
|
||||
username = notification.user.username
|
||||
host = notification.user.host
|
||||
instance = host if host else 'local'
|
||||
|
||||
if USE_WHITELIST and not is_whitelisted(instance):
|
||||
# Check whitelist
|
||||
if config.USE_WHITELIST and not is_whitelisted(instance):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
return
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
# Only process mentions and replies
|
||||
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
|
||||
return
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
# Return early if no post attached
|
||||
if not notification.post:
|
||||
return
|
||||
|
||||
# Determine visibility for reply
|
||||
if notification.post.visibility != Visibility.SPECIFIED:
|
||||
visibility = Visibility.HOME
|
||||
else:
|
||||
visibility = Visibility.SPECIFIED
|
||||
|
||||
notif_type = notification.type.value
|
||||
notif_id = notification.id
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_notification = parse_notification(notification, client)
|
||||
parsed_notification = parse_notification(notification, fediverse_service)
|
||||
|
||||
if not parsed_notification:
|
||||
return
|
||||
|
@ -49,79 +51,20 @@ def process_notification(
|
|||
print(f'⚠️ Blocked notification from banned player: {author}')
|
||||
return
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
response: BotResponse | None = generate_response(parsed_notification)
|
||||
|
||||
if not response:
|
||||
return
|
||||
|
||||
client.notes_create(
|
||||
# Handle attachment URLs (convert to file IDs if needed)
|
||||
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
|
||||
|
||||
# Send response using fediverse service
|
||||
fediverse_service.create_post(
|
||||
text=response['message'],
|
||||
reply_id=note_id,
|
||||
reply_to_id=notification.post.id,
|
||||
visibility=visibility,
|
||||
file_ids=response['attachment_urls']
|
||||
# TODO: write actual visible users ids so pleromers can use the bot
|
||||
# privately
|
||||
# visible_user_ids=[]
|
||||
file_ids=file_ids
|
||||
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
||||
|
||||
def process_notifications(client: misskey.Misskey) -> bool:
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id', '')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a \
|
||||
notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
||||
|
|
|
@ -1,27 +1,38 @@
|
|||
import re
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
|
||||
import config
|
||||
from response import generate_response
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from fediverse_types import FediverseNotification, NotificationType, Visibility
|
||||
from custom_types import ParsedNotification
|
||||
|
||||
def parse_notification(notification: FediverseNotification, fediverse_service=None):
|
||||
'''Parses any notifications received by the bot and sends any commands to
|
||||
generate_response()'''
|
||||
|
||||
def parse_notification(
|
||||
notification: Dict[str, Any],
|
||||
client: misskey.Misskey) -> ParsedNotification | None:
|
||||
'''Parses any notifications received by the bot'''
|
||||
if fediverse_service is None:
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# Return early if no post attached
|
||||
if not notification.post:
|
||||
return
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
if notification.post.visibility != Visibility.SPECIFIED:
|
||||
visibility = Visibility.HOME
|
||||
else:
|
||||
visibility = Visibility.SPECIFIED
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host")
|
||||
# Local users may not have a hostname attached
|
||||
full_user = f"@{username}" if not host else f"@{username}@{host}"
|
||||
full_user = notification.user.full_handle
|
||||
|
||||
note_obj = notification.get("note", {})
|
||||
note_text = note_obj.get("text")
|
||||
note_id = note_obj.get("id")
|
||||
note_text = notification.post.text
|
||||
note_id = notification.post.id
|
||||
|
||||
note = note_text.strip().lower() if note_text else ""
|
||||
# Split words into tokens
|
||||
|
@ -44,9 +55,30 @@ def parse_notification(
|
|||
command = parts[1].lower()
|
||||
arguments = parts[2:] if len(parts) > 2 else []
|
||||
|
||||
return {
|
||||
# Create ParsedNotification object for the new response system
|
||||
parsed_notification: ParsedNotification = {
|
||||
'author': full_user,
|
||||
'command': command,
|
||||
'arguments': arguments,
|
||||
'note_obj': note_obj
|
||||
'note_obj': {
|
||||
'id': note_id,
|
||||
'text': note_text,
|
||||
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
|
||||
}
|
||||
}
|
||||
|
||||
# Generate response using the new system
|
||||
response = generate_response(parsed_notification)
|
||||
if not response:
|
||||
return
|
||||
|
||||
# Handle attachment URLs (convert to file IDs if needed)
|
||||
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
|
||||
|
||||
fediverse_service.create_post(
|
||||
text=response['message'],
|
||||
reply_to_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=file_ids
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
|
188
bot/pleroma_service.py
Normal file
188
bot/pleroma_service.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
from mastodon import Mastodon
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
import io
|
||||
import filetype
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class PleromaService(FediverseService):
|
||||
"""Pleroma implementation of FediverseService using Mastodon.py"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = Mastodon(
|
||||
access_token=config.KEY,
|
||||
api_base_url=config.INSTANCE
|
||||
)
|
||||
|
||||
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Mastodon/Pleroma user data to FediverseUser"""
|
||||
acct = user_data.get("acct", "")
|
||||
if "@" in acct:
|
||||
username, host = acct.split("@", 1)
|
||||
else:
|
||||
username = acct
|
||||
host = None
|
||||
|
||||
return FediverseUser(
|
||||
id=str(user_data.get("id", "")),
|
||||
username=username,
|
||||
host=host,
|
||||
display_name=user_data.get("display_name")
|
||||
)
|
||||
|
||||
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=str(file_data.get("id", "")),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("description")
|
||||
)
|
||||
|
||||
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Mastodon/Pleroma visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"private": Visibility.FOLLOWERS,
|
||||
"direct": Visibility.DIRECT
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.PUBLIC)
|
||||
|
||||
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
|
||||
Visibility.FOLLOWERS: "private",
|
||||
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
|
||||
Visibility.DIRECT: "direct"
|
||||
}
|
||||
return visibility_map.get(visibility, "public")
|
||||
|
||||
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Mastodon/Pleroma notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Mastodon/Pleroma status data to FediversePost"""
|
||||
files = []
|
||||
if status_data.get("media_attachments"):
|
||||
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
|
||||
|
||||
# Extract plain text from HTML content
|
||||
content = status_data.get("content", "")
|
||||
# Basic HTML stripping - in production you might want to use a proper HTML parser
|
||||
import re
|
||||
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
|
||||
|
||||
return FediversePost(
|
||||
id=str(status_data.get("id", "")),
|
||||
text=plain_text,
|
||||
user=self._convert_mastodon_user(status_data.get("account", {})),
|
||||
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
|
||||
created_at=status_data.get("created_at"),
|
||||
files=files,
|
||||
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
|
||||
)
|
||||
|
||||
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("status"):
|
||||
post = self._convert_mastodon_status(notification_data["status"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=str(notification_data.get("id", "")),
|
||||
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_mastodon_user(notification_data.get("account", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("created_at")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Pleroma instance"""
|
||||
params = {}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.notifications(**params)
|
||||
return [self._convert_mastodon_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Pleroma instance"""
|
||||
params = {
|
||||
"status": text,
|
||||
"visibility": self._convert_to_mastodon_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["in_reply_to_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["media_ids"] = file_ids
|
||||
|
||||
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
|
||||
# For direct messages, you typically mention users in the status text
|
||||
|
||||
response = self.client.status_post(**params)
|
||||
return str(response.get("id", ""))
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Pleroma instance"""
|
||||
try:
|
||||
status = self.client.status(post_id)
|
||||
return self._convert_mastodon_status(status)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Pleroma instance"""
|
||||
try:
|
||||
# Convert file_data to bytes for MIME detection
|
||||
if hasattr(file_data, 'read'):
|
||||
# Check if we can seek back
|
||||
try:
|
||||
current_pos = file_data.tell()
|
||||
file_bytes = file_data.read()
|
||||
file_data.seek(current_pos)
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
except (io.UnsupportedOperation, OSError):
|
||||
# Non-seekable stream, already read all data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
else:
|
||||
file_bytes = file_data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
|
||||
# Use filetype library for robust MIME detection
|
||||
kind = filetype.guess(file_bytes)
|
||||
if kind is not None:
|
||||
mime_type = kind.mime
|
||||
else:
|
||||
# Fallback to image/jpeg if detection fails
|
||||
mime_type = 'image/jpeg'
|
||||
|
||||
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
||||
return self._convert_mastodon_file(media)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e
|
|
@ -120,12 +120,6 @@ in order: name, rarity',
|
|||
be a number between 1 and 5',
|
||||
'attachment_urls': None
|
||||
}
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return {
|
||||
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
|
||||
must be a decimal value between 0.0 and 1.0',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
card_id, file_id = add_card(
|
||||
name=arguments[0],
|
||||
|
|
|
@ -5,6 +5,13 @@
|
|||
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
; Instance type - either "misskey" or "pleroma"
|
||||
InstanceType = misskey
|
||||
; Web server port (default: 5000)
|
||||
WebPort = 5000
|
||||
; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
|
||||
BindAddress = 127.0.0.1
|
||||
|
||||
; Whether to lmit access to the bot via an instance whitelist
|
||||
; The whitelist can be adjusted via the application
|
||||
UseWhitelist = False
|
||||
|
@ -35,4 +42,3 @@ User = @bot@example.tld
|
|||
; API key for the bot
|
||||
; Generate one by going to Settings > API > Generate access token
|
||||
Token = abcdefghijklmnopqrstuvwxyz012345
|
||||
|
||||
|
|
31
readme.md
31
readme.md
|
@ -1,6 +1,6 @@
|
|||
# Kemoverse
|
||||
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
|
||||
<p align="center">
|
||||
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
|
||||
</p>
|
||||
|
@ -47,13 +47,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
- 🌐 Web app to generate cards from images
|
||||
|
||||
### 🌍 Fediverse Support
|
||||
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
|
||||
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
|
||||
|
||||
## 🗃️ Tech Stack
|
||||
|
||||
- Python (3.12+)
|
||||
- SQLite
|
||||
- Fediverse API integration (via Misskey endpoints)
|
||||
- Fediverse API integration (Misskey and Pleroma support)
|
||||
- Flask
|
||||
- Modular DB design for extensibility
|
||||
|
||||
|
@ -61,16 +61,33 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
|
||||
The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and duel tactics.
|
||||
|
||||
## 📝 License
|
||||
|
||||
This project is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0).
|
||||
|
||||
This means:
|
||||
|
||||
- You are free to use, modify, and redistribute this software under the same license.
|
||||
|
||||
- If you modify and deploy the software (for example, as part of a web service), you must also make the source code of your modified version available to users of that service.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License along with this program. If not, see https://www.gnu.org/licenses/.
|
||||
The full license text can be found here: https://www.gnu.org/licenses/agpl-3.0.html
|
||||
|
||||
The AGPL is designed to ensure that software freedom is preserved, especially in networked environments. If you improve this project or build something on top of it, please give back to the community by sharing your changes too.
|
||||
|
||||
Unless another license is listed, every file in the project is licensed under the GNU Affero General Public License version 3 (or at your option), any later version.
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
|
||||
subgraph Player Interaction
|
||||
A1[Misskey bot]
|
||||
A1[Fediverse bot]
|
||||
A2[Web]
|
||||
end
|
||||
|
||||
subgraph Misskey
|
||||
B1[Misskey instance]
|
||||
subgraph Fediverse
|
||||
B1[Fediverse instance]
|
||||
end
|
||||
|
||||
subgraph Bot
|
||||
|
@ -78,7 +95,7 @@ flowchart TD
|
|||
C2[Notification parser]
|
||||
C3[Gacha roll logic]
|
||||
C4[Database interface]
|
||||
C5[Misskey API poster]
|
||||
C5[Fediverse API poster]
|
||||
end
|
||||
|
||||
subgraph Website
|
||||
|
|
|
@ -6,3 +6,5 @@ Jinja2==3.1.6
|
|||
MarkupSafe==3.0.2
|
||||
Werkzeug==3.1.3
|
||||
Misskey.py==4.1.0
|
||||
Mastodon.py==1.8.1
|
||||
filetype==1.2.0
|
||||
|
|
23
setup_db.py
23
setup_db.py
|
@ -1,3 +1,19 @@
|
|||
#Kemoverse - a gacha-style bot for the Fediverse.
|
||||
#Copyright © 2025 Waifu
|
||||
#
|
||||
#This program is free software: you can redistribute it and/or modify
|
||||
#it under the terms of the GNU Affero General Public License as
|
||||
#published by the Free Software Foundation, either version 3 of the
|
||||
#License, or (at your option) any later version.
|
||||
#
|
||||
#This program is distributed in the hope that it will be useful,
|
||||
#but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
#GNU Affero General Public License for more details.
|
||||
#
|
||||
#You should have received a copy of the GNU Affero General Public License
|
||||
#along with this program. If not, see https://www.gnu.org/licenses/.
|
||||
|
||||
import sqlite3
|
||||
import traceback
|
||||
import os
|
||||
|
@ -57,16 +73,14 @@ def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> Non
|
|||
def get_db_path() -> str | DBNotFoundError:
|
||||
'''Gets the DB path from config.ini'''
|
||||
env = os.environ.get('KEMOVERSE_ENV')
|
||||
if not (env and env in ['prod', 'dev']):
|
||||
raise KemoverseEnvUnset
|
||||
|
||||
print(f'Running in "{env}" mode')
|
||||
|
||||
config_path = f'config_{env}.ini'
|
||||
|
||||
if not os.path.isfile(config_path):
|
||||
raise ConfigError(f'Could not find {config_path}')
|
||||
|
||||
print(f'Running in "{env}" mode')
|
||||
|
||||
config = ConfigParser()
|
||||
config.read(config_path)
|
||||
db_path = config['application']['DatabaseLocation']
|
||||
|
@ -96,7 +110,6 @@ def main():
|
|||
return
|
||||
except KemoverseEnvUnset:
|
||||
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
|
||||
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
|
||||
print(traceback.format_exc())
|
||||
return
|
||||
|
||||
|
|
16
startup.sh
16
startup.sh
|
@ -1,5 +1,21 @@
|
|||
#!/bin/bash
|
||||
|
||||
#Kemoverse - a gacha-style bot for the Fediverse.
|
||||
#Copyright © 2025 Waifu
|
||||
#
|
||||
#This program is free software: you can redistribute it and/or modify
|
||||
#it under the terms of the GNU Affero General Public License as
|
||||
#published by the Free Software Foundation, either version 3 of the
|
||||
#License, or (at your option) any later version.
|
||||
#
|
||||
#This program is distributed in the hope that it will be useful,
|
||||
#but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
#GNU Affero General Public License for more details.
|
||||
#
|
||||
#You should have received a copy of the GNU Affero General Public License
|
||||
#along with this program. If not, see https://www.gnu.org/licenses/.
|
||||
|
||||
# Navigate to the project directory (optional)
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
|
|
27
web/app.py
27
web/app.py
|
@ -1,13 +1,34 @@
|
|||
#Kemoverse - a gacha-style bot for the Fediverse.
|
||||
#Copyright © 2025 Waifu
|
||||
#
|
||||
#This program is free software: you can redistribute it and/or modify
|
||||
#it under the terms of the GNU Affero General Public License as
|
||||
#published by the Free Software Foundation, either version 3 of the
|
||||
#License, or (at your option) any later version.
|
||||
#
|
||||
#This program is distributed in the hope that it will be useful,
|
||||
#but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
#GNU Affero General Public License for more details.
|
||||
#
|
||||
#You should have received a copy of the GNU Affero General Public License
|
||||
#along with this program. If not, see https://www.gnu.org/licenses/.
|
||||
|
||||
import sqlite3
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add bot directory to path to import config
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
|
||||
import config
|
||||
|
||||
from flask import Flask, render_template, abort
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
app = Flask(__name__)
|
||||
DB_PATH = "./gacha_game.db" # Adjust path if needed
|
||||
|
||||
def get_db_connection():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn = sqlite3.connect(config.DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
@ -68,4 +89,4 @@ def submit_character():
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5000, debug=True)
|
||||
app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True)
|
||||
|
|
Loading…
Add table
Reference in a new issue