Compare commits
33 commits
master
...
pleroma-su
Author | SHA1 | Date | |
---|---|---|---|
|
0039717a01 | ||
|
bf8d9823b2 | ||
7c4fd5fc41 | |||
|
24bfe88dc1 | ||
|
1a35750d0a | ||
|
d416ae1b2d | ||
|
7fd4d5db25 | ||
|
f4f847e577 | ||
|
2ef70801c7 | ||
|
fdf21b3f5f | ||
|
337a989671 | ||
|
3ad4edbc45 | ||
1984b67523 | |||
|
bbb64a869d | ||
|
fc45c688e8 | ||
|
77d4fa13bb | ||
|
bd287b096a | ||
|
40f018a83b | ||
|
a516a9b55a | ||
|
7161712f15 | ||
|
fadd4dfe27 | ||
|
7b32ee7fcf | ||
|
f70b2147cd | ||
|
298d7fda72 | ||
|
8918b5205d | ||
|
67b4d949fd | ||
|
89ae8a7290 | ||
|
91376c0eba | ||
|
b2ca6dd59a | ||
|
dac05a3ed8 | ||
|
fe8e7d246f | ||
|
a47530180d | ||
|
e0cf42f8f6 |
21 changed files with 880 additions and 201 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -185,5 +185,6 @@ cython_debug/
|
|||
gacha_game*.db
|
||||
gacha_game*.db.*
|
||||
config*.ini
|
||||
run.sh
|
||||
|
||||
.idea
|
||||
.idea
|
||||
|
|
1
.tool-versions
Normal file
1
.tool-versions
Normal file
|
@ -0,0 +1 @@
|
|||
nodejs 23.4.0
|
|
@ -1,32 +1,24 @@
|
|||
import requests
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
from client import client_connection
|
||||
from db_utils import insert_card
|
||||
from custom_types import Card
|
||||
from config import RARITY_TO_WEIGHT
|
||||
import config
|
||||
from fediverse_factory import get_fediverse_service
|
||||
import db_utils
|
||||
|
||||
|
||||
def add_card(
|
||||
name: str,
|
||||
rarity: int,
|
||||
image_url: str) -> tuple[int, str]:
|
||||
'''
|
||||
Adds a card to the database, uploading the image from a public URL to
|
||||
the bot's Misskey Drive.
|
||||
def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]:
|
||||
"""
|
||||
Adds a card to the database, uploading the image from a public URL to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
name (str): Card name.
|
||||
rarity (int): Card rarity (e.g., 1-5).
|
||||
image_url (str): Public URL of the image from the post (e.g., from
|
||||
note['files'][i]['url']).
|
||||
image_url (str): Public URL of the image from the post.
|
||||
|
||||
Returns:
|
||||
tuple[int, str]: Card ID and bot's Drive file_id.
|
||||
tuple[int, str]: Card ID and file_id.
|
||||
|
||||
Raises:
|
||||
ValueError: If inputs are invalid.
|
||||
RuntimeError: If image download/upload or database operation fails.
|
||||
'''
|
||||
"""
|
||||
|
||||
stripped_name = name.strip()
|
||||
|
||||
|
@ -35,30 +27,33 @@ def add_card(
|
|||
raise ValueError('Card name cannot be empty.')
|
||||
if rarity < 1:
|
||||
raise ValueError('Rarity must be a positive integer.')
|
||||
if rarity not in RARITY_TO_WEIGHT.keys():
|
||||
if rarity not in config.RARITY_TO_WEIGHT.keys():
|
||||
raise ValueError(f'Invalid rarity: {rarity}')
|
||||
if not image_url:
|
||||
raise ValueError('Image URL must be provided.')
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f'Failed to download image from {image_url}')
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
try:
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media['id']
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
|
||||
from e
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
|
||||
# Insert into database
|
||||
card_id = insert_card(
|
||||
# Upload to Fediverse instance
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
try:
|
||||
uploaded_file = fediverse_service.upload_file(response.raw)
|
||||
file_id = uploaded_file.id
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f"Failed to upload image: {e}") from e
|
||||
|
||||
# Insert into database using db_utils function
|
||||
card_id = db_utils.insert_card(
|
||||
stripped_name,
|
||||
rarity,
|
||||
RARITY_TO_WEIGHT[rarity],
|
||||
file_id
|
||||
)
|
||||
return card_id, file_id
|
||||
)
|
||||
|
||||
return card_id, file_id
|
||||
|
||||
except Exception as e:
|
||||
raise
|
||||
|
|
|
@ -1,21 +1,62 @@
|
|||
import time
|
||||
import misskey as misskey
|
||||
from client import client_connection
|
||||
import db_utils as db
|
||||
import traceback
|
||||
import config
|
||||
from notification import process_fediverse_notification
|
||||
from db_utils import get_config, set_config, connect, setup_administrators
|
||||
from fediverse_factory import get_fediverse_service
|
||||
|
||||
from config import NOTIFICATION_POLL_INTERVAL
|
||||
from notification import process_notifications
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
# Connect to DB
|
||||
db.connect()
|
||||
from config import USE_WHITELIST
|
||||
|
||||
def stream_notifications():
|
||||
# Initialize database connection
|
||||
connect()
|
||||
|
||||
# Setup default administrators
|
||||
db.setup_administrators()
|
||||
setup_administrators()
|
||||
|
||||
# Initialize the Fediverse service
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
|
||||
# Get the last seen notification ID from the database
|
||||
last_seen_id = get_config("last_seen_notif_id")
|
||||
|
||||
# Show whitelist status
|
||||
whitelist_status = "enabled" if USE_WHITELIST else "disabled"
|
||||
print(f'Instance whitelisting: {whitelist_status}')
|
||||
|
||||
print('Listening for notifications...')
|
||||
while True:
|
||||
if not process_notifications(client):
|
||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
||||
try:
|
||||
# Get notifications from the fediverse service
|
||||
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
|
||||
|
||||
if notifications:
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.id
|
||||
|
||||
# Skip old or same ID notifications
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Process the notification using the abstracted processor
|
||||
process_fediverse_notification(notification, fediverse_service)
|
||||
|
||||
# Update only if this notif_id is greater
|
||||
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||
new_last_seen_id = notif_id
|
||||
|
||||
# Save the latest seen ID
|
||||
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||
set_config("last_seen_notif_id", new_last_seen_id)
|
||||
last_seen_id = new_last_seen_id
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||
time.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
stream_notifications()
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
import misskey
|
||||
import config
|
||||
|
||||
|
||||
def client_connection() -> misskey.Misskey:
|
||||
return misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
|
@ -1,6 +1,7 @@
|
|||
'''Essentials for the bot to function'''
|
||||
import configparser
|
||||
import json
|
||||
import re
|
||||
from os import environ, path
|
||||
|
||||
|
||||
|
@ -13,8 +14,10 @@ def get_config_file() -> str:
|
|||
env: str | None = environ.get('KEMOVERSE_ENV')
|
||||
if not env:
|
||||
raise ConfigError('Error: KEMOVERSE_ENV is unset')
|
||||
if not (env in ['prod', 'dev']):
|
||||
raise ConfigError(f'Error: Invalid environment: {env}')
|
||||
|
||||
# Validate environment name contains only alphanumeric, dash, and underscore
|
||||
if not re.match(r'^[a-zA-Z0-9_-]+$', env):
|
||||
raise ValueError(f'KEMOVERSE_ENV "{env}" contains invalid characters. Only alphanumeric, dash (-), and underscore (_) are allowed.')
|
||||
|
||||
config_path: str = f'config_{env}.ini'
|
||||
|
||||
|
@ -23,6 +26,50 @@ def get_config_file() -> str:
|
|||
return config_path
|
||||
|
||||
|
||||
def normalize_user(user_string: str) -> str:
|
||||
"""
|
||||
Normalizes a user string to the format @user@domain.tld where domain is lowercase and user is case-sensitive
|
||||
|
||||
Args:
|
||||
user_string: User string in various formats
|
||||
|
||||
Returns:
|
||||
Normalized user string
|
||||
|
||||
Raises:
|
||||
ValueError: If the user string is invalid or domain is malformed
|
||||
"""
|
||||
if not user_string or not user_string.strip():
|
||||
raise ValueError("User string cannot be empty")
|
||||
|
||||
user_string = user_string.strip()
|
||||
|
||||
# Add leading @ if missing
|
||||
if not user_string.startswith('@'):
|
||||
user_string = '@' + user_string
|
||||
|
||||
# Split into user and domain parts
|
||||
parts = user_string[1:].split('@', 1) # Remove leading @ and split
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"Invalid user format: {user_string}. Expected @user@domain.tld")
|
||||
|
||||
username, domain = parts
|
||||
|
||||
if not username:
|
||||
raise ValueError("Username cannot be empty")
|
||||
|
||||
if not domain:
|
||||
raise ValueError("Domain cannot be empty")
|
||||
|
||||
# Validate domain format (basic check for valid domain structure)
|
||||
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
|
||||
if not re.match(domain_pattern, domain):
|
||||
raise ValueError(f"Invalid domain format: {domain}")
|
||||
|
||||
# Return normalized format: @user@domain.tld (domain lowercase, user case-sensitive)
|
||||
return f"@{username}@{domain.lower()}"
|
||||
|
||||
|
||||
def get_rarity_to_weight(
|
||||
config_section: configparser.SectionProxy) -> dict[int, float]:
|
||||
"""Parses Rarity_X keys from config and returns a {rarity: weight} dict."""
|
||||
|
@ -38,19 +85,36 @@ config = configparser.ConfigParser()
|
|||
config.read(get_config_file())
|
||||
|
||||
# Username for the bot
|
||||
USER = config['credentials']['User'].lower()
|
||||
if 'User' not in config['credentials'] or not config['credentials']['User'].strip():
|
||||
raise ConfigError("User must be specified in config.ini under [credentials]")
|
||||
|
||||
USER = normalize_user(config['credentials']['User'])
|
||||
# API key for the bot
|
||||
KEY = config['credentials']['Token']
|
||||
# Bot's Misskey instance URL
|
||||
# Bot's Misskey/Pleroma instance URL
|
||||
INSTANCE = config['credentials']['Instance'].lower()
|
||||
|
||||
# Instance type validation
|
||||
if 'InstanceType' not in config['application']:
|
||||
raise ValueError("InstanceType must be specified in config.ini")
|
||||
|
||||
instance_type = config['application']['InstanceType'].lower()
|
||||
if instance_type not in ('misskey', 'pleroma'):
|
||||
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
|
||||
|
||||
INSTANCE_TYPE = instance_type
|
||||
|
||||
# Web server port
|
||||
WEB_PORT = config['application'].getint('WebPort', 5000)
|
||||
BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1')
|
||||
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot
|
||||
ADMINS = json.loads(config['application']['DefaultAdmins'])
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
DB_PATH = config['application'].get('DatabaseLocation', './gacha_game.db')
|
||||
# Whether to enable the instance whitelist
|
||||
USE_WHITELIST = config['application']['UseWhitelist']
|
||||
USE_WHITELIST = config['application'].getboolean('UseWhitelist', True)
|
||||
|
||||
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
|
||||
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
|
||||
|
|
|
@ -150,12 +150,11 @@ def is_player_administrator(username: str) -> bool:
|
|||
|
||||
|
||||
def insert_card(
|
||||
name: str, rarity: int, weight: float, file_id: str) -> int:
|
||||
name: str, rarity: int, file_id: str) -> int:
|
||||
'''Inserts a card'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO cards (name, rarity, weight, file_id) VALUES \
|
||||
(?, ?, ?, ?)',
|
||||
(name, rarity, weight, file_id)
|
||||
'INSERT INTO cards (name, rarity, file_id) VALUES (?, ?, ?)',
|
||||
(name, rarity, file_id)
|
||||
)
|
||||
card_id = CURSOR.lastrowid
|
||||
return card_id if card_id else 0
|
||||
|
|
40
bot/fediverse_factory.py
Normal file
40
bot/fediverse_factory.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from fediverse_service import FediverseService
|
||||
from misskey_service import MisskeyService
|
||||
from pleroma_service import PleromaService
|
||||
|
||||
|
||||
class FediverseServiceFactory:
|
||||
"""Factory for creating FediverseService implementations"""
|
||||
|
||||
@staticmethod
|
||||
def create_service(instance_type: str) -> FediverseService:
|
||||
"""
|
||||
Create a FediverseService implementation based on the instance type.
|
||||
|
||||
Args:
|
||||
instance_type: The type of instance ("misskey" or "pleroma")
|
||||
|
||||
Returns:
|
||||
FediverseService implementation (MisskeyService or PleromaService)
|
||||
|
||||
Raises:
|
||||
ValueError: If the instance type is not supported
|
||||
"""
|
||||
instance_type = instance_type.lower()
|
||||
|
||||
if instance_type == "misskey":
|
||||
return MisskeyService()
|
||||
elif instance_type == "pleroma":
|
||||
return PleromaService()
|
||||
else:
|
||||
raise ValueError(f"Unsupported instance type: {instance_type}")
|
||||
|
||||
|
||||
def get_fediverse_service(instance_type: str) -> FediverseService:
|
||||
"""
|
||||
Convenience function to get a FediverseService instance
|
||||
|
||||
Args:
|
||||
instance_type: The instance type ("misskey" or "pleroma")
|
||||
"""
|
||||
return FediverseServiceFactory.create_service(instance_type)
|
74
bot/fediverse_service.py
Normal file
74
bot/fediverse_service.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Union, BinaryIO
|
||||
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
|
||||
|
||||
|
||||
class FediverseService(ABC):
|
||||
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
|
||||
|
||||
@abstractmethod
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""
|
||||
Retrieve notifications from the Fediverse instance.
|
||||
|
||||
Args:
|
||||
since_id: Optional ID to get notifications newer than this ID
|
||||
|
||||
Returns:
|
||||
List of FediverseNotification objects
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""
|
||||
Create a new post on the Fediverse instance.
|
||||
|
||||
Args:
|
||||
text: The text content of the post
|
||||
reply_to_id: Optional ID of post to reply to
|
||||
visibility: Visibility level for the post
|
||||
file_ids: Optional list of file IDs to attach
|
||||
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
|
||||
|
||||
Returns:
|
||||
ID of the created post
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""
|
||||
Retrieve a specific post by its ID.
|
||||
|
||||
Args:
|
||||
post_id: The ID of the post to retrieve
|
||||
|
||||
Returns:
|
||||
FediversePost object if found, None otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""
|
||||
Upload a file to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
file_data: File data as binary stream or bytes
|
||||
filename: Optional filename for the uploaded file
|
||||
|
||||
Returns:
|
||||
FediverseFile object with ID, URL, and other metadata
|
||||
|
||||
Raises:
|
||||
RuntimeError: If file upload fails
|
||||
"""
|
||||
pass
|
73
bot/fediverse_types.py
Normal file
73
bot/fediverse_types.py
Normal file
|
@ -0,0 +1,73 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class NotificationType(Enum):
|
||||
MENTION = "mention"
|
||||
REPLY = "reply"
|
||||
FOLLOW = "follow"
|
||||
FAVOURITE = "favourite"
|
||||
REBLOG = "reblog"
|
||||
POLL = "poll"
|
||||
OTHER = "other"
|
||||
|
||||
|
||||
class Visibility(Enum):
|
||||
PUBLIC = "public"
|
||||
UNLISTED = "unlisted"
|
||||
HOME = "home"
|
||||
FOLLOWERS = "followers"
|
||||
SPECIFIED = "specified"
|
||||
DIRECT = "direct"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseUser:
|
||||
"""Common user representation across Fediverse platforms"""
|
||||
id: str
|
||||
username: str
|
||||
host: Optional[str] = None # None for local users
|
||||
display_name: Optional[str] = None
|
||||
|
||||
@property
|
||||
def full_handle(self) -> str:
|
||||
"""Returns the full fediverse handle (@user@domain or @user for local)"""
|
||||
if self.host:
|
||||
return f"@{self.username}@{self.host}"
|
||||
return f"@{self.username}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseFile:
|
||||
"""Common file/attachment representation"""
|
||||
id: str
|
||||
url: str
|
||||
type: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediversePost:
|
||||
"""Common post representation across Fediverse platforms"""
|
||||
id: str
|
||||
text: Optional[str]
|
||||
user: FediverseUser
|
||||
visibility: Visibility
|
||||
created_at: Optional[str] = None
|
||||
files: List[FediverseFile] = None
|
||||
reply_to_id: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.files is None:
|
||||
self.files = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseNotification:
|
||||
"""Common notification representation across Fediverse platforms"""
|
||||
id: str
|
||||
type: NotificationType
|
||||
user: FediverseUser
|
||||
post: Optional[FediversePost] = None
|
||||
created_at: Optional[str] = None
|
156
bot/misskey_service.py
Normal file
156
bot/misskey_service.py
Normal file
|
@ -0,0 +1,156 @@
|
|||
import misskey
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class MisskeyService(FediverseService):
|
||||
"""Misskey implementation of FediverseService"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
||||
|
||||
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Misskey user data to FediverseUser"""
|
||||
return FediverseUser(
|
||||
id=user_data.get("id", ""),
|
||||
username=user_data.get("username", "unknown"),
|
||||
host=user_data.get("host"),
|
||||
display_name=user_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Misskey file data to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=file_data.get("id", ""),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Misskey visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"home": Visibility.HOME,
|
||||
"followers": Visibility.FOLLOWERS,
|
||||
"specified": Visibility.SPECIFIED
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.HOME)
|
||||
|
||||
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Misskey visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "home",
|
||||
Visibility.FOLLOWERS: "followers",
|
||||
Visibility.SPECIFIED: "specified",
|
||||
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
|
||||
}
|
||||
return visibility_map.get(visibility, "home")
|
||||
|
||||
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Misskey notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"reply": NotificationType.REPLY,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Misskey note data to FediversePost"""
|
||||
files = []
|
||||
if note_data.get("files"):
|
||||
files = [self._convert_misskey_file(f) for f in note_data["files"]]
|
||||
|
||||
return FediversePost(
|
||||
id=note_data.get("id", ""),
|
||||
text=note_data.get("text"),
|
||||
user=self._convert_misskey_user(note_data.get("user", {})),
|
||||
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
|
||||
created_at=note_data.get("createdAt"),
|
||||
files=files,
|
||||
reply_to_id=note_data.get("replyId")
|
||||
)
|
||||
|
||||
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Misskey notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("note"):
|
||||
post = self._convert_misskey_post(notification_data["note"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=notification_data.get("id", ""),
|
||||
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_misskey_user(notification_data.get("user", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("createdAt")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Misskey instance"""
|
||||
params = {
|
||||
'include_types': ['mention', 'reply'],
|
||||
'limit': 50
|
||||
}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.i_notifications(**params)
|
||||
return [self._convert_misskey_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Misskey instance"""
|
||||
params = {
|
||||
"text": text,
|
||||
"visibility": self._convert_to_misskey_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["reply_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["file_ids"] = file_ids
|
||||
|
||||
if visible_user_ids and visibility == Visibility.SPECIFIED:
|
||||
params["visible_user_ids"] = visible_user_ids
|
||||
|
||||
response = self.client.notes_create(**params)
|
||||
return response.get("createdNote", {}).get("id", "")
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Misskey instance"""
|
||||
try:
|
||||
note = self.client.notes_show(noteId=post_id)
|
||||
return self._convert_misskey_post(note)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Misskey Drive"""
|
||||
try:
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
media = self.client.drive_files_create(file_data)
|
||||
return self._convert_misskey_file(media)
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Unexpected error during file upload: {e}") from e
|
74
bot/mock_fediverse_service.py
Normal file
74
bot/mock_fediverse_service.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
"""Mock FediverseService for testing purposes"""
|
||||
|
||||
from typing import List, Optional, Union, BinaryIO
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
|
||||
|
||||
|
||||
class MockFediverseService(FediverseService):
|
||||
"""Mock implementation of FediverseService for testing"""
|
||||
|
||||
def __init__(self):
|
||||
self.notifications = []
|
||||
self.created_posts = []
|
||||
self.uploaded_files = []
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Return mock notifications, optionally filtered by since_id"""
|
||||
if since_id is None:
|
||||
return self.notifications
|
||||
|
||||
# Filter notifications newer than since_id
|
||||
filtered = []
|
||||
for notif in self.notifications:
|
||||
if notif.id > since_id:
|
||||
filtered.append(notif)
|
||||
return filtered
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Mock post creation, returns fake post ID"""
|
||||
post_id = f"mock_post_{len(self.created_posts)}"
|
||||
|
||||
# Store the post for assertions
|
||||
self.created_posts.append({
|
||||
'id': post_id,
|
||||
'text': text,
|
||||
'reply_to_id': reply_to_id,
|
||||
'visibility': visibility,
|
||||
'file_ids': file_ids,
|
||||
'visible_user_ids': visible_user_ids
|
||||
})
|
||||
|
||||
return post_id
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile:
|
||||
"""Mock file upload, returns fake file"""
|
||||
file_id = f"mock_file_{len(self.uploaded_files)}"
|
||||
|
||||
mock_file = FediverseFile(
|
||||
id=file_id,
|
||||
url=f"https://example.com/files/{file_id}",
|
||||
type="image/png",
|
||||
name="test_file.png"
|
||||
)
|
||||
|
||||
self.uploaded_files.append(mock_file)
|
||||
return mock_file
|
||||
|
||||
# Helper methods for testing
|
||||
def add_mock_notification(self, notification: FediverseNotification):
|
||||
"""Add a mock notification for testing"""
|
||||
self.notifications.append(notification)
|
||||
|
||||
def clear_all(self):
|
||||
"""Clear all mock data"""
|
||||
self.notifications.clear()
|
||||
self.created_posts.clear()
|
||||
self.uploaded_files.clear()
|
|
@ -1,45 +1,47 @@
|
|||
import traceback
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
|
||||
import config
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
|
||||
from db_utils import is_whitelisted, is_player_banned
|
||||
from response import generate_response
|
||||
from custom_types import BotResponse
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from fediverse_types import FediverseNotification, NotificationType, Visibility
|
||||
|
||||
|
||||
def process_notification(
|
||||
client: misskey.Misskey,
|
||||
notification: Dict[str, Any]) -> None:
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None:
|
||||
'''Processes an individual fediverse notification using the abstraction'''
|
||||
if fediverse_service is None:
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
|
||||
# Get user and instance info
|
||||
username = notification.user.username
|
||||
host = notification.user.host
|
||||
instance = host if host else 'local'
|
||||
|
||||
if USE_WHITELIST and not is_whitelisted(instance):
|
||||
# Check whitelist
|
||||
if config.USE_WHITELIST and not is_whitelisted(instance):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
return
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
# Only process mentions and replies
|
||||
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
|
||||
return
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
# Return early if no post attached
|
||||
if not notification.post:
|
||||
return
|
||||
|
||||
# Determine visibility for reply
|
||||
if notification.post.visibility != Visibility.SPECIFIED:
|
||||
visibility = Visibility.HOME
|
||||
else:
|
||||
visibility = Visibility.SPECIFIED
|
||||
|
||||
notif_type = notification.type.value
|
||||
notif_id = notification.id
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_notification = parse_notification(notification, client)
|
||||
parsed_notification = parse_notification(notification, fediverse_service)
|
||||
|
||||
if not parsed_notification:
|
||||
return
|
||||
|
@ -49,79 +51,20 @@ def process_notification(
|
|||
print(f'⚠️ Blocked notification from banned player: {author}')
|
||||
return
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
response: BotResponse | None = generate_response(parsed_notification)
|
||||
|
||||
if not response:
|
||||
return
|
||||
|
||||
client.notes_create(
|
||||
# Handle attachment URLs (convert to file IDs if needed)
|
||||
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
|
||||
|
||||
# Send response using fediverse service
|
||||
fediverse_service.create_post(
|
||||
text=response['message'],
|
||||
reply_id=note_id,
|
||||
reply_to_id=notification.post.id,
|
||||
visibility=visibility,
|
||||
file_ids=response['attachment_urls']
|
||||
# TODO: write actual visible users ids so pleromers can use the bot
|
||||
# privately
|
||||
# visible_user_ids=[]
|
||||
file_ids=file_ids
|
||||
# visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
||||
|
||||
def process_notifications(client: misskey.Misskey) -> bool:
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id', '')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a \
|
||||
notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
||||
|
|
|
@ -1,27 +1,38 @@
|
|||
import re
|
||||
from typing import Dict, Any
|
||||
|
||||
import misskey
|
||||
|
||||
import config
|
||||
from response import generate_response
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from fediverse_types import FediverseNotification, NotificationType, Visibility
|
||||
from custom_types import ParsedNotification
|
||||
|
||||
def parse_notification(notification: FediverseNotification, fediverse_service=None):
|
||||
'''Parses any notifications received by the bot and sends any commands to
|
||||
generate_response()'''
|
||||
|
||||
if fediverse_service is None:
|
||||
fediverse_service = get_fediverse_service(config.INSTANCE_TYPE)
|
||||
|
||||
def parse_notification(
|
||||
notification: Dict[str, Any],
|
||||
client: misskey.Misskey) -> ParsedNotification | None:
|
||||
'''Parses any notifications received by the bot'''
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# Return early if no post attached
|
||||
if not notification.post:
|
||||
return
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
if notification.post.visibility != Visibility.SPECIFIED:
|
||||
visibility = Visibility.HOME
|
||||
else:
|
||||
visibility = Visibility.SPECIFIED
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host")
|
||||
# Local users may not have a hostname attached
|
||||
full_user = f"@{username}" if not host else f"@{username}@{host}"
|
||||
full_user = notification.user.full_handle
|
||||
|
||||
note_obj = notification.get("note", {})
|
||||
note_text = note_obj.get("text")
|
||||
note_id = note_obj.get("id")
|
||||
note_text = notification.post.text
|
||||
note_id = notification.post.id
|
||||
|
||||
note = note_text.strip().lower() if note_text else ""
|
||||
# Split words into tokens
|
||||
|
@ -44,9 +55,30 @@ def parse_notification(
|
|||
command = parts[1].lower()
|
||||
arguments = parts[2:] if len(parts) > 2 else []
|
||||
|
||||
return {
|
||||
# Create ParsedNotification object for the new response system
|
||||
parsed_notification: ParsedNotification = {
|
||||
'author': full_user,
|
||||
'command': command,
|
||||
'arguments': arguments,
|
||||
'note_obj': note_obj
|
||||
'note_obj': {
|
||||
'id': note_id,
|
||||
'text': note_text,
|
||||
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
|
||||
}
|
||||
}
|
||||
|
||||
# Generate response using the new system
|
||||
response = generate_response(parsed_notification)
|
||||
if not response:
|
||||
return
|
||||
|
||||
# Handle attachment URLs (convert to file IDs if needed)
|
||||
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
|
||||
|
||||
fediverse_service.create_post(
|
||||
text=response['message'],
|
||||
reply_to_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=file_ids
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
|
188
bot/pleroma_service.py
Normal file
188
bot/pleroma_service.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
from mastodon import Mastodon
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
import io
|
||||
import filetype
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class PleromaService(FediverseService):
|
||||
"""Pleroma implementation of FediverseService using Mastodon.py"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = Mastodon(
|
||||
access_token=config.KEY,
|
||||
api_base_url=config.INSTANCE
|
||||
)
|
||||
|
||||
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Mastodon/Pleroma user data to FediverseUser"""
|
||||
acct = user_data.get("acct", "")
|
||||
if "@" in acct:
|
||||
username, host = acct.split("@", 1)
|
||||
else:
|
||||
username = acct
|
||||
host = None
|
||||
|
||||
return FediverseUser(
|
||||
id=str(user_data.get("id", "")),
|
||||
username=username,
|
||||
host=host,
|
||||
display_name=user_data.get("display_name")
|
||||
)
|
||||
|
||||
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=str(file_data.get("id", "")),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("description")
|
||||
)
|
||||
|
||||
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Mastodon/Pleroma visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"private": Visibility.FOLLOWERS,
|
||||
"direct": Visibility.DIRECT
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.PUBLIC)
|
||||
|
||||
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
|
||||
Visibility.FOLLOWERS: "private",
|
||||
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
|
||||
Visibility.DIRECT: "direct"
|
||||
}
|
||||
return visibility_map.get(visibility, "public")
|
||||
|
||||
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Mastodon/Pleroma notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Mastodon/Pleroma status data to FediversePost"""
|
||||
files = []
|
||||
if status_data.get("media_attachments"):
|
||||
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
|
||||
|
||||
# Extract plain text from HTML content
|
||||
content = status_data.get("content", "")
|
||||
# Basic HTML stripping - in production you might want to use a proper HTML parser
|
||||
import re
|
||||
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
|
||||
|
||||
return FediversePost(
|
||||
id=str(status_data.get("id", "")),
|
||||
text=plain_text,
|
||||
user=self._convert_mastodon_user(status_data.get("account", {})),
|
||||
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
|
||||
created_at=status_data.get("created_at"),
|
||||
files=files,
|
||||
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
|
||||
)
|
||||
|
||||
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("status"):
|
||||
post = self._convert_mastodon_status(notification_data["status"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=str(notification_data.get("id", "")),
|
||||
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_mastodon_user(notification_data.get("account", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("created_at")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Pleroma instance"""
|
||||
params = {}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.notifications(**params)
|
||||
return [self._convert_mastodon_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Pleroma instance"""
|
||||
params = {
|
||||
"status": text,
|
||||
"visibility": self._convert_to_mastodon_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["in_reply_to_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["media_ids"] = file_ids
|
||||
|
||||
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
|
||||
# For direct messages, you typically mention users in the status text
|
||||
|
||||
response = self.client.status_post(**params)
|
||||
return str(response.get("id", ""))
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Pleroma instance"""
|
||||
try:
|
||||
status = self.client.status(post_id)
|
||||
return self._convert_mastodon_status(status)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Pleroma instance"""
|
||||
try:
|
||||
# Convert file_data to bytes for MIME detection
|
||||
if hasattr(file_data, 'read'):
|
||||
# Check if we can seek back
|
||||
try:
|
||||
current_pos = file_data.tell()
|
||||
file_bytes = file_data.read()
|
||||
file_data.seek(current_pos)
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
except (io.UnsupportedOperation, OSError):
|
||||
# Non-seekable stream, already read all data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
else:
|
||||
file_bytes = file_data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
|
||||
# Use filetype library for robust MIME detection
|
||||
kind = filetype.guess(file_bytes)
|
||||
if kind is not None:
|
||||
mime_type = kind.mime
|
||||
else:
|
||||
# Fallback to image/jpeg if detection fails
|
||||
mime_type = 'image/jpeg'
|
||||
|
||||
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
||||
return self._convert_mastodon_file(media)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e
|
|
@ -120,12 +120,6 @@ in order: name, rarity',
|
|||
be a number between 1 and 5',
|
||||
'attachment_urls': None
|
||||
}
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return {
|
||||
'message': f'{author} Invalid drop weight: \'{arguments[2]}\' \
|
||||
must be a decimal value between 0.0 and 1.0',
|
||||
'attachment_urls': None
|
||||
}
|
||||
|
||||
card_id, file_id = add_card(
|
||||
name=arguments[0],
|
||||
|
|
|
@ -5,6 +5,13 @@
|
|||
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"]
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
; Instance type - either "misskey" or "pleroma"
|
||||
InstanceType = misskey
|
||||
; Web server port (default: 5000)
|
||||
WebPort = 5000
|
||||
; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces)
|
||||
BindAddress = 127.0.0.1
|
||||
|
||||
; Whether to lmit access to the bot via an instance whitelist
|
||||
; The whitelist can be adjusted via the application
|
||||
UseWhitelist = False
|
||||
|
@ -35,4 +42,3 @@ User = @bot@example.tld
|
|||
; API key for the bot
|
||||
; Generate one by going to Settings > API > Generate access token
|
||||
Token = abcdefghijklmnopqrstuvwxyz012345
|
||||
|
||||
|
|
14
readme.md
14
readme.md
|
@ -1,6 +1,6 @@
|
|||
# Kemoverse
|
||||
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
|
||||
<p align="center">
|
||||
<img src="./web/static/logo.png" alt="Fediverse Gacha Bot Logo" width="300" height="auto">
|
||||
</p>
|
||||
|
@ -47,13 +47,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
- 🌐 Web app to generate cards from images
|
||||
|
||||
### 🌍 Fediverse Support
|
||||
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
|
||||
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
|
||||
|
||||
## 🗃️ Tech Stack
|
||||
|
||||
- Python (3.12+)
|
||||
- SQLite
|
||||
- Fediverse API integration (via Misskey endpoints)
|
||||
- Fediverse API integration (Misskey and Pleroma support)
|
||||
- Flask
|
||||
- Modular DB design for extensibility
|
||||
|
||||
|
@ -65,12 +65,12 @@ The bot is meant to feel *light, fun, and competitive*. Mixing social, gacha and
|
|||
flowchart TD
|
||||
|
||||
subgraph Player Interaction
|
||||
A1[Misskey bot]
|
||||
A1[Fediverse bot]
|
||||
A2[Web]
|
||||
end
|
||||
|
||||
subgraph Misskey
|
||||
B1[Misskey instance]
|
||||
subgraph Fediverse
|
||||
B1[Fediverse instance]
|
||||
end
|
||||
|
||||
subgraph Bot
|
||||
|
@ -78,7 +78,7 @@ flowchart TD
|
|||
C2[Notification parser]
|
||||
C3[Gacha roll logic]
|
||||
C4[Database interface]
|
||||
C5[Misskey API poster]
|
||||
C5[Fediverse API poster]
|
||||
end
|
||||
|
||||
subgraph Website
|
||||
|
|
|
@ -6,3 +6,5 @@ Jinja2==3.1.6
|
|||
MarkupSafe==3.0.2
|
||||
Werkzeug==3.1.3
|
||||
Misskey.py==4.1.0
|
||||
Mastodon.py==1.8.1
|
||||
filetype==1.2.0
|
||||
|
|
|
@ -57,16 +57,14 @@ def perform_migration(cursor: sqlite3.Cursor, migration: tuple[int, str]) -> Non
|
|||
def get_db_path() -> str | DBNotFoundError:
|
||||
'''Gets the DB path from config.ini'''
|
||||
env = os.environ.get('KEMOVERSE_ENV')
|
||||
if not (env and env in ['prod', 'dev']):
|
||||
raise KemoverseEnvUnset
|
||||
|
||||
print(f'Running in "{env}" mode')
|
||||
|
||||
config_path = f'config_{env}.ini'
|
||||
|
||||
if not os.path.isfile(config_path):
|
||||
raise ConfigError(f'Could not find {config_path}')
|
||||
|
||||
print(f'Running in "{env}" mode')
|
||||
|
||||
config = ConfigParser()
|
||||
config.read(config_path)
|
||||
db_path = config['application']['DatabaseLocation']
|
||||
|
@ -96,7 +94,6 @@ def main():
|
|||
return
|
||||
except KemoverseEnvUnset:
|
||||
print('Error: KEMOVERSE_ENV is either not set or has an invalid value.')
|
||||
print('Please set KEMOVERSE_ENV to either "dev" or "prod" before running.')
|
||||
print(traceback.format_exc())
|
||||
return
|
||||
|
||||
|
|
11
web/app.py
11
web/app.py
|
@ -1,13 +1,18 @@
|
|||
import sqlite3
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add bot directory to path to import config
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
|
||||
import config
|
||||
|
||||
from flask import Flask, render_template, abort
|
||||
from werkzeug.exceptions import HTTPException
|
||||
|
||||
app = Flask(__name__)
|
||||
DB_PATH = "./gacha_game.db" # Adjust path if needed
|
||||
|
||||
def get_db_connection():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn = sqlite3.connect(config.DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
@ -68,4 +73,4 @@ def submit_character():
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5000, debug=True)
|
||||
app.run(host=config.BIND_ADDRESS, port=config.WEB_PORT, debug=True)
|
||||
|
|
Loading…
Add table
Reference in a new issue