completely untested refactor. bug fixes to come later.

This commit is contained in:
Moon 2025-06-12 11:22:20 +09:00
parent fe8e7d246f
commit dac05a3ed8
4 changed files with 153 additions and 69 deletions

View file

@ -1,32 +1,25 @@
import requests
from misskey.exceptions import MisskeyAPIException
from client import client_connection
from db_utils import insert_character
from custom_types import Character
from fediverse_factory import get_fediverse_service
from db_utils import get_db_connection
from config import RARITY_TO_WEIGHT
def add_character(
name: str,
rarity: int,
image_url: str) -> tuple[int, str]:
'''
Adds a character to the database, uploading the image from a public URL to
the bot's Misskey Drive.
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
"""
Adds a character to the database, uploading the image from a public URL to the Fediverse instance.
Args:
name (str): Character name.
rarity (int): Character rarity (e.g., 1-5).
image_url (str): Public URL of the image from the post (e.g., from
note['files'][i]['url']).
weight (float): Pull weight (e.g., 0.02).
image_url (str): Public URL of the image from the post.
Returns:
tuple[int, str]: Character ID and bot's Drive file_id.
tuple[int, str]: Character ID and file_id.
Raises:
ValueError: If inputs are invalid.
RuntimeError: If image download/upload or database operation fails.
'''
"""
stripped_name = name.strip()
@ -40,25 +33,34 @@ def add_character(
if not image_url:
raise ValueError('Image URL must be provided.')
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f'Failed to download image from {image_url}')
# Upload to bot's Drive
mk = client_connection()
try:
media = mk.drive_files_create(response.raw)
file_id = media['id']
except MisskeyAPIException as e:
raise RuntimeError(f'Failed to upload image to bot\'s Drive: {e}')\
from e
# Download image
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
# Insert into database
character_id = insert_character(
stripped_name,
rarity,
RARITY_TO_WEIGHT[rarity],
file_id
)
return character_id, file_id
# Upload to Fediverse instance
fediverse_service = get_fediverse_service()
try:
uploaded_file = fediverse_service.upload_file(response.raw)
file_id = uploaded_file.id
except RuntimeError as e:
raise RuntimeError(f"Failed to upload image: {e}") from e
# Insert into database
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
(stripped_name, rarity, float(weight), file_id)
)
conn.commit()
character_id = cur.lastrowid
return character_id, file_id
except Exception as e:
raise
finally:
if 'conn' in locals():
conn.close()

View file

@ -1,18 +1,67 @@
import time
import misskey as misskey
from client import client_connection
import db_utils as db
import traceback
from parsing import parse_notification
from db_utils import get_config, set_config
from fediverse_factory import get_fediverse_service
import config
from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications
def stream_notifications():
# Initialize the Fediverse service
fediverse_service = get_fediverse_service()
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
# Connect to DB
db.connect()
# Get the last seen notification ID from the database
last_seen_id = get_config("last_seen_notif_id")
whitelisted_instances = getattr(config, 'WHITELISTED_INSTANCES', [])
print('Listening for notifications...')
while True:
if not process_notifications(client):
time.sleep(NOTIFICATION_POLL_INTERVAL)
try:
# Get notifications from the fediverse service
notifications = fediverse_service.get_notifications(since_id=last_seen_id)
if notifications:
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.id
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
username = notification.user.username
host = notification.user.host
instance = host if host else "local"
if instance in whitelisted_instances or instance == "local":
note = notification.post.text if notification.post else ""
notif_type = notification.type.value
print(f"📨 [{notif_type}] from @{username}@{instance}")
print(f"💬 {note}")
print("-" * 30)
# 🧠 Send to the parser
parse_notification(notification, fediverse_service)
else:
print(f"⚠️ Blocked notification from untrusted instance: {host}")
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()

View file

@ -1,27 +1,38 @@
import re
from typing import Dict, Any
import misskey
import config
from response import generate_response
from fediverse_factory import get_fediverse_service
from fediverse_types import FediverseNotification, NotificationType, Visibility
from custom_types import ParsedNotification
def parse_notification(notification: FediverseNotification, fediverse_service=None):
'''Parses any notifications received by the bot and sends any commands to
generate_response()'''
def parse_notification(
notification: Dict[str, Any],
client: misskey.Misskey) -> ParsedNotification | None:
'''Parses any notifications received by the bot'''
if fediverse_service is None:
fediverse_service = get_fediverse_service()
# We get the type of notification to filter the ones that we actually want
# to parse
if notification.type not in (NotificationType.MENTION, NotificationType.REPLY):
return # Ignore anything that isn't a mention
# Return early if no post attached
if not notification.post:
return
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
if notification.post.visibility != Visibility.SPECIFIED:
visibility = Visibility.HOME
else:
visibility = Visibility.SPECIFIED
# Get the full Activitypub ID of the user
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host")
# Local users may not have a hostname attached
full_user = f"@{username}" if not host else f"@{username}@{host}"
full_user = notification.user.full_handle
note_obj = notification.get("note", {})
note_text = note_obj.get("text")
note_id = note_obj.get("id")
note_text = notification.post.text
note_id = notification.post.id
note = note_text.strip().lower() if note_text else ""
@ -44,9 +55,30 @@ def parse_notification(
command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else []
return {
# Create ParsedNotification object for the new response system
parsed_notification: ParsedNotification = {
'author': full_user,
'command': command,
'arguments': arguments,
'note_obj': note_obj
'note_obj': {
'id': note_id,
'text': note_text,
'files': [{'url': f.url} for f in notification.post.files] if notification.post.files else []
}
}
# Generate response using the new system
response = generate_response(parsed_notification)
if not response:
return
# Handle attachment URLs (convert to file IDs if needed)
file_ids = response['attachment_urls'] if response['attachment_urls'] else None
fediverse_service.create_post(
text=response['message'],
reply_to_id=note_id,
visibility=visibility,
file_ids=file_ids
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)

View file

@ -103,10 +103,10 @@ dumbass.',
'attachment_urls': None
}
if len(arguments) != 2:
if len(arguments) != 3:
return {
'message': f'{author} Please specify the following attributes \
in order: name, rarity',
in order: name, rarity, weight',
'attachment_urls': None
}
@ -126,6 +126,7 @@ must be a decimal value between 0.0 and 1.0',
character_id, file_id = add_character(
name=arguments[0],
rarity=int(arguments[1]),
weight=float(arguments[2]),
image_url=image_url
)
return {