Compare commits

..

No commits in common. "97b30c79f44ef28dc72f945a1bfbf3a555c673d6" and "fde6e1167a8143a7ebfda9bab09c8fa70c1cc751" have entirely different histories.

7 changed files with 37 additions and 202 deletions

View file

@ -49,8 +49,6 @@ INSTANCE = config['credentials']['Instance'].lower()
ADMINS = json.loads(config['application']['DefaultAdmins']) ADMINS = json.loads(config['application']['DefaultAdmins'])
# SQLite Database location # SQLite Database location
DB_PATH = config['application']['DatabaseLocation'] DB_PATH = config['application']['DatabaseLocation']
# Whether to enable the instance whitelist
USE_WHITELIST = config['application']['UseWhitelist']
NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval']) NOTIFICATION_POLL_INTERVAL = int(config['notification']['PollInterval'])
NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize']) NOTIFICATION_BATCH_SIZE = int(config['notification']['BatchSize'])

View file

@ -90,9 +90,6 @@ def delete_player(username: str) -> bool:
) )
player = CURSOR.fetchone() player = CURSOR.fetchone()
if not player:
return False
player_id = player[0] player_id = player[0]
# Delete pulls # Delete pulls
@ -110,40 +107,10 @@ def delete_player(username: str) -> bool:
return True return True
def ban_player(username: str) -> bool: def is_player_administrator(player_id: int) -> bool:
'''Adds a player to the ban list.'''
try:
CURSOR.execute( CURSOR.execute(
'INSERT INTO banned_players (handle) VALUES (?)', 'SELECT is_administrator FROM PLAYERS WHERE id = ? LIMIT 1',
(username,) (player_id,)
)
return True
except sqlite3.IntegrityError:
return False
def unban_player(username: str) -> bool:
'''Removes a player from the ban list.'''
CURSOR.execute(
'DELETE FROM banned_players WHERE handle = ?',
(username,)
)
return CURSOR.rowcount > 0
def is_player_banned(username: str) -> bool:
CURSOR.execute(
'SELECT * FROM banned_players WHERE handle = ?',
(username,)
)
row = CURSOR.fetchone()
return row is not None
def is_player_administrator(username: str) -> bool:
CURSOR.execute(
'SELECT is_administrator FROM players WHERE username = ? LIMIT 1',
(username,)
) )
row = CURSOR.fetchone() row = CURSOR.fetchone()
return row[0] if row else False return row[0] if row else False
@ -179,36 +146,6 @@ DESC",
return row[0] if row else 0 return row[0] if row else 0
def add_to_whitelist(instance: str) -> bool:
'''Adds an instance to the whitelist, returns false if instance was already
present'''
try:
CURSOR.execute(
'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)
)
return True
except sqlite3.IntegrityError:
return False
def remove_from_whitelist(instance: str) -> bool:
'''Removes an instance to the whitelist, returns false if instance was not
present'''
CURSOR.execute(
'DELETE FROM instance_whitelist WHERE tld = ?', (instance,))
return CURSOR.rowcount > 0
def is_whitelisted(instance: str) -> bool:
'''Checks whether an instance is in the whitelist'''
if instance == 'local':
return True
CURSOR.execute(
'SELECT * FROM instance_whitelist WHERE tld = ?', (instance,))
row = CURSOR.fetchone()
return row is not None
def get_config(key: str) -> str: def get_config(key: str) -> str:
'''Reads the value for a specified config key from the db''' '''Reads the value for a specified config key from the db'''
CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,)) CURSOR.execute("SELECT value FROM config WHERE key = ?", (key,))

View file

@ -4,9 +4,9 @@ from typing import Dict, Any
import misskey import misskey
from misskey.exceptions import MisskeyAPIException from misskey.exceptions import MisskeyAPIException
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST from config import NOTIFICATION_BATCH_SIZE
from parsing import parse_notification from parsing import parse_notification
from db_utils import get_config, set_config, is_whitelisted, is_player_banned from db_utils import get_config, set_config
from response import generate_response from response import generate_response
from custom_types import BotResponse from custom_types import BotResponse
@ -24,7 +24,7 @@ def process_notification(
host = user.get('host') # None if local user host = user.get('host') # None if local user
instance = host if host else 'local' instance = host if host else 'local'
if USE_WHITELIST and not is_whitelisted(instance): if not (instance in WHITELISTED_INSTANCES or instance == 'local'):
print(f'⚠️ Blocked notification from untrusted instance: {instance}') print(f'⚠️ Blocked notification from untrusted instance: {instance}')
return return
@ -44,11 +44,6 @@ def process_notification(
if not parsed_notification: if not parsed_notification:
return return
author = parsed_notification['author']
if is_player_banned(author):
print(f'⚠️ Blocked notification from banned player: {author}')
return
# Get the note Id to reply to # Get the note Id to reply to
note_id = notification.get('note', {}).get('id') note_id = notification.get('note', {}).get('id')

View file

@ -24,8 +24,6 @@ def parse_notification(
note_id = note_obj.get("id") note_id = note_obj.get("id")
note = note_text.strip().lower() if note_text else "" note = note_text.strip().lower() if note_text else ""
# Split words into tokens
parts = note.split()
# Check for both short and fully-qualified name mentions # Check for both short and fully-qualified name mentions
username_variants = [ username_variants = [
@ -33,16 +31,18 @@ def parse_notification(
f'@{config.USER.split("@")[1]}' f'@{config.USER.split("@")[1]}'
] ]
# Notifs must consist of the initial mention and at least one other token # Make sure the notification text explicitly mentions the bot
if len(parts) <= 1: if not any(variant in note for variant in username_variants):
return None return None
# Make sure the first token is a mention to the bot # Find command and arguments after the mention
if not parts[0] in username_variants: # Removes all mentions
return None # regex = mentions that start with @ and may contain @domain
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
parts = cleaned_text.split()
command = parts[1].lower() command = parts[0].lower() if parts else None
arguments = parts[2:] if len(parts) > 2 else [] arguments = parts[1:] if len(parts) > 1 else []
return { return {
'author': full_user, 'author': full_user,

View file

@ -1,6 +1,7 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import TypedDict, Any, List, Dict from typing import TypedDict, Any, List, Dict
import db_utils as db from db_utils import get_player, insert_player, delete_player, insert_pull, \
get_last_rolled_at, get_random_card, is_player_administrator
from add_card import add_card from add_card import add_card
from config import GACHA_ROLL_INTERVAL from config import GACHA_ROLL_INTERVAL
from custom_types import BotResponse, ParsedNotification from custom_types import BotResponse, ParsedNotification
@ -8,7 +9,7 @@ from custom_types import BotResponse, ParsedNotification
def do_roll(author: str) -> BotResponse: def do_roll(author: str) -> BotResponse:
'''Determines whether the user can roll, then pulls a random card''' '''Determines whether the user can roll, then pulls a random card'''
user_id = db.get_player(author) user_id = get_player(author)
if not user_id: if not user_id:
return { return {
'message': f'{author} 🛑 You havent signed up yet! Use the \ 'message': f'{author} 🛑 You havent signed up yet! Use the \
@ -16,7 +17,7 @@ def do_roll(author: str) -> BotResponse:
'attachment_urls': None 'attachment_urls': None
} }
# Get date of user's last roll # Get date of user's last roll
date = db.get_last_rolled_at(user_id) date = get_last_rolled_at(user_id)
# No date means it's users first roll # No date means it's users first roll
if date: if date:
@ -45,7 +46,7 @@ def do_roll(author: str) -> BotResponse:
'attachment_urls': None 'attachment_urls': None
} }
card = db.get_random_card() card = get_random_card()
if not card: if not card:
return { return {
@ -54,7 +55,7 @@ cards found. 😿',
'attachment_urls': None 'attachment_urls': None
} }
db.insert_pull(user_id, card['id']) insert_pull(user_id, card['id'])
stars = '⭐️' * card['rarity'] stars = '⭐️' * card['rarity']
return { return {
'message': f'{author} 🎲 Congrats! You rolled {stars} \ 'message': f'{author} 🎲 Congrats! You rolled {stars} \
@ -65,7 +66,7 @@ cards found. 😿',
def do_signup(author: str) -> BotResponse: def do_signup(author: str) -> BotResponse:
'''Registers a new user if they havent signed up yet.''' '''Registers a new user if they havent signed up yet.'''
user_id = db.get_player(author) user_id = get_player(author)
if user_id: if user_id:
return { return {
@ -74,7 +75,7 @@ begin~ 🎲',
'attachment_urls': None 'attachment_urls': None
} }
new_user_id = db.insert_player(author) new_user_id = insert_player(author)
return { return {
'message': f'{author} ✅ Signed up successfully! Your gacha \ 'message': f'{author} ✅ Signed up successfully! Your gacha \
destiny begins now... Use the roll command to start!', destiny begins now... Use the roll command to start!',
@ -163,7 +164,7 @@ and all your cards.\n'
def confirm_delete(author: str) -> BotResponse: def confirm_delete(author: str) -> BotResponse:
db.delete_player(author) delete_player(author)
return { return {
'message': f'{author} 🧼 Your account and all your cards have been \ 'message': f'{author} 🧼 Your account and all your cards have been \
@ -172,87 +173,12 @@ deleted. RIP your gacha history 🕊️✨',
} }
def do_whitelist(author: str, args: list[str]) -> BotResponse: def do_admin_test(author: str) -> BotResponse:
if len(args) == 0: player_id = get_player(author)
is_admin = is_player_administrator(player_id)
return { return {
'message': f'{author} Please specify an instance to whitelist', 'message': f'{author} You are {"not " if not is_admin else ""}an \
'attachment_urls': None admin.',
}
if db.add_to_whitelist(args[0]):
return {
'message': f'{author} Whitelisted instance: {args[0]}',
'attachment_urls': None
}
else:
return {
'message': f'{author} Instance already whitelisted: {args[0]}',
'attachment_urls': None
}
def do_unwhitelist(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify an instance to remove from \
the whitelist',
'attachment_urls': None
}
if db.remove_from_whitelist(args[0]):
return {
'message': f'{author} Unwhitelisted instance: {args[0]}',
'attachment_urls': None
}
else:
return {
'message': f'{author} Instance not whitelisted: {args[0]}',
'attachment_urls': None
}
def do_ban(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify a user to ban',
'attachment_urls': None
}
if db.is_player_administrator(args[0]):
return {
'message': f'{author} Cannot ban other administrators.',
'attachment_urls': None
}
if db.ban_player(args[0]):
# Delete banned player's account
db.delete_player(args[0])
return {
'message': f'{author} 🔨 **BONK!** Get banned, {args[0]}!',
'attachment_urls': None
}
else:
return {
'message': f'{author} Player is already banned: {args[0]}',
'attachment_urls': None
}
def do_unban(author: str, args: list[str]) -> BotResponse:
if len(args) == 0:
return {
'message': f'{author} Please specify a user to unban',
'attachment_urls': None
}
if db.unban_player(args[0]):
return {
'message': f'{author} Player unbanned: {args[0]}!',
'attachment_urls': None
}
else:
return {
'message': f'{author} Player was not banned: {args[0]}',
'attachment_urls': None 'attachment_urls': None
} }
@ -264,7 +190,7 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None:
# Temporary response variable # Temporary response variable
res: BotResponse | None = None res: BotResponse | None = None
author = notification['author'] author = notification['author']
player_id = db.get_player(author) player_id = get_player(author)
command = notification['command'] command = notification['command']
# Unrestricted commands # Unrestricted commands
@ -294,25 +220,14 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None:
res = delete_account(author) res = delete_account(author)
case 'confirm_delete_account': case 'confirm_delete_account':
res = confirm_delete(author) res = confirm_delete(author)
case 'admin_test':
res = do_admin_test(author)
case _: case _:
pass pass
# Commands beyond this point require the user to be an administrator # Commands beyond this point require the user to be an administrator
if not db.is_player_administrator(author): if not is_player_administrator(player_id):
return res return res
# Admin commands
match command:
case 'whitelist':
res = do_whitelist(author, notification['arguments'])
case 'unwhitelist':
res = do_unwhitelist(author, notification['arguments'])
case 'ban':
res = do_ban(author, notification['arguments'])
case 'unban':
res = do_unban(author, notification['arguments'])
case _:
pass
# Administrator commands go here # Administrator commands go here
return res return res

View file

@ -2,12 +2,9 @@
[application] [application]
; Comma separated list of fedi handles for any administrator users ; Comma separated list of fedi handles for any administrator users
; More can be added through the application ; More can be added through the application
DefaultAdmins = ["@localadmin", "@remoteadmin@example.tld"] DefaultAdmins = ["@localadmin", "remoteadmin@example.tld"]
; SQLite Database location ; SQLite Database location
DatabaseLocation = ./gacha_game.db DatabaseLocation = ./gacha_game.db
; Whether to lmit access to the bot via an instance whitelist
; The whitelist can be adjusted via the application
UseWhitelist = False
[gacha] [gacha]
; Number of seconds players have to wait between rolls ; Number of seconds players have to wait between rolls

View file

@ -1,7 +0,0 @@
CREATE TABLE IF NOT EXISTS instance_whitelist (
tld TEXT UNIQUE PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS banned_players (
handle TEXT UNIQUE PRIMARY KEY
);