# discord_tools/scripts/voice_activity_tracker.py import os import sys import asyncio import websockets import json from datetime import datetime from collections import defaultdict # Add the parent directory to the Python path script_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(script_dir)) sys.path.insert(0, project_root) from discord_tools.config.settings import DISCORD_TOKEN, DATA_DIR from discord_tools.utils.api_utils import make_discord_request class VoiceActivityTracker: def __init__(self, guild_ids=None, user_ids=None, exclude_guilds=None): # Support multiple guilds or specific users self.guild_ids = guild_ids if isinstance(guild_ids, list) else ([guild_ids] if guild_ids else []) self.user_ids = user_ids if isinstance(user_ids, list) else ([user_ids] if user_ids else []) self.exclude_guilds = exclude_guilds if isinstance(exclude_guilds, list) else ([exclude_guilds] if exclude_guilds else []) self.voice_states = {} # user_id -> {channel_id, joined_at, user_info, guild_id} self.session_log = [] # List of all voice sessions self.active = True self.guild_names = {} # Cache guild names self.channel_names = {} # Cache channel names def get_guild_info(self, guild_id): # Get guild name and cache it if guild_id in self.guild_names: return self.guild_names[guild_id] response = make_discord_request('GET', f'/guilds/{guild_id}') if response: guild_data = response.json() self.guild_names[guild_id] = guild_data['name'] return guild_data['name'] self.guild_names[guild_id] = f'Guild-{guild_id}' return f'Guild-{guild_id}' def get_voice_channels(self, guild_id): # Get all voice channels in a guild response = make_discord_request('GET', f'/guilds/{guild_id}/channels') if not response: return {} channels = response.json() voice_channels = {} for channel in channels: # Type 2 = voice channel, Type 13 = stage channel if channel['type'] in [2, 13]: voice_channels[channel['id']] = channel['name'] self.channel_names[channel['id']] = channel['name'] return voice_channels def should_track_event(self, guild_id, user_id): # Check if we should track this event based on filters # Check excluded guilds if guild_id in self.exclude_guilds: return False # If specific guilds set, only track those if self.guild_ids and guild_id not in self.guild_ids: return False # If specific users set, only track those if self.user_ids and user_id not in self.user_ids: return False return True def handle_voice_state_update(self, data): # Process voice state changes user_id = data['user_id'] channel_id = data.get('channel_id') # None if user left voice guild_id = data.get('guild_id') # Check if we should track this event if not self.should_track_event(guild_id, user_id): return timestamp = datetime.now() # User info member = data.get('member', {}) user = member.get('user', data.get('user', {})) username = f"{user.get('username', 'Unknown')}#{user.get('discriminator', '0000')}" # Get guild and channel names guild_name = self.get_guild_info(guild_id) # Build unique key for user+guild combo state_key = f"{user_id}:{guild_id}" # Check if user was already in a voice channel in this guild if state_key in self.voice_states: old_state = self.voice_states[state_key] # User left voice completely if channel_id is None: duration = (timestamp - old_state['joined_at']).total_seconds() session = { 'user_id': user_id, 'username': username, 'guild_id': guild_id, 'guild_name': guild_name, 'channel_id': old_state['channel_id'], 'channel_name': old_state.get('channel_name', 'Unknown'), 'joined_at': old_state['joined_at'].isoformat(), 'left_at': timestamp.isoformat(), 'duration_seconds': duration } self.session_log.append(session) del self.voice_states[state_key] print(f"[-] {username} left {old_state.get('channel_name', 'Unknown')} in {guild_name} (stayed {duration:.0f}s)") # User switched channels elif channel_id != old_state['channel_id']: duration = (timestamp - old_state['joined_at']).total_seconds() session = { 'user_id': user_id, 'username': username, 'guild_id': guild_id, 'guild_name': guild_name, 'channel_id': old_state['channel_id'], 'channel_name': old_state.get('channel_name', 'Unknown'), 'joined_at': old_state['joined_at'].isoformat(), 'left_at': timestamp.isoformat(), 'duration_seconds': duration } self.session_log.append(session) # Get new channel name if channel_id not in self.channel_names: self.get_voice_channels(guild_id) new_channel_name = self.channel_names.get(channel_id, f'Channel-{channel_id}') self.voice_states[state_key] = { 'channel_id': channel_id, 'channel_name': new_channel_name, 'guild_id': guild_id, 'guild_name': guild_name, 'joined_at': timestamp, 'username': username } print(f"[→] {username} moved from {old_state.get('channel_name', 'Unknown')} to {new_channel_name} in {guild_name}") # User joined voice (wasn't in voice before) elif channel_id is not None: if channel_id not in self.channel_names: self.get_voice_channels(guild_id) channel_name = self.channel_names.get(channel_id, f'Channel-{channel_id}') self.voice_states[state_key] = { 'channel_id': channel_id, 'channel_name': channel_name, 'guild_id': guild_id, 'guild_name': guild_name, 'joined_at': timestamp, 'username': username } print(f"[+] {username} joined {channel_name} in {guild_name}") async def track_voice_activity(self): # Connect to Discord gateway and track voice state changes gateway_url = "wss://gateway.discord.gg/?v=9&encoding=json" async with websockets.connect(gateway_url, max_size=16 * 1024 * 1024) as ws: # Receive Hello hello = json.loads(await ws.recv()) heartbeat_interval = hello['d']['heartbeat_interval'] # Send Identify identify = { "op": 2, "d": { "token": DISCORD_TOKEN, "properties": { "$os": "windows", "$browser": "chrome", "$device": "pc" }, "compress": False } } await ws.send(json.dumps(identify)) # Heartbeat task async def heartbeat(): while self.active: await asyncio.sleep(heartbeat_interval / 1000) if self.active: await ws.send(json.dumps({"op": 1, "d": None})) heartbeat_task = asyncio.create_task(heartbeat()) # Wait for READY ready = False while not ready: msg = json.loads(await ws.recv()) if msg.get('t') == 'READY': ready = True print("Connected to Discord Gateway") print("Tracking voice activity...") print("=" * 60) # Track voice state updates try: while self.active: msg = await ws.recv() data = json.loads(msg) event_type = data.get('t') if event_type == 'VOICE_STATE_UPDATE': self.handle_voice_state_update(data['d']) # Also handle initial guild members in voice elif event_type == 'GUILD_CREATE': guild_id = data['d']['id'] # Only process if we should track this guild if guild_id in self.exclude_guilds: continue if self.guild_ids and guild_id not in self.guild_ids: continue # Process initial voice states for voice_state in data['d'].get('voice_states', []): voice_state['guild_id'] = guild_id self.handle_voice_state_update(voice_state) except KeyboardInterrupt: print("\nStopping tracker...") self.active = False finally: heartbeat_task.cancel() def save_logs(self): # Save session logs to file if not self.session_log and not self.voice_states: print("No voice activity recorded") return # Close any active sessions timestamp = datetime.now() for state_key, state in self.voice_states.items(): duration = (timestamp - state['joined_at']).total_seconds() session = { 'user_id': state_key.split(':')[0], 'username': state['username'], 'guild_id': state['guild_id'], 'guild_name': state.get('guild_name', 'Unknown'), 'channel_id': state['channel_id'], 'channel_name': state.get('channel_name', 'Unknown'), 'joined_at': state['joined_at'].isoformat(), 'left_at': timestamp.isoformat(), 'duration_seconds': duration, 'ongoing': True } self.session_log.append(session) # Create output folder if self.guild_ids and len(self.guild_ids) == 1: folder_name = f"voice_activity_{self.guild_ids[0]}" elif self.user_ids: folder_name = f"voice_activity_user_{'_'.join(self.user_ids[:3])}" else: folder_name = "voice_activity_all" output_folder = os.path.join(DATA_DIR, folder_name) os.makedirs(output_folder, exist_ok=True) timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S') # Save raw JSON json_file = os.path.join(output_folder, f"sessions_{timestamp_str}.json") with open(json_file, 'w', encoding='utf-8') as f: json.dump(self.session_log, f, indent=2, ensure_ascii=False) # Generate summary stats stats_file = os.path.join(output_folder, f"stats_{timestamp_str}.txt") # Calculate stats user_stats = defaultdict(lambda: {'total_time': 0, 'sessions': 0, 'channels': set(), 'guilds': set()}) channel_stats = defaultdict(lambda: {'total_time': 0, 'unique_users': set()}) guild_stats = defaultdict(lambda: {'total_time': 0, 'unique_users': set()}) for session in self.session_log: user_id = session['user_id'] username = session['username'] channel_id = session['channel_id'] channel_name = session['channel_name'] guild_name = session.get('guild_name', 'Unknown') duration = session['duration_seconds'] user_stats[username]['total_time'] += duration user_stats[username]['sessions'] += 1 user_stats[username]['channels'].add(channel_name) user_stats[username]['guilds'].add(guild_name) channel_stats[f"{guild_name} > {channel_name}"]['total_time'] += duration channel_stats[f"{guild_name} > {channel_name}"]['unique_users'].add(username) guild_stats[guild_name]['total_time'] += duration guild_stats[guild_name]['unique_users'].add(username) with open(stats_file, 'w', encoding='utf-8') as f: f.write("Voice Activity Statistics\n") f.write("=" * 60 + "\n\n") f.write(f"Total Sessions: {len(self.session_log)}\n") f.write(f"Unique Users: {len(user_stats)}\n") f.write(f"Servers Tracked: {len(guild_stats)}\n\n") if len(guild_stats) > 1: f.write("\nServer Activity:\n") f.write("-" * 60 + "\n") sorted_guilds = sorted(guild_stats.items(), key=lambda x: x[1]['total_time'], reverse=True) for guild_name, stats in sorted_guilds: hours = stats['total_time'] / 3600 f.write(f"{guild_name}:\n") f.write(f" Total Time: {hours:.2f} hours\n") f.write(f" Unique Users: {len(stats['unique_users'])}\n\n") f.write("\nMost Active Users (by time):\n") f.write("-" * 60 + "\n") sorted_users = sorted(user_stats.items(), key=lambda x: x[1]['total_time'], reverse=True) for username, stats in sorted_users[:10]: hours = stats['total_time'] / 3600 f.write(f"{username}:\n") f.write(f" Total Time: {hours:.2f} hours\n") f.write(f" Sessions: {stats['sessions']}\n") if len(stats['guilds']) > 1: f.write(f" Servers: {', '.join(stats['guilds'])}\n") f.write(f" Channels: {', '.join(stats['channels'])}\n\n") f.write("\nChannel Activity:\n") f.write("-" * 60 + "\n") sorted_channels = sorted(channel_stats.items(), key=lambda x: x[1]['total_time'], reverse=True) for channel_name, stats in sorted_channels: hours = stats['total_time'] / 3600 f.write(f"{channel_name}:\n") f.write(f" Total Time: {hours:.2f} hours\n") f.write(f" Unique Users: {len(stats['unique_users'])}\n\n") # Save detailed log log_file = os.path.join(output_folder, f"detailed_log_{timestamp_str}.txt") with open(log_file, 'w', encoding='utf-8') as f: f.write("Detailed Voice Activity Log\n") f.write("=" * 80 + "\n\n") for session in sorted(self.session_log, key=lambda x: x['joined_at']): ongoing = session.get('ongoing', False) duration_min = session['duration_seconds'] / 60 f.write(f"User: {session['username']}\n") f.write(f"Server: {session.get('guild_name', 'Unknown')}\n") f.write(f"Channel: {session['channel_name']}\n") f.write(f"Joined: {session['joined_at']}\n") f.write(f"Left: {session['left_at']}") if ongoing: f.write(" (ongoing when tracking stopped)") f.write("\n") f.write(f"Duration: {duration_min:.1f} minutes\n") f.write("-" * 80 + "\n\n") print("\n" + "=" * 60) print(f"Logs saved to: {output_folder}") print(f" - sessions_{timestamp_str}.json (raw data)") print(f" - stats_{timestamp_str}.txt (summary)") print(f" - detailed_log_{timestamp_str}.txt (full log)") def main(): print("Voice Activity Tracker") print("=" * 60) print("Track voice activity with filters\n") print("1. Track specific server(s)") print("2. Track specific user(s)") print("3. Track all servers") print("4. Track all servers except some") print("=" * 60) choice = input("Choice: ").strip() guild_ids = None user_ids = None exclude_guilds = None if choice == '1': # Track specific guilds guild_input = input("\nEnter guild ID(s) (comma-separated for multiple): ").strip() guild_ids = [g.strip() for g in guild_input.split(',') if g.strip()] if not all(g.isdigit() for g in guild_ids): print("Invalid guild ID(s)") return elif choice == '2': # Track specific users user_input = input("\nEnter user ID(s) (comma-separated for multiple): ").strip() user_ids = [u.strip() for u in user_input.split(',') if u.strip()] if not all(u.isdigit() for u in user_ids): print("Invalid user ID(s)") return elif choice == '3': # Track all servers print("\nTracking all servers...") elif choice == '4': # Exclude specific servers exclude_input = input("\nEnter guild ID(s) to EXCLUDE (comma-separated): ").strip() exclude_guilds = [g.strip() for g in exclude_input.split(',') if g.strip()] if not all(g.isdigit() for g in exclude_guilds): print("Invalid guild ID(s)") return else: print("Invalid choice") return tracker = VoiceActivityTracker( guild_ids=guild_ids, user_ids=user_ids, exclude_guilds=exclude_guilds ) # Show what we're tracking print("\n" + "=" * 60) if guild_ids: print(f"Tracking {len(guild_ids)} specific server(s)") for gid in guild_ids: name = tracker.get_guild_info(gid) print(f" - {name}") elif user_ids: print(f"Tracking {len(user_ids)} specific user(s) across all servers") elif exclude_guilds: print(f"Tracking all servers except {len(exclude_guilds)}") else: print("Tracking all servers") print("\nPress Ctrl+C to stop tracking and save logs\n") print("=" * 60) try: asyncio.run(tracker.track_voice_activity()) except KeyboardInterrupt: print("\n\nStopping tracker...") finally: tracker.save_logs() if __name__ == "__main__": main()