use a library for content type detection instead of handrolled
This commit is contained in:
parent
89ae8a7290
commit
67b4d949fd
2 changed files with 15 additions and 19 deletions
|
@ -1,7 +1,7 @@
|
||||||
from mastodon import Mastodon
|
from mastodon import Mastodon
|
||||||
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
from typing import List, Optional, Dict, Any, Union, BinaryIO
|
||||||
import mimetypes
|
|
||||||
import io
|
import io
|
||||||
|
import filetype
|
||||||
from fediverse_service import FediverseService
|
from fediverse_service import FediverseService
|
||||||
from fediverse_types import (
|
from fediverse_types import (
|
||||||
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
FediverseNotification, FediversePost, FediverseUser, FediverseFile,
|
||||||
|
@ -159,33 +159,28 @@ class PleromaService(FediverseService):
|
||||||
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
def upload_file(self, file_data: Union[BinaryIO, bytes], filename: Optional[str] = None) -> FediverseFile:
|
||||||
"""Upload a file to Pleroma instance"""
|
"""Upload a file to Pleroma instance"""
|
||||||
try:
|
try:
|
||||||
# Convert file_data to bytes if it's a stream for MIME detection
|
# Convert file_data to bytes for MIME detection
|
||||||
if hasattr(file_data, 'read'):
|
if hasattr(file_data, 'read'):
|
||||||
# Check if we can seek back
|
# Check if we can seek back
|
||||||
try:
|
try:
|
||||||
current_pos = file_data.tell()
|
current_pos = file_data.tell()
|
||||||
header = file_data.read(8)
|
file_bytes = file_data.read()
|
||||||
file_data.seek(current_pos)
|
file_data.seek(current_pos)
|
||||||
except (io.UnsupportedOperation, OSError):
|
|
||||||
# Non-seekable stream, read all data
|
|
||||||
remaining_data = file_data.read()
|
|
||||||
file_bytes = header + remaining_data
|
|
||||||
file_data = io.BytesIO(file_bytes)
|
file_data = io.BytesIO(file_bytes)
|
||||||
header = file_bytes[:8]
|
except (io.UnsupportedOperation, OSError):
|
||||||
|
# Non-seekable stream, already read all data
|
||||||
|
file_data = io.BytesIO(file_bytes)
|
||||||
else:
|
else:
|
||||||
header = file_data[:8] if len(file_data) >= 8 else file_data
|
file_bytes = file_data
|
||||||
|
file_data = io.BytesIO(file_bytes)
|
||||||
|
|
||||||
# Determine mime type from file header
|
# Use filetype library for robust MIME detection
|
||||||
if header.startswith(b'\xff\xd8\xff'):
|
kind = filetype.guess(file_bytes)
|
||||||
mime_type = 'image/jpeg'
|
if kind is not None:
|
||||||
elif header.startswith(b'\x89PNG\r\n\x1a\n'):
|
mime_type = kind.mime
|
||||||
mime_type = 'image/png'
|
|
||||||
elif header.startswith(b'GIF8'):
|
|
||||||
mime_type = 'image/gif'
|
|
||||||
elif header.startswith(b'RIFF') and len(header) >= 8 and b'WEBP' in header:
|
|
||||||
mime_type = 'image/webp'
|
|
||||||
else:
|
else:
|
||||||
mime_type = 'image/jpeg' # Default fallback
|
# Fallback to image/jpeg if detection fails
|
||||||
|
mime_type = 'image/jpeg'
|
||||||
|
|
||||||
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
media = self.client.media_post(file_data, mime_type=mime_type, description=filename)
|
||||||
return self._convert_mastodon_file(media)
|
return self._convert_mastodon_file(media)
|
||||||
|
|
|
@ -7,3 +7,4 @@ MarkupSafe==3.0.2
|
||||||
Werkzeug==3.1.3
|
Werkzeug==3.1.3
|
||||||
Misskey.py==4.1.0
|
Misskey.py==4.1.0
|
||||||
Mastodon.py==1.8.1
|
Mastodon.py==1.8.1
|
||||||
|
filetype==1.2.0
|
||||||
|
|
Loading…
Add table
Reference in a new issue