Added normalize_input() function to BaseModule for consistent lowercase conversion of user input. Updated all command modules to use normalization for commands, arguments, pet names, location names, gym names, and item names. Players can now use any capitalization for commands and arguments. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
367 lines
No EOL
14 KiB
Python
367 lines
No EOL
14 KiB
Python
#!/usr/bin/env python3
|
|
import socket
|
|
import time
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import importlib
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from src.database import Database
|
|
from src.game_engine import GameEngine
|
|
from modules import CoreCommands, Exploration, BattleSystem, PetManagement, Achievements, Admin, Inventory, GymBattles, TeamBuilder
|
|
from webserver import PetBotWebServer
|
|
|
|
class PetBotDebug:
|
|
def __init__(self):
|
|
print("🤖 PetBot Debug Mode - Initializing...")
|
|
self.database = Database()
|
|
self.game_engine = GameEngine(self.database)
|
|
self.config = {
|
|
"server": "irc.libera.chat",
|
|
"port": 6667,
|
|
"nickname": "PetBot",
|
|
"channel": "#petz",
|
|
"command_prefix": "!"
|
|
}
|
|
self.socket = None
|
|
self.connected = False
|
|
self.running = True
|
|
self.active_encounters = {}
|
|
self.modules = {}
|
|
self.command_map = {}
|
|
self.web_server = None
|
|
print("✅ Basic initialization complete")
|
|
|
|
def initialize_async_components(self):
|
|
"""Initialize async components"""
|
|
print("🔄 Creating event loop...")
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
print("✅ Event loop created")
|
|
|
|
print("🔄 Initializing database...")
|
|
loop.run_until_complete(self.database.init_database())
|
|
print("✅ Database initialized")
|
|
|
|
print("🔄 Loading game data...")
|
|
loop.run_until_complete(self.game_engine.load_game_data())
|
|
print("✅ Game data loaded")
|
|
|
|
print("🔄 Loading modules...")
|
|
self.load_modules()
|
|
print("✅ Modules loaded")
|
|
|
|
print("🔄 Validating player data and achievements...")
|
|
loop.run_until_complete(self.validate_all_player_data())
|
|
print("✅ Player data validation complete")
|
|
|
|
print("🔄 Starting background validation task...")
|
|
self.start_background_validation(loop)
|
|
print("✅ Background validation started")
|
|
|
|
print("🔄 Starting web server...")
|
|
self.web_server = PetBotWebServer(self.database, port=8080)
|
|
self.web_server.start_in_thread()
|
|
print("✅ Web server started")
|
|
|
|
self.loop = loop
|
|
print("✅ Async components ready")
|
|
|
|
def load_modules(self):
|
|
"""Load all command modules"""
|
|
module_classes = [
|
|
CoreCommands,
|
|
Exploration,
|
|
BattleSystem,
|
|
PetManagement,
|
|
Achievements,
|
|
Admin,
|
|
Inventory,
|
|
GymBattles,
|
|
TeamBuilder
|
|
]
|
|
|
|
self.modules = {}
|
|
self.command_map = {}
|
|
|
|
for module_class in module_classes:
|
|
module_name = module_class.__name__
|
|
print(f" Loading {module_name}...")
|
|
module_instance = module_class(self, self.database, self.game_engine)
|
|
self.modules[module_name] = module_instance
|
|
|
|
# Map commands to modules
|
|
commands = module_instance.get_commands()
|
|
for command in commands:
|
|
self.command_map[command] = module_instance
|
|
print(f" ✅ {module_name}: {len(commands)} commands")
|
|
|
|
print(f"✅ Loaded {len(self.modules)} modules with {len(self.command_map)} commands")
|
|
|
|
async def validate_all_player_data(self):
|
|
"""Validate and refresh all player data on startup to prevent state loss"""
|
|
try:
|
|
# Get all players from database
|
|
import aiosqlite
|
|
async with aiosqlite.connect(self.database.db_path) as db:
|
|
cursor = await db.execute("SELECT id, nickname FROM players")
|
|
players = await cursor.fetchall()
|
|
|
|
print(f"🔄 Found {len(players)} players to validate...")
|
|
|
|
for player_id, nickname in players:
|
|
try:
|
|
# Check and award any missing achievements for each player
|
|
new_achievements = await self.game_engine.check_all_achievements(player_id)
|
|
|
|
if new_achievements:
|
|
print(f" 🏆 {nickname}: Restored {len(new_achievements)} missing achievements")
|
|
for achievement in new_achievements:
|
|
print(f" - {achievement['name']}")
|
|
else:
|
|
print(f" ✅ {nickname}: All achievements up to date")
|
|
|
|
# Validate team composition
|
|
team_composition = await self.database.get_team_composition(player_id)
|
|
if team_composition["active_pets"] == 0 and team_composition["total_pets"] > 0:
|
|
# Player has pets but none active - activate the first one
|
|
pets = await self.database.get_player_pets(player_id)
|
|
if pets:
|
|
first_pet = pets[0]
|
|
await self.database.activate_pet(player_id, str(first_pet["id"]))
|
|
print(f" 🔧 {nickname}: Auto-activated pet {first_pet['nickname'] or first_pet['species_name']} (no active pets)")
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Error validating {nickname}: {e}")
|
|
|
|
print("✅ All player data validated and updated")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error during player data validation: {e}")
|
|
# Don't fail startup if validation fails
|
|
|
|
def start_background_validation(self, loop):
|
|
"""Start background task to periodically validate player data"""
|
|
import asyncio
|
|
|
|
async def periodic_validation():
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(1800) # Run every 30 minutes
|
|
print("🔄 Running periodic player data validation...")
|
|
await self.validate_all_player_data()
|
|
except Exception as e:
|
|
print(f"❌ Error in background validation: {e}")
|
|
|
|
# Create background task
|
|
loop.create_task(periodic_validation())
|
|
|
|
async def reload_modules(self):
|
|
"""Reload all modules (for admin use)"""
|
|
try:
|
|
# Reload module files
|
|
import modules
|
|
importlib.reload(modules.core_commands)
|
|
importlib.reload(modules.exploration)
|
|
importlib.reload(modules.battle_system)
|
|
importlib.reload(modules.pet_management)
|
|
importlib.reload(modules.achievements)
|
|
importlib.reload(modules.admin)
|
|
importlib.reload(modules)
|
|
|
|
# Reinitialize modules
|
|
print("🔄 Reloading modules...")
|
|
self.load_modules()
|
|
print("✅ Modules reloaded successfully")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Module reload failed: {e}")
|
|
return False
|
|
|
|
def test_connection(self):
|
|
"""Test if we can connect to IRC"""
|
|
print("🔄 Testing IRC connection...")
|
|
try:
|
|
test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
test_sock.settimeout(5)
|
|
test_sock.connect((self.config["server"], self.config["port"]))
|
|
test_sock.close()
|
|
print("✅ IRC connection test successful")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ IRC connection failed: {e}")
|
|
return False
|
|
|
|
def connect(self):
|
|
print("🚀 Starting bot connection process...")
|
|
|
|
print("🔄 Step 1: Initialize async components...")
|
|
self.initialize_async_components()
|
|
print("✅ Step 1 complete")
|
|
|
|
print("🔄 Step 2: Test IRC connectivity...")
|
|
if not self.test_connection():
|
|
print("❌ IRC connection test failed - would run offline mode")
|
|
print("✅ Bot core systems are working - IRC connection unavailable")
|
|
return
|
|
print("✅ Step 2 complete")
|
|
|
|
print("🔄 Step 3: Create IRC socket...")
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.settimeout(10)
|
|
print("✅ Socket created")
|
|
|
|
print("🔄 Step 4: Connect to IRC server...")
|
|
try:
|
|
self.socket.connect((self.config["server"], self.config["port"]))
|
|
print("✅ Connected to IRC server!")
|
|
except Exception as e:
|
|
print(f"❌ Failed to connect: {e}")
|
|
return
|
|
|
|
print("🔄 Step 5: Send IRC handshake...")
|
|
self.send(f"NICK {self.config['nickname']}")
|
|
self.send(f"USER {self.config['nickname']} 0 * :{self.config['nickname']}")
|
|
print("✅ Handshake sent")
|
|
|
|
print("🔄 Step 6: Start main loop...")
|
|
self.socket.settimeout(1)
|
|
print("✅ Bot is now running!")
|
|
print(f"✅ Will join {self.config['channel']} when connected")
|
|
print(f"✅ Loaded commands: {', '.join(sorted(self.command_map.keys()))}")
|
|
print("🎮 Ready for testing! Press Ctrl+C to stop.")
|
|
|
|
self.main_loop()
|
|
|
|
def main_loop(self):
|
|
message_count = 0
|
|
while self.running:
|
|
try:
|
|
data = self.socket.recv(4096).decode('utf-8', errors='ignore')
|
|
if not data:
|
|
print("💀 Connection closed by server")
|
|
break
|
|
|
|
lines = data.strip().split('\n')
|
|
for line in lines:
|
|
if line.strip():
|
|
message_count += 1
|
|
if message_count <= 10: # Show first 10 messages for debugging
|
|
print(f"📨 {message_count}: {line.strip()}")
|
|
self.handle_line(line.strip())
|
|
|
|
except socket.timeout:
|
|
continue
|
|
except KeyboardInterrupt:
|
|
print("\n🛑 Shutting down bot...")
|
|
self.running = False
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error in main loop: {e}")
|
|
time.sleep(1)
|
|
|
|
if self.socket:
|
|
self.socket.close()
|
|
self.loop.close()
|
|
print("✅ Bot shutdown complete")
|
|
|
|
def send(self, message):
|
|
if self.socket:
|
|
full_message = f"{message}\r\n"
|
|
print(f"📤 >> {message}")
|
|
self.socket.send(full_message.encode('utf-8'))
|
|
|
|
def handle_line(self, line):
|
|
if line.startswith("PING"):
|
|
pong_response = line.replace("PING", "PONG")
|
|
self.send(pong_response)
|
|
return
|
|
|
|
# Handle connection messages
|
|
if "376" in line or "422" in line: # End of MOTD
|
|
if not self.connected:
|
|
self.send(f"JOIN {self.config['channel']}")
|
|
self.connected = True
|
|
print(f"🎉 Joined {self.config['channel']} - Bot is ready for commands!")
|
|
return
|
|
|
|
parts = line.split()
|
|
if len(parts) < 4:
|
|
return
|
|
|
|
if parts[1] == "PRIVMSG":
|
|
channel = parts[2]
|
|
message = " ".join(parts[3:])[1:]
|
|
|
|
hostmask = parts[0][1:]
|
|
nickname = hostmask.split('!')[0]
|
|
|
|
if message.startswith(self.config["command_prefix"]):
|
|
print(f"🎮 Command from {nickname}: {message}")
|
|
self.handle_command(channel, nickname, message)
|
|
|
|
def handle_command(self, channel, nickname, message):
|
|
from modules.base_module import BaseModule
|
|
|
|
command_parts = message[1:].split()
|
|
if not command_parts:
|
|
return
|
|
|
|
command = BaseModule.normalize_input(command_parts[0])
|
|
args = BaseModule.normalize_input(command_parts[1:])
|
|
|
|
try:
|
|
if command in self.command_map:
|
|
module = self.command_map[command]
|
|
print(f"🔧 Executing {command} via {module.__class__.__name__}")
|
|
# Run async command handler
|
|
self.loop.run_until_complete(
|
|
module.handle_command(channel, nickname, command, args)
|
|
)
|
|
else:
|
|
self.send_message(channel, f"{nickname}: Unknown command. Use !help for available commands.")
|
|
except Exception as e:
|
|
self.send_message(channel, f"{nickname}: Error processing command: {str(e)}")
|
|
print(f"❌ Command error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def send_message(self, target, message):
|
|
self.send(f"PRIVMSG {target} :{message}")
|
|
time.sleep(0.5)
|
|
|
|
async def send_team_builder_pin(self, nickname, pin_code):
|
|
"""Send team builder PIN via private message"""
|
|
if hasattr(self.modules.get('TeamBuilder'), 'send_team_builder_pin'):
|
|
await self.modules['TeamBuilder'].send_team_builder_pin(nickname, pin_code)
|
|
else:
|
|
# Fallback direct PM
|
|
message = f"🔐 Team Builder PIN: {pin_code} (expires in 10 minutes)"
|
|
self.send_message(nickname, message)
|
|
|
|
def run_async_command(self, coro):
|
|
return self.loop.run_until_complete(coro)
|
|
|
|
if __name__ == "__main__":
|
|
print("🐾 Starting Pet Bot for IRC (Debug Mode)...")
|
|
bot = PetBotDebug()
|
|
|
|
# Make bot instance globally accessible for webserver
|
|
import sys
|
|
sys.modules[__name__].bot_instance = bot
|
|
try:
|
|
bot.connect()
|
|
except KeyboardInterrupt:
|
|
print("\n🔄 Bot stopping...")
|
|
# Gracefully shutdown the game engine
|
|
try:
|
|
bot.loop.run_until_complete(bot.game_engine.shutdown())
|
|
except Exception as e:
|
|
print(f"Error during shutdown: {e}")
|
|
print("✅ Bot stopped by user")
|
|
except Exception as e:
|
|
print(f"❌ Bot crashed: {e}")
|
|
import traceback
|
|
traceback.print_exc() |