#!/usr/bin/env python3 """ PetBot with Advanced IRC Connection Management Includes automatic reconnection, health monitoring, and graceful error handling. """ import asyncio import sys import os import importlib import logging import signal from datetime import datetime # Add the project directory to the path sys.path.append(os.path.dirname(os.path.abspath(__file__))) from src.database import Database from src.game_engine import GameEngine from src.irc_connection_manager import IRCConnectionManager, ConnectionState from src.rate_limiter import RateLimiter, get_command_category from modules import CoreCommands, Exploration, BattleSystem, PetManagement, Achievements, Admin, Inventory, GymBattles, TeamBuilder from webserver import PetBotWebServer from config import IRC_CONFIG, RATE_LIMIT_CONFIG class PetBotWithReconnect: """ Enhanced PetBot with robust IRC connection management. """ def __init__(self): # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) self.logger.info("šŸ¤– PetBot with Auto-Reconnect - Initializing...") # Core components self.database = Database() self.game_engine = GameEngine(self.database) self.config = IRC_CONFIG # Connection and state management self.connection_manager = None self.running = False self.shutdown_requested = False # Module management self.modules = {} self.command_map = {} self.active_encounters = {} # Web server self.web_server = None # Rate limiting self.rate_limiter = None # Statistics self.startup_time = datetime.now() self.command_count = 0 self.connection_events = [] self.logger.info("āœ… Basic initialization complete") async def initialize(self): """Initialize all async components.""" try: self.logger.info("šŸ”„ Initializing async components...") # Initialize database self.logger.info("šŸ”„ Initializing database...") await self.database.init_database() self.logger.info("āœ… Database initialized") # Load game data self.logger.info("šŸ”„ Loading game data...") await self.game_engine.load_game_data() self.logger.info("āœ… Game data loaded") # Load modules self.logger.info("šŸ”„ Loading command modules...") await self.load_modules() self.logger.info("āœ… Modules loaded") # Validate player data self.logger.info("šŸ”„ Validating player data...") await self.validate_all_player_data() self.logger.info("āœ… Player data validation complete") # Initialize rate limiter with config self.logger.info("šŸ”„ Initializing rate limiter...") self.rate_limiter = RateLimiter(RATE_LIMIT_CONFIG) self.logger.info("āœ… Rate limiter initialized") # Start web server self.logger.info("šŸ”„ Starting web server...") self.web_server = PetBotWebServer(self.database, port=8080, bot=self) self.web_server.start_in_thread() self.logger.info("āœ… Web server started on port 8080") # Initialize connection manager self.logger.info("šŸ”„ Initializing IRC connection manager...") self.connection_manager = IRCConnectionManager(self.config, self) self.connection_manager.set_callbacks( on_connect=self.on_irc_connect, on_disconnect=self.on_irc_disconnect, on_message=self.on_irc_message, on_connection_lost=self.on_connection_lost ) self.logger.info("āœ… IRC connection manager initialized") # Start background tasks self.logger.info("šŸ”„ Starting background tasks...") asyncio.create_task(self.background_validation_task()) asyncio.create_task(self.connection_stats_task()) self.logger.info("āœ… Background tasks started") self.logger.info("šŸŽ‰ All components initialized successfully!") except Exception as e: self.logger.error(f"āŒ Initialization failed: {e}") raise async def load_modules(self): """Load all command modules.""" module_classes = [ CoreCommands, Exploration, BattleSystem, PetManagement, Achievements, Admin, Inventory, GymBattles, TeamBuilder ] self.modules = {} self.command_map = {} for module_class in module_classes: try: module_name = module_class.__name__ self.logger.info(f" Loading {module_name}...") module_instance = module_class(self, self.database, self.game_engine) self.modules[module_name] = module_instance # Map commands to modules commands = module_instance.get_commands() for command in commands: self.command_map[command] = module_instance self.logger.info(f" āœ… {module_name}: {len(commands)} commands") except Exception as e: self.logger.error(f" āŒ Failed to load {module_name}: {e}") raise self.logger.info(f"āœ… Loaded {len(self.modules)} modules with {len(self.command_map)} commands") async def reload_modules(self): """Reload all modules (for admin use).""" try: self.logger.info("šŸ”„ Reloading modules...") # Reload module files import modules importlib.reload(modules.core_commands) importlib.reload(modules.exploration) importlib.reload(modules.battle_system) importlib.reload(modules.pet_management) importlib.reload(modules.achievements) importlib.reload(modules.admin) importlib.reload(modules.inventory) importlib.reload(modules.gym_battles) importlib.reload(modules.team_builder) importlib.reload(modules) # Reinitialize modules await self.load_modules() self.logger.info("āœ… Modules reloaded successfully") return True except Exception as e: self.logger.error(f"āŒ Module reload failed: {e}") return False async def validate_all_player_data(self): """Validate and refresh all player data.""" try: import aiosqlite async with aiosqlite.connect(self.database.db_path) as db: cursor = await db.execute("SELECT id, nickname FROM players") players = await cursor.fetchall() self.logger.info(f"šŸ”„ Validating {len(players)} players...") for player_id, nickname in players: try: # Check and award missing achievements new_achievements = await self.game_engine.check_all_achievements(player_id) if new_achievements: self.logger.info(f" šŸ† {nickname}: Restored {len(new_achievements)} achievements") # Validate team composition team_composition = await self.database.get_team_composition(player_id) if team_composition["active_pets"] == 0 and team_composition["total_pets"] > 0: pets = await self.database.get_player_pets(player_id) if pets: first_pet = pets[0] await self.database.activate_pet(player_id, str(first_pet["id"])) self.logger.info(f" šŸ”§ {nickname}: Auto-activated pet {first_pet['nickname'] or first_pet['species_name']}") except Exception as e: self.logger.error(f" āŒ Error validating {nickname}: {e}") self.logger.info("āœ… Player data validation complete") except Exception as e: self.logger.error(f"āŒ Player data validation failed: {e}") async def background_validation_task(self): """Background task for periodic validation.""" while self.running: try: await asyncio.sleep(1800) # Run every 30 minutes if self.running: self.logger.info("šŸ”„ Running periodic validation...") await self.validate_all_player_data() except asyncio.CancelledError: break except Exception as e: self.logger.error(f"āŒ Background validation error: {e}") async def connection_stats_task(self): """Background task for connection statistics.""" while self.running: try: await asyncio.sleep(300) # Log stats every 5 minutes if self.running and self.connection_manager: stats = self.connection_manager.get_connection_stats() if stats["connected"]: self.logger.info(f"šŸ“Š Connection stats: {stats['message_count']} messages, {stats['total_reconnections']} reconnections") except asyncio.CancelledError: break except Exception as e: self.logger.error(f"āŒ Connection stats error: {e}") async def on_irc_connect(self): """Called when IRC connection is established.""" self.logger.info("šŸŽ‰ IRC connection established successfully!") self.connection_events.append({"type": "connect", "time": datetime.now()}) # Send welcome message to channel if self.connection_manager: await self.connection_manager.send_message( self.config["channel"], "šŸ¤– PetBot is online and ready for commands! Use !help to get started." ) async def on_irc_disconnect(self, error): """Called when IRC connection is lost.""" self.logger.warning(f"šŸ’” IRC connection lost: {error}") self.connection_events.append({"type": "disconnect", "time": datetime.now(), "error": str(error)}) async def on_connection_lost(self, error): """Called when connection is lost and reconnection will be attempted.""" self.logger.warning(f"šŸ”„ Connection lost, will attempt reconnection: {error}") self.connection_events.append({"type": "reconnect_attempt", "time": datetime.now(), "error": str(error)}) async def on_irc_message(self, line): """Called when IRC message is received.""" # Handle private messages and channel messages parts = line.split() if len(parts) < 4: return if parts[1] == "PRIVMSG": channel = parts[2] message = " ".join(parts[3:])[1:] # Remove leading ':' # Extract nickname from hostmask hostmask = parts[0][1:] # Remove leading ':' nickname = hostmask.split('!')[0] # Handle commands if message.startswith(self.config["command_prefix"]): self.logger.info(f"šŸŽ® Command from {nickname}: {message}") await self.handle_command(channel, nickname, message) async def handle_command(self, channel, nickname, message): """Handle IRC commands with rate limiting.""" from modules.base_module import BaseModule command_parts = message[1:].split() if not command_parts: return command = BaseModule.normalize_input(command_parts[0]) args = BaseModule.normalize_input(command_parts[1:]) try: # Check rate limit first if self.rate_limiter: category = get_command_category(command) allowed, rate_limit_message = await self.rate_limiter.check_rate_limit( nickname, category, command ) if not allowed: await self.send_message(channel, f"{nickname}: {rate_limit_message}") return self.command_count += 1 if command in self.command_map: module = self.command_map[command] self.logger.info(f"šŸ”§ Executing {command} via {module.__class__.__name__}") await module.handle_command(channel, nickname, command, args) else: await self.send_message(channel, f"{nickname}: Unknown command. Use !help for available commands.") except Exception as e: self.logger.error(f"āŒ Command error: {e}") await self.send_message(channel, f"{nickname}: Error processing command: {str(e)}") async def send_message(self, target, message): """Send message via connection manager.""" if self.connection_manager: success = await self.connection_manager.send_message(target, message) if not success: self.logger.warning(f"Failed to send message to {target}: {message}") else: self.logger.warning(f"No connection manager available to send message to {target}") def send_message_sync(self, target, message): """Synchronous wrapper for send_message (for compatibility with old modules).""" if hasattr(self, 'loop') and self.loop and self.loop.is_running(): # Schedule the coroutine to run in the existing event loop asyncio.create_task(self.send_message(target, message)) else: # Fallback - try to get current loop try: loop = asyncio.get_event_loop() if loop.is_running(): asyncio.create_task(self.send_message(target, message)) else: loop.run_until_complete(self.send_message(target, message)) except Exception as e: self.logger.error(f"Failed to send message synchronously: {e}") async def send_team_builder_pin(self, nickname, pin_code): """Send team builder PIN via private message.""" message = f"šŸ” Team Builder PIN: {pin_code} (expires in 10 minutes)" await self.send_message(nickname, message) def get_bot_stats(self): """Get comprehensive bot statistics.""" uptime = datetime.now() - self.startup_time connection_stats = {} if self.connection_manager: connection_stats = self.connection_manager.get_connection_stats() return { "uptime": str(uptime), "startup_time": self.startup_time, "command_count": self.command_count, "loaded_modules": len(self.modules), "available_commands": len(self.command_map), "connection_events": len(self.connection_events), "connection_stats": connection_stats, "running": self.running } async def start(self): """Start the bot.""" self.running = True try: # Initialize all components await self.initialize() # Start connection manager self.logger.info("šŸš€ Starting IRC connection manager...") await self.connection_manager.start() except Exception as e: self.logger.error(f"āŒ Bot startup failed: {e}") raise finally: await self.shutdown() async def shutdown(self): """Gracefully shutdown the bot.""" if self.shutdown_requested: return self.shutdown_requested = True self.logger.info("šŸ”„ Shutting down bot...") # Stop main loop self.running = False # Stop connection manager if self.connection_manager: await self.connection_manager.stop() # Shutdown rate limiter if self.rate_limiter: try: await self.rate_limiter.shutdown() except Exception as e: self.logger.error(f"Error shutting down rate limiter: {e}") # Shutdown game engine if self.game_engine: try: await self.game_engine.shutdown() except Exception as e: self.logger.error(f"Error shutting down game engine: {e}") # Stop web server if self.web_server: try: # Web server doesn't have async shutdown, so we'll just log it self.logger.info("šŸ”„ Web server shutdown (handled by thread)") except Exception as e: self.logger.error(f"Error shutting down web server: {e}") self.logger.info("āœ… Bot shutdown complete") async def main(): """Main entry point.""" bot = PetBotWithReconnect() # Make bot instance globally accessible for webserver sys.modules[__name__].bot_instance = bot # Setup signal handlers for graceful shutdown def signal_handler(signum, frame): bot.logger.info(f"Received signal {signum}, initiating shutdown...") asyncio.create_task(bot.shutdown()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: await bot.start() except KeyboardInterrupt: bot.logger.info("šŸ›‘ Keyboard interrupt received") except Exception as e: bot.logger.error(f"āŒ Bot crashed: {e}") import traceback traceback.print_exc() finally: await bot.shutdown() if __name__ == "__main__": print("🐾 Starting PetBot with Auto-Reconnect...") try: asyncio.run(main()) except KeyboardInterrupt: print("\nšŸ”„ Bot stopping...") except Exception as e: print(f"āŒ Fatal error: {e}") import traceback traceback.print_exc() finally: print("āœ… Bot stopped")