Initial commit: Discord automation tools

This commit is contained in:
2026-01-27 10:13:41 +01:00
commit 7f5d3c2ca7
41 changed files with 983407 additions and 0 deletions

473
scripts/voice_activity_tracker.py Executable file
View File

@@ -0,0 +1,473 @@
# discord_tools/scripts/voice_activity_tracker.py
import os
import sys
import asyncio
import websockets
import json
from datetime import datetime
from collections import defaultdict
# Add the parent directory to the Python path
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(script_dir))
sys.path.insert(0, project_root)
from discord_tools.config.settings import DISCORD_TOKEN, DATA_DIR
from discord_tools.utils.api_utils import make_discord_request
class VoiceActivityTracker:
def __init__(self, guild_ids=None, user_ids=None, exclude_guilds=None):
# Support multiple guilds or specific users
self.guild_ids = guild_ids if isinstance(guild_ids, list) else ([guild_ids] if guild_ids else [])
self.user_ids = user_ids if isinstance(user_ids, list) else ([user_ids] if user_ids else [])
self.exclude_guilds = exclude_guilds if isinstance(exclude_guilds, list) else ([exclude_guilds] if exclude_guilds else [])
self.voice_states = {} # user_id -> {channel_id, joined_at, user_info, guild_id}
self.session_log = [] # List of all voice sessions
self.active = True
self.guild_names = {} # Cache guild names
self.channel_names = {} # Cache channel names
def get_guild_info(self, guild_id):
# Get guild name and cache it
if guild_id in self.guild_names:
return self.guild_names[guild_id]
response = make_discord_request('GET', f'/guilds/{guild_id}')
if response:
guild_data = response.json()
self.guild_names[guild_id] = guild_data['name']
return guild_data['name']
self.guild_names[guild_id] = f'Guild-{guild_id}'
return f'Guild-{guild_id}'
def get_voice_channels(self, guild_id):
# Get all voice channels in a guild
response = make_discord_request('GET', f'/guilds/{guild_id}/channels')
if not response:
return {}
channels = response.json()
voice_channels = {}
for channel in channels:
# Type 2 = voice channel, Type 13 = stage channel
if channel['type'] in [2, 13]:
voice_channels[channel['id']] = channel['name']
self.channel_names[channel['id']] = channel['name']
return voice_channels
def should_track_event(self, guild_id, user_id):
# Check if we should track this event based on filters
# Check excluded guilds
if guild_id in self.exclude_guilds:
return False
# If specific guilds set, only track those
if self.guild_ids and guild_id not in self.guild_ids:
return False
# If specific users set, only track those
if self.user_ids and user_id not in self.user_ids:
return False
return True
def handle_voice_state_update(self, data):
# Process voice state changes
user_id = data['user_id']
channel_id = data.get('channel_id') # None if user left voice
guild_id = data.get('guild_id')
# Check if we should track this event
if not self.should_track_event(guild_id, user_id):
return
timestamp = datetime.now()
# User info
member = data.get('member', {})
user = member.get('user', data.get('user', {}))
username = f"{user.get('username', 'Unknown')}#{user.get('discriminator', '0000')}"
# Get guild and channel names
guild_name = self.get_guild_info(guild_id)
# Build unique key for user+guild combo
state_key = f"{user_id}:{guild_id}"
# Check if user was already in a voice channel in this guild
if state_key in self.voice_states:
old_state = self.voice_states[state_key]
# User left voice completely
if channel_id is None:
duration = (timestamp - old_state['joined_at']).total_seconds()
session = {
'user_id': user_id,
'username': username,
'guild_id': guild_id,
'guild_name': guild_name,
'channel_id': old_state['channel_id'],
'channel_name': old_state.get('channel_name', 'Unknown'),
'joined_at': old_state['joined_at'].isoformat(),
'left_at': timestamp.isoformat(),
'duration_seconds': duration
}
self.session_log.append(session)
del self.voice_states[state_key]
print(f"[-] {username} left {old_state.get('channel_name', 'Unknown')} in {guild_name} (stayed {duration:.0f}s)")
# User switched channels
elif channel_id != old_state['channel_id']:
duration = (timestamp - old_state['joined_at']).total_seconds()
session = {
'user_id': user_id,
'username': username,
'guild_id': guild_id,
'guild_name': guild_name,
'channel_id': old_state['channel_id'],
'channel_name': old_state.get('channel_name', 'Unknown'),
'joined_at': old_state['joined_at'].isoformat(),
'left_at': timestamp.isoformat(),
'duration_seconds': duration
}
self.session_log.append(session)
# Get new channel name
if channel_id not in self.channel_names:
self.get_voice_channels(guild_id)
new_channel_name = self.channel_names.get(channel_id, f'Channel-{channel_id}')
self.voice_states[state_key] = {
'channel_id': channel_id,
'channel_name': new_channel_name,
'guild_id': guild_id,
'guild_name': guild_name,
'joined_at': timestamp,
'username': username
}
print(f"[→] {username} moved from {old_state.get('channel_name', 'Unknown')} to {new_channel_name} in {guild_name}")
# User joined voice (wasn't in voice before)
elif channel_id is not None:
if channel_id not in self.channel_names:
self.get_voice_channels(guild_id)
channel_name = self.channel_names.get(channel_id, f'Channel-{channel_id}')
self.voice_states[state_key] = {
'channel_id': channel_id,
'channel_name': channel_name,
'guild_id': guild_id,
'guild_name': guild_name,
'joined_at': timestamp,
'username': username
}
print(f"[+] {username} joined {channel_name} in {guild_name}")
async def track_voice_activity(self):
# Connect to Discord gateway and track voice state changes
gateway_url = "wss://gateway.discord.gg/?v=9&encoding=json"
async with websockets.connect(gateway_url, max_size=16 * 1024 * 1024) as ws:
# Receive Hello
hello = json.loads(await ws.recv())
heartbeat_interval = hello['d']['heartbeat_interval']
# Send Identify
identify = {
"op": 2,
"d": {
"token": DISCORD_TOKEN,
"properties": {
"$os": "windows",
"$browser": "chrome",
"$device": "pc"
},
"compress": False
}
}
await ws.send(json.dumps(identify))
# Heartbeat task
async def heartbeat():
while self.active:
await asyncio.sleep(heartbeat_interval / 1000)
if self.active:
await ws.send(json.dumps({"op": 1, "d": None}))
heartbeat_task = asyncio.create_task(heartbeat())
# Wait for READY
ready = False
while not ready:
msg = json.loads(await ws.recv())
if msg.get('t') == 'READY':
ready = True
print("Connected to Discord Gateway")
print("Tracking voice activity...")
print("=" * 60)
# Track voice state updates
try:
while self.active:
msg = await ws.recv()
data = json.loads(msg)
event_type = data.get('t')
if event_type == 'VOICE_STATE_UPDATE':
self.handle_voice_state_update(data['d'])
# Also handle initial guild members in voice
elif event_type == 'GUILD_CREATE':
guild_id = data['d']['id']
# Only process if we should track this guild
if guild_id in self.exclude_guilds:
continue
if self.guild_ids and guild_id not in self.guild_ids:
continue
# Process initial voice states
for voice_state in data['d'].get('voice_states', []):
voice_state['guild_id'] = guild_id
self.handle_voice_state_update(voice_state)
except KeyboardInterrupt:
print("\nStopping tracker...")
self.active = False
finally:
heartbeat_task.cancel()
def save_logs(self):
# Save session logs to file
if not self.session_log and not self.voice_states:
print("No voice activity recorded")
return
# Close any active sessions
timestamp = datetime.now()
for state_key, state in self.voice_states.items():
duration = (timestamp - state['joined_at']).total_seconds()
session = {
'user_id': state_key.split(':')[0],
'username': state['username'],
'guild_id': state['guild_id'],
'guild_name': state.get('guild_name', 'Unknown'),
'channel_id': state['channel_id'],
'channel_name': state.get('channel_name', 'Unknown'),
'joined_at': state['joined_at'].isoformat(),
'left_at': timestamp.isoformat(),
'duration_seconds': duration,
'ongoing': True
}
self.session_log.append(session)
# Create output folder
if self.guild_ids and len(self.guild_ids) == 1:
folder_name = f"voice_activity_{self.guild_ids[0]}"
elif self.user_ids:
folder_name = f"voice_activity_user_{'_'.join(self.user_ids[:3])}"
else:
folder_name = "voice_activity_all"
output_folder = os.path.join(DATA_DIR, folder_name)
os.makedirs(output_folder, exist_ok=True)
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
# Save raw JSON
json_file = os.path.join(output_folder, f"sessions_{timestamp_str}.json")
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(self.session_log, f, indent=2, ensure_ascii=False)
# Generate summary stats
stats_file = os.path.join(output_folder, f"stats_{timestamp_str}.txt")
# Calculate stats
user_stats = defaultdict(lambda: {'total_time': 0, 'sessions': 0, 'channels': set(), 'guilds': set()})
channel_stats = defaultdict(lambda: {'total_time': 0, 'unique_users': set()})
guild_stats = defaultdict(lambda: {'total_time': 0, 'unique_users': set()})
for session in self.session_log:
user_id = session['user_id']
username = session['username']
channel_id = session['channel_id']
channel_name = session['channel_name']
guild_name = session.get('guild_name', 'Unknown')
duration = session['duration_seconds']
user_stats[username]['total_time'] += duration
user_stats[username]['sessions'] += 1
user_stats[username]['channels'].add(channel_name)
user_stats[username]['guilds'].add(guild_name)
channel_stats[f"{guild_name} > {channel_name}"]['total_time'] += duration
channel_stats[f"{guild_name} > {channel_name}"]['unique_users'].add(username)
guild_stats[guild_name]['total_time'] += duration
guild_stats[guild_name]['unique_users'].add(username)
with open(stats_file, 'w', encoding='utf-8') as f:
f.write("Voice Activity Statistics\n")
f.write("=" * 60 + "\n\n")
f.write(f"Total Sessions: {len(self.session_log)}\n")
f.write(f"Unique Users: {len(user_stats)}\n")
f.write(f"Servers Tracked: {len(guild_stats)}\n\n")
if len(guild_stats) > 1:
f.write("\nServer Activity:\n")
f.write("-" * 60 + "\n")
sorted_guilds = sorted(guild_stats.items(), key=lambda x: x[1]['total_time'], reverse=True)
for guild_name, stats in sorted_guilds:
hours = stats['total_time'] / 3600
f.write(f"{guild_name}:\n")
f.write(f" Total Time: {hours:.2f} hours\n")
f.write(f" Unique Users: {len(stats['unique_users'])}\n\n")
f.write("\nMost Active Users (by time):\n")
f.write("-" * 60 + "\n")
sorted_users = sorted(user_stats.items(), key=lambda x: x[1]['total_time'], reverse=True)
for username, stats in sorted_users[:10]:
hours = stats['total_time'] / 3600
f.write(f"{username}:\n")
f.write(f" Total Time: {hours:.2f} hours\n")
f.write(f" Sessions: {stats['sessions']}\n")
if len(stats['guilds']) > 1:
f.write(f" Servers: {', '.join(stats['guilds'])}\n")
f.write(f" Channels: {', '.join(stats['channels'])}\n\n")
f.write("\nChannel Activity:\n")
f.write("-" * 60 + "\n")
sorted_channels = sorted(channel_stats.items(), key=lambda x: x[1]['total_time'], reverse=True)
for channel_name, stats in sorted_channels:
hours = stats['total_time'] / 3600
f.write(f"{channel_name}:\n")
f.write(f" Total Time: {hours:.2f} hours\n")
f.write(f" Unique Users: {len(stats['unique_users'])}\n\n")
# Save detailed log
log_file = os.path.join(output_folder, f"detailed_log_{timestamp_str}.txt")
with open(log_file, 'w', encoding='utf-8') as f:
f.write("Detailed Voice Activity Log\n")
f.write("=" * 80 + "\n\n")
for session in sorted(self.session_log, key=lambda x: x['joined_at']):
ongoing = session.get('ongoing', False)
duration_min = session['duration_seconds'] / 60
f.write(f"User: {session['username']}\n")
f.write(f"Server: {session.get('guild_name', 'Unknown')}\n")
f.write(f"Channel: {session['channel_name']}\n")
f.write(f"Joined: {session['joined_at']}\n")
f.write(f"Left: {session['left_at']}")
if ongoing:
f.write(" (ongoing when tracking stopped)")
f.write("\n")
f.write(f"Duration: {duration_min:.1f} minutes\n")
f.write("-" * 80 + "\n\n")
print("\n" + "=" * 60)
print(f"Logs saved to: {output_folder}")
print(f" - sessions_{timestamp_str}.json (raw data)")
print(f" - stats_{timestamp_str}.txt (summary)")
print(f" - detailed_log_{timestamp_str}.txt (full log)")
def main():
print("Voice Activity Tracker")
print("=" * 60)
print("Track voice activity with filters\n")
print("1. Track specific server(s)")
print("2. Track specific user(s)")
print("3. Track all servers")
print("4. Track all servers except some")
print("=" * 60)
choice = input("Choice: ").strip()
guild_ids = None
user_ids = None
exclude_guilds = None
if choice == '1':
# Track specific guilds
guild_input = input("\nEnter guild ID(s) (comma-separated for multiple): ").strip()
guild_ids = [g.strip() for g in guild_input.split(',') if g.strip()]
if not all(g.isdigit() for g in guild_ids):
print("Invalid guild ID(s)")
return
elif choice == '2':
# Track specific users
user_input = input("\nEnter user ID(s) (comma-separated for multiple): ").strip()
user_ids = [u.strip() for u in user_input.split(',') if u.strip()]
if not all(u.isdigit() for u in user_ids):
print("Invalid user ID(s)")
return
elif choice == '3':
# Track all servers
print("\nTracking all servers...")
elif choice == '4':
# Exclude specific servers
exclude_input = input("\nEnter guild ID(s) to EXCLUDE (comma-separated): ").strip()
exclude_guilds = [g.strip() for g in exclude_input.split(',') if g.strip()]
if not all(g.isdigit() for g in exclude_guilds):
print("Invalid guild ID(s)")
return
else:
print("Invalid choice")
return
tracker = VoiceActivityTracker(
guild_ids=guild_ids,
user_ids=user_ids,
exclude_guilds=exclude_guilds
)
# Show what we're tracking
print("\n" + "=" * 60)
if guild_ids:
print(f"Tracking {len(guild_ids)} specific server(s)")
for gid in guild_ids:
name = tracker.get_guild_info(gid)
print(f" - {name}")
elif user_ids:
print(f"Tracking {len(user_ids)} specific user(s) across all servers")
elif exclude_guilds:
print(f"Tracking all servers except {len(exclude_guilds)}")
else:
print("Tracking all servers")
print("\nPress Ctrl+C to stop tracking and save logs\n")
print("=" * 60)
try:
asyncio.run(tracker.track_voice_activity())
except KeyboardInterrupt:
print("\n\nStopping tracker...")
finally:
tracker.save_logs()
if __name__ == "__main__":
main()