243 lines
8.7 KiB
Python
Executable File
243 lines
8.7 KiB
Python
Executable File
# discord_tools/scripts/user_history_scraper.py
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
|
|
# Add the parent directory to the Python path
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
project_root = os.path.dirname(os.path.dirname(script_dir))
|
|
sys.path.insert(0, project_root)
|
|
|
|
from discord_tools.config.settings import DATA_DIR
|
|
from discord_tools.utils.api_utils import make_discord_request
|
|
|
|
def get_mutual_guilds():
|
|
# Get all servers you're in
|
|
response = make_discord_request('GET', '/users/@me/guilds')
|
|
if not response:
|
|
print("Failed to fetch guilds")
|
|
return []
|
|
return response.json()
|
|
|
|
def get_guild_channels(guild_id):
|
|
# Get channels in a server
|
|
response = make_discord_request('GET', f'/guilds/{guild_id}/channels')
|
|
if not response:
|
|
return []
|
|
return response.json()
|
|
|
|
def search_user_messages_in_guild(guild_id, user_id):
|
|
# Use Discord's search API to find messages from a specific user
|
|
# Way faster than manually paginating through every channel
|
|
all_messages = []
|
|
offset = 0
|
|
|
|
while True:
|
|
endpoint = f'/guilds/{guild_id}/messages/search?author_id={user_id}&offset={offset}'
|
|
response = make_discord_request('GET', endpoint)
|
|
|
|
if not response:
|
|
break
|
|
|
|
data = response.json()
|
|
messages = data.get('messages', [])
|
|
|
|
if not messages:
|
|
break
|
|
|
|
# Discord returns messages in a weird nested format
|
|
# Each item is a list containing related messages
|
|
for message_group in messages:
|
|
if isinstance(message_group, list):
|
|
for msg in message_group:
|
|
if msg['author']['id'] == user_id:
|
|
all_messages.append(msg)
|
|
else:
|
|
if message_group['author']['id'] == user_id:
|
|
all_messages.append(message_group)
|
|
|
|
# Pagination
|
|
offset += 25
|
|
total_results = data.get('total_results', 0)
|
|
|
|
print(f" Found {len(all_messages)} messages so far...")
|
|
|
|
# If we've gotten all results, stop
|
|
if offset >= total_results:
|
|
break
|
|
|
|
return all_messages
|
|
|
|
def get_user_info_from_guild(guild_id, user_id):
|
|
# Try to get user info from guild member endpoint
|
|
endpoint = f'/guilds/{guild_id}/members/{user_id}'
|
|
response = make_discord_request('GET', endpoint)
|
|
|
|
if response:
|
|
member_data = response.json()
|
|
return member_data.get('user')
|
|
|
|
return None
|
|
|
|
def format_message_for_export(msg):
|
|
# Convert message to a readable format
|
|
timestamp = datetime.fromisoformat(msg['timestamp'].replace('Z', '+00:00'))
|
|
|
|
formatted = {
|
|
'id': msg['id'],
|
|
'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'),
|
|
'content': msg['content'],
|
|
'attachments': [att['url'] for att in msg.get('attachments', [])],
|
|
'embeds': len(msg.get('embeds', [])),
|
|
'mentions': [f"{u['username']}#{u['discriminator']}" for u in msg.get('mentions', [])],
|
|
'edited': msg.get('edited_timestamp') is not None
|
|
}
|
|
|
|
return formatted
|
|
|
|
def main():
|
|
print("User History Scraper")
|
|
print("=" * 50)
|
|
|
|
user_id = input("Enter the user ID to scrape: ").strip()
|
|
|
|
if not user_id.isdigit():
|
|
print("Invalid user ID")
|
|
return
|
|
|
|
print(f"\nScraping messages from user ID: {user_id}")
|
|
print("=" * 50)
|
|
|
|
# Get all mutual guilds
|
|
guilds = get_mutual_guilds()
|
|
print(f"Found {len(guilds)} servers")
|
|
|
|
all_scraped_messages = {}
|
|
total_messages = 0
|
|
username = "Unknown"
|
|
|
|
for guild in guilds:
|
|
guild_id = guild['id']
|
|
guild_name = guild['name']
|
|
|
|
print(f"\nSearching {guild_name}...")
|
|
|
|
# Try to get user info from this guild
|
|
if username == "Unknown":
|
|
user_info = get_user_info_from_guild(guild_id, user_id)
|
|
if user_info:
|
|
username = f"{user_info['username']}#{user_info.get('discriminator', '0')}"
|
|
print(f" Found user: {username}")
|
|
|
|
# Use search API to find all messages from this user in this guild
|
|
messages = search_user_messages_in_guild(guild_id, user_id)
|
|
|
|
if messages:
|
|
# Organize by channel
|
|
channels_dict = {}
|
|
for msg in messages:
|
|
channel_id = msg['channel_id']
|
|
if channel_id not in channels_dict:
|
|
channels_dict[channel_id] = []
|
|
channels_dict[channel_id].append(msg)
|
|
|
|
all_scraped_messages[guild_name] = channels_dict
|
|
total_messages += len(messages)
|
|
print(f" Total in {guild_name}: {len(messages)} messages across {len(channels_dict)} channels")
|
|
|
|
print("\n" + "=" * 50)
|
|
print(f"Scraping complete. Total messages found: {total_messages}")
|
|
|
|
if total_messages == 0:
|
|
print("No messages found for this user.")
|
|
return
|
|
|
|
# If we still don't have username, try to get it from first message
|
|
if username == "Unknown":
|
|
for guild_messages in all_scraped_messages.values():
|
|
for messages in guild_messages.values():
|
|
if messages:
|
|
first_msg = messages[0]
|
|
author = first_msg['author']
|
|
username = f"{author['username']}#{author.get('discriminator', '0')}"
|
|
break
|
|
if username != "Unknown":
|
|
break
|
|
|
|
print(f"User identified as: {username}")
|
|
|
|
# Save to file
|
|
output_folder = os.path.join(DATA_DIR, f"user_history_{user_id}")
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
|
|
# Save raw JSON
|
|
raw_file = os.path.join(output_folder, "raw_messages.json")
|
|
with open(raw_file, 'w', encoding='utf-8') as f:
|
|
json.dump(all_scraped_messages, f, indent=2, ensure_ascii=False)
|
|
|
|
# Save formatted text file
|
|
formatted_file = os.path.join(output_folder, "formatted_messages.txt")
|
|
with open(formatted_file, 'w', encoding='utf-8') as f:
|
|
f.write(f"Message History for {username}\n")
|
|
f.write(f"Scraped on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Total Messages: {total_messages}\n")
|
|
f.write("=" * 80 + "\n\n")
|
|
|
|
for guild_name, channels in all_scraped_messages.items():
|
|
f.write(f"\n{'=' * 80}\n")
|
|
f.write(f"SERVER: {guild_name}\n")
|
|
f.write(f"{'=' * 80}\n\n")
|
|
|
|
for channel_id, messages in channels.items():
|
|
# Try to get channel name from first message
|
|
channel_name = f"channel-{channel_id}"
|
|
if messages:
|
|
# Channel info might be in message metadata
|
|
channel_name = messages[0].get('channel_id', channel_id)
|
|
|
|
f.write(f"\n--- Channel {channel_name} ({len(messages)} messages) ---\n\n")
|
|
|
|
# Sort messages by timestamp
|
|
sorted_messages = sorted(messages, key=lambda x: x['timestamp'])
|
|
|
|
for msg in sorted_messages:
|
|
formatted = format_message_for_export(msg)
|
|
f.write(f"[{formatted['timestamp']}]\n")
|
|
f.write(f"{formatted['content']}\n")
|
|
|
|
if formatted['attachments']:
|
|
f.write(f"Attachments: {', '.join(formatted['attachments'])}\n")
|
|
|
|
if formatted['edited']:
|
|
f.write("(edited)\n")
|
|
|
|
f.write("\n")
|
|
|
|
# Generate summary stats
|
|
stats_file = os.path.join(output_folder, "stats.txt")
|
|
with open(stats_file, 'w', encoding='utf-8') as f:
|
|
f.write(f"Message Statistics for {username}\n")
|
|
f.write("=" * 50 + "\n\n")
|
|
f.write(f"Total Messages: {total_messages}\n")
|
|
f.write(f"Servers: {len(all_scraped_messages)}\n\n")
|
|
|
|
f.write("Messages per server:\n")
|
|
for guild_name, channels in all_scraped_messages.items():
|
|
guild_total = sum(len(msgs) for msgs in channels.values())
|
|
f.write(f" {guild_name}: {guild_total}\n")
|
|
|
|
f.write("\nMessages per channel:\n")
|
|
for guild_name, channels in all_scraped_messages.items():
|
|
for channel_id, messages in channels.items():
|
|
f.write(f" {guild_name} > {channel_id}: {len(messages)}\n")
|
|
|
|
print(f"\nResults saved to: {output_folder}")
|
|
print(f" - raw_messages.json (complete data)")
|
|
print(f" - formatted_messages.txt (readable format)")
|
|
print(f" - stats.txt (summary)")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|