Compare commits
14 commits
master
...
misc-bugfi
Author | SHA1 | Date | |
---|---|---|---|
|
9fdcf21663 | ||
|
918ed75e9b | ||
|
937ed6b2b1 | ||
|
9c5eab51af | ||
|
99442b4e01 | ||
|
34df5fc6ee | ||
|
287246c6fb | ||
|
717ee05b38 | ||
|
bd5e56278f | ||
|
f101119b47 | ||
|
fc3d638bf4 | ||
|
f4499f7afb | ||
|
2468ffa1f3 | ||
|
bf5bc2212c |
17 changed files with 725 additions and 75 deletions
5
.gitignore
vendored
5
.gitignore
vendored
|
@ -181,5 +181,6 @@ cython_debug/
|
|||
.cursorindexingignore
|
||||
|
||||
# Custom stuff
|
||||
gacha_game.db
|
||||
config.ini
|
||||
*.db
|
||||
*.ini
|
||||
reference/
|
||||
|
|
1
.tool-versions
Normal file
1
.tool-versions
Normal file
|
@ -0,0 +1 @@
|
|||
nodejs 23.4.0
|
|
@ -1,20 +1,19 @@
|
|||
import requests
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
from client import client_connection
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from db_utils import get_db_connection
|
||||
|
||||
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
||||
"""
|
||||
Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
|
||||
Adds a character to the database, uploading the image from a public URL to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
name (str): Character name.
|
||||
rarity (int): Character rarity (e.g., 1-5).
|
||||
weight (float): Pull weight (e.g., 0.02).
|
||||
image_url (str): Public URL of the image from the post (e.g., from note['files'][i]['url']).
|
||||
image_url (str): Public URL of the image from the post.
|
||||
|
||||
Returns:
|
||||
tuple[int, str]: Character ID and bot's Drive file_id.
|
||||
tuple[int, str]: Character ID and file_id.
|
||||
|
||||
Raises:
|
||||
ValueError: If inputs are invalid.
|
||||
|
@ -36,13 +35,13 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
|||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
# Upload to Fediverse instance
|
||||
fediverse_service = get_fediverse_service()
|
||||
try:
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media["id"]
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
||||
uploaded_file = fediverse_service.upload_file(response.raw)
|
||||
file_id = uploaded_file.id
|
||||
except RuntimeError as e:
|
||||
raise RuntimeError(f"Failed to upload image: {e}") from e
|
||||
|
||||
# Insert into database
|
||||
conn = get_db_connection()
|
||||
|
|
|
@ -1,16 +1,15 @@
|
|||
import time
|
||||
import traceback
|
||||
import misskey
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_or_create_user, add_pull, get_config, set_config
|
||||
from client import client_connection
|
||||
from fediverse_factory import get_fediverse_service
|
||||
import config
|
||||
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
# Initialize the Fediverse service
|
||||
fediverse_service = get_fediverse_service()
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
whitelisted_instances: list[str] = []
|
||||
# Get trusted instances from config
|
||||
whitelisted_instances = config.TRUSTED_INSTANCES
|
||||
|
||||
def stream_notifications():
|
||||
print("Starting filtered notification stream...")
|
||||
|
@ -19,40 +18,38 @@ def stream_notifications():
|
|||
|
||||
while True:
|
||||
try:
|
||||
# May be able to mark notifications as read using misskey.py and
|
||||
# filter them out here. This function also takes a since_id we
|
||||
# could use as well
|
||||
notifications = client.i_notifications()
|
||||
# Get notifications from the fediverse service
|
||||
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
|
||||
|
||||
if notifications:
|
||||
# Oldest to newest
|
||||
notifications.reverse()
|
||||
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.get("id")
|
||||
notif_id = notification.id
|
||||
|
||||
# Skip old or same ID notifications
|
||||
# Double-check filtering: Even though we pass since_id to the API,
|
||||
# we manually filter again for reliability because:
|
||||
# 1. API might ignore since_id if it's too old (server downtime)
|
||||
# 2. Pagination limits might cause missed notifications
|
||||
# 3. Race conditions between fetch and save operations
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host") # None if local user
|
||||
username = notification.user.username
|
||||
host = notification.user.host
|
||||
|
||||
instance = host if host else "local"
|
||||
|
||||
if instance in whitelisted_instances or instance == "local":
|
||||
note = notification.get("note", {}).get("text", "")
|
||||
notif_type = notification.get("type", "unknown")
|
||||
note = notification.post.text if notification.post else ""
|
||||
notif_type = notification.type.value
|
||||
|
||||
print(f"📨 [{notif_type}] from @{username}@{instance}")
|
||||
print(f"💬 {note}")
|
||||
print("-" * 30)
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parse_notification(notification,client)
|
||||
parse_notification(notification, fediverse_service)
|
||||
|
||||
else:
|
||||
print(f"⚠️ Blocked notification from untrusted instance: {host}")
|
||||
|
|
|
@ -1,22 +1,90 @@
|
|||
'''Essentials for the bot to function'''
|
||||
import configparser
|
||||
config = configparser.ConfigParser()
|
||||
config.read('config.ini')
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
def get_config_file_path():
|
||||
"""Get config file path from command line arguments or default"""
|
||||
parser = argparse.ArgumentParser(add_help=False) # Don't interfere with other argument parsing
|
||||
parser.add_argument('--config', default='config.ini', help='Path to config file (default: config.ini)')
|
||||
|
||||
# Parse only known args to avoid conflicts with other argument parsing
|
||||
args, _ = parser.parse_known_args()
|
||||
return args.config
|
||||
|
||||
def load_config():
|
||||
"""Load and validate config file with proper error handling"""
|
||||
config_file = get_config_file_path()
|
||||
config = configparser.ConfigParser()
|
||||
|
||||
# Check if config file exists
|
||||
if not os.path.exists(config_file):
|
||||
print(f"Error: {config_file} not found!")
|
||||
if config_file == 'config.ini':
|
||||
print("Please copy example_config.ini to config.ini and configure it.")
|
||||
print(f"Or use --config /path/to/your/config.ini to specify a different location.")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
config.read(config_file)
|
||||
except configparser.Error as e:
|
||||
print(f"Error reading {config_file}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Check if [application] section exists
|
||||
if 'application' not in config:
|
||||
print(f"Error: [application] section missing in {config_file}")
|
||||
sys.exit(1)
|
||||
|
||||
# Validate required fields
|
||||
required_fields = ['BotUser', 'ApiKey', 'InstanceUrl', 'InstanceType', 'DatabaseLocation']
|
||||
missing_fields = []
|
||||
|
||||
for field in required_fields:
|
||||
if field not in config['application'] or not config['application'][field].strip():
|
||||
missing_fields.append(field)
|
||||
|
||||
if missing_fields:
|
||||
print(f"Error: Required fields missing in {config_file}: {', '.join(missing_fields)}")
|
||||
sys.exit(1)
|
||||
|
||||
# Validate InstanceType
|
||||
instance_type = config['application']['InstanceType'].lower()
|
||||
if instance_type not in ('misskey', 'pleroma'):
|
||||
print("Error: InstanceType must be either 'misskey' or 'pleroma'")
|
||||
sys.exit(1)
|
||||
|
||||
return config
|
||||
|
||||
# Load and validate configuration
|
||||
config = load_config()
|
||||
|
||||
# Username for the bot
|
||||
USER = config['application']['BotUser']
|
||||
|
||||
# API key for the bot
|
||||
KEY = config['application']['ApiKey']
|
||||
# Bot's Misskey instance URL
|
||||
|
||||
# Bot's instance URL
|
||||
INSTANCE = config['application']['InstanceUrl']
|
||||
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
|
||||
# Instance type
|
||||
INSTANCE_TYPE = config['application']['InstanceType'].lower()
|
||||
|
||||
# Web server port
|
||||
WEB_PORT = config['application'].getint('WebPort', 5000)
|
||||
|
||||
# Trusted instances
|
||||
trusted_instances_str = config['application'].get('TrustedInstances', '')
|
||||
TRUSTED_INSTANCES = [instance.strip() for instance in trusted_instances_str.split(',') if instance.strip()]
|
||||
|
||||
# Extra stuff for control of the bot
|
||||
|
||||
# TODO: move this to db
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot'''
|
||||
ADMINS = config['application']['DefaultAdmins']
|
||||
ADMINS = config['application'].get('DefaultAdmins', '[]')
|
||||
|
|
31
bot/fediverse_factory.py
Normal file
31
bot/fediverse_factory.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
from fediverse_service import FediverseService
|
||||
from misskey_service import MisskeyService
|
||||
from pleroma_service import PleromaService
|
||||
import config
|
||||
|
||||
|
||||
class FediverseServiceFactory:
|
||||
"""Factory for creating FediverseService implementations based on configuration"""
|
||||
|
||||
@staticmethod
|
||||
def create_service() -> FediverseService:
|
||||
"""
|
||||
Create a FediverseService implementation based on the configured instance type.
|
||||
|
||||
Returns:
|
||||
FediverseService implementation (MisskeyService or PleromaService)
|
||||
|
||||
Raises:
|
||||
ValueError: If the instance type is not supported
|
||||
"""
|
||||
if config.INSTANCE_TYPE == "misskey":
|
||||
return MisskeyService()
|
||||
elif config.INSTANCE_TYPE == "pleroma":
|
||||
return PleromaService()
|
||||
else:
|
||||
raise ValueError(f"Unsupported instance type: {config.INSTANCE_TYPE}")
|
||||
|
||||
|
||||
def get_fediverse_service() -> FediverseService:
|
||||
"""Convenience function to get a FediverseService instance"""
|
||||
return FediverseServiceFactory.create_service()
|
74
bot/fediverse_service.py
Normal file
74
bot/fediverse_service.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Union, BinaryIO
|
||||
from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility
|
||||
|
||||
|
||||
class FediverseService(ABC):
|
||||
"""Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)"""
|
||||
|
||||
@abstractmethod
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""
|
||||
Retrieve notifications from the Fediverse instance.
|
||||
|
||||
Args:
|
||||
since_id: Optional ID to get notifications newer than this ID
|
||||
|
||||
Returns:
|
||||
List of FediverseNotification objects
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""
|
||||
Create a new post on the Fediverse instance.
|
||||
|
||||
Args:
|
||||
text: The text content of the post
|
||||
reply_to_id: Optional ID of post to reply to
|
||||
visibility: Visibility level for the post
|
||||
file_ids: Optional list of file IDs to attach
|
||||
visible_user_ids: Optional list of user IDs who can see the post (for specified visibility)
|
||||
|
||||
Returns:
|
||||
ID of the created post
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""
|
||||
Retrieve a specific post by its ID.
|
||||
|
||||
Args:
|
||||
post_id: The ID of the post to retrieve
|
||||
|
||||
Returns:
|
||||
FediversePost object if found, None otherwise
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""
|
||||
Upload a file to the Fediverse instance.
|
||||
|
||||
Args:
|
||||
file_data: File data as binary stream or bytes
|
||||
filename: Optional filename for the uploaded file
|
||||
|
||||
Returns:
|
||||
FediverseFile object with ID, URL, and other metadata
|
||||
|
||||
Raises:
|
||||
RuntimeError: If file upload fails
|
||||
"""
|
||||
pass
|
73
bot/fediverse_types.py
Normal file
73
bot/fediverse_types.py
Normal file
|
@ -0,0 +1,73 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class NotificationType(Enum):
|
||||
MENTION = "mention"
|
||||
REPLY = "reply"
|
||||
FOLLOW = "follow"
|
||||
FAVOURITE = "favourite"
|
||||
REBLOG = "reblog"
|
||||
POLL = "poll"
|
||||
OTHER = "other"
|
||||
|
||||
|
||||
class Visibility(Enum):
|
||||
PUBLIC = "public"
|
||||
UNLISTED = "unlisted"
|
||||
HOME = "home"
|
||||
FOLLOWERS = "followers"
|
||||
SPECIFIED = "specified"
|
||||
DIRECT = "direct"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseUser:
|
||||
"""Common user representation across Fediverse platforms"""
|
||||
id: str
|
||||
username: str
|
||||
host: Optional[str] = None # None for local users
|
||||
display_name: Optional[str] = None
|
||||
|
||||
@property
|
||||
def full_handle(self) -> str:
|
||||
"""Returns the full fediverse handle (@user@domain or @user for local)"""
|
||||
if self.host:
|
||||
return f"@{self.username}@{self.host}"
|
||||
return f"@{self.username}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseFile:
|
||||
"""Common file/attachment representation"""
|
||||
id: str
|
||||
url: str
|
||||
type: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediversePost:
|
||||
"""Common post representation across Fediverse platforms"""
|
||||
id: str
|
||||
text: Optional[str]
|
||||
user: FediverseUser
|
||||
visibility: Visibility
|
||||
created_at: Optional[str] = None
|
||||
files: List[FediverseFile] = None
|
||||
reply_to_id: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.files is None:
|
||||
self.files = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class FediverseNotification:
|
||||
"""Common notification representation across Fediverse platforms"""
|
||||
id: str
|
||||
type: NotificationType
|
||||
user: FediverseUser
|
||||
post: Optional[FediversePost] = None
|
||||
created_at: Optional[str] = None
|
|
@ -1,6 +1,7 @@
|
|||
import random
|
||||
from db_utils import get_or_create_user, add_pull, get_db_connection
|
||||
from add_character import add_character
|
||||
from fediverse_types import FediversePost
|
||||
|
||||
def get_character():
|
||||
''' Gets a random character from the database'''
|
||||
|
@ -27,7 +28,7 @@ def is_float(val):
|
|||
|
||||
|
||||
# TODO: See issue #3, separate command parsing from game logic.
|
||||
def gacha_response(command,full_user, arguments,note_obj):
|
||||
def gacha_response(command: str, full_user: str, arguments: list, post: FediversePost):
|
||||
'''Parses a given command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
|
@ -46,7 +47,7 @@ def gacha_response(command,full_user, arguments,note_obj):
|
|||
|
||||
if command == "create":
|
||||
# Example call from bot logic
|
||||
image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None
|
||||
image_url = post.files[0].url if post.files else None
|
||||
if not image_url:
|
||||
return "You need an image to create a character, dumbass."
|
||||
|
||||
|
|
156
bot/misskey_service.py
Normal file
156
bot/misskey_service.py
Normal file
|
@ -0,0 +1,156 @@
|
|||
import misskey
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class MisskeyService(FediverseService):
|
||||
"""Misskey implementation of FediverseService"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY)
|
||||
|
||||
def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Misskey user data to FediverseUser"""
|
||||
return FediverseUser(
|
||||
id=user_data.get("id", ""),
|
||||
username=user_data.get("username", "unknown"),
|
||||
host=user_data.get("host"),
|
||||
display_name=user_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Misskey file data to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=file_data.get("id", ""),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("name")
|
||||
)
|
||||
|
||||
def _convert_misskey_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Misskey visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"home": Visibility.HOME,
|
||||
"followers": Visibility.FOLLOWERS,
|
||||
"specified": Visibility.SPECIFIED
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.HOME)
|
||||
|
||||
def _convert_to_misskey_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Misskey visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "home",
|
||||
Visibility.FOLLOWERS: "followers",
|
||||
Visibility.SPECIFIED: "specified",
|
||||
Visibility.DIRECT: "specified" # Map direct to specified for Misskey
|
||||
}
|
||||
return visibility_map.get(visibility, "home")
|
||||
|
||||
def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Misskey notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"reply": NotificationType.REPLY,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Misskey note data to FediversePost"""
|
||||
files = []
|
||||
if note_data.get("files"):
|
||||
files = [self._convert_misskey_file(f) for f in note_data["files"]]
|
||||
|
||||
return FediversePost(
|
||||
id=str(note_data.get("id", "")), # Misskey IDs (aid/meid/ulid/objectid) are lexicographically sortable as strings
|
||||
text=note_data.get("text"),
|
||||
user=self._convert_misskey_user(note_data.get("user", {})),
|
||||
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
|
||||
created_at=note_data.get("createdAt"),
|
||||
files=files,
|
||||
reply_to_id=str(note_data.get("replyId", "")) if note_data.get("replyId") else None
|
||||
)
|
||||
|
||||
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Misskey notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("note"):
|
||||
post = self._convert_misskey_post(notification_data["note"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=str(notification_data.get("id", "")), # Misskey IDs are lexicographically sortable as strings
|
||||
type=self._convert_misskey_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_misskey_user(notification_data.get("user", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("createdAt")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Misskey instance"""
|
||||
params = {
|
||||
'include_types': ['mention', 'reply'],
|
||||
'limit': 50
|
||||
}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.i_notifications(**params)
|
||||
return [self._convert_misskey_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Misskey instance"""
|
||||
params = {
|
||||
"text": text,
|
||||
"visibility": self._convert_to_misskey_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["reply_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["file_ids"] = file_ids
|
||||
|
||||
if visible_user_ids and visibility == Visibility.SPECIFIED:
|
||||
params["visible_user_ids"] = visible_user_ids
|
||||
|
||||
response = self.client.notes_create(**params)
|
||||
return response.get("createdNote", {}).get("id", "")
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Misskey instance"""
|
||||
try:
|
||||
note = self.client.notes_show(noteId=post_id)
|
||||
return self._convert_misskey_post(note)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Misskey Drive"""
|
||||
try:
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
media = self.client.drive_files_create(file_data)
|
||||
return self._convert_misskey_file(media)
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Unexpected error during file upload: {e}") from e
|
|
@ -1,34 +1,37 @@
|
|||
import random, re
|
||||
import config
|
||||
from gacha_response import gacha_response
|
||||
from fediverse_factory import get_fediverse_service
|
||||
from fediverse_types import FediverseNotification, NotificationType, Visibility
|
||||
|
||||
def parse_notification(notification,client):
|
||||
'''Oarses any notifications received by the bot and sends any commands to
|
||||
def parse_notification(notification: FediverseNotification, fediverse_service=None):
|
||||
'''Parses any notifications received by the bot and sends any commands to
|
||||
gacha_response()'''
|
||||
|
||||
if fediverse_service is None:
|
||||
fediverse_service = get_fediverse_service()
|
||||
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
|
||||
notif_type = notification.get("type")
|
||||
if not notif_type in ('mention', 'reply'):
|
||||
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# Return early if no post attached
|
||||
if not notification.post:
|
||||
return
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification["note"]["visibility"]
|
||||
if visibility != "specified":
|
||||
visibility = "home"
|
||||
if notification.post.visibility != Visibility.SPECIFIED:
|
||||
visibility = Visibility.HOME
|
||||
else:
|
||||
visibility = Visibility.SPECIFIED
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host")
|
||||
# Local users may not have a hostname attached
|
||||
full_user = f"@{username}" if not host else f"@{username}@{host}"
|
||||
full_user = notification.user.full_handle
|
||||
|
||||
note_obj = notification.get("note", {})
|
||||
note_text = note_obj.get("text")
|
||||
note_id = note_obj.get("id")
|
||||
note_text = notification.post.text
|
||||
note_id = notification.post.id
|
||||
|
||||
note = note_text.strip().lower() if note_text else ""
|
||||
|
||||
|
@ -51,20 +54,20 @@ def parse_notification(notification,client):
|
|||
arguments = parts[1:] if len(parts) > 1 else []
|
||||
|
||||
# TODO: move response generation to a different function
|
||||
response = gacha_response(command.lower(),full_user, arguments, note_obj)
|
||||
response = gacha_response(command.lower(), full_user, arguments, notification.post)
|
||||
if not response:
|
||||
return
|
||||
|
||||
if isinstance(response, str):
|
||||
client.notes_create(
|
||||
fediverse_service.create_post(
|
||||
text=response,
|
||||
reply_id=note_id,
|
||||
reply_to_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
else:
|
||||
client.notes_create(
|
||||
fediverse_service.create_post(
|
||||
text=response[0],
|
||||
reply_id=note_id,
|
||||
reply_to_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
|
|
188
bot/pleroma_service.py
Normal file
188
bot/pleroma_service.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
from mastodon import Mastodon
|
||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||
import io
|
||||
import filetype
|
||||
from fediverse_service import FediverseService
|
||||
from fediverse_types import (
|
||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||
NotificationType, Visibility
|
||||
)
|
||||
import config
|
||||
|
||||
|
||||
class PleromaService(FediverseService):
|
||||
"""Pleroma implementation of FediverseService using Mastodon.py"""
|
||||
|
||||
def __init__(self):
|
||||
self.client = Mastodon(
|
||||
access_token=config.KEY,
|
||||
api_base_url=config.INSTANCE
|
||||
)
|
||||
|
||||
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
||||
"""Convert Mastodon/Pleroma user data to FediverseUser"""
|
||||
acct = user_data.get("acct", "")
|
||||
if "@" in acct:
|
||||
username, host = acct.split("@", 1)
|
||||
else:
|
||||
username = acct
|
||||
host = None
|
||||
|
||||
return FediverseUser(
|
||||
id=str(user_data.get("id", "")),
|
||||
username=username,
|
||||
host=host,
|
||||
display_name=user_data.get("display_name")
|
||||
)
|
||||
|
||||
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
||||
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
|
||||
return FediverseFile(
|
||||
id=str(file_data.get("id", "")),
|
||||
url=file_data.get("url", ""),
|
||||
type=file_data.get("type"),
|
||||
name=file_data.get("description")
|
||||
)
|
||||
|
||||
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
|
||||
"""Convert Mastodon/Pleroma visibility to our enum"""
|
||||
visibility_map = {
|
||||
"public": Visibility.PUBLIC,
|
||||
"unlisted": Visibility.UNLISTED,
|
||||
"private": Visibility.FOLLOWERS,
|
||||
"direct": Visibility.DIRECT
|
||||
}
|
||||
return visibility_map.get(visibility, Visibility.PUBLIC)
|
||||
|
||||
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
|
||||
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
|
||||
visibility_map = {
|
||||
Visibility.PUBLIC: "public",
|
||||
Visibility.UNLISTED: "unlisted",
|
||||
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
|
||||
Visibility.FOLLOWERS: "private",
|
||||
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
|
||||
Visibility.DIRECT: "direct"
|
||||
}
|
||||
return visibility_map.get(visibility, "public")
|
||||
|
||||
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
|
||||
"""Convert Mastodon/Pleroma notification type to our enum"""
|
||||
type_map = {
|
||||
"mention": NotificationType.MENTION,
|
||||
"follow": NotificationType.FOLLOW,
|
||||
"favourite": NotificationType.FAVOURITE,
|
||||
"reblog": NotificationType.REBLOG,
|
||||
"poll": NotificationType.POLL
|
||||
}
|
||||
return type_map.get(notif_type, NotificationType.OTHER)
|
||||
|
||||
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
|
||||
"""Convert Mastodon/Pleroma status data to FediversePost"""
|
||||
files = []
|
||||
if status_data.get("media_attachments"):
|
||||
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
|
||||
|
||||
# Extract plain text from HTML content
|
||||
content = status_data.get("content", "")
|
||||
# Basic HTML stripping - in production you might want to use a proper HTML parser
|
||||
import re
|
||||
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
|
||||
|
||||
return FediversePost(
|
||||
id=str(status_data.get("id", "")), # Pleroma IDs (snowflake format) are lexicographically sortable as strings
|
||||
text=plain_text,
|
||||
user=self._convert_mastodon_user(status_data.get("account", {})),
|
||||
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
|
||||
created_at=status_data.get("created_at"),
|
||||
files=files,
|
||||
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
|
||||
)
|
||||
|
||||
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
||||
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
|
||||
post = None
|
||||
if notification_data.get("status"):
|
||||
post = self._convert_mastodon_status(notification_data["status"])
|
||||
|
||||
return FediverseNotification(
|
||||
id=str(notification_data.get("id", "")), # Pleroma IDs are lexicographically sortable as strings
|
||||
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
|
||||
user=self._convert_mastodon_user(notification_data.get("account", {})),
|
||||
post=post,
|
||||
created_at=notification_data.get("created_at")
|
||||
)
|
||||
|
||||
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
||||
"""Get notifications from Pleroma instance"""
|
||||
params = {}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
|
||||
notifications = self.client.notifications(**params)
|
||||
return [self._convert_mastodon_notification(notif) for notif in notifications]
|
||||
|
||||
def create_post(
|
||||
self,
|
||||
text: str,
|
||||
reply_to_id: Optional[str] = None,
|
||||
visibility: Visibility = Visibility.HOME,
|
||||
file_ids: Optional[List[str]] = None,
|
||||
visible_user_ids: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""Create a post on Pleroma instance"""
|
||||
params = {
|
||||
"status": text,
|
||||
"visibility": self._convert_to_mastodon_visibility(visibility)
|
||||
}
|
||||
|
||||
if reply_to_id:
|
||||
params["in_reply_to_id"] = reply_to_id
|
||||
|
||||
if file_ids:
|
||||
params["media_ids"] = file_ids
|
||||
|
||||
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
|
||||
# For direct messages, you typically mention users in the status text
|
||||
|
||||
response = self.client.status_post(**params)
|
||||
return str(response.get("id", ""))
|
||||
|
||||
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
||||
"""Get a specific post by ID from Pleroma instance"""
|
||||
try:
|
||||
status = self.client.status(post_id)
|
||||
return self._convert_mastodon_status(status)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||
"""Upload a file to Pleroma instance"""
|
||||
try:
|
||||
# Convert file_data to bytes for MIME detection
|
||||
if hasattr(file_data, 'read'):
|
||||
# Check if we can seek back
|
||||
try:
|
||||
current_pos = file_data.tell()
|
||||
file_bytes = file_data.read()
|
||||
file_data.seek(current_pos)
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
except (io.UnsupportedOperation, OSError):
|
||||
# Non-seekable stream, already read all data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
else:
|
||||
file_bytes = file_data
|
||||
file_data = io.BytesIO(file_bytes)
|
||||
|
||||
# Use filetype library for robust MIME detection
|
||||
kind = filetype.guess(file_bytes)
|
||||
if kind is not None:
|
||||
mime_type = kind.mime
|
||||
else:
|
||||
# Fallback to image/jpeg if detection fails
|
||||
mime_type = 'image/jpeg'
|
||||
|
||||
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
||||
return self._convert_mastodon_file(media)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e
|
42
db.py
42
db.py
|
@ -1,7 +1,47 @@
|
|||
import sqlite3
|
||||
import configparser
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
def get_config_file_path():
|
||||
"""Get config file path from command line arguments or default"""
|
||||
parser = argparse.ArgumentParser(add_help=False)
|
||||
parser.add_argument('--config', default='config.ini', help='Path to config file (default: config.ini)')
|
||||
args, _ = parser.parse_known_args()
|
||||
return args.config
|
||||
|
||||
def get_db_path():
|
||||
"""Get database path from config with error handling"""
|
||||
config_file = get_config_file_path()
|
||||
|
||||
if not os.path.exists(config_file):
|
||||
print(f"Error: {config_file} not found!")
|
||||
if config_file == 'config.ini':
|
||||
print("Please copy example_config.ini to config.ini and configure it.")
|
||||
print(f"Or use --config /path/to/your/config.ini to specify a different location.")
|
||||
sys.exit(1)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
try:
|
||||
config.read(config_file)
|
||||
except configparser.Error as e:
|
||||
print(f"Error reading {config_file}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if 'application' not in config:
|
||||
print(f"Error: [application] section missing in {config_file}")
|
||||
sys.exit(1)
|
||||
|
||||
if 'DatabaseLocation' not in config['application']:
|
||||
print(f"Error: DatabaseLocation missing in {config_file}")
|
||||
sys.exit(1)
|
||||
|
||||
return config['application']['DatabaseLocation']
|
||||
|
||||
# Connect to SQLite database (or create it if it doesn't exist)
|
||||
conn = sqlite3.connect('gacha_game.db')
|
||||
db_path = get_db_path()
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create tables
|
||||
|
|
|
@ -10,8 +10,18 @@ ApiKey = abcdefghijklmnopqrstuvwxyz012345
|
|||
; Fully qualified URL of the instance hosting the bot
|
||||
InstanceUrl = http://example.tld
|
||||
|
||||
; Instance type - either "misskey" or "pleroma"
|
||||
InstanceType = misskey
|
||||
|
||||
; Comma separated list of fedi handles for any administrator users
|
||||
DefaultAdmins = ['admin@example.tld']
|
||||
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
|
||||
; Web server port (default: 5000)
|
||||
WebPort = 5000
|
||||
|
||||
; Comma-separated list of trusted fediverse instances (leave empty to allow only local users)
|
||||
; Example: TrustedInstances = mastodon.social,misskey.io,pleroma.example.com
|
||||
TrustedInstances =
|
14
readme.md
14
readme.md
|
@ -1,6 +1,6 @@
|
|||
# Kemoverse
|
||||
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse.
|
||||
A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.
|
||||
|
||||

|
||||
|
||||
|
@ -35,13 +35,13 @@ A gacha-style bot for the Fediverse built with Python. Users can roll for charac
|
|||
- 🌐 Web app to generate cards from images
|
||||
|
||||
### 🌍 Fediverse Support
|
||||
✅ Anyone from the fediverse can play, but the server only works using a Misskey instance. Want to rewrite the program in Elixir for Pleroma? Let us know!
|
||||
✅ Anyone from the fediverse can play! The bot supports both Misskey and Pleroma instances through configurable backends.
|
||||
|
||||
## 🗃️ Tech Stack
|
||||
|
||||
- Python (3.11+)
|
||||
- SQLite
|
||||
- Fediverse API integration (via Misskey endpoints)
|
||||
- Fediverse API integration (Misskey and Pleroma support)
|
||||
- Flask
|
||||
- Modular DB design for extensibility
|
||||
|
||||
|
@ -58,12 +58,12 @@ Instructions on installing dependencies, initializing the database, and running
|
|||
flowchart TD
|
||||
|
||||
subgraph Player Interaction
|
||||
A1[Misskey bot]
|
||||
A1[Fediverse bot]
|
||||
A2[Web]
|
||||
end
|
||||
|
||||
subgraph Misskey
|
||||
B1[Misskey instance]
|
||||
subgraph Fediverse
|
||||
B1[Fediverse instance]
|
||||
end
|
||||
|
||||
subgraph Bot
|
||||
|
@ -71,7 +71,7 @@ flowchart TD
|
|||
C2[Notification parser]
|
||||
C3[Gacha roll logic]
|
||||
C4[Database interface]
|
||||
C5[Misskey API poster]
|
||||
C5[Fediverse API poster]
|
||||
end
|
||||
|
||||
subgraph Website
|
||||
|
|
|
@ -6,3 +6,5 @@ Jinja2==3.1.6
|
|||
MarkupSafe==3.0.2
|
||||
Werkzeug==3.1.3
|
||||
Misskey.py==4.1.0
|
||||
Mastodon.py==1.8.1
|
||||
filetype==1.2.0
|
||||
|
|
10
web/app.py
10
web/app.py
|
@ -1,8 +1,14 @@
|
|||
from flask import Flask, render_template
|
||||
import sqlite3
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add bot directory to path to import config
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'bot'))
|
||||
import config
|
||||
|
||||
app = Flask(__name__)
|
||||
DB_PATH = "./gacha_game.db" # Adjust path if needed
|
||||
DB_PATH = config.DB_PATH
|
||||
|
||||
def get_db_connection():
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
|
@ -56,4 +62,4 @@ def submit_character():
|
|||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5000, debug=True)
|
||||
app.run(host='0.0.0.0', port=config.WEB_PORT, debug=True)
|
||||
|
|
Loading…
Add table
Reference in a new issue