forked from waifu/kemoverse
Compare commits
23 commits
master
...
stats_syst
Author | SHA1 | Date | |
---|---|---|---|
9d98299264 | |||
3b7f2006ef | |||
471f399176 | |||
d210f44efc | |||
bdd2f20b84 | |||
0690ac5212 | |||
e8774cb8bd | |||
d9027356ab | |||
4a7c9239fc | |||
631b66e003 | |||
3a2033a025 | |||
74e4c86d02 | |||
237f17f40d | |||
8a331e0c7b | |||
47272aee4f | |||
11c9cf58a3 | |||
ff20e26821 | |||
e53bd87d81 | |||
9be92afce3 | |||
d2a7e523e8 | |||
45dd6f10e3 | |||
24309ce900 | |||
6b6777cb89 |
10 changed files with 402 additions and 268 deletions
|
@ -1,11 +1,12 @@
|
|||
import requests
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
from client import client_connection
|
||||
from db_utils import get_db_connection
|
||||
from db_utils import insert_character
|
||||
|
||||
def add_character(name: str, rarity: int, weight: float, image_url: str) -> tuple[int, str]:
|
||||
"""
|
||||
Adds a character to the database, uploading the image from a public URL to the bot's Misskey Drive.
|
||||
Adds a character to the database, uploading the image from a public URL to
|
||||
the bot's Misskey Drive.
|
||||
|
||||
Args:
|
||||
name (str): Character name.
|
||||
|
@ -20,44 +21,29 @@ def add_character(name: str, rarity: int, weight: float, image_url: str) -> tupl
|
|||
ValueError: If inputs are invalid.
|
||||
RuntimeError: If image download/upload or database operation fails.
|
||||
"""
|
||||
# Validate inputs
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Character name cannot be empty.")
|
||||
if not isinstance(rarity, int) or rarity < 1:
|
||||
raise ValueError("Rarity must be a positive integer.")
|
||||
if not isinstance(weight, (int, float)) or weight <= 0:
|
||||
raise ValueError("Weight must be a positive number.")
|
||||
if not image_url:
|
||||
raise ValueError("Image URL must be provided.")
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True, timeout=30)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
try:
|
||||
# Validate inputs
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Character name cannot be empty.")
|
||||
if not isinstance(rarity, int) or rarity < 1:
|
||||
raise ValueError("Rarity must be a positive integer.")
|
||||
if not isinstance(weight, (int, float)) or weight <= 0:
|
||||
raise ValueError("Weight must be a positive number.")
|
||||
if not image_url:
|
||||
raise ValueError("Image URL must be provided.")
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media["id"]
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
||||
|
||||
# Download image
|
||||
response = requests.get(image_url, stream=True)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"Failed to download image from {image_url}")
|
||||
|
||||
# Upload to bot's Drive
|
||||
mk = client_connection()
|
||||
try:
|
||||
media = mk.drive_files_create(response.raw)
|
||||
file_id = media["id"]
|
||||
except MisskeyAPIException as e:
|
||||
raise RuntimeError(f"Failed to upload image to bot's Drive: {e}") from e
|
||||
|
||||
# Insert into database
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
||||
(name.strip(), rarity, float(weight), file_id)
|
||||
)
|
||||
conn.commit()
|
||||
character_id = cur.lastrowid
|
||||
|
||||
return character_id, file_id
|
||||
|
||||
except Exception as e:
|
||||
raise
|
||||
finally:
|
||||
if 'conn' in locals():
|
||||
conn.close()
|
||||
# Insert into database
|
||||
character_id = insert_character(name.strip(), rarity, float(weight), file_id)
|
||||
return character_id, file_id
|
||||
|
|
|
@ -1,79 +1,18 @@
|
|||
import time
|
||||
import traceback
|
||||
import misskey
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_or_create_user, add_pull, get_config, set_config
|
||||
import misskey as misskey
|
||||
from client import client_connection
|
||||
import db_utils as db
|
||||
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
from config import NOTIFICATION_POLL_INTERVAL
|
||||
from notification import process_notifications
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
whitelisted_instances: list[str] = []
|
||||
|
||||
def stream_notifications():
|
||||
print("Starting filtered notification stream...")
|
||||
|
||||
last_seen_id = get_config("last_seen_notif_id")
|
||||
if __name__ == '__main__':
|
||||
# Initialize the Misskey client
|
||||
client = client_connection()
|
||||
# Connect to DB
|
||||
db.connect()
|
||||
|
||||
print('Listening for notifications...')
|
||||
while True:
|
||||
try:
|
||||
# May be able to mark notifications as read using misskey.py and
|
||||
# filter them out here. This function also takes a since_id we
|
||||
# could use as well
|
||||
notifications = client.i_notifications()
|
||||
|
||||
if notifications:
|
||||
# Oldest to newest
|
||||
notifications.reverse()
|
||||
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
for notification in notifications:
|
||||
notif_id = notification.get("id")
|
||||
|
||||
# Skip old or same ID notifications
|
||||
if last_seen_id is not None and notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
user = notification.get("user", {})
|
||||
username = user.get("username", "unknown")
|
||||
host = user.get("host") # None if local user
|
||||
|
||||
instance = host if host else "local"
|
||||
|
||||
if instance in whitelisted_instances or instance == "local":
|
||||
note = notification.get("note", {}).get("text", "")
|
||||
notif_type = notification.get("type", "unknown")
|
||||
|
||||
print(f"📨 [{notif_type}] from @{username}@{instance}")
|
||||
print(f"💬 {note}")
|
||||
print("-" * 30)
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parse_notification(notification,client)
|
||||
|
||||
else:
|
||||
print(f"⚠️ Blocked notification from untrusted instance: {host}")
|
||||
|
||||
# Update only if this notif_id is greater
|
||||
if new_last_seen_id is None or notif_id > new_last_seen_id:
|
||||
new_last_seen_id = notif_id
|
||||
|
||||
# Save the latest seen ID
|
||||
if new_last_seen_id and new_last_seen_id != last_seen_id:
|
||||
set_config("last_seen_notif_id", new_last_seen_id)
|
||||
last_seen_id = new_last_seen_id
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An exception has occured: {e}\n{traceback.format_exc()}")
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
stream_notifications()
|
||||
if not process_notifications(client):
|
||||
time.sleep(NOTIFICATION_POLL_INTERVAL)
|
||||
|
|
|
@ -4,19 +4,20 @@ config = configparser.ConfigParser()
|
|||
config.read('config.ini')
|
||||
|
||||
# Username for the bot
|
||||
USER = config['application']['BotUser']
|
||||
|
||||
USER = config['credentials']['User']
|
||||
# API key for the bot
|
||||
KEY = config['application']['ApiKey']
|
||||
KEY = config['credentials']['Token']
|
||||
# Bot's Misskey instance URL
|
||||
INSTANCE = config['application']['InstanceUrl']
|
||||
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
|
||||
# Extra stuff for control of the bot
|
||||
INSTANCE = config['credentials']['Instance']
|
||||
|
||||
# TODO: move this to db
|
||||
# Fedi handles in the traditional 'user@domain.tld' style, allows these users
|
||||
# to use extra admin exclusive commands with the bot'''
|
||||
ADMINS = config['application']['DefaultAdmins']
|
||||
# to use extra admin exclusive commands with the bot
|
||||
ADMINS = config['application']['DefaultAdmins']
|
||||
# SQLite Database location
|
||||
DB_PATH = config['application']['DatabaseLocation']
|
||||
|
||||
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
|
||||
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])
|
||||
|
||||
GACHA_ROLL_INTERVAL = int(config['gacha']['RollInterval'])
|
||||
|
|
130
bot/db_utils.py
130
bot/db_utils.py
|
@ -1,58 +1,128 @@
|
|||
from random import choices
|
||||
import sqlite3
|
||||
import random
|
||||
import config
|
||||
|
||||
DB_PATH = config.DB_PATH
|
||||
CONNECTION: sqlite3.Connection
|
||||
CURSOR: sqlite3.Cursor
|
||||
|
||||
def get_db_connection():
|
||||
def connect() -> None:
|
||||
'''Creates a connection to the database'''
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
print('Connecting to the database...')
|
||||
global CONNECTION
|
||||
global CURSOR
|
||||
CONNECTION = sqlite3.connect(DB_PATH, autocommit=True)
|
||||
CONNECTION.row_factory = sqlite3.Row
|
||||
CURSOR = CONNECTION.cursor()
|
||||
|
||||
def get_random_character():
|
||||
''' Gets a random character from the database'''
|
||||
CURSOR.execute('SELECT * FROM characters')
|
||||
characters = CURSOR.fetchall()
|
||||
|
||||
if not characters:
|
||||
return None, None, None, None
|
||||
|
||||
weights = [c['weight'] for c in characters]
|
||||
chosen = choices(characters, weights=weights, k=1)[0]
|
||||
|
||||
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
||||
|
||||
|
||||
# User functions
|
||||
|
||||
def get_or_create_user(username):
|
||||
'''Retrieves an ID for a given user, if the user does not exist, it will be
|
||||
created.'''
|
||||
conn = get_db_connection()
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
cur.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||
user = cur.fetchone()
|
||||
CURSOR.execute('SELECT id FROM users WHERE username = ?', (username,))
|
||||
user = CURSOR.fetchone()
|
||||
if user:
|
||||
conn.close()
|
||||
return user[0]
|
||||
|
||||
# New user starts with has_rolled = False
|
||||
cur.execute(
|
||||
CURSOR.execute(
|
||||
'INSERT INTO users (username, has_rolled) VALUES (?, ?)',
|
||||
(username, False)
|
||||
)
|
||||
conn.commit()
|
||||
user_id = cur.lastrowid
|
||||
conn.close()
|
||||
user_id = CURSOR.lastrowid
|
||||
return user_id
|
||||
|
||||
def add_pull(user_id, character_id):
|
||||
def insert_character(name: str, rarity: int, weight: float, file_id: str, stats: dict) -> int:
|
||||
'''Inserts a character'''
|
||||
CURSOR.execute(
|
||||
'INSERT INTO characters (name, rarity, weight, file_id) VALUES (?, ?, ?, ?)',
|
||||
(name, rarity, weight, file_id)
|
||||
)
|
||||
character_id = CURSOR.lastrowid
|
||||
|
||||
# Insert stats
|
||||
columns = ', '.join(stats.keys())
|
||||
placeholders = ', '.join(['?'] * len(stats))
|
||||
updates = ', '.join([f"{col}=excluded.{col}" for col in stats.keys()])
|
||||
values = list(stats.values())
|
||||
|
||||
sql = f'''
|
||||
INSERT INTO character_stats (character_id, {columns})
|
||||
VALUES (?, {placeholders})
|
||||
ON CONFLICT(character_id) DO UPDATE SET {updates}
|
||||
'''
|
||||
CURSOR.execute(sql, [character_id] + values)
|
||||
|
||||
return character_id if character_id else 0
|
||||
|
||||
def insert_pull(user_id, character_id):
|
||||
'''Creates a pull in the database'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute('INSERT INTO pulls (user_id, character_id) VALUES (?, ?)', (user_id, character_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
CURSOR.execute(
|
||||
'INSERT INTO pulls (user_id, character_id) VALUES (?, ?)',
|
||||
(user_id, character_id)
|
||||
)
|
||||
|
||||
def get_last_rolled_at(user_id):
|
||||
'''Gets the timestamp when the user last rolled'''
|
||||
CURSOR.execute("SELECT timestamp FROM pulls WHERE user_id = ? ORDER BY timestamp DESC", \
|
||||
(user_id,))
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
# Configuration
|
||||
|
||||
def get_config(key):
|
||||
'''Reads the value for a specified config key from the db'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))
|
||||
row = CURSOR.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
def set_config(key, value):
|
||||
'''Writes the value for a specified config key to the db'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute("INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", (key, value))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
# Character stat functions
|
||||
|
||||
def get_characters(character_ids):
|
||||
'''
|
||||
Retrieves stats for a list of character IDs.
|
||||
Returns a dictionary of character_id -> {stat_name: value, ...}
|
||||
'''
|
||||
if not character_ids:
|
||||
return {}
|
||||
|
||||
placeholders = ','.join('?' for _ in character_ids)
|
||||
query = f'''
|
||||
SELECT *
|
||||
FROM character_stats
|
||||
WHERE character_id IN ({placeholders})
|
||||
'''
|
||||
|
||||
|
||||
CURSOR.execute(query, character_ids)
|
||||
rows = cur.fetchall()
|
||||
col_names = [desc[0] for desc in cur.description]
|
||||
|
||||
result = {}
|
||||
for row in rows:
|
||||
character_id = row[0]
|
||||
stats = dict(zip(col_names[1:], row[1:])) # Skip character_id
|
||||
result[character_id] = stats
|
||||
|
||||
return result
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
import random
|
||||
from db_utils import get_or_create_user, add_pull, get_db_connection
|
||||
from add_character import add_character
|
||||
|
||||
def get_character():
|
||||
''' Gets a random character from the database'''
|
||||
conn = get_db_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute('SELECT * FROM characters')
|
||||
characters = cur.fetchall()
|
||||
conn.close()
|
||||
|
||||
if not characters:
|
||||
return None, None, None, None
|
||||
|
||||
weights = [c['weight'] for c in characters]
|
||||
chosen = random.choices(characters, weights=weights, k=1)[0]
|
||||
|
||||
return chosen['id'], chosen['name'], chosen['file_id'], chosen['rarity']
|
||||
|
||||
def is_float(val):
|
||||
try:
|
||||
float(val)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
# TODO: See issue #3, separate command parsing from game logic.
|
||||
def gacha_response(command,full_user, arguments,note_obj):
|
||||
'''Parses a given command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
if command == "roll":
|
||||
user_id = get_or_create_user(full_user)
|
||||
character_id, character_name, file_id, rarity = get_character()
|
||||
|
||||
if not character_id:
|
||||
#TODO: Can't have tuples of a single element
|
||||
# Return these as a dict or object instead.
|
||||
return(f"@{full_user} Uwaaa... something went wrong! No characters found. 😿")
|
||||
|
||||
add_pull(user_id,character_id)
|
||||
stars = '⭐️' * rarity
|
||||
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\nShe's all yours now~ 💖✨",[file_id]])
|
||||
|
||||
if command == "create":
|
||||
# Example call from bot logic
|
||||
image_url = note_obj.get("files", [{}])[0].get("url") if note_obj.get("files") else None
|
||||
if not image_url:
|
||||
return "You need an image to create a character, dumbass."
|
||||
|
||||
if len(arguments) != 3:
|
||||
return "Please specify the following attributes in order: name, rarity, drop weighting"
|
||||
|
||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||
return f"Invalid rarity: '{arguments[1]}' must be a number between 1 and 5"
|
||||
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return f"Invalid drop weight: '{arguments[2]}' must be a decimal value between 0.0 and 1.0"
|
||||
|
||||
character_id, file_id = add_character(
|
||||
name=arguments[0],
|
||||
rarity=int(arguments[1]),
|
||||
weight=float(arguments[2]),
|
||||
image_url=image_url
|
||||
)
|
||||
return([f"Added {arguments[0]}, ID {character_id}.",[file_id]])
|
||||
return None
|
115
bot/notification.py
Normal file
115
bot/notification.py
Normal file
|
@ -0,0 +1,115 @@
|
|||
import traceback
|
||||
from misskey.exceptions import MisskeyAPIException
|
||||
|
||||
from config import NOTIFICATION_BATCH_SIZE
|
||||
from parsing import parse_notification
|
||||
from db_utils import get_config, set_config
|
||||
from response import generate_response
|
||||
|
||||
# Define your whitelist
|
||||
# TODO: move to config
|
||||
WHITELISTED_INSTANCES: list[str] = []
|
||||
|
||||
def process_notification(client, notification):
|
||||
'''Processes an individual notification'''
|
||||
user = notification.get('user', {})
|
||||
username = user.get('username', 'unknown')
|
||||
host = user.get('host') # None if local user
|
||||
instance = host if host else 'local'
|
||||
|
||||
if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
|
||||
print(f'⚠️ Blocked notification from untrusted instance: {instance}')
|
||||
return
|
||||
|
||||
# Copy visibility of the post that was received when replying (so if people
|
||||
# don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification['note']['visibility']
|
||||
if visibility != 'specified':
|
||||
visibility = 'home'
|
||||
|
||||
notif_type = notification.get('type', 'unknown')
|
||||
notif_id = notification.get('id')
|
||||
print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}')
|
||||
|
||||
# 🧠 Send to the parser
|
||||
parsed_command = parse_notification(notification, client)
|
||||
|
||||
# Get the note Id to reply to
|
||||
note_id = notification.get('note', {}).get('id')
|
||||
|
||||
# Get the response
|
||||
# TODO: Formalize exactly *what* is returned by this. Ideally just want to
|
||||
# handle two cases here: either we have a response, or we don't.
|
||||
# TODO: Return dictionaries instead of tuples. They handle multiple
|
||||
# elements a lot better as they're not position dependent
|
||||
response = generate_response(parsed_command)
|
||||
if isinstance(response, str):
|
||||
client.notes_create(
|
||||
text=response,
|
||||
reply_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
elif response:
|
||||
client.notes_create(
|
||||
text=response[0],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
|
||||
def process_notifications(client):
|
||||
'''Processes a batch of unread notifications. Returns False if there are
|
||||
no more notifications to process.'''
|
||||
|
||||
last_seen_id = get_config('last_seen_notif_id')
|
||||
# process_notification writes to last_seen_id, so make a copy
|
||||
new_last_seen_id = last_seen_id
|
||||
|
||||
try:
|
||||
notifications = client.i_notifications(
|
||||
# Fetch notifications we haven't seen yet. This option is a bit
|
||||
# tempermental, sometimes it'll include since_id, sometimes it
|
||||
# won't. We need to keep track of what notifications we've
|
||||
# already processed.
|
||||
since_id=last_seen_id,
|
||||
# Let misskey handle the filtering
|
||||
include_types=['mention', 'reply'],
|
||||
# And handle the batch size while we're at it
|
||||
limit=NOTIFICATION_BATCH_SIZE
|
||||
)
|
||||
|
||||
# No notifications. Wait the poll period.
|
||||
if not notifications:
|
||||
return False
|
||||
|
||||
# Iterate oldest to newest
|
||||
for notification in notifications:
|
||||
try:
|
||||
# Skip if we've processed already
|
||||
notif_id = notification.get('id')
|
||||
if notif_id <= last_seen_id:
|
||||
continue
|
||||
|
||||
# Update new_last_seen_id and process
|
||||
new_last_seen_id = notif_id
|
||||
process_notification(client, notification)
|
||||
|
||||
except Exception as e:
|
||||
print(f'An exception has occured while processing a notification: {e}')
|
||||
print(traceback.format_exc())
|
||||
|
||||
# If we got as many notifications as we requested, there are probably
|
||||
# more in the queue
|
||||
return len(notifications) == NOTIFICATION_BATCH_SIZE
|
||||
|
||||
except MisskeyAPIException as e:
|
||||
print(f'An exception has occured while reading notifications: {e}\n')
|
||||
print(traceback.format_exc())
|
||||
finally:
|
||||
# Quality jank right here, but finally lets us update the last_seen_id
|
||||
# even if we hit an exception or return early
|
||||
if new_last_seen_id > last_seen_id:
|
||||
set_config('last_seen_notif_id', new_last_seen_id)
|
||||
|
||||
return False
|
|
@ -1,23 +1,11 @@
|
|||
import random, re
|
||||
import config
|
||||
from gacha_response import gacha_response
|
||||
|
||||
def parse_notification(notification,client):
|
||||
'''Oarses any notifications received by the bot and sends any commands to
|
||||
'''Parses any notifications received by the bot and sends any commands to
|
||||
gacha_response()'''
|
||||
|
||||
# We get the type of notification to filter the ones that we actually want
|
||||
# to parse
|
||||
|
||||
notif_type = notification.get("type")
|
||||
if not notif_type in ('mention', 'reply'):
|
||||
return # Ignore anything that isn't a mention
|
||||
|
||||
# We want the visibility to be related to the type that was received (so if
|
||||
# people don't want to dump a bunch of notes on home they don't have to)
|
||||
visibility = notification["note"]["visibility"]
|
||||
if visibility != "specified":
|
||||
visibility = "home"
|
||||
|
||||
|
||||
# Get the full Activitypub ID of the user
|
||||
user = notification.get("user", {})
|
||||
|
@ -50,22 +38,4 @@ def parse_notification(notification,client):
|
|||
command = parts[0].lower() if parts else None
|
||||
arguments = parts[1:] if len(parts) > 1 else []
|
||||
|
||||
# TODO: move response generation to a different function
|
||||
response = gacha_response(command.lower(),full_user, arguments, note_obj)
|
||||
if not response:
|
||||
return
|
||||
|
||||
if isinstance(response, str):
|
||||
client.notes_create(
|
||||
text=response,
|
||||
reply_id=note_id,
|
||||
visibility=visibility
|
||||
)
|
||||
else:
|
||||
client.notes_create(
|
||||
text=response[0],
|
||||
reply_id=note_id,
|
||||
visibility=visibility,
|
||||
file_ids=response[1]
|
||||
#visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately
|
||||
)
|
||||
return [command,full_user, arguments, note_obj]
|
||||
|
|
100
bot/response.py
Normal file
100
bot/response.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from db_utils import get_or_create_user, insert_pull, get_last_rolled_at, get_random_character
|
||||
from add_character import add_character
|
||||
from config import GACHA_ROLL_INTERVAL
|
||||
|
||||
def do_roll(full_user):
|
||||
'''Determines whether the user can roll, then pulls a random character'''
|
||||
user_id = get_or_create_user(full_user)
|
||||
|
||||
# Get date of user's last roll
|
||||
date = get_last_rolled_at(user_id)
|
||||
|
||||
# No date means it's users first roll
|
||||
if date:
|
||||
# SQLite timestamps returned by the DB are always in UTC
|
||||
# Below timestamps are to be converted to UTC
|
||||
prev = datetime.strptime(date + '+0000', '%Y-%m-%d %H:%M:%S%z')
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
time_since_last_roll = now - prev
|
||||
roll_interval = timedelta(seconds=GACHA_ROLL_INTERVAL)
|
||||
duration = roll_interval - time_since_last_roll
|
||||
|
||||
# User needs to wait before they can roll again
|
||||
if time_since_last_roll < roll_interval:
|
||||
remaining_duration = None
|
||||
if duration.seconds > 3600:
|
||||
remaining_duration = f'{-(duration.seconds // -3600)} hours'
|
||||
elif duration.seconds > 60:
|
||||
remaining_duration = f'{-(duration.seconds // -60)} minutes'
|
||||
else:
|
||||
remaining_duration = f'{duration.seconds} seconds'
|
||||
|
||||
return f'{full_user} ⏱️ Please wait another {remaining_duration} before rolling again.'
|
||||
|
||||
character_id, character_name, file_id, rarity = get_random_character()
|
||||
|
||||
if not character_id:
|
||||
return f'{full_user} Uwaaa... something went wrong! No characters found. 😿'
|
||||
|
||||
insert_pull(user_id,character_id)
|
||||
stars = '⭐️' * rarity
|
||||
return([f"@{full_user} 🎲 Congrats! You rolled {stars} **{character_name}**\n\
|
||||
She's all yours now~ 💖✨",[file_id]])
|
||||
|
||||
def is_float(val):
|
||||
'''Returns true if `val` can be converted to a float'''
|
||||
try:
|
||||
float(val)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def do_create(full_user, arguments, note_obj):
|
||||
'''Creates a character'''
|
||||
# Example call from bot logic
|
||||
image_url = note_obj.get('files', [{}])[0].get('url') if note_obj.get('files') else None
|
||||
if not image_url:
|
||||
return f'{full_user}{full_user} You need an image to create a character, dumbass.'
|
||||
|
||||
if len(arguments) != 3:
|
||||
return '{full_user}Please specify the following attributes in order: \
|
||||
name, rarity, drop weighting'
|
||||
|
||||
if not (arguments[1].isnumeric() and 1 <= int(arguments[1]) <= 5):
|
||||
return f'{full_user}Invalid rarity: \'{arguments[1]}\' must be a number between 1 and 5'
|
||||
|
||||
if not (is_float(arguments[2]) and 0.0 < float(arguments[2]) <= 1.0):
|
||||
return f'{full_user}Invalid drop weight: \'{arguments[2]}\' \
|
||||
must be a decimal value between 0.0 and 1.0'
|
||||
|
||||
character_id, file_id = add_character(
|
||||
name=arguments[0],
|
||||
rarity=int(arguments[1]),
|
||||
weight=float(arguments[2]),
|
||||
image_url=image_url
|
||||
)
|
||||
return([f'{full_user}Added {arguments[0]}, ID {character_id}.',[file_id]])
|
||||
|
||||
def do_help(full_user):
|
||||
'''Provides a list of commands that the bot can do.'''
|
||||
return f'{full_user} Here\'s what I can do:\n \
|
||||
- `roll` Pulls a random character.\
|
||||
- `create <name> <rarity> <weight>` Creates a character using a given image.\
|
||||
- `help` Shows this message'
|
||||
|
||||
def generate_response(parsed_command):
|
||||
'''Given a command with arguments, processes the game state and
|
||||
returns a response'''
|
||||
|
||||
command, full_user, arguments, note_obj = parsed_command
|
||||
match command:
|
||||
case 'roll':
|
||||
return do_roll(full_user)
|
||||
case 'create':
|
||||
return do_create(full_user, arguments, note_obj)
|
||||
case 'help':
|
||||
return do_help(command)
|
||||
case _:
|
||||
return None
|
12
db.py
12
db.py
|
@ -41,6 +41,18 @@ cursor.execute("""
|
|||
)
|
||||
""")
|
||||
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS character_stats (
|
||||
character_id INTEGER PRIMARY KEY,
|
||||
power INTEGER NOT NULL DEFAULT abs(random() % 9999),
|
||||
charm INTEGER NOT NULL DEFAULT abs(random() % 9999),,
|
||||
FOREIGN KEY(character_id) REFERENCES characters(id)
|
||||
)
|
||||
''')
|
||||
|
||||
# Initialize essential config key
|
||||
cursor.execute('INSERT INTO config VALUES ("last_seen_notif_id", 0)')
|
||||
|
||||
""" # Insert example characters into the database if they don't already exist
|
||||
characters = [
|
||||
('Murakami-san', 1, 0.35),
|
||||
|
|
|
@ -1,17 +1,27 @@
|
|||
; Rename me to config.ini and put your values in here
|
||||
[application]
|
||||
; Full fedi handle of the bot user
|
||||
BotUser = @bot@example.tld
|
||||
; Comma separated list of fedi handles for any administrator users
|
||||
; More can be added through the application
|
||||
DefaultAdmins = ['admin@example.tld']
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
||||
|
||||
[gacha]
|
||||
; Number of seconds players have to wait between rolls
|
||||
RollInterval = 72000
|
||||
|
||||
[notification]
|
||||
; Number of seconds to sleep while awaiting new notifications
|
||||
PollInterval = 5
|
||||
; Number of notifications to process at once (max 100)
|
||||
BatchSize = 10
|
||||
|
||||
[credentials]
|
||||
; Fully qualified URL of the instance hosting the bot
|
||||
Instance = http://example.tld
|
||||
; Full fedi handle of the bot user
|
||||
User = @bot@example.tld
|
||||
; API key for the bot
|
||||
; Generate one by going to Settings > API > Generate access token
|
||||
ApiKey = abcdefghijklmnopqrstuvwxyz012345
|
||||
Token = abcdefghijklmnopqrstuvwxyz012345
|
||||
|
||||
; Fully qualified URL of the instance hosting the bot
|
||||
InstanceUrl = http://example.tld
|
||||
|
||||
; Comma separated list of fedi handles for any administrator users
|
||||
DefaultAdmins = ['admin@example.tld']
|
||||
|
||||
; SQLite Database location
|
||||
DatabaseLocation = ./gacha_game.db
|
Loading…
Add table
Reference in a new issue