193 lines
No EOL
7.9 KiB
Python
193 lines
No EOL
7.9 KiB
Python
from mastodon import Mastodon
|
|
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
|
import mimetypes
|
|
import io
|
|
from fediverse_service import FediverseService
|
|
from fediverse_types import (
|
|
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
|
NotificationType, Visibility
|
|
)
|
|
import config
|
|
|
|
|
|
class PleromaService(FediverseService):
|
|
"""Pleroma implementation of FediverseService using Mastodon.py"""
|
|
|
|
def __init__(self):
|
|
self.client = Mastodon(
|
|
access_token=config.KEY,
|
|
api_base_url=config.INSTANCE
|
|
)
|
|
|
|
def _convert_mastodon_user(self, user_data: Dict[str, Any]) -> FediverseUser:
|
|
"""Convert Mastodon/Pleroma user data to FediverseUser"""
|
|
acct = user_data.get("acct", "")
|
|
if "@" in acct:
|
|
username, host = acct.split("@", 1)
|
|
else:
|
|
username = acct
|
|
host = None
|
|
|
|
return FediverseUser(
|
|
id=str(user_data.get("id", "")),
|
|
username=username,
|
|
host=host,
|
|
display_name=user_data.get("display_name")
|
|
)
|
|
|
|
def _convert_mastodon_file(self, file_data: Dict[str, Any]) -> FediverseFile:
|
|
"""Convert Mastodon/Pleroma media attachment to FediverseFile"""
|
|
return FediverseFile(
|
|
id=str(file_data.get("id", "")),
|
|
url=file_data.get("url", ""),
|
|
type=file_data.get("type"),
|
|
name=file_data.get("description")
|
|
)
|
|
|
|
def _convert_mastodon_visibility(self, visibility: str) -> Visibility:
|
|
"""Convert Mastodon/Pleroma visibility to our enum"""
|
|
visibility_map = {
|
|
"public": Visibility.PUBLIC,
|
|
"unlisted": Visibility.UNLISTED,
|
|
"private": Visibility.FOLLOWERS,
|
|
"direct": Visibility.DIRECT
|
|
}
|
|
return visibility_map.get(visibility, Visibility.PUBLIC)
|
|
|
|
def _convert_to_mastodon_visibility(self, visibility: Visibility) -> str:
|
|
"""Convert our visibility enum to Mastodon/Pleroma visibility"""
|
|
visibility_map = {
|
|
Visibility.PUBLIC: "public",
|
|
Visibility.UNLISTED: "unlisted",
|
|
Visibility.HOME: "unlisted", # Map home to unlisted for Pleroma
|
|
Visibility.FOLLOWERS: "private",
|
|
Visibility.SPECIFIED: "direct", # Map specified to direct for Pleroma
|
|
Visibility.DIRECT: "direct"
|
|
}
|
|
return visibility_map.get(visibility, "public")
|
|
|
|
def _convert_mastodon_notification_type(self, notif_type: str) -> NotificationType:
|
|
"""Convert Mastodon/Pleroma notification type to our enum"""
|
|
type_map = {
|
|
"mention": NotificationType.MENTION,
|
|
"follow": NotificationType.FOLLOW,
|
|
"favourite": NotificationType.FAVOURITE,
|
|
"reblog": NotificationType.REBLOG,
|
|
"poll": NotificationType.POLL
|
|
}
|
|
return type_map.get(notif_type, NotificationType.OTHER)
|
|
|
|
def _convert_mastodon_status(self, status_data: Dict[str, Any]) -> FediversePost:
|
|
"""Convert Mastodon/Pleroma status data to FediversePost"""
|
|
files = []
|
|
if status_data.get("media_attachments"):
|
|
files = [self._convert_mastodon_file(f) for f in status_data["media_attachments"]]
|
|
|
|
# Extract plain text from HTML content
|
|
content = status_data.get("content", "")
|
|
# Basic HTML stripping - in production you might want to use a proper HTML parser
|
|
import re
|
|
plain_text = re.sub(r'<[^>]+>', '', content) if content else None
|
|
|
|
return FediversePost(
|
|
id=str(status_data.get("id", "")),
|
|
text=plain_text,
|
|
user=self._convert_mastodon_user(status_data.get("account", {})),
|
|
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
|
|
created_at=status_data.get("created_at"),
|
|
files=files,
|
|
reply_to_id=str(status_data["in_reply_to_id"]) if status_data.get("in_reply_to_id") else None
|
|
)
|
|
|
|
def _convert_mastodon_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
|
|
"""Convert Mastodon/Pleroma notification data to FediverseNotification"""
|
|
post = None
|
|
if notification_data.get("status"):
|
|
post = self._convert_mastodon_status(notification_data["status"])
|
|
|
|
return FediverseNotification(
|
|
id=str(notification_data.get("id", "")),
|
|
type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
|
|
user=self._convert_mastodon_user(notification_data.get("account", {})),
|
|
post=post,
|
|
created_at=notification_data.get("created_at")
|
|
)
|
|
|
|
def get_notifications(self, since_id: Optional[str] = None) -> List[FediverseNotification]:
|
|
"""Get notifications from Pleroma instance"""
|
|
params = {}
|
|
if since_id:
|
|
params["since_id"] = since_id
|
|
|
|
notifications = self.client.notifications(**params)
|
|
return [self._convert_mastodon_notification(notif) for notif in notifications]
|
|
|
|
def create_post(
|
|
self,
|
|
text: str,
|
|
reply_to_id: Optional[str] = None,
|
|
visibility: Visibility = Visibility.HOME,
|
|
file_ids: Optional[List[str]] = None,
|
|
visible_user_ids: Optional[List[str]] = None
|
|
) -> str:
|
|
"""Create a post on Pleroma instance"""
|
|
params = {
|
|
"status": text,
|
|
"visibility": self._convert_to_mastodon_visibility(visibility)
|
|
}
|
|
|
|
if reply_to_id:
|
|
params["in_reply_to_id"] = reply_to_id
|
|
|
|
if file_ids:
|
|
params["media_ids"] = file_ids
|
|
|
|
# Note: Pleroma/Mastodon doesn't have direct equivalent to visible_user_ids
|
|
# For direct messages, you typically mention users in the status text
|
|
|
|
response = self.client.status_post(**params)
|
|
return str(response.get("id", ""))
|
|
|
|
def get_post_by_id(self, post_id: str) -> Optional[FediversePost]:
|
|
"""Get a specific post by ID from Pleroma instance"""
|
|
try:
|
|
status = self.client.status(post_id)
|
|
return self._convert_mastodon_status(status)
|
|
except Exception:
|
|
return None
|
|
|
|
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
|
"""Upload a file to Pleroma instance"""
|
|
try:
|
|
# Convert file_data to bytes if it's a stream for MIME detection
|
|
if hasattr(file_data, 'read'):
|
|
# Check if we can seek back
|
|
try:
|
|
current_pos = file_data.tell()
|
|
header = file_data.read(8)
|
|
file_data.seek(current_pos)
|
|
except (io.UnsupportedOperation, OSError):
|
|
# Non-seekable stream, read all data
|
|
remaining_data = file_data.read()
|
|
file_bytes = header + remaining_data
|
|
file_data = io.BytesIO(file_bytes)
|
|
header = file_bytes[:8]
|
|
else:
|
|
header = file_data[:8] if len(file_data) >= 8 else file_data
|
|
|
|
# Determine mime type from file header
|
|
if header.startswith(b'\xff\xd8\xff'):
|
|
mime_type = 'image/jpeg'
|
|
elif header.startswith(b'\x89PNG\r\n\x1a\n'):
|
|
mime_type = 'image/png'
|
|
elif header.startswith(b'GIF8'):
|
|
mime_type = 'image/gif'
|
|
elif header.startswith(b'RIFF') and len(header) >= 8 and b'WEBP' in header:
|
|
mime_type = 'image/webp'
|
|
else:
|
|
mime_type = 'image/jpeg' # Default fallback
|
|
|
|
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
|
return self._convert_mastodon_file(media)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to upload file to Pleroma: {e}") from e |