Compare commits

...
Sign in to create a new pull request.

11 commits
master ... dev

9 changed files with 290 additions and 204 deletions

View file

@ -32,7 +32,7 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
raise ValueError("Image URL must be provided.")
# Download image
response = requests.get(image_url, stream=True)
response = requests.get(image_url, stream=True, timeout=30)
if response.status_code != 200:
raise RuntimeError(f"Failed to download image from {image_url}")
@ -55,9 +55,6 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
character_id = cur.lastrowid
return character_id, file_id
except Exception as e:
raise
finally:
if 'conn' in locals():
conn.close()

View file

@ -1,79 +1,14 @@
import time
import traceback
import misskey
from parsing import parse_notification
from db_utils import get_or_create_user, add_pull, get_config, set_config
import misskey as misskey
from client import client_connection
# Initialize the Misskey client
client = client_connection()
# Define your whitelist
# TODO: move to config
whitelisted_instances: list[str] = []
def stream_notifications():
print("Starting filtered notification stream...")
last_seen_id = get_config("last_seen_notif_id")
from config import NOTIFICATION_POLL_INTERVAL
from notification import process_notifications
if __name__ == '__main__':
# Initialize the Misskey client
client = client_connection()
print('Listening for notifications...')
while True:
try:
# May be able to mark notifications as read using misskey.py and
# filter them out here. This function also takes a since_id we
# could use as well
notifications = client.i_notifications()
if notifications:
# Oldest to newest
notifications.reverse()
new_last_seen_id = last_seen_id
for notification in notifications:
notif_id = notification.get("id")
# Skip old or same ID notifications
if last_seen_id is not None and notif_id <= last_seen_id:
continue
user = notification.get("user", {})
username = user.get("username", "unknown")
host = user.get("host") # None if local user
instance = host if host else "local"
if instance in whitelisted_instances or instance == "local":
note = notification.get("note", {}).get("text", "")
notif_type = notification.get("type", "unknown")
print(f"📨 [{notif_type}] from @{username}@{instance}")
print(f"💬 {note}")
print("-" * 30)
# 🧠 Send to the parser
parse_notification(notification,client)
else:
print(f"⚠️ Blocked notification from untrusted instance: {host}")
# Update only if this notif_id is greater
if new_last_seen_id is None or notif_id > new_last_seen_id:
new_last_seen_id = notif_id
# Save the latest seen ID
if new_last_seen_id and new_last_seen_id != last_seen_id:
set_config("last_seen_notif_id", new_last_seen_id)
last_seen_id = new_last_seen_id
time.sleep(5)
except Exception as e:
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
time.sleep(5)
if __name__ == "__main__":
stream_notifications()
if not process_notifications(client):
time.sleep(NOTIFICATION_POLL_INTERVAL)

View file

@ -4,19 +4,20 @@ config = configparser.ConfigParser()
config.read('config.ini')
# Username for the bot
USER = config['application']['BotUser']
USER = config['credentials']['User']
# API key for the bot
KEY = config['application']['ApiKey']
KEY = config['credentials']['Token']
# Bot's Misskey instance URL
INSTANCE = config['application']['InstanceUrl']
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
# Extra stuff for control of the bot
INSTANCE = config['credentials']['Instance']
# TODO: move this to db
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot'''
# to use extra admin exclusive commands with the bot
ADMINS = config['application']['DefaultAdmins']
# SQLite Database location
DB_PATH = config['application']['DatabaseLocation']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])

View file

@ -1,5 +1,4 @@
import sqlite3
import random
import config
DB_PATH = config.DB_PATH
@ -40,6 +39,17 @@ def add_pull(user_id, character_id):
conn.commit()
conn.close()
def get_last_rolled_at(user_id):
'''Gets the timestamp when the user last rolled'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
(user_id,))
row = cur.fetchone()
conn.close()
return row[0] if row else None
def get_config(key):
'''Reads the value for a specified config key from the db'''
conn = get_db_connection()

View file

@ -1,69 +0,0 @@
import random
from db_utils import get_or_create_user, add_pull, get_db_connection
from add_character import add_character
def get_character():
''' Gets a random character from the database'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM characters')
characters = cur.fetchall()
conn.close()
if not characters:
return None, None, None, None
weights = [c['weight'] for c in characters]
chosen = random.choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
def is_float(val):
try:
float(val)
return True
except ValueError:
return False
# TODO: See issue #3, separate command parsing from game logic.
def gacha_response(command,full_user, arguments,note_obj):
'''Parses a given command with arguments, processes the game state and
returns a response'''
if command == "roll":
user_id = get_or_create_user(full_user)
character_id, character_name, file_id, rarity = get_character()
if not character_id:
#TODO: Can't have tuples of a single element
# Return these as a dict or object instead.
return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿")
add_pull(user_id,character_id)
stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]])
if command == "create":
# Example call from bot logic
image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None
if not image_url:
return "You need an image to create a character, dumbass."
if len(arguments) != 3:
return "Please specify the following attributes in order: name, rarity, drop weighting"
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5"
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0"
character_id, file_id = add_character(
name=arguments[0],
rarity=int(arguments[1]),
weight=float(arguments[2]),
image_url=image_url
)
return([f"Added {arguments[0]}, ID {character_id}.",[file_id]])
return None

115
bot/notification.py Normal file
View file

@ -0,0 +1,115 @@
import traceback
from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification
from db_utils import get_config, set_config
from response import generate_response
# Define your whitelist
# TODO: move to config
WHITELISTED_INSTANCES: list[str] = []
def process_notification(client, notification):
'''Processes an individual notification'''
user = notification.get('user', {})
username = user.get('username', 'unknown')
host = user.get('host') # None if local user
instance = host if host else 'local'
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return
# Copy visibility of the post that was received when replying (so if people
# don't want to dump a bunch of notes on home they don't have to)
visibility = notification['note']['visibility']
if visibility != 'specified':
visibility = 'home'
notif_type = notification.get('type', 'unknown')
notif_id = notification.get('id')
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
# 🧠 Send to the parser
parsed_command = parse_notification(notification, client)
# Get the note Id to reply to
note_id = notification.get('note', {}).get('id')
# Get the response
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
# handle two cases here: either we have a response, or we don't.
# TODO: Return dictionaries instead of tuples. They handle multiple
# elements a lot better as they're not position dependent
response = generate_response(parsed_command)
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
elif response:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
def process_notifications(client):
'''Processes a batch of unread notifications. Returns False if there are
no more notifications to process.'''
last_seen_id = get_config('last_seen_notif_id')
# process_notification writes to last_seen_id, so make a copy
new_last_seen_id = last_seen_id
try:
notifications = client.i_notifications(
# Fetch notifications we haven't seen yet. This option is a bit
# tempermental, sometimes it'll include since_id, sometimes it
# won't. We need to keep track of what notifications we've
# already processed.
since_id=last_seen_id,
# Let misskey handle the filtering
include_types=['mention', 'reply'],
# And handle the batch size while we're at it
limit=NOTIFICATION_BATCH_SIZE
)
# No notifications. Wait the poll period.
if not notifications:
return False
# Iterate oldest to newest
for notification in notifications:
try:
# Skip if we've processed already
notif_id = notification.get('id')
if notif_id <= last_seen_id:
continue
# Update new_last_seen_id and process
new_last_seen_id = notif_id
process_notification(client, notification)
except Exception as e:
print(f'An exception has occured while processing a notification: {e}')
print(traceback.format_exc())
# If we got as many notifications as we requested, there are probably
# more in the queue
return len(notifications) == NOTIFICATION_BATCH_SIZE
except MisskeyAPIException as e:
print(f'An exception has occured while reading notifications: {e}\n')
print(traceback.format_exc())
finally:
# Quality jank right here, but finally lets us update the last_seen_id
# even if we hit an exception or return early
if new_last_seen_id > last_seen_id:
set_config('last_seen_notif_id', new_last_seen_id)
return False

View file

@ -1,23 +1,11 @@
import random, re
import config
from gacha_response import gacha_response
def parse_notification(notification,client):
'''Oarses any notifications received by the bot and sends any commands to
'''Parses any notifications received by the bot and sends any commands to
gacha_response()'''
# We get the type of notification to filter the ones that we actually want
# to parse
notif_type = notification.get("type")
if not notif_type in ('mention', 'reply'):
return # Ignore anything that isn't a mention
# We want the visibility to be related to the type that was received (so if
# people don't want to dump a bunch of notes on home they don't have to)
visibility = notification["note"]["visibility"]
if visibility != "specified":
visibility = "home"
# Get the full Activitypub ID of the user
user = notification.get("user", {})
@ -50,22 +38,4 @@ def parse_notification(notification,client):
command = parts[0].lower() if parts else None
arguments = parts[1:] if len(parts) > 1 else []
# TODO: move response generation to a different function
response = gacha_response(command.lower(),full_user, arguments, note_obj)
if not response:
return
if isinstance(response, str):
client.notes_create(
text=response,
reply_id=note_id,
visibility=visibility
)
else:
client.notes_create(
text=response[0],
reply_id=note_id,
visibility=visibility,
file_ids=response[1]
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
)
return [command,full_user, arguments, note_obj]

117
bot/response.py Normal file
View file

@ -0,0 +1,117 @@
import random
from datetime import datetime, timedelta, timezone
from db_utils import get_or_create_user, add_pull, get_db_connection, get_last_rolled_at
from add_character import add_character
from config import GACHA_ROLL_INTERVAL
def get_character():
''' Gets a random character from the database'''
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM characters')
characters = cur.fetchall()
conn.close()
if not characters:
return None, None, None, None
weights = [c['weight'] for c in characters]
chosen = random.choices(characters, weights=weights, k=1)[0]
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
def do_roll(full_user):
'''Determines whether the user can roll, then pulls a random character'''
user_id = get_or_create_user(full_user)
# Get date of user's last roll
date = get_last_rolled_at(user_id)
# No date means it's users first roll
if date:
# SQLite timestamps returned by the DB are always in UTC
# Below timestamps are to be converted to UTC
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z')
now = datetime.now(timezone.utc)
time_since_last_roll = now - prev
roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL)
duration = roll_interval - time_since_last_roll
# User needs to wait before they can roll again
if time_since_last_roll < roll_interval:
remaining_duration = None
if duration.seconds > 3600:
remaining_duration = f'{-(duration.seconds // -3600)} hours'
elif duration.seconds > 60:
remaining_duration = f'{-(duration.seconds // -60)} minutes'
else:
remaining_duration = f'{duration.seconds} seconds'
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
character_id, character_name, file_id, rarity = get_character()
if not character_id:
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
add_pull(user_id,character_id)
stars = '⭐️' * rarity
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
She's all yours now~ 💖✨",[file_id]])
def is_float(val):
'''Returns true if `val` can be converted to a float'''
try:
float(val)
return True
except ValueError:
return False
def do_create(full_user, arguments, note_obj):
'''Creates a character'''
# Example call from bot logic
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None
if not image_url:
return f'{full_user}{full_user} You need an image to create a character, dumbass.'
if len(arguments) != 3:
return '{full_user}Please specify the following attributes in order: \
name, rarity, drop weighting'
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5'
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \
must be a decimal value between 0.0 and 1.0'
character_id, file_id = add_character(
name=arguments[0],
rarity=int(arguments[1]),
weight=float(arguments[2]),
image_url=image_url
)
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
def do_help(full_user):
'''Provides a list of commands that the bot can do.'''
return f'{full_user} Here\'s what I can do:\n \
- `roll` Pulls a random character.\
- `create <name> <rarity> <weight>` Creates a character using a given image.\
- `help` Shows this message'
def generate_response(parsed_command):
'''Given a command with arguments, processes the game state and
returns a response'''
command, full_user, arguments, note_obj = parsed_command
match command:
case 'roll':
return do_roll(full_user)
case 'create':
return do_create(full_user, arguments, note_obj)
case 'help':
return do_help(command)
case _:
return None

View file

@ -1,17 +1,27 @@
; Rename me to config.ini and put your values in here
[application]
; Full fedi handle of the bot user
BotUser = @bot@example.tld
; API key for the bot
; Generate one by going to Settings > API > Generate access token
ApiKey = abcdefghijklmnopqrstuvwxyz012345
; Fully qualified URL of the instance hosting the bot
InstanceUrl = http://example.tld
; Comma separated list of fedi handles for any administrator users
; More can be added through the application
DefaultAdmins = ['admin@example.tld']
; SQLite Database location
DatabaseLocation = ./gacha_game.db
[gacha]
; Number of seconds players have to wait between rolls
RollInterval = 72000
[notification]
; Number of seconds to sleep while awaiting new notifications
PollInterval = 5
; Number of notifications to process at once (max 100)
BatchSize = 10
[credentials]
; Fully qualified URL of the instance hosting the bot
Instance = http://example.tld
; Full fedi handle of the bot user
User = @bot@example.tld
; API key for the bot
; Generate one by going to Settings > API > Generate access token
Token = abcdefghijklmnopqrstuvwxyz012345