Petbot/run_bot_with_reconnect.py
megaproxy 915aa00bea Implement comprehensive rate limiting system and item spawn configuration
Major Features Added:
- Complete token bucket rate limiting for IRC commands and web interface
- Per-user rate tracking with category-based limits (Basic, Gameplay, Management, Admin, Web)
- Admin commands for rate limit management (\!rate_stats, \!rate_user, \!rate_unban, \!rate_reset)
- Automatic violation tracking and temporary bans with cleanup
- Global item spawn multiplier system with 75% spawn rate reduction
- Central admin configuration system (config.py)
- One-command bot startup script (start_petbot.sh)

Rate Limiting:
- Token bucket algorithm with burst capacity and refill rates
- Category limits: Basic (20/min), Gameplay (10/min), Management (5/min), Web (60/min)
- Graceful violation handling with user-friendly error messages
- Admin exemption and override capabilities
- Background cleanup of old violations and expired bans

Item Spawn System:
- Added global_spawn_multiplier to config/items.json for easy adjustment
- Reduced all individual spawn rates by 75% (multiplied by 0.25)
- Admins can fine-tune both global multiplier and individual item rates
- Game engine integration applies multiplier to all spawn calculations

Infrastructure:
- Single admin user configuration in config.py
- Enhanced startup script with dependency management and verification
- Updated documentation and help system with rate limiting guide
- Comprehensive test suite for rate limiting functionality

Security:
- Rate limiting protects against command spam and abuse
- IP-based tracking for web interface requests
- Proper error handling and status codes (429 for rate limits)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-15 20:10:43 +00:00

482 lines
No EOL
18 KiB
Python

#!/usr/bin/env python3
"""
PetBot with Advanced IRC Connection Management
Includes automatic reconnection, health monitoring, and graceful error handling.
"""
import asyncio
import sys
import os
import importlib
import logging
import signal
from datetime import datetime
# Add the project directory to the path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from src.database import Database
from src.game_engine import GameEngine
from src.irc_connection_manager import IRCConnectionManager, ConnectionState
from src.rate_limiter import RateLimiter, get_command_category
from modules import CoreCommands, Exploration, BattleSystem, PetManagement, Achievements, Admin, Inventory, GymBattles, TeamBuilder
from webserver import PetBotWebServer
from config import IRC_CONFIG, RATE_LIMIT_CONFIG
class PetBotWithReconnect:
"""
Enhanced PetBot with robust IRC connection management.
"""
def __init__(self):
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
self.logger.info("🤖 PetBot with Auto-Reconnect - Initializing...")
# Core components
self.database = Database()
self.game_engine = GameEngine(self.database)
self.config = IRC_CONFIG
# Connection and state management
self.connection_manager = None
self.running = False
self.shutdown_requested = False
# Module management
self.modules = {}
self.command_map = {}
self.active_encounters = {}
# Web server
self.web_server = None
# Rate limiting
self.rate_limiter = None
# Statistics
self.startup_time = datetime.now()
self.command_count = 0
self.connection_events = []
self.logger.info("✅ Basic initialization complete")
async def initialize(self):
"""Initialize all async components."""
try:
self.logger.info("🔄 Initializing async components...")
# Initialize database
self.logger.info("🔄 Initializing database...")
await self.database.init_database()
self.logger.info("✅ Database initialized")
# Load game data
self.logger.info("🔄 Loading game data...")
await self.game_engine.load_game_data()
self.logger.info("✅ Game data loaded")
# Load modules
self.logger.info("🔄 Loading command modules...")
await self.load_modules()
self.logger.info("✅ Modules loaded")
# Validate player data
self.logger.info("🔄 Validating player data...")
await self.validate_all_player_data()
self.logger.info("✅ Player data validation complete")
# Initialize rate limiter with config
self.logger.info("🔄 Initializing rate limiter...")
self.rate_limiter = RateLimiter(RATE_LIMIT_CONFIG)
self.logger.info("✅ Rate limiter initialized")
# Start web server
self.logger.info("🔄 Starting web server...")
self.web_server = PetBotWebServer(self.database, port=8080, bot=self)
self.web_server.start_in_thread()
self.logger.info("✅ Web server started on port 8080")
# Initialize connection manager
self.logger.info("🔄 Initializing IRC connection manager...")
self.connection_manager = IRCConnectionManager(self.config, self)
self.connection_manager.set_callbacks(
on_connect=self.on_irc_connect,
on_disconnect=self.on_irc_disconnect,
on_message=self.on_irc_message,
on_connection_lost=self.on_connection_lost
)
self.logger.info("✅ IRC connection manager initialized")
# Start background tasks
self.logger.info("🔄 Starting background tasks...")
asyncio.create_task(self.background_validation_task())
asyncio.create_task(self.connection_stats_task())
self.logger.info("✅ Background tasks started")
self.logger.info("🎉 All components initialized successfully!")
except Exception as e:
self.logger.error(f"❌ Initialization failed: {e}")
raise
async def load_modules(self):
"""Load all command modules."""
module_classes = [
CoreCommands,
Exploration,
BattleSystem,
PetManagement,
Achievements,
Admin,
Inventory,
GymBattles,
TeamBuilder
]
self.modules = {}
self.command_map = {}
for module_class in module_classes:
try:
module_name = module_class.__name__
self.logger.info(f" Loading {module_name}...")
module_instance = module_class(self, self.database, self.game_engine)
self.modules[module_name] = module_instance
# Map commands to modules
commands = module_instance.get_commands()
for command in commands:
self.command_map[command] = module_instance
self.logger.info(f"{module_name}: {len(commands)} commands")
except Exception as e:
self.logger.error(f" ❌ Failed to load {module_name}: {e}")
raise
self.logger.info(f"✅ Loaded {len(self.modules)} modules with {len(self.command_map)} commands")
async def reload_modules(self):
"""Reload all modules (for admin use)."""
try:
self.logger.info("🔄 Reloading modules...")
# Reload module files
import modules
importlib.reload(modules.core_commands)
importlib.reload(modules.exploration)
importlib.reload(modules.battle_system)
importlib.reload(modules.pet_management)
importlib.reload(modules.achievements)
importlib.reload(modules.admin)
importlib.reload(modules.inventory)
importlib.reload(modules.gym_battles)
importlib.reload(modules.team_builder)
importlib.reload(modules)
# Reinitialize modules
await self.load_modules()
self.logger.info("✅ Modules reloaded successfully")
return True
except Exception as e:
self.logger.error(f"❌ Module reload failed: {e}")
return False
async def validate_all_player_data(self):
"""Validate and refresh all player data."""
try:
import aiosqlite
async with aiosqlite.connect(self.database.db_path) as db:
cursor = await db.execute("SELECT id, nickname FROM players")
players = await cursor.fetchall()
self.logger.info(f"🔄 Validating {len(players)} players...")
for player_id, nickname in players:
try:
# Check and award missing achievements
new_achievements = await self.game_engine.check_all_achievements(player_id)
if new_achievements:
self.logger.info(f" 🏆 {nickname}: Restored {len(new_achievements)} achievements")
# Validate team composition
team_composition = await self.database.get_team_composition(player_id)
if team_composition["active_pets"] == 0 and team_composition["total_pets"] > 0:
pets = await self.database.get_player_pets(player_id)
if pets:
first_pet = pets[0]
await self.database.activate_pet(player_id, str(first_pet["id"]))
self.logger.info(f" 🔧 {nickname}: Auto-activated pet {first_pet['nickname'] or first_pet['species_name']}")
except Exception as e:
self.logger.error(f" ❌ Error validating {nickname}: {e}")
self.logger.info("✅ Player data validation complete")
except Exception as e:
self.logger.error(f"❌ Player data validation failed: {e}")
async def background_validation_task(self):
"""Background task for periodic validation."""
while self.running:
try:
await asyncio.sleep(1800) # Run every 30 minutes
if self.running:
self.logger.info("🔄 Running periodic validation...")
await self.validate_all_player_data()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"❌ Background validation error: {e}")
async def connection_stats_task(self):
"""Background task for connection statistics."""
while self.running:
try:
await asyncio.sleep(300) # Log stats every 5 minutes
if self.running and self.connection_manager:
stats = self.connection_manager.get_connection_stats()
if stats["connected"]:
self.logger.info(f"📊 Connection stats: {stats['message_count']} messages, {stats['total_reconnections']} reconnections")
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"❌ Connection stats error: {e}")
async def on_irc_connect(self):
"""Called when IRC connection is established."""
self.logger.info("🎉 IRC connection established successfully!")
self.connection_events.append({"type": "connect", "time": datetime.now()})
# Send welcome message to channel
if self.connection_manager:
await self.connection_manager.send_message(
self.config["channel"],
"🤖 PetBot is online and ready for commands! Use !help to get started."
)
async def on_irc_disconnect(self, error):
"""Called when IRC connection is lost."""
self.logger.warning(f"💔 IRC connection lost: {error}")
self.connection_events.append({"type": "disconnect", "time": datetime.now(), "error": str(error)})
async def on_connection_lost(self, error):
"""Called when connection is lost and reconnection will be attempted."""
self.logger.warning(f"🔄 Connection lost, will attempt reconnection: {error}")
self.connection_events.append({"type": "reconnect_attempt", "time": datetime.now(), "error": str(error)})
async def on_irc_message(self, line):
"""Called when IRC message is received."""
# Handle private messages and channel messages
parts = line.split()
if len(parts) < 4:
return
if parts[1] == "PRIVMSG":
channel = parts[2]
message = " ".join(parts[3:])[1:] # Remove leading ':'
# Extract nickname from hostmask
hostmask = parts[0][1:] # Remove leading ':'
nickname = hostmask.split('!')[0]
# Handle commands
if message.startswith(self.config["command_prefix"]):
self.logger.info(f"🎮 Command from {nickname}: {message}")
await self.handle_command(channel, nickname, message)
async def handle_command(self, channel, nickname, message):
"""Handle IRC commands with rate limiting."""
from modules.base_module import BaseModule
command_parts = message[1:].split()
if not command_parts:
return
command = BaseModule.normalize_input(command_parts[0])
args = BaseModule.normalize_input(command_parts[1:])
try:
# Check rate limit first
if self.rate_limiter:
category = get_command_category(command)
allowed, rate_limit_message = await self.rate_limiter.check_rate_limit(
nickname, category, command
)
if not allowed:
await self.send_message(channel, f"{nickname}: {rate_limit_message}")
return
self.command_count += 1
if command in self.command_map:
module = self.command_map[command]
self.logger.info(f"🔧 Executing {command} via {module.__class__.__name__}")
await module.handle_command(channel, nickname, command, args)
else:
await self.send_message(channel, f"{nickname}: Unknown command. Use !help for available commands.")
except Exception as e:
self.logger.error(f"❌ Command error: {e}")
await self.send_message(channel, f"{nickname}: Error processing command: {str(e)}")
async def send_message(self, target, message):
"""Send message via connection manager."""
if self.connection_manager:
success = await self.connection_manager.send_message(target, message)
if not success:
self.logger.warning(f"Failed to send message to {target}: {message}")
else:
self.logger.warning(f"No connection manager available to send message to {target}")
def send_message_sync(self, target, message):
"""Synchronous wrapper for send_message (for compatibility with old modules)."""
if hasattr(self, 'loop') and self.loop and self.loop.is_running():
# Schedule the coroutine to run in the existing event loop
asyncio.create_task(self.send_message(target, message))
else:
# Fallback - try to get current loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.send_message(target, message))
else:
loop.run_until_complete(self.send_message(target, message))
except Exception as e:
self.logger.error(f"Failed to send message synchronously: {e}")
async def send_team_builder_pin(self, nickname, pin_code):
"""Send team builder PIN via private message."""
message = f"🔐 Team Builder PIN: {pin_code} (expires in 10 minutes)"
await self.send_message(nickname, message)
def get_bot_stats(self):
"""Get comprehensive bot statistics."""
uptime = datetime.now() - self.startup_time
connection_stats = {}
if self.connection_manager:
connection_stats = self.connection_manager.get_connection_stats()
return {
"uptime": str(uptime),
"startup_time": self.startup_time,
"command_count": self.command_count,
"loaded_modules": len(self.modules),
"available_commands": len(self.command_map),
"connection_events": len(self.connection_events),
"connection_stats": connection_stats,
"running": self.running
}
async def start(self):
"""Start the bot."""
self.running = True
try:
# Initialize all components
await self.initialize()
# Start connection manager
self.logger.info("🚀 Starting IRC connection manager...")
await self.connection_manager.start()
except Exception as e:
self.logger.error(f"❌ Bot startup failed: {e}")
raise
finally:
await self.shutdown()
async def shutdown(self):
"""Gracefully shutdown the bot."""
if self.shutdown_requested:
return
self.shutdown_requested = True
self.logger.info("🔄 Shutting down bot...")
# Stop main loop
self.running = False
# Stop connection manager
if self.connection_manager:
await self.connection_manager.stop()
# Shutdown rate limiter
if self.rate_limiter:
try:
await self.rate_limiter.shutdown()
except Exception as e:
self.logger.error(f"Error shutting down rate limiter: {e}")
# Shutdown game engine
if self.game_engine:
try:
await self.game_engine.shutdown()
except Exception as e:
self.logger.error(f"Error shutting down game engine: {e}")
# Stop web server
if self.web_server:
try:
# Web server doesn't have async shutdown, so we'll just log it
self.logger.info("🔄 Web server shutdown (handled by thread)")
except Exception as e:
self.logger.error(f"Error shutting down web server: {e}")
self.logger.info("✅ Bot shutdown complete")
async def main():
"""Main entry point."""
bot = PetBotWithReconnect()
# Make bot instance globally accessible for webserver
sys.modules[__name__].bot_instance = bot
# Setup signal handlers for graceful shutdown
def signal_handler(signum, frame):
bot.logger.info(f"Received signal {signum}, initiating shutdown...")
asyncio.create_task(bot.shutdown())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
await bot.start()
except KeyboardInterrupt:
bot.logger.info("🛑 Keyboard interrupt received")
except Exception as e:
bot.logger.error(f"❌ Bot crashed: {e}")
import traceback
traceback.print_exc()
finally:
await bot.shutdown()
if __name__ == "__main__":
print("🐾 Starting PetBot with Auto-Reconnect...")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n🔄 Bot stopping...")
except Exception as e:
print(f"❌ Fatal error: {e}")
import traceback
traceback.print_exc()
finally:
print("✅ Bot stopped")