Add admin commands
This commit is contained in:
parent
8ae6e25b95
commit
97b30c79f4
5 changed files with 111 additions and 19 deletions
|
@ -90,6 +90,9 @@ def delete_player(username: str) -> bool:
|
||||||
)
|
)
|
||||||
player = CURSOR.fetchone()
|
player = CURSOR.fetchone()
|
||||||
|
|
||||||
|
if not player:
|
||||||
|
return False
|
||||||
|
|
||||||
player_id = player[0]
|
player_id = player[0]
|
||||||
|
|
||||||
# Delete pulls
|
# Delete pulls
|
||||||
|
@ -107,10 +110,40 @@ def delete_player(username: str) -> bool:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def is_player_administrator(player_id: int) -> bool:
|
def ban_player(username: str) -> bool:
|
||||||
|
'''Adds a player to the ban list.'''
|
||||||
|
try:
|
||||||
|
CURSOR.execute(
|
||||||
|
'INSERT INTO banned_players (handle) VALUES (?)',
|
||||||
|
(username,)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except sqlite3.IntegrityError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def unban_player(username: str) -> bool:
|
||||||
|
'''Removes a player from the ban list.'''
|
||||||
CURSOR.execute(
|
CURSOR.execute(
|
||||||
'SELECT is_administrator FROM PLAYERS WHERE id = ? LIMIT 1',
|
'DELETE FROM banned_players WHERE handle = ?',
|
||||||
(player_id,)
|
(username,)
|
||||||
|
)
|
||||||
|
return CURSOR.rowcount > 0
|
||||||
|
|
||||||
|
|
||||||
|
def is_player_banned(username: str) -> bool:
|
||||||
|
CURSOR.execute(
|
||||||
|
'SELECT * FROM banned_players WHERE handle = ?',
|
||||||
|
(username,)
|
||||||
|
)
|
||||||
|
row = CURSOR.fetchone()
|
||||||
|
return row is not None
|
||||||
|
|
||||||
|
|
||||||
|
def is_player_administrator(username: str) -> bool:
|
||||||
|
CURSOR.execute(
|
||||||
|
'SELECT is_administrator FROM players WHERE username = ? LIMIT 1',
|
||||||
|
(username,)
|
||||||
)
|
)
|
||||||
row = CURSOR.fetchone()
|
row = CURSOR.fetchone()
|
||||||
return row[0] if row else False
|
return row[0] if row else False
|
||||||
|
@ -151,7 +184,8 @@ def add_to_whitelist(instance: str) -> bool:
|
||||||
present'''
|
present'''
|
||||||
try:
|
try:
|
||||||
CURSOR.execute(
|
CURSOR.execute(
|
||||||
'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,))
|
'INSERT INTO instance_whitelist (tld) VALUES (?)', (instance,)
|
||||||
|
)
|
||||||
return True
|
return True
|
||||||
except sqlite3.IntegrityError:
|
except sqlite3.IntegrityError:
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -6,7 +6,7 @@ from misskey.exceptions import MisskeyAPIException
|
||||||
|
|
||||||
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
|
from config import NOTIFICATION_BATCH_SIZE, USE_WHITELIST
|
||||||
from parsing import parse_notification
|
from parsing import parse_notification
|
||||||
from db_utils import get_config, set_config, is_whitelisted
|
from db_utils import get_config, set_config, is_whitelisted, is_player_banned
|
||||||
from response import generate_response
|
from response import generate_response
|
||||||
from custom_types import BotResponse
|
from custom_types import BotResponse
|
||||||
|
|
||||||
|
@ -44,6 +44,11 @@ def process_notification(
|
||||||
if not parsed_notification:
|
if not parsed_notification:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
author = parsed_notification['author']
|
||||||
|
if is_player_banned(author):
|
||||||
|
print(f'⚠️ Blocked notification from banned player: {author}')
|
||||||
|
return
|
||||||
|
|
||||||
# Get the note Id to reply to
|
# Get the note Id to reply to
|
||||||
note_id = notification.get('note', {}).get('id')
|
note_id = notification.get('note', {}).get('id')
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,8 @@ def parse_notification(
|
||||||
note_id = note_obj.get("id")
|
note_id = note_obj.get("id")
|
||||||
|
|
||||||
note = note_text.strip().lower() if note_text else ""
|
note = note_text.strip().lower() if note_text else ""
|
||||||
|
# Split words into tokens
|
||||||
|
parts = note.split()
|
||||||
|
|
||||||
# Check for both short and fully-qualified name mentions
|
# Check for both short and fully-qualified name mentions
|
||||||
username_variants = [
|
username_variants = [
|
||||||
|
@ -31,18 +33,16 @@ def parse_notification(
|
||||||
f'@{config.USER.split("@")[1]}'
|
f'@{config.USER.split("@")[1]}'
|
||||||
]
|
]
|
||||||
|
|
||||||
# Make sure the notification text explicitly mentions the bot
|
# Notifs must consist of the initial mention and at least one other token
|
||||||
if not any(variant in note for variant in username_variants):
|
if len(parts) <= 1:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Find command and arguments after the mention
|
# Make sure the first token is a mention to the bot
|
||||||
# Removes all mentions
|
if not parts[0] in username_variants:
|
||||||
# regex = mentions that start with @ and may contain @domain
|
return None
|
||||||
cleaned_text = re.sub(r"@\w+(?:@\S+)?", "", note).strip()
|
|
||||||
parts = cleaned_text.split()
|
|
||||||
|
|
||||||
command = parts[0].lower() if parts else None
|
command = parts[1].lower()
|
||||||
arguments = parts[1:] if len(parts) > 1 else []
|
arguments = parts[2:] if len(parts) > 2 else []
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'author': full_user,
|
'author': full_user,
|
||||||
|
|
|
@ -211,6 +211,52 @@ the whitelist',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def do_ban(author: str, args: list[str]) -> BotResponse:
|
||||||
|
if len(args) == 0:
|
||||||
|
return {
|
||||||
|
'message': f'{author} Please specify a user to ban',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.is_player_administrator(args[0]):
|
||||||
|
return {
|
||||||
|
'message': f'{author} Cannot ban other administrators.',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.ban_player(args[0]):
|
||||||
|
# Delete banned player's account
|
||||||
|
db.delete_player(args[0])
|
||||||
|
return {
|
||||||
|
'message': f'{author} 🔨 **BONK!** Get banned, {args[0]}!',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
'message': f'{author} Player is already banned: {args[0]}',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def do_unban(author: str, args: list[str]) -> BotResponse:
|
||||||
|
if len(args) == 0:
|
||||||
|
return {
|
||||||
|
'message': f'{author} Please specify a user to unban',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
|
||||||
|
if db.unban_player(args[0]):
|
||||||
|
return {
|
||||||
|
'message': f'{author} Player unbanned: {args[0]}!',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {
|
||||||
|
'message': f'{author} Player was not banned: {args[0]}',
|
||||||
|
'attachment_urls': None
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def generate_response(notification: ParsedNotification) -> BotResponse | None:
|
def generate_response(notification: ParsedNotification) -> BotResponse | None:
|
||||||
'''Given a command with arguments, processes the game state and
|
'''Given a command with arguments, processes the game state and
|
||||||
returns a response'''
|
returns a response'''
|
||||||
|
@ -252,7 +298,7 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Commands beyond this point require the user to be an administrator
|
# Commands beyond this point require the user to be an administrator
|
||||||
if not db.is_player_administrator(player_id):
|
if not db.is_player_administrator(author):
|
||||||
return res
|
return res
|
||||||
|
|
||||||
# Admin commands
|
# Admin commands
|
||||||
|
@ -261,10 +307,10 @@ def generate_response(notification: ParsedNotification) -> BotResponse | None:
|
||||||
res = do_whitelist(author, notification['arguments'])
|
res = do_whitelist(author, notification['arguments'])
|
||||||
case 'unwhitelist':
|
case 'unwhitelist':
|
||||||
res = do_unwhitelist(author, notification['arguments'])
|
res = do_unwhitelist(author, notification['arguments'])
|
||||||
# case 'ban':
|
case 'ban':
|
||||||
# res = do_ban(author, notification['arguments'])
|
res = do_ban(author, notification['arguments'])
|
||||||
# case 'unban':
|
case 'unban':
|
||||||
# res = do_unban(author, notification['arguments'])
|
res = do_unban(author, notification['arguments'])
|
||||||
case _:
|
case _:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
7
migrations/0005_add_whitelist.sql
Normal file
7
migrations/0005_add_whitelist.sql
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
CREATE TABLE IF NOT EXISTS instance_whitelist (
|
||||||
|
tld TEXT UNIQUE PRIMARY KEY
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS banned_players (
|
||||||
|
handle TEXT UNIQUE PRIMARY KEY
|
||||||
|
);
|
Loading…
Add table
Reference in a new issue