Compare commits

...

2 commits

5 changed files with 110 additions and 24 deletions

1
.gitignore vendored
View file

@ -183,3 +183,4 @@ cython_debug/
# Custom stuff # Custom stuff
*.db *.db
*.ini *.ini
reference/

View file

@ -1,28 +1,79 @@
'''Essentials for the bot to function''' '''Essentials for the bot to function'''
import configparser import configparser
config = configparser.ConfigParser() import os
config.read('config.ini') import sys
import argparse
def get_config_file_path():
"""Get config file path from command line arguments or default"""
parser = argparse.ArgumentParser(add_help=False) # Don't interfere with other argument parsing
parser.add_argument('--config', default='config.ini', help='Path to config file (default: config.ini)')
# Parse only known args to avoid conflicts with other argument parsing
args, _ = parser.parse_known_args()
return args.config
def load_config():
"""Load and validate config file with proper error handling"""
config_file = get_config_file_path()
config = configparser.ConfigParser()
# Check if config file exists
if not os.path.exists(config_file):
print(f"Error: {config_file} not found!")
if config_file == 'config.ini':
print("Please copy example_config.ini to config.ini and configure it.")
print(f"Or use --config /path/to/your/config.ini to specify a different location.")
sys.exit(1)
try:
config.read(config_file)
except configparser.Error as e:
print(f"Error reading {config_file}: {e}")
sys.exit(1)
# Check if [application] section exists
if 'application' not in config:
print(f"Error: [application] section missing in {config_file}")
sys.exit(1)
# Validate required fields
required_fields = ['BotUser', 'ApiKey', 'InstanceUrl', 'InstanceType', 'DatabaseLocation']
missing_fields = []
for field in required_fields:
if field not in config['application'] or not config['application'][field].strip():
missing_fields.append(field)
if missing_fields:
print(f"Error: Required fields missing in {config_file}: {', '.join(missing_fields)}")
sys.exit(1)
# Validate InstanceType
instance_type = config['application']['InstanceType'].lower()
if instance_type not in ('misskey', 'pleroma'):
print("Error: InstanceType must be either 'misskey' or 'pleroma'")
sys.exit(1)
return config
# Load and validate configuration
config = load_config()
# Username for the bot # Username for the bot
USER = config['application']['BotUser'] USER = config['application']['BotUser']
# API key for the bot # API key for the bot
KEY = config['application']['ApiKey'] KEY = config['application']['ApiKey']
# Bot's Misskey instance URL
# Bot's instance URL
INSTANCE = config['application']['InstanceUrl'] INSTANCE = config['application']['InstanceUrl']
# SQLite Database location # SQLite Database location
DB_PATH = config['application']['DatabaseLocation'] DB_PATH = config['application']['DatabaseLocation']
# Instance type validation # Instance type
if 'InstanceType' not in config['application']: INSTANCE_TYPE = config['application']['InstanceType'].lower()
raise ValueError("InstanceType must be specified in config.ini")
instance_type = config['application']['InstanceType'].lower()
if instance_type not in ('misskey', 'pleroma'):
raise ValueError("InstanceType must be either 'misskey' or 'pleroma'")
INSTANCE_TYPE = instance_type
# Web server port # Web server port
WEB_PORT = config['application'].getint('WebPort', 5000) WEB_PORT = config['application'].getint('WebPort', 5000)
@ -36,4 +87,4 @@ TRUSTED_INSTANCES = [instance.strip() for instance in trusted_instances_str.spli
# TODO: move this to db # TODO: move this to db
# Fedi handles in the traditional 'user@domain.tld' style, allows these users # Fedi handles in the traditional 'user@domain.tld' style, allows these users
# to use extra admin exclusive commands with the bot''' # to use extra admin exclusive commands with the bot'''
ADMINS = config['application']['DefaultAdmins'] ADMINS = config['application'].get('DefaultAdmins', '[]')

View file

@ -74,13 +74,13 @@ class MisskeyService(FediverseService):
files = [self._convert_misskey_file(f) for f in note_data["files"]] files = [self._convert_misskey_file(f) for f in note_data["files"]]
return FediversePost( return FediversePost(
id=note_data.get("id", ""), id=str(note_data.get("id", "")), # Misskey IDs (aid/meid/ulid/objectid) are lexicographically sortable as strings
text=note_data.get("text"), text=note_data.get("text"),
user=self._convert_misskey_user(note_data.get("user", {})), user=self._convert_misskey_user(note_data.get("user", {})),
visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")), visibility=self._convert_misskey_visibility(note_data.get("visibility", "home")),
created_at=note_data.get("createdAt"), created_at=note_data.get("createdAt"),
files=files, files=files,
reply_to_id=note_data.get("replyId") reply_to_id=str(note_data.get("replyId", "")) if note_data.get("replyId") else None
) )
def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification: def _convert_misskey_notification(self, notification_data: Dict[str, Any]) -> FediverseNotification:
@ -90,7 +90,7 @@ class MisskeyService(FediverseService):
post = self._convert_misskey_post(notification_data["note"]) post = self._convert_misskey_post(notification_data["note"])
return FediverseNotification( return FediverseNotification(
id=notification_data.get("id", ""), id=str(notification_data.get("id", "")), # Misskey IDs are lexicographically sortable as strings
type=self._convert_misskey_notification_type(notification_data.get("type", "")), type=self._convert_misskey_notification_type(notification_data.get("type", "")),
user=self._convert_misskey_user(notification_data.get("user", {})), user=self._convert_misskey_user(notification_data.get("user", {})),
post=post, post=post,

View file

@ -90,7 +90,7 @@ class PleromaService(FediverseService):
plain_text = re.sub(r'<[^>]+>', '', content) if content else None plain_text = re.sub(r'<[^>]+>', '', content) if content else None
return FediversePost( return FediversePost(
id=str(status_data.get("id", "")), id=str(status_data.get("id", "")), # Pleroma IDs (snowflake format) are lexicographically sortable as strings
text=plain_text, text=plain_text,
user=self._convert_mastodon_user(status_data.get("account", {})), user=self._convert_mastodon_user(status_data.get("account", {})),
visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")), visibility=self._convert_mastodon_visibility(status_data.get("visibility", "public")),
@ -106,7 +106,7 @@ class PleromaService(FediverseService):
post = self._convert_mastodon_status(notification_data["status"]) post = self._convert_mastodon_status(notification_data["status"])
return FediverseNotification( return FediverseNotification(
id=str(notification_data.get("id", "")), id=str(notification_data.get("id", "")), # Pleroma IDs are lexicographically sortable as strings
type=self._convert_mastodon_notification_type(notification_data.get("type", "")), type=self._convert_mastodon_notification_type(notification_data.get("type", "")),
user=self._convert_mastodon_user(notification_data.get("account", {})), user=self._convert_mastodon_user(notification_data.get("account", {})),
post=post, post=post,

42
db.py
View file

@ -1,12 +1,46 @@
import sqlite3 import sqlite3
import configparser import configparser
import os
import sys
import argparse
# Read config to get database location def get_config_file_path():
config = configparser.ConfigParser() """Get config file path from command line arguments or default"""
config.read('config.ini') parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--config', default='config.ini', help='Path to config file (default: config.ini)')
args, _ = parser.parse_known_args()
return args.config
def get_db_path():
"""Get database path from config with error handling"""
config_file = get_config_file_path()
if not os.path.exists(config_file):
print(f"Error: {config_file} not found!")
if config_file == 'config.ini':
print("Please copy example_config.ini to config.ini and configure it.")
print(f"Or use --config /path/to/your/config.ini to specify a different location.")
sys.exit(1)
config = configparser.ConfigParser()
try:
config.read(config_file)
except configparser.Error as e:
print(f"Error reading {config_file}: {e}")
sys.exit(1)
if 'application' not in config:
print(f"Error: [application] section missing in {config_file}")
sys.exit(1)
if 'DatabaseLocation' not in config['application']:
print(f"Error: DatabaseLocation missing in {config_file}")
sys.exit(1)
return config['application']['DatabaseLocation']
# Connect to SQLite database (or create it if it doesn't exist) # Connect to SQLite database (or create it if it doesn't exist)
db_path = config['application']['DatabaseLocation'] db_path = get_db_path()
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
cursor = conn.cursor() cursor = conn.cursor()