diff --git a/.gitignore b/.gitignore index e5543ec..2df4c91 100644 --- a/.gitignore +++ b/.gitignore @@ -185,5 +185,6 @@ cython_debug/ gacha_game*.db gacha_game*.db.* config*.ini +run.sh -.idea \ No newline at end of file +.idea diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..45e4b9f --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +nodejs 23.4.0 diff --git a/bot/add_card.py b/bot/add_card.py index 16834a9..ffa9601 100644 --- a/bot/add_card.py +++ b/bot/add_card.py @@ -1,32 +1,24 @@ import requests -from misskey.exceptions import MisskeyAPIException -from client import client_connection -from db_utils import insert_card -from custom_types import Card -from config import RARITY_TO_WEIGHT +import config +from fediverse_factory import get_fediverse_service +import db_utils - -def add_card( - name: str, - rarity: int, - image_url: str) -> tuple[int, str]: - ''' - Adds a card to the database, uploading the image from a public URL to - the bot's Misskey Drive. +def add_card(name: str, rarity: int, image_url: str) -> tuple[int, str]: + """ + Adds a card to the database, uploading the image from a public URL to the Fediverse instance. Args: name (str): Card name. rarity (int): Card rarity (e.g., 1-5). - image_url (str): Public URL of the image from the post (e.g., from - note['files'][i]['url']). + image_url (str): Public URL of the image from the post. Returns: - tuple[int, str]: Card ID and bot's Drive file_id. + tuple[int, str]: Card ID and file_id. Raises: ValueError: If inputs are invalid. RuntimeError: If image download/upload or database operation fails. - ''' + """ stripped_name = name.strip() @@ -35,29 +27,33 @@ def add_card( raise ValueError('Card name cannot be empty.') if rarity < 1: raise ValueError('Rarity must be a positive integer.') - if rarity not in RARITY_TO_WEIGHT.keys(): + if rarity not in config.RARITY_TO_WEIGHT.keys(): raise ValueError(f'Invalid rarity: {rarity}') if not image_url: raise ValueError('Image URL must be provided.') - # Download image - response = requests.get(image_url, stream=True, timeout=30) - if response.status_code != 200: - raise RuntimeError(f'Failed to download image from {image_url}') - - # Upload to bot's Drive - mk = client_connection() try: - media = mk.drive_files_create(response.raw) - file_id = media['id'] - except MisskeyAPIException as e: - raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\ - from e + # Download image + response = requests.get(image_url, stream=True, timeout=30) + if response.status_code != 200: + raise RuntimeError(f"Failed to download image from {image_url}") - # Insert into database - card_id = insert_card( + # Upload to Fediverse instance + fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) + try: + uploaded_file = fediverse_service.upload_file(response.raw) + file_id = uploaded_file.id + except RuntimeError as e: + raise RuntimeError(f"Failed to upload image: {e}") from e + + # Insert into database using db_utils function + card_id = db_utils.insert_card( stripped_name, rarity, file_id - ) - return card_id, file_id + ) + + return card_id, file_id + + except Exception as e: + raise diff --git a/bot/bot_app.py b/bot/bot_app.py index 02e6baf..ffbd413 100644 --- a/bot/bot_app.py +++ b/bot/bot_app.py @@ -15,21 +15,26 @@ #along with this program. If not, see https://www.gnu.org/licenses/. import time -import misskey as misskey -from client import client_connection -import db_utils as db +import traceback +import config +from notification import process_fediverse_notification +from db_utils import get_config, set_config, connect, setup_administrators +from fediverse_factory import get_fediverse_service -from config import NOTIFICATION_POLL_INTERVAL, USE_WHITELIST -from notification import process_notifications - -if __name__ == '__main__': - # Initialize the Misskey client - client = client_connection() - # Connect to DB - db.connect() +from config import USE_WHITELIST +def stream_notifications(): + # Initialize database connection + connect() + # Setup default administrators - db.setup_administrators() + setup_administrators() + + # Initialize the Fediverse service + fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) + + # Get the last seen notification ID from the database + last_seen_id = get_config("last_seen_notif_id") # Show whitelist status whitelist_status = "enabled" if USE_WHITELIST else "disabled" @@ -37,5 +42,37 @@ if __name__ == '__main__': print('Listening for notifications...') while True: - if not process_notifications(client): - time.sleep(NOTIFICATION_POLL_INTERVAL) + try: + # Get notifications from the fediverse service + notifications = fediverse_service.get_notifications(since_id=last_seen_id) + + if notifications: + new_last_seen_id = last_seen_id + + for notification in notifications: + notif_id = notification.id + + # Skip old or same ID notifications + if last_seen_id is not None and notif_id <= last_seen_id: + continue + + # Process the notification using the abstracted processor + process_fediverse_notification(notification, fediverse_service) + + # Update only if this notif_id is greater + if new_last_seen_id is None or notif_id > new_last_seen_id: + new_last_seen_id = notif_id + + # Save the latest seen ID + if new_last_seen_id and new_last_seen_id != last_seen_id: + set_config("last_seen_notif_id", new_last_seen_id) + last_seen_id = new_last_seen_id + + time.sleep(5) + + except Exception as e: + print(f"An exception has occured: {e}\n{traceback.format_exc()}") + time.sleep(5) + +if __name__ == "__main__": + stream_notifications() diff --git a/bot/client.py b/bot/client.py deleted file mode 100644 index 57f9f4e..0000000 --- a/bot/client.py +++ /dev/null @@ -1,6 +0,0 @@ -import misskey -import config - - -def client_connection() -> misskey.Misskey: - return misskey.Misskey(address=config.INSTANCE, i=config.KEY) diff --git a/bot/config.py b/bot/config.py index 57cbfa9..36177f9 100644 --- a/bot/config.py +++ b/bot/config.py @@ -91,9 +91,19 @@ if 'User' not in config['credentials'] or not config['credentials']['User'].stri USER = normalize_user(config['credentials']['User']) # API key for the bot KEY = config['credentials']['Token'] -# Bot's Misskey instance URL +# Bot's Misskey/Pleroma instance URL INSTANCE = config['credentials']['Instance'].lower() +# Instance type validation +if 'InstanceType' not in config['application']: + raise ValueError("InstanceType must be specified in config.ini") + +instance_type = config['application']['InstanceType'].lower() +if instance_type not in ('misskey', 'pleroma'): + raise ValueError("InstanceType must be either 'misskey' or 'pleroma'") + +INSTANCE_TYPE = instance_type + # Web server port WEB_PORT = config['application'].getint('WebPort', 5000) BIND_ADDRESS = config['application'].get('BindAddress', '127.0.0.1') diff --git a/bot/fediverse_factory.py b/bot/fediverse_factory.py new file mode 100644 index 0000000..fe501db --- /dev/null +++ b/bot/fediverse_factory.py @@ -0,0 +1,40 @@ +from fediverse_service import FediverseService +from misskey_service import MisskeyService +from pleroma_service import PleromaService + + +class FediverseServiceFactory: + """Factory for creating FediverseService implementations""" + + @staticmethod + def create_service(instance_type: str) -> FediverseService: + """ + Create a FediverseService implementation based on the instance type. + + Args: + instance_type: The type of instance ("misskey" or "pleroma") + + Returns: + FediverseService implementation (MisskeyService or PleromaService) + + Raises: + ValueError: If the instance type is not supported + """ + instance_type = instance_type.lower() + + if instance_type == "misskey": + return MisskeyService() + elif instance_type == "pleroma": + return PleromaService() + else: + raise ValueError(f"Unsupported instance type: {instance_type}") + + +def get_fediverse_service(instance_type: str) -> FediverseService: + """ + Convenience function to get a FediverseService instance + + Args: + instance_type: The instance type ("misskey" or "pleroma") + """ + return FediverseServiceFactory.create_service(instance_type) \ No newline at end of file diff --git a/bot/fediverse_service.py b/bot/fediverse_service.py new file mode 100644 index 0000000..1292d63 --- /dev/null +++ b/bot/fediverse_service.py @@ -0,0 +1,74 @@ +from abc import ABC, abstractmethod +from typing import List, Optional, Union, BinaryIO +from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility + + +class FediverseService(ABC): + """Abstract interface for Fediverse platform services (Misskey, Pleroma, etc.)""" + + @abstractmethod + def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]: + """ + Retrieve notifications from the Fediverse instance. + + Args: + since_id: Optional ID to get notifications newer than this ID + + Returns: + List of FediverseNotification objects + """ + pass + + @abstractmethod + def create_post( + self, + text: str, + reply_to_id: Optional[str] = None, + visibility: Visibility = Visibility.HOME, + file_ids: Optional[List[str]] = None, + visible_user_ids: Optional[List[str]] = None + ) -> str: + """ + Create a new post on the Fediverse instance. + + Args: + text: The text content of the post + reply_to_id: Optional ID of post to reply to + visibility: Visibility level for the post + file_ids: Optional list of file IDs to attach + visible_user_ids: Optional list of user IDs who can see the post (for specified visibility) + + Returns: + ID of the created post + """ + pass + + @abstractmethod + def get_post_by_id(self, post_id: str) -> Optional[FediversePost]: + """ + Retrieve a specific post by its ID. + + Args: + post_id: The ID of the post to retrieve + + Returns: + FediversePost object if found, None otherwise + """ + pass + + @abstractmethod + def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile: + """ + Upload a file to the Fediverse instance. + + Args: + file_data: File data as binary stream or bytes + filename: Optional filename for the uploaded file + + Returns: + FediverseFile object with ID, URL, and other metadata + + Raises: + RuntimeError: If file upload fails + """ + pass \ No newline at end of file diff --git a/bot/fediverse_types.py b/bot/fediverse_types.py new file mode 100644 index 0000000..c1580b6 --- /dev/null +++ b/bot/fediverse_types.py @@ -0,0 +1,73 @@ +from dataclasses import dataclass +from typing import Optional, List, Dict, Any +from enum import Enum + + +class NotificationType(Enum): + MENTION = "mention" + REPLY = "reply" + FOLLOW = "follow" + FAVOURITE = "favourite" + REBLOG = "reblog" + POLL = "poll" + OTHER = "other" + + +class Visibility(Enum): + PUBLIC = "public" + UNLISTED = "unlisted" + HOME = "home" + FOLLOWERS = "followers" + SPECIFIED = "specified" + DIRECT = "direct" + + +@dataclass +class FediverseUser: + """Common user representation across Fediverse platforms""" + id: str + username: str + host: Optional[str] = None # None for local users + display_name: Optional[str] = None + + @property + def full_handle(self) -> str: + """Returns the full fediverse handle (@user@domain or @user for local)""" + if self.host: + return f"@{self.username}@{self.host}" + return f"@{self.username}" + + +@dataclass +class FediverseFile: + """Common file/attachment representation""" + id: str + url: str + type: Optional[str] = None + name: Optional[str] = None + + +@dataclass +class FediversePost: + """Common post representation across Fediverse platforms""" + id: str + text: Optional[str] + user: FediverseUser + visibility: Visibility + created_at: Optional[str] = None + files: List[FediverseFile] = None + reply_to_id: Optional[str] = None + + def __post_init__(self): + if self.files is None: + self.files = [] + + +@dataclass +class FediverseNotification: + """Common notification representation across Fediverse platforms""" + id: str + type: NotificationType + user: FediverseUser + post: Optional[FediversePost] = None + created_at: Optional[str] = None \ No newline at end of file diff --git a/bot/misskey_service.py b/bot/misskey_service.py new file mode 100644 index 0000000..510b32b --- /dev/null +++ b/bot/misskey_service.py @@ -0,0 +1,156 @@ +import misskey +from typing import List, Optional, Dict, Any, Union, BinaryIO +from fediverse_service import FediverseService +from fediverse_types import ( + FediverseNotification, FediversePost, FediverseUser, FediverseFile, + NotificationType, Visibility +) +import config + + +class MisskeyService(FediverseService): + """Misskey implementation of FediverseService""" + + def __init__(self): + self.client = misskey.Misskey(address=config.INSTANCE, i=config.KEY) + + def _convert_misskey_user(self, user_data: Dict[str, Any]) -> FediverseUser: + """Convert Misskey user data to FediverseUser""" + return FediverseUser( + id=user_data.get("id", ""), + username=user_data.get("username", "unknown"), + host=user_data.get("host"), + display_name=user_data.get("name") + ) + + def _convert_misskey_file(self, file_data: Dict[str, Any]) -> FediverseFile: + """Convert Misskey file data to FediverseFile""" + return FediverseFile( + id=file_data.get("id", ""), + url=file_data.get("url", ""), + type=file_data.get("type"), + name=file_data.get("name") + ) + + def _convert_misskey_visibility(self, visibility: str) -> Visibility: + """Convert Misskey visibility to our enum""" + visibility_map = { + "public": Visibility.PUBLIC, + "unlisted": Visibility.UNLISTED, + "home": Visibility.HOME, + "followers": Visibility.FOLLOWERS, + "specified": Visibility.SPECIFIED + } + return visibility_map.get(visibility, Visibility.HOME) + + def _convert_to_misskey_visibility(self, visibility: Visibility) -> str: + """Convert our visibility enum to Misskey visibility""" + visibility_map = { + Visibility.PUBLIC: "public", + Visibility.UNLISTED: "unlisted", + Visibility.HOME: "home", + Visibility.FOLLOWERS: "followers", + Visibility.SPECIFIED: "specified", + Visibility.DIRECT: "specified" # Map direct to specified for Misskey + } + return visibility_map.get(visibility, "home") + + def _convert_misskey_notification_type(self, notif_type: str) -> NotificationType: + """Convert Misskey notification type to our enum""" + type_map = { + "mention": NotificationType.MENTION, + "reply": NotificationType.REPLY, + "follow": NotificationType.FOLLOW, + "favourite": NotificationType.FAVOURITE, + "reblog": NotificationType.REBLOG, + "poll": NotificationType.POLL + } + return type_map.get(notif_type, NotificationType.OTHER) + + def _convert_misskey_post(self, note_data: Dict[str, Any]) -> FediversePost: + """Convert Misskey note data to FediversePost""" + files = [] + if note_data.get("files"): + files = [self._convert_misskey_file(f) for f in note_data["files"]] + + return FediversePost( + id=note_data.get("id", ""), + text=note_data.get("text"), + user=self._convert_misskey_user(note_data.get("user", {})), + visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")), + created_at=note_data.get("createdAt"), + files=files, + reply_to_id=note_data.get("replyId") + ) + + def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification: + """Convert Misskey notification data to FediverseNotification""" + post = None + if notification_data.get("note"): + post = self._convert_misskey_post(notification_data["note"]) + + return FediverseNotification( + id=notification_data.get("id", ""), + type=self._convert_misskey_notification_type(notification_data.get("type", "")), + user=self._convert_misskey_user(notification_data.get("user", {})), + post=post, + created_at=notification_data.get("createdAt") + ) + + def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]: + """Get notifications from Misskey instance""" + params = { + 'include_types': ['mention', 'reply'], + 'limit': 50 + } + if since_id: + params["since_id"] = since_id + + notifications = self.client.i_notifications(**params) + return [self._convert_misskey_notification(notif) for notif in notifications] + + def create_post( + self, + text: str, + reply_to_id: Optional[str] = None, + visibility: Visibility = Visibility.HOME, + file_ids: Optional[List[str]] = None, + visible_user_ids: Optional[List[str]] = None + ) -> str: + """Create a post on Misskey instance""" + params = { + "text": text, + "visibility": self._convert_to_misskey_visibility(visibility) + } + + if reply_to_id: + params["reply_id"] = reply_to_id + + if file_ids: + params["file_ids"] = file_ids + + if visible_user_ids and visibility == Visibility.SPECIFIED: + params["visible_user_ids"] = visible_user_ids + + response = self.client.notes_create(**params) + return response.get("createdNote", {}).get("id", "") + + def get_post_by_id(self, post_id: str) -> Optional[FediversePost]: + """Get a specific post by ID from Misskey instance""" + try: + note = self.client.notes_show(noteId=post_id) + return self._convert_misskey_post(note) + except Exception: + return None + + def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile: + """Upload a file to Misskey Drive""" + try: + from misskey.exceptions import MisskeyAPIException + + media = self.client.drive_files_create(file_data) + return self._convert_misskey_file(media) + except MisskeyAPIException as e: + raise RuntimeError(f"Failed to upload file to Misskey Drive: {e}") from e + except Exception as e: + raise RuntimeError(f"Unexpected error during file upload: {e}") from e \ No newline at end of file diff --git a/bot/mock_fediverse_service.py b/bot/mock_fediverse_service.py new file mode 100644 index 0000000..34a1c0c --- /dev/null +++ b/bot/mock_fediverse_service.py @@ -0,0 +1,74 @@ +"""Mock FediverseService for testing purposes""" + +from typing import List, Optional, Union, BinaryIO +from fediverse_service import FediverseService +from fediverse_types import FediverseNotification, FediversePost, FediverseFile, Visibility + + +class MockFediverseService(FediverseService): + """Mock implementation of FediverseService for testing""" + + def __init__(self): + self.notifications = [] + self.created_posts = [] + self.uploaded_files = [] + + def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]: + """Return mock notifications, optionally filtered by since_id""" + if since_id is None: + return self.notifications + + # Filter notifications newer than since_id + filtered = [] + for notif in self.notifications: + if notif.id > since_id: + filtered.append(notif) + return filtered + + def create_post( + self, + text: str, + reply_to_id: Optional[str] = None, + visibility: Visibility = Visibility.HOME, + file_ids: Optional[List[str]] = None, + visible_user_ids: Optional[List[str]] = None + ) -> str: + """Mock post creation, returns fake post ID""" + post_id = f"mock_post_{len(self.created_posts)}" + + # Store the post for assertions + self.created_posts.append({ + 'id': post_id, + 'text': text, + 'reply_to_id': reply_to_id, + 'visibility': visibility, + 'file_ids': file_ids, + 'visible_user_ids': visible_user_ids + }) + + return post_id + + def upload_file(self, file_data: Union[BinaryIO, bytes]) -> FediverseFile: + """Mock file upload, returns fake file""" + file_id = f"mock_file_{len(self.uploaded_files)}" + + mock_file = FediverseFile( + id=file_id, + url=f"https://example.com/files/{file_id}", + type="image/png", + name="test_file.png" + ) + + self.uploaded_files.append(mock_file) + return mock_file + + # Helper methods for testing + def add_mock_notification(self, notification: FediverseNotification): + """Add a mock notification for testing""" + self.notifications.append(notification) + + def clear_all(self): + """Clear all mock data""" + self.notifications.clear() + self.created_posts.clear() + self.uploaded_files.clear() \ No newline at end of file diff --git a/bot/notification.py b/bot/notification.py index deb8ec6..f264718 100644 --- a/bot/notification.py +++ b/bot/notification.py @@ -1,45 +1,47 @@ -import traceback -from typing import Dict, Any - -import misskey -from misskey.exceptions import MisskeyAPIException - -from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST +import config from parsing import parse_notification -from db_utils import get_config, set_config, is_whitelisted, is_player_banned +from db_utils import is_whitelisted, is_player_banned from response import generate_response from custom_types import BotResponse - -# Define your whitelist -# TODO: move to config -WHITELISTED_INSTANCES: list[str] = [] +from fediverse_factory import get_fediverse_service +from fediverse_types import FediverseNotification, NotificationType, Visibility -def process_notification( - client: misskey.Misskey, - notification: Dict[str, Any]) -> None: - '''Processes an individual notification''' - user = notification.get('user', {}) - username = user.get('username', 'unknown') - host = user.get('host') # None if local user +def process_fediverse_notification(notification: FediverseNotification, fediverse_service=None) -> None: + '''Processes an individual fediverse notification using the abstraction''' + if fediverse_service is None: + fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) + + # Get user and instance info + username = notification.user.username + host = notification.user.host instance = host if host else 'local' - if USE_WHITELIST and not is_whitelisted(instance): + # Check whitelist + if config.USE_WHITELIST and not is_whitelisted(instance): print(f'⚠️ Blocked notification from untrusted instance: {instance}') return - # Copy visibility of the post that was received when replying (so if people - # don't want to dump a bunch of notes on home they don't have to) - visibility = notification['note']['visibility'] - if visibility != 'specified': - visibility = 'home' + # Only process mentions and replies + if notification.type not in (NotificationType.MENTION, NotificationType.REPLY): + return - notif_type = notification.get('type', 'unknown') - notif_id = notification.get('id') + # Return early if no post attached + if not notification.post: + return + + # Determine visibility for reply + if notification.post.visibility != Visibility.SPECIFIED: + visibility = Visibility.HOME + else: + visibility = Visibility.SPECIFIED + + notif_type = notification.type.value + notif_id = notification.id print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') # 🧠 Send to the parser - parsed_notification = parse_notification(notification, client) + parsed_notification = parse_notification(notification, fediverse_service) if not parsed_notification: return @@ -49,79 +51,20 @@ def process_notification( print(f'⚠️ Blocked notification from banned player: {author}') return - # Get the note Id to reply to - note_id = notification.get('note', {}).get('id') - # Get the response response: BotResponse | None = generate_response(parsed_notification) if not response: return - client.notes_create( + # Handle attachment URLs (convert to file IDs if needed) + file_ids = response['attachment_urls'] if response['attachment_urls'] else None + + # Send response using fediverse service + fediverse_service.create_post( text=response['message'], - reply_id=note_id, + reply_to_id=notification.post.id, visibility=visibility, - file_ids=response['attachment_urls'] - # TODO: write actual visible users ids so pleromers can use the bot - # privately - # visible_user_ids=[] + file_ids=file_ids + # visible_user_ids=[] # TODO: write actual visible users ids so pleromers can use the bot privately ) - - -def process_notifications(client: misskey.Misskey) -> bool: - '''Processes a batch of unread notifications. Returns False if there are - no more notifications to process.''' - - last_seen_id = get_config('last_seen_notif_id') - # process_notification writes to last_seen_id, so make a copy - new_last_seen_id = last_seen_id - - try: - notifications = client.i_notifications( - # Fetch notifications we haven't seen yet. This option is a bit - # tempermental, sometimes it'll include since_id, sometimes it - # won't. We need to keep track of what notifications we've - # already processed. - since_id=last_seen_id, - # Let misskey handle the filtering - include_types=['mention', 'reply'], - # And handle the batch size while we're at it - limit=NOTIFICATION_BATCH_SIZE - ) - - # No notifications. Wait the poll period. - if not notifications: - return False - - # Iterate oldest to newest - for notification in notifications: - try: - # Skip if we've processed already - notif_id = notification.get('id', '') - if notif_id <= last_seen_id: - continue - - # Update new_last_seen_id and process - new_last_seen_id = notif_id - process_notification(client, notification) - - except Exception as e: - print(f'An exception has occured while processing a \ -notification: {e}') - print(traceback.format_exc()) - - # If we got as many notifications as we requested, there are probably - # more in the queue - return len(notifications) == NOTIFICATION_BATCH_SIZE - - except MisskeyAPIException as e: - print(f'An exception has occured while reading notifications: {e}\n') - print(traceback.format_exc()) - finally: - # Quality jank right here, but finally lets us update the last_seen_id - # even if we hit an exception or return early - if new_last_seen_id > last_seen_id: - set_config('last_seen_notif_id', new_last_seen_id) - - return False diff --git a/bot/parsing.py b/bot/parsing.py index e1e8583..d625980 100644 --- a/bot/parsing.py +++ b/bot/parsing.py @@ -1,27 +1,38 @@ import re -from typing import Dict, Any - -import misskey - import config +from response import generate_response +from fediverse_factory import get_fediverse_service +from fediverse_types import FediverseNotification, NotificationType, Visibility from custom_types import ParsedNotification +def parse_notification(notification: FediverseNotification, fediverse_service=None): + '''Parses any notifications received by the bot and sends any commands to + generate_response()''' + + if fediverse_service is None: + fediverse_service = get_fediverse_service(config.INSTANCE_TYPE) -def parse_notification( - notification: Dict[str, Any], - client: misskey.Misskey) -> ParsedNotification | None: - '''Parses any notifications received by the bot''' + # We get the type of notification to filter the ones that we actually want + # to parse + if notification.type not in (NotificationType.MENTION, NotificationType.REPLY): + return # Ignore anything that isn't a mention + + # Return early if no post attached + if not notification.post: + return + + # We want the visibility to be related to the type that was received (so if + # people don't want to dump a bunch of notes on home they don't have to) + if notification.post.visibility != Visibility.SPECIFIED: + visibility = Visibility.HOME + else: + visibility = Visibility.SPECIFIED # Get the full Activitypub ID of the user - user = notification.get("user", {}) - username = user.get("username", "unknown") - host = user.get("host") - # Local users may not have a hostname attached - full_user = f"@{username}" if not host else f"@{username}@{host}" + full_user = notification.user.full_handle - note_obj = notification.get("note", {}) - note_text = note_obj.get("text") - note_id = note_obj.get("id") + note_text = notification.post.text + note_id = notification.post.id note = note_text.strip().lower() if note_text else "" # Split words into tokens @@ -44,9 +55,30 @@ def parse_notification( command = parts[1].lower() arguments = parts[2:] if len(parts) > 2 else [] - return { + # Create ParsedNotification object for the new response system + parsed_notification: ParsedNotification = { 'author': full_user, 'command': command, 'arguments': arguments, - 'note_obj': note_obj + 'note_obj': { + 'id': note_id, + 'text': note_text, + 'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else [] + } } + + # Generate response using the new system + response = generate_response(parsed_notification) + if not response: + return + + # Handle attachment URLs (convert to file IDs if needed) + file_ids = response['attachment_urls'] if response['attachment_urls'] else None + + fediverse_service.create_post( + text=response['message'], + reply_to_id=note_id, + visibility=visibility, + file_ids=file_ids + #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately + ) diff --git a/bot/pleroma_service.py b/bot/pleroma_service.py new file mode 100644 index 0000000..21320d1 --- /dev/null +++ b/bot/pleroma_service.py @@ -0,0 +1,188 @@ +from mastodon import Mastodon +from typing import List, Optional, Dict, Any, Union, BinaryIO +import io +import filetype +from fediverse_service import FediverseService +from fediverse_types import ( + FediverseNotification, FediversePost, FediverseUser, FediverseFile, + NotificationType, Visibility +) +import config + + +class PleromaService(FediverseService): + """Pleroma implementation of FediverseService using Mastodon.py""" + + def __init__(self): + self.client = Mastodon( + access_token=config.KEY, + api_base_url=config.INSTANCE + ) + + def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser: + """Convert Mastodon/Pleroma user data to FediverseUser""" + acct = user_data.get("acct", "") + if "@" in acct: + username, host = acct.split("@", 1) + else: + username = acct + host = None + + return FediverseUser( + id=str(user_data.get("id", "")), + username=username, + host=host, + display_name=user_data.get("display_name") + ) + + def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile: + """Convert Mastodon/Pleroma media attachment to FediverseFile""" + return FediverseFile( + id=str(file_data.get("id", "")), + url=file_data.get("url", ""), + type=file_data.get("type"), + name=file_data.get("description") + ) + + def _convert_mastodon_visibility(self, visibility: str) -> Visibility: + """Convert Mastodon/Pleroma visibility to our enum""" + visibility_map = { + "public": Visibility.PUBLIC, + "unlisted": Visibility.UNLISTED, + "private": Visibility.FOLLOWERS, + "direct": Visibility.DIRECT + } + return visibility_map.get(visibility, Visibility.PUBLIC) + + def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str: + """Convert our visibility enum to Mastodon/Pleroma visibility""" + visibility_map = { + Visibility.PUBLIC: "public", + Visibility.UNLISTED: "unlisted", + Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma + Visibility.FOLLOWERS: "private", + Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma + Visibility.DIRECT: "direct" + } + return visibility_map.get(visibility, "public") + + def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType: + """Convert Mastodon/Pleroma notification type to our enum""" + type_map = { + "mention": NotificationType.MENTION, + "follow": NotificationType.FOLLOW, + "favourite": NotificationType.FAVOURITE, + "reblog": NotificationType.REBLOG, + "poll": NotificationType.POLL + } + return type_map.get(notif_type, NotificationType.OTHER) + + def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost: + """Convert Mastodon/Pleroma status data to FediversePost""" + files = [] + if status_data.get("media_attachments"): + files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]] + + # Extract plain text from HTML content + content = status_data.get("content", "") + # Basic HTML stripping - in production you might want to use a proper HTML parser + import re + plain_text = re.sub(r'<[^>]+>', '', content) if content else None + + return FediversePost( + id=str(status_data.get("id", "")), + text=plain_text, + user=self._convert_mastodon_user(status_data.get("account", {})), + visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")), + created_at=status_data.get("created_at"), + files=files, + reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None + ) + + def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification: + """Convert Mastodon/Pleroma notification data to FediverseNotification""" + post = None + if notification_data.get("status"): + post = self._convert_mastodon_status(notification_data["status"]) + + return FediverseNotification( + id=str(notification_data.get("id", "")), + type=self._convert_mastodon_notification_type(notification_data.get("type", "")), + user=self._convert_mastodon_user(notification_data.get("account", {})), + post=post, + created_at=notification_data.get("created_at") + ) + + def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]: + """Get notifications from Pleroma instance""" + params = {} + if since_id: + params["since_id"] = since_id + + notifications = self.client.notifications(**params) + return [self._convert_mastodon_notification(notif) for notif in notifications] + + def create_post( + self, + text: str, + reply_to_id: Optional[str] = None, + visibility: Visibility = Visibility.HOME, + file_ids: Optional[List[str]] = None, + visible_user_ids: Optional[List[str]] = None + ) -> str: + """Create a post on Pleroma instance""" + params = { + "status": text, + "visibility": self._convert_to_mastodon_visibility(visibility) + } + + if reply_to_id: + params["in_reply_to_id"] = reply_to_id + + if file_ids: + params["media_ids"] = file_ids + + # Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids + # For direct messages, you typically mention users in the status text + + response = self.client.status_post(**params) + return str(response.get("id", "")) + + def get_post_by_id(self, post_id: str) -> Optional[FediversePost]: + """Get a specific post by ID from Pleroma instance""" + try: + status = self.client.status(post_id) + return self._convert_mastodon_status(status) + except Exception: + return None + + def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile: + """Upload a file to Pleroma instance""" + try: + # Convert file_data to bytes for MIME detection + if hasattr(file_data, 'read'): + # Check if we can seek back + try: + current_pos = file_data.tell() + file_bytes = file_data.read() + file_data.seek(current_pos) + file_data = io.BytesIO(file_bytes) + except (io.UnsupportedOperation, OSError): + # Non-seekable stream, already read all data + file_data = io.BytesIO(file_bytes) + else: + file_bytes = file_data + file_data = io.BytesIO(file_bytes) + + # Use filetype library for robust MIME detection + kind = filetype.guess(file_bytes) + if kind is not None: + mime_type = kind.mime + else: + # Fallback to image/jpeg if detection fails + mime_type = 'image/jpeg' + + media = self.client.media_post(file_data, mime_type=mime_type, description=filename) + return self._convert_mastodon_file(media) + except Exception as e: + raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e \ No newline at end of file diff --git a/example_config.ini b/example_config.ini index 8c18c28..1dd738e 100644 --- a/example_config.ini +++ b/example_config.ini @@ -5,6 +5,8 @@ DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"] ; SQLite Database location DatabaseLocation = ./gacha_game.db +; Instance type - either "misskey" or "pleroma" +InstanceType = misskey ; Web server port (default: 5000) WebPort = 5000 ; Web server bind address (default: 127.0.0.1, set to 0.0.0.0 to listen on all interfaces) @@ -40,4 +42,3 @@ User = @bot@example.tld ; API key for the bot ; Generate one by going to Settings > API > Generate access token Token = abcdefghijklmnopqrstuvwxyz012345 - diff --git a/readme.md b/readme.md index 7a939cc..7bedcea 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,6 @@ # Kemoverse -A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Currently designed for use with Misskey. Name comes from Kemonomimi and Fediverse. +A gacha-style bot for the Fediverse built with Python. Users can roll for characters, trade, duel, and perhaps engage with popularity-based mechanics. Supports both Misskey and Pleroma instances. Name comes from Kemonomimi and Fediverse.