Overhaul notification parsing #12
					 4 changed files with 133 additions and 110 deletions
				
			
		
							
								
								
									
										118
									
								
								bot/bot_app.py
									
										
									
									
									
								
							
							
						
						
									
										118
									
								
								bot/bot_app.py
									
										
									
									
									
								
							|  | @ -1,114 +1,14 @@ | |||
| import time | ||||
| import traceback | ||||
| import misskey | ||||
| from parsing import parse_notification | ||||
| from db_utils import get_or_create_user, add_pull, get_config, set_config | ||||
| import misskey as misskey | ||||
| from client import client_connection | ||||
| from response import generate_response | ||||
| 
 | ||||
| # Initialize the Misskey client | ||||
| client = client_connection() | ||||
| 
 | ||||
| # Define your whitelist | ||||
| # TODO: move to config | ||||
| whitelisted_instances: list[str] = [] | ||||
| 
 | ||||
| def stream_notifications(): | ||||
|     print("Starting filtered notification stream...") | ||||
| 
 | ||||
|     last_seen_id = get_config("last_seen_notif_id") | ||||
| from config import NOTIFICATION_POLL_INTERVAL | ||||
| from notification import process_notifications | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     # Initialize the Misskey client | ||||
|     client = client_connection() | ||||
|     print('Listening for notifications...') | ||||
|     while True: | ||||
|         try: | ||||
|             # May be able to mark notifications as read using misskey.py and | ||||
|             # filter them out here. This function also takes a since_id we | ||||
|             # could use as well | ||||
|             notifications = client.i_notifications() | ||||
| 
 | ||||
|             if notifications: | ||||
|                 # Oldest to newest | ||||
|                 notifications.reverse() | ||||
| 
 | ||||
|                 new_last_seen_id = last_seen_id | ||||
| 
 | ||||
|                 for notification in notifications: | ||||
|                     notif_id = notification.get("id") | ||||
| 
 | ||||
|                     # Skip old or same ID notifications | ||||
|                     if last_seen_id is not None and notif_id <= last_seen_id: | ||||
|                         continue | ||||
| 
 | ||||
|                     user = notification.get("user", {}) | ||||
|                     username = user.get("username", "unknown") | ||||
|                     host = user.get("host")  # None if local user | ||||
| 
 | ||||
|                     instance = host if host else "local" | ||||
| 
 | ||||
|                     if instance in whitelisted_instances or instance == "local": | ||||
|                         note = notification.get("note", {}) | ||||
|                         note_text = note.get("text", "") | ||||
|                         note_id = note.get("id") | ||||
|                         notif_type = notification.get("type", "unknown") | ||||
| 
 | ||||
|                         # We want the visibility to be related to the type that was received (so if | ||||
|                         # people don't want to dump a bunch of notes on home they don't have to) | ||||
|                          | ||||
|                         visibility = notification["note"]["visibility"] | ||||
|                         if visibility != "specified": | ||||
|                             visibility = "home" | ||||
| 
 | ||||
|                         print(f"📨 [{notif_type}] from @{username}@{instance}") | ||||
|                         print(f"💬 {note_text}") | ||||
|                         print("-" * 30) | ||||
| 
 | ||||
|                          | ||||
|                         # We get the type of notification to filter the ones that we actually want | ||||
|                         # to parse | ||||
| 
 | ||||
|                         notif_type = notification.get("type") | ||||
|                         if notif_type in ('mention', 'reply'): | ||||
|                             # 🧠 Send to the parser | ||||
|                             parsed_command = parse_notification(notification,client) | ||||
|                             # Get the response | ||||
|                             response = generate_response(parsed_command) | ||||
|                             if isinstance(response, str): | ||||
|                                 client.notes_create( | ||||
|                                     text=response, | ||||
|                                     reply_id=note_id, | ||||
|                                     visibility=visibility | ||||
|                                 ) | ||||
|                             elif response: | ||||
|                                 client.notes_create( | ||||
|                                     text=response[0], | ||||
|                                     reply_id=note_id, | ||||
|                                     visibility=visibility, | ||||
|                                     file_ids=response[1] | ||||
|                                     #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately | ||||
|                                 ) | ||||
|                              | ||||
| 
 | ||||
|                          | ||||
| 
 | ||||
|                     else: | ||||
|                         print(f"⚠️ Blocked notification from untrusted instance: {host}") | ||||
| 
 | ||||
|                     # Update only if this notif_id is greater | ||||
|                     if new_last_seen_id is None or notif_id > new_last_seen_id: | ||||
|                         new_last_seen_id = notif_id | ||||
| 
 | ||||
|                 # Save the latest seen ID | ||||
|                 if new_last_seen_id and new_last_seen_id != last_seen_id: | ||||
|                     set_config("last_seen_notif_id", new_last_seen_id) | ||||
|                     last_seen_id = new_last_seen_id | ||||
| 
 | ||||
|             time.sleep(5) | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             print(f"An exception has occured: {e}\n{traceback.format_exc()}") | ||||
|             time.sleep(2) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     stream_notifications() | ||||
|         if not process_notifications(client): | ||||
|             time.sleep(NOTIFICATION_POLL_INTERVAL) | ||||
|  |  | |||
|  | @ -20,3 +20,6 @@ DB_PATH = config['application']['DatabaseLocation'] | |||
| # Fedi handles in the traditional 'user@domain.tld' style, allows these users | ||||
| # to use extra admin exclusive commands with the bot''' | ||||
| ADMINS = config['application']['DefaultAdmins'] | ||||
| 
 | ||||
| NOTIFICATION_POLL_INTERVAL = int(config['application']['NotificationPollInterval']) | ||||
| NOTIFICATION_BATCH_SIZE    = int(config['application']['NotificationBatchSize']) | ||||
|  |  | |||
							
								
								
									
										114
									
								
								bot/notification.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										114
									
								
								bot/notification.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,114 @@ | |||
| import traceback | ||||
| from misskey.exceptions import MisskeyAPIException | ||||
| 
 | ||||
| from config import NOTIFICATION_BATCH_SIZE | ||||
| from parsing import parse_notification | ||||
| from db_utils import get_config, set_config | ||||
| from response import generate_response | ||||
| 
 | ||||
| # Define your whitelist | ||||
| # TODO: move to config | ||||
| WHITELISTED_INSTANCES: list[str] = [] | ||||
| 
 | ||||
| def process_notification(client, notification): | ||||
|     '''Processes an individual notification''' | ||||
|     user = notification.get('user', {}) | ||||
|     username = user.get('username', 'unknown') | ||||
|     host = user.get('host')  # None if local user | ||||
|     instance = host if host else 'local' | ||||
| 
 | ||||
|     if not (instance in WHITELISTED_INSTANCES or instance == 'local'): | ||||
|         print(f'⚠️ Blocked notification from untrusted instance: {instance}') | ||||
| 
 | ||||
|     # Copy visibility of the post that was received when replying (so if people | ||||
|     # don't want to dump a bunch of notes on home they don't have to) | ||||
|     visibility = notification['note']['visibility'] | ||||
|     if visibility != 'specified': | ||||
|         visibility = 'home' | ||||
| 
 | ||||
|     notif_type = notification.get('type', 'unknown') | ||||
|     notif_id = notification.get('id') | ||||
|     print(f'📨 <{notif_id}> [{notif_type}] from @{username}@{instance}') | ||||
| 
 | ||||
|     # 🧠 Send to the parser | ||||
|     parsed_command = parse_notification(notification, client) | ||||
| 
 | ||||
|     # Get the note Id to reply to | ||||
|     note_id = notification.get('note', {}).get('id') | ||||
| 
 | ||||
|     # Get the response | ||||
|     # TODO: Formalize exactly *what* is returned by this. Ideally just want to | ||||
|     # handle two cases here: either we have a response, or we don't. | ||||
|     # TODO: Return dictionaries instead of tuples. They handle multiple | ||||
|     # elements a lot better as they're not position dependent | ||||
|     response = generate_response(parsed_command) | ||||
|     if isinstance(response, str): | ||||
|         client.notes_create( | ||||
|             text=response, | ||||
|             reply_id=note_id, | ||||
|             visibility=visibility | ||||
|         ) | ||||
|     elif response: | ||||
|         client.notes_create( | ||||
|             text=response[0], | ||||
|             reply_id=note_id, | ||||
|             visibility=visibility, | ||||
|             file_ids=response[1] | ||||
|             #visible_user_ids=[] #todo: write actual visible users ids so pleromers can use the bot privately | ||||
|         ) | ||||
| 
 | ||||
| def process_notifications(client): | ||||
|     '''Processes a batch of unread notifications. Returns False if there are | ||||
|     no more notifications to process.''' | ||||
| 
 | ||||
|     last_seen_id = get_config('last_seen_notif_id') | ||||
|     # process_notification writes to last_seen_id, so make a copy | ||||
|     new_last_seen_id = last_seen_id | ||||
| 
 | ||||
|     try: | ||||
|         notifications = client.i_notifications( | ||||
|                 # Fetch notifications we haven't seen yet. This option is a bit | ||||
|                 # tempermental, sometimes it'll include since_id, sometimes it | ||||
|                 # won't. We need to keep track of what notifications we've | ||||
|                 # already processed. | ||||
|                 since_id=last_seen_id, | ||||
|                 # Let misskey handle the filtering | ||||
|                 include_types=['mention', 'reply'], | ||||
|                 # And handle the batch size while we're at it | ||||
|                 limit=NOTIFICATION_BATCH_SIZE | ||||
|         ) | ||||
| 
 | ||||
|         # No notifications. Wait the poll period. | ||||
|         if not notifications: | ||||
|             return False | ||||
| 
 | ||||
|         # Iterate oldest to newest | ||||
|         for notification in notifications: | ||||
|             try: | ||||
|                 # Skip if we've processed already | ||||
|                 notif_id = notification.get('id') | ||||
|                 if notif_id <= last_seen_id: | ||||
|                     continue | ||||
| 
 | ||||
|                 # Update new_last_seen_id and process | ||||
|                 new_last_seen_id = notif_id | ||||
|                 process_notification(client, notification) | ||||
| 
 | ||||
|             except Exception as e: | ||||
|                 print(f'An exception has occured while processing a notification: {e}') | ||||
|                 print(traceback.format_exc()) | ||||
| 
 | ||||
|         # If we got as many notifications as we requested, there are probably | ||||
|         # more in the queue | ||||
|         return len(notifications) == NOTIFICATION_BATCH_SIZE | ||||
| 
 | ||||
|     except MisskeyAPIException as e: | ||||
|         print(f'An exception has occured while reading notifications: {e}\n') | ||||
|         print(traceback.format_exc()) | ||||
|     finally: | ||||
|         # Quality jank right here, but finally lets us update the last_seen_id | ||||
|         # even if we hit an exception or return early | ||||
|         if new_last_seen_id > last_seen_id: | ||||
|             set_config('last_seen_notif_id', new_last_seen_id) | ||||
| 
 | ||||
|     return False | ||||
|  | @ -14,4 +14,10 @@ InstanceUrl   = http://example.tld | |||
| DefaultAdmins = ['admin@example.tld'] | ||||
| 
 | ||||
| ; SQLite Database location | ||||
| DatabaseLocation = ./gacha_game.db | ||||
| DatabaseLocation = ./gacha_game.db | ||||
| 
 | ||||
| ; Number of seconds to sleep while awaiting new notifications | ||||
| NotificationPollInterval = 5 | ||||
| 
 | ||||
| ; Number of notifications to process at once (limit 100) | ||||
| NotificationBatchSize = 10 | ||||
|  |  | |||
		Loading…
	
	Add table
		
		Reference in a new issue