Files
discord_tools/scripts/user_history_scraper.py

243 lines
8.7 KiB
Python
Executable File

# discord_tools/scripts/user_history_scraper.py
import os
import sys
import asyncio
import json
from datetime import datetime
# Add the parent directory to the Python path
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(script_dir))
sys.path.insert(0, project_root)
from discord_tools.config.settings import DATA_DIR
from discord_tools.utils.api_utils import make_discord_request
def get_mutual_guilds():
# Get all servers you're in
response = make_discord_request('GET', '/users/@me/guilds')
if not response:
print("Failed to fetch guilds")
return []
return response.json()
def get_guild_channels(guild_id):
# Get channels in a server
response = make_discord_request('GET', f'/guilds/{guild_id}/channels')
if not response:
return []
return response.json()
def search_user_messages_in_guild(guild_id, user_id):
# Use Discord's search API to find messages from a specific user
# Way faster than manually paginating through every channel
all_messages = []
offset = 0
while True:
endpoint = f'/guilds/{guild_id}/messages/search?author_id={user_id}&offset={offset}'
response = make_discord_request('GET', endpoint)
if not response:
break
data = response.json()
messages = data.get('messages', [])
if not messages:
break
# Discord returns messages in a weird nested format
# Each item is a list containing related messages
for message_group in messages:
if isinstance(message_group, list):
for msg in message_group:
if msg['author']['id'] == user_id:
all_messages.append(msg)
else:
if message_group['author']['id'] == user_id:
all_messages.append(message_group)
# Pagination
offset += 25
total_results = data.get('total_results', 0)
print(f" Found {len(all_messages)} messages so far...")
# If we've gotten all results, stop
if offset >= total_results:
break
return all_messages
def get_user_info_from_guild(guild_id, user_id):
# Try to get user info from guild member endpoint
endpoint = f'/guilds/{guild_id}/members/{user_id}'
response = make_discord_request('GET', endpoint)
if response:
member_data = response.json()
return member_data.get('user')
return None
def format_message_for_export(msg):
# Convert message to a readable format
timestamp = datetime.fromisoformat(msg['timestamp'].replace('Z', '+00:00'))
formatted = {
'id': msg['id'],
'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'),
'content': msg['content'],
'attachments': [att['url'] for att in msg.get('attachments', [])],
'embeds': len(msg.get('embeds', [])),
'mentions': [f"{u['username']}#{u['discriminator']}" for u in msg.get('mentions', [])],
'edited': msg.get('edited_timestamp') is not None
}
return formatted
def main():
print("User History Scraper")
print("=" * 50)
user_id = input("Enter the user ID to scrape: ").strip()
if not user_id.isdigit():
print("Invalid user ID")
return
print(f"\nScraping messages from user ID: {user_id}")
print("=" * 50)
# Get all mutual guilds
guilds = get_mutual_guilds()
print(f"Found {len(guilds)} servers")
all_scraped_messages = {}
total_messages = 0
username = "Unknown"
for guild in guilds:
guild_id = guild['id']
guild_name = guild['name']
print(f"\nSearching {guild_name}...")
# Try to get user info from this guild
if username == "Unknown":
user_info = get_user_info_from_guild(guild_id, user_id)
if user_info:
username = f"{user_info['username']}#{user_info.get('discriminator', '0')}"
print(f" Found user: {username}")
# Use search API to find all messages from this user in this guild
messages = search_user_messages_in_guild(guild_id, user_id)
if messages:
# Organize by channel
channels_dict = {}
for msg in messages:
channel_id = msg['channel_id']
if channel_id not in channels_dict:
channels_dict[channel_id] = []
channels_dict[channel_id].append(msg)
all_scraped_messages[guild_name] = channels_dict
total_messages += len(messages)
print(f" Total in {guild_name}: {len(messages)} messages across {len(channels_dict)} channels")
print("\n" + "=" * 50)
print(f"Scraping complete. Total messages found: {total_messages}")
if total_messages == 0:
print("No messages found for this user.")
return
# If we still don't have username, try to get it from first message
if username == "Unknown":
for guild_messages in all_scraped_messages.values():
for messages in guild_messages.values():
if messages:
first_msg = messages[0]
author = first_msg['author']
username = f"{author['username']}#{author.get('discriminator', '0')}"
break
if username != "Unknown":
break
print(f"User identified as: {username}")
# Save to file
output_folder = os.path.join(DATA_DIR, f"user_history_{user_id}")
os.makedirs(output_folder, exist_ok=True)
# Save raw JSON
raw_file = os.path.join(output_folder, "raw_messages.json")
with open(raw_file, 'w', encoding='utf-8') as f:
json.dump(all_scraped_messages, f, indent=2, ensure_ascii=False)
# Save formatted text file
formatted_file = os.path.join(output_folder, "formatted_messages.txt")
with open(formatted_file, 'w', encoding='utf-8') as f:
f.write(f"Message History for {username}\n")
f.write(f"Scraped on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total Messages: {total_messages}\n")
f.write("=" * 80 + "\n\n")
for guild_name, channels in all_scraped_messages.items():
f.write(f"\n{'=' * 80}\n")
f.write(f"SERVER: {guild_name}\n")
f.write(f"{'=' * 80}\n\n")
for channel_id, messages in channels.items():
# Try to get channel name from first message
channel_name = f"channel-{channel_id}"
if messages:
# Channel info might be in message metadata
channel_name = messages[0].get('channel_id', channel_id)
f.write(f"\n--- Channel {channel_name} ({len(messages)} messages) ---\n\n")
# Sort messages by timestamp
sorted_messages = sorted(messages, key=lambda x: x['timestamp'])
for msg in sorted_messages:
formatted = format_message_for_export(msg)
f.write(f"[{formatted['timestamp']}]\n")
f.write(f"{formatted['content']}\n")
if formatted['attachments']:
f.write(f"Attachments: {', '.join(formatted['attachments'])}\n")
if formatted['edited']:
f.write("(edited)\n")
f.write("\n")
# Generate summary stats
stats_file = os.path.join(output_folder, "stats.txt")
with open(stats_file, 'w', encoding='utf-8') as f:
f.write(f"Message Statistics for {username}\n")
f.write("=" * 50 + "\n\n")
f.write(f"Total Messages: {total_messages}\n")
f.write(f"Servers: {len(all_scraped_messages)}\n\n")
f.write("Messages per server:\n")
for guild_name, channels in all_scraped_messages.items():
guild_total = sum(len(msgs) for msgs in channels.values())
f.write(f" {guild_name}: {guild_total}\n")
f.write("\nMessages per channel:\n")
for guild_name, channels in all_scraped_messages.items():
for channel_id, messages in channels.items():
f.write(f" {guild_name} > {channel_id}: {len(messages)}\n")
print(f"\nResults saved to: {output_folder}")
print(f" - raw_messages.json (complete data)")
print(f" - formatted_messages.txt (readable format)")
print(f" - stats.txt (summary)")
if __name__ == "__main__":
main()