Major Features Added: - Complete token bucket rate limiting for IRC commands and web interface - Per-user rate tracking with category-based limits (Basic, Gameplay, Management, Admin, Web) - Admin commands for rate limit management (\!rate_stats, \!rate_user, \!rate_unban, \!rate_reset) - Automatic violation tracking and temporary bans with cleanup - Global item spawn multiplier system with 75% spawn rate reduction - Central admin configuration system (config.py) - One-command bot startup script (start_petbot.sh) Rate Limiting: - Token bucket algorithm with burst capacity and refill rates - Category limits: Basic (20/min), Gameplay (10/min), Management (5/min), Web (60/min) - Graceful violation handling with user-friendly error messages - Admin exemption and override capabilities - Background cleanup of old violations and expired bans Item Spawn System: - Added global_spawn_multiplier to config/items.json for easy adjustment - Reduced all individual spawn rates by 75% (multiplied by 0.25) - Admins can fine-tune both global multiplier and individual item rates - Game engine integration applies multiplier to all spawn calculations Infrastructure: - Single admin user configuration in config.py - Enhanced startup script with dependency management and verification - Updated documentation and help system with rate limiting guide - Comprehensive test suite for rate limiting functionality Security: - Rate limiting protects against command spam and abuse - IP-based tracking for web interface requests - Proper error handling and status codes (429 for rate limits) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
171 lines
No EOL
7.2 KiB
Python
171 lines
No EOL
7.2 KiB
Python
#!/usr/bin/env python3
|
||
"""Admin commands module for PetBot"""
|
||
|
||
import sys
|
||
import os
|
||
|
||
# Add parent directory to path for config import
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from .base_module import BaseModule
|
||
from config import ADMIN_USER # Import admin user from central config
|
||
|
||
# =============================================================================
|
||
# ADMIN CONFIGURATION
|
||
# =============================================================================
|
||
# To change the admin user, edit config.py in the project root
|
||
# Current admin user: {ADMIN_USER}
|
||
# =============================================================================
|
||
|
||
class Admin(BaseModule):
|
||
"""Handles admin-only commands like reload"""
|
||
|
||
def get_commands(self):
|
||
return ["reload", "rate_stats", "rate_user", "rate_unban", "rate_reset"]
|
||
|
||
async def handle_command(self, channel, nickname, command, args):
|
||
if command == "reload":
|
||
await self.cmd_reload(channel, nickname)
|
||
elif command == "rate_stats":
|
||
await self.cmd_rate_stats(channel, nickname)
|
||
elif command == "rate_user":
|
||
await self.cmd_rate_user(channel, nickname, args)
|
||
elif command == "rate_unban":
|
||
await self.cmd_rate_unban(channel, nickname, args)
|
||
elif command == "rate_reset":
|
||
await self.cmd_rate_reset(channel, nickname, args)
|
||
|
||
async def cmd_reload(self, channel, nickname):
|
||
"""Reload bot modules (admin only)"""
|
||
if not self.is_admin(nickname):
|
||
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
|
||
return
|
||
|
||
try:
|
||
# Trigger module reload in main bot
|
||
success = await self.bot.reload_modules()
|
||
if success:
|
||
self.send_message(channel, f"{nickname}: ✅ Modules reloaded successfully!")
|
||
else:
|
||
self.send_message(channel, f"{nickname}: ❌ Module reload failed!")
|
||
except Exception as e:
|
||
self.send_message(channel, f"{nickname}: ❌ Reload error: {str(e)}")
|
||
|
||
def is_admin(self, nickname):
|
||
"""Check if user is admin"""
|
||
return nickname.lower() == ADMIN_USER.lower()
|
||
|
||
async def cmd_rate_stats(self, channel, nickname):
|
||
"""Show global rate limiting statistics"""
|
||
if not self.is_admin(nickname):
|
||
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
|
||
return
|
||
|
||
if not self.bot.rate_limiter:
|
||
self.send_message(channel, f"{nickname}: Rate limiter not available.")
|
||
return
|
||
|
||
try:
|
||
stats = self.bot.rate_limiter.get_global_stats()
|
||
|
||
response = f"{nickname}: 📊 Rate Limiter Stats:\n"
|
||
response += f"• Status: {'Enabled' if stats['enabled'] else 'Disabled'}\n"
|
||
response += f"• Requests this minute: {stats['requests_this_minute']}\n"
|
||
response += f"• Active users: {stats['active_users']}\n"
|
||
response += f"• Total requests: {stats['total_requests']}\n"
|
||
response += f"• Blocked requests: {stats['blocked_requests']}\n"
|
||
response += f"• Banned users: {stats['banned_users']}\n"
|
||
response += f"• Tracked users: {stats['tracked_users']}\n"
|
||
response += f"• Total violations: {stats['total_violations']}"
|
||
|
||
self.send_message(channel, response)
|
||
|
||
except Exception as e:
|
||
self.send_message(channel, f"{nickname}: ❌ Error getting rate stats: {str(e)}")
|
||
|
||
async def cmd_rate_user(self, channel, nickname, args):
|
||
"""Show rate limiting stats for a specific user"""
|
||
if not self.is_admin(nickname):
|
||
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
|
||
return
|
||
|
||
if not args:
|
||
self.send_message(channel, f"{nickname}: Usage: !rate_user <username>")
|
||
return
|
||
|
||
if not self.bot.rate_limiter:
|
||
self.send_message(channel, f"{nickname}: Rate limiter not available.")
|
||
return
|
||
|
||
try:
|
||
target_user = args[0]
|
||
stats = self.bot.rate_limiter.get_user_stats(target_user)
|
||
|
||
response = f"{nickname}: 👤 Rate Stats for {stats['user']}:\n"
|
||
response += f"• Admin exemption: {'Yes' if stats['admin_exemption'] else 'No'}\n"
|
||
response += f"• Currently banned: {'Yes' if stats['is_banned'] else 'No'}\n"
|
||
if stats['ban_expires']:
|
||
response += f"• Ban expires: {stats['ban_expires']}\n"
|
||
response += f"• Total violations: {stats['violations']}\n"
|
||
if stats['buckets']:
|
||
response += f"• Available tokens: {stats['buckets']['tokens']}\n"
|
||
response += f"• Last request: {stats['buckets']['last_request']}"
|
||
else:
|
||
response += "• No recent activity"
|
||
|
||
self.send_message(channel, response)
|
||
|
||
except Exception as e:
|
||
self.send_message(channel, f"{nickname}: ❌ Error getting user stats: {str(e)}")
|
||
|
||
async def cmd_rate_unban(self, channel, nickname, args):
|
||
"""Manually unban a user from rate limiting"""
|
||
if not self.is_admin(nickname):
|
||
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
|
||
return
|
||
|
||
if not args:
|
||
self.send_message(channel, f"{nickname}: Usage: !rate_unban <username>")
|
||
return
|
||
|
||
if not self.bot.rate_limiter:
|
||
self.send_message(channel, f"{nickname}: Rate limiter not available.")
|
||
return
|
||
|
||
try:
|
||
target_user = args[0]
|
||
success = self.bot.rate_limiter.unban_user(target_user)
|
||
|
||
if success:
|
||
self.send_message(channel, f"{nickname}: ✅ User {target_user} has been unbanned.")
|
||
else:
|
||
self.send_message(channel, f"{nickname}: ℹ️ User {target_user} was not banned.")
|
||
|
||
except Exception as e:
|
||
self.send_message(channel, f"{nickname}: ❌ Error unbanning user: {str(e)}")
|
||
|
||
async def cmd_rate_reset(self, channel, nickname, args):
|
||
"""Reset rate limiting violations for a user"""
|
||
if not self.is_admin(nickname):
|
||
self.send_message(channel, f"{nickname}: Access denied. Admin command.")
|
||
return
|
||
|
||
if not args:
|
||
self.send_message(channel, f"{nickname}: Usage: !rate_reset <username>")
|
||
return
|
||
|
||
if not self.bot.rate_limiter:
|
||
self.send_message(channel, f"{nickname}: Rate limiter not available.")
|
||
return
|
||
|
||
try:
|
||
target_user = args[0]
|
||
success = self.bot.rate_limiter.reset_user_violations(target_user)
|
||
|
||
if success:
|
||
self.send_message(channel, f"{nickname}: ✅ Violations reset for user {target_user}.")
|
||
else:
|
||
self.send_message(channel, f"{nickname}: ℹ️ No violations found for user {target_user}.")
|
||
|
||
except Exception as e:
|
||
self.send_message(channel, f"{nickname}: ❌ Error resetting violations: {str(e)}") |