#!/usr/bin/env python3 import socket import threading import time import json import sys import os import asyncio sys.path.append(os.path.dirname(os.path.abspath(__file__))) from src.database import Database from src.game_engine import GameEngine class PetBot: def __init__(self): self.database = Database() self.game_engine = GameEngine(self.database) self.config = { "server": "irc.libera.chat", "port": 6667, "nickname": "PetBot", "channel": "#petz", "command_prefix": "!" } self.socket = None self.connected = False self.running = True self.active_encounters = {} # player_id -> encounter_data def initialize_async_components(self): """Initialize async components""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) print("Initializing database...") loop.run_until_complete(self.database.init_database()) print("Loading game data...") loop.run_until_complete(self.game_engine.load_game_data()) print("✓ Bot initialized successfully!") self.loop = loop def test_connection(self): """Test if we can connect to IRC""" print("Testing IRC connection...") try: test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) test_sock.settimeout(5) test_sock.connect((self.config["server"], self.config["port"])) test_sock.close() print("✓ IRC connection test successful") return True except Exception as e: print(f"✗ IRC connection failed: {e}") return False def connect(self): self.initialize_async_components() if not self.test_connection(): print("\n=== IRC Connection Failed ===") print("The bot's core functionality is working perfectly!") print("Run these commands to test locally:") print(" python3 test_commands.py - Test all functionality") print(" python3 test_db.py - Test database operations") print("\nTo connect to IRC:") print("1. Ensure network access to irc.libera.chat:6667") print("2. Or modify config in run_bot.py to use a different server") return print(f"Connecting to {self.config['server']}:{self.config['port']}") self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(10) try: self.socket.connect((self.config["server"], self.config["port"])) print("✓ Connected to IRC server!") except Exception as e: print(f"✗ Failed to connect: {e}") return # Send IRC handshake self.send(f"NICK {self.config['nickname']}") self.send(f"USER {self.config['nickname']} 0 * :{self.config['nickname']}") print("✓ Sent IRC handshake") # Start message processing self.socket.settimeout(1) self.main_loop() def main_loop(self): print("✓ Bot is now running! Use Ctrl+C to stop.") print(f"✓ Bot will join {self.config['channel']} when connected") print("✓ Available commands: !help !start !catch !team !wild !stats") while self.running: try: data = self.socket.recv(4096).decode('utf-8', errors='ignore') if not data: print("Connection closed by server") break lines = data.strip().split('\n') for line in lines: if line.strip(): self.handle_line(line.strip()) except socket.timeout: continue except KeyboardInterrupt: print("\n✓ Shutting down bot...") self.running = False break except Exception as e: print(f"Error in main loop: {e}") time.sleep(1) if self.socket: self.socket.close() self.loop.close() def send(self, message): if self.socket: full_message = f"{message}\r\n" print(f">> {message}") self.socket.send(full_message.encode('utf-8')) def handle_line(self, line): print(f"<< {line}") if line.startswith("PING"): pong_response = line.replace("PING", "PONG") self.send(pong_response) return # Handle connection messages if "376" in line or "422" in line: # End of MOTD if not self.connected: self.send(f"JOIN {self.config['channel']}") self.connected = True print(f"✓ Joined {self.config['channel']} - Bot is ready!") return parts = line.split() if len(parts) < 4: return if parts[1] == "PRIVMSG": channel = parts[2] message = " ".join(parts[3:])[1:] hostmask = parts[0][1:] nickname = hostmask.split('!')[0] if message.startswith(self.config["command_prefix"]): print(f"Processing command from {nickname}: {message}") self.handle_command(channel, nickname, message) def handle_command(self, channel, nickname, message): command_parts = message[1:].split() if not command_parts: return command = command_parts[0].lower() args = command_parts[1:] try: if command == "help": self.cmd_help(channel, nickname) elif command == "start": self.cmd_start(channel, nickname) elif command == "explore": self.cmd_explore(channel, nickname) elif command == "catch" or command == "capture": self.cmd_catch(channel, nickname) elif command == "battle": self.cmd_battle(channel, nickname) elif command == "attack": self.cmd_attack(channel, nickname, args) elif command == "flee": self.cmd_flee(channel, nickname) elif command == "moves": self.cmd_moves(channel, nickname) elif command == "travel": self.cmd_travel(channel, nickname, args) elif command == "location" or command == "where": self.cmd_location(channel, nickname) elif command == "achievements": self.cmd_achievements(channel, nickname) elif command == "weather": self.cmd_weather(channel, nickname) elif command == "team": self.cmd_team(channel, nickname) elif command == "wild": self.cmd_wild(channel, nickname, args) elif command == "stats": self.cmd_stats(channel, nickname, args) else: self.send_message(channel, f"{nickname}: Unknown command. Use !help for available commands.") except Exception as e: self.send_message(channel, f"{nickname}: Error processing command: {str(e)}") print(f"Command error: {e}") def send_message(self, target, message): self.send(f"PRIVMSG {target} :{message}") time.sleep(0.5) def run_async_command(self, coro): return self.loop.run_until_complete(coro) def cmd_help(self, channel, nickname): help_text = [ "🐾 Pet Bot Commands:", "!start - Begin your pet journey in Starter Town", "!explore - Explore your current location for wild pets", "!catch (or !capture) - Try to catch a pet during exploration or battle", "!battle - Start a battle with an encountered wild pet", "!attack - Use a move in battle", "!flee - Try to escape from battle", "!moves - View your active pet's available moves", "!travel - Travel to a different location", "!location - See where you currently are", "!achievements - View your achievements and progress", "!weather - Check current weather effects", "!team - View your active pets", "!wild - See what pets spawn in a location", "!stats - View your player stats", "Locations: Starter Town, Whispering Woods, Electric Canyon, Crystal Caves, Frozen Tundra, Dragon's Peak" ] for line in help_text: self.send_message(channel, line) def cmd_start(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if player: self.send_message(channel, f"{nickname}: You already have an account! Use !team to see your pets.") return player_id = self.run_async_command(self.database.create_player(nickname)) starter_pet = self.run_async_command(self.game_engine.give_starter_pet(player_id)) self.send_message(channel, f"🎉 {nickname}: Welcome to the world of pets! You received a Level {starter_pet['level']} {starter_pet['species_name']}! You are now in Starter Town.") def cmd_explore(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return encounter = self.run_async_command(self.game_engine.explore_location(player["id"])) if encounter["type"] == "error": self.send_message(channel, f"{nickname}: {encounter['message']}") elif encounter["type"] == "empty": self.send_message(channel, f"🔍 {nickname}: {encounter['message']}") elif encounter["type"] == "encounter": # Store the encounter for potential catching self.active_encounters[player["id"]] = encounter["pet"] pet = encounter["pet"] type_str = pet["type1"] if pet["type2"]: type_str += f"/{pet['type2']}" self.send_message(channel, f"🐾 {nickname}: A wild Level {pet['level']} {pet['species_name']} ({type_str}) appeared in {encounter['location']}!") self.send_message(channel, f"Choose your action: !battle to fight it, or !catch to try catching it directly!") def cmd_battle(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return # Check if player has an active encounter if player["id"] not in self.active_encounters: self.send_message(channel, f"{nickname}: You need to !explore first to find a pet to battle!") return # Check if already in battle active_battle = self.run_async_command(self.game_engine.battle_engine.get_active_battle(player["id"])) if active_battle: self.send_message(channel, f"{nickname}: You're already in battle! Use !attack or !flee.") return # Get player's active pet pets = self.run_async_command(self.database.get_player_pets(player["id"], active_only=True)) if not pets: self.send_message(channel, f"{nickname}: You need an active pet to battle! Use !team to check your pets.") return player_pet = pets[0] # Use first active pet wild_pet = self.active_encounters[player["id"]] # Start battle battle = self.run_async_command(self.game_engine.battle_engine.start_battle(player["id"], player_pet, wild_pet)) self.send_message(channel, f"⚔️ {nickname}: Battle started!") self.send_message(channel, f"Your {player_pet['species_name']} (Lv.{player_pet['level']}, {battle['player_hp']}/{player_pet['max_hp']} HP) vs Wild {wild_pet['species_name']} (Lv.{wild_pet['level']}, {battle['wild_hp']}/{wild_pet['max_hp']} HP)") move_list = [move["name"] for move in battle["available_moves"]] self.send_message(channel, f"Available moves: {', '.join(move_list)}") self.send_message(channel, f"Use !attack to attack!") def cmd_attack(self, channel, nickname, args): if not args: self.send_message(channel, f"{nickname}: Specify a move to use! Example: !attack Tackle") return player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return move_name = " ".join(args).title() # Normalize to Title Case result = self.run_async_command(self.game_engine.battle_engine.execute_battle_turn(player["id"], move_name)) if "error" in result: self.send_message(channel, f"{nickname}: {result['error']}") return # Display battle results for action in result["results"]: damage_msg = f"{action['attacker']} used {action['move']}!" self.send_message(channel, damage_msg) if action["damage"] > 0: effectiveness_msgs = { "super_effective": "It's super effective!", "not_very_effective": "It's not very effective...", "no_effect": "It had no effect!", "super_effective_critical": "Critical hit! It's super effective!", "normal_critical": "Critical hit!", "not_very_effective_critical": "Critical hit! It's not very effective..." } damage_text = f"Dealt {action['damage']} damage!" if action["effectiveness"] in effectiveness_msgs: damage_text += f" {effectiveness_msgs[action['effectiveness']]}" self.send_message(channel, damage_text) self.send_message(channel, f"Target HP: {action['target_hp']}") if result["battle_over"]: if result["winner"] == "player": self.send_message(channel, f"🎉 {nickname}: You won the battle!") # Remove encounter since battle is over if player["id"] in self.active_encounters: del self.active_encounters[player["id"]] else: self.send_message(channel, f"💀 {nickname}: Your pet fainted! You lost the battle...") # Remove encounter if player["id"] in self.active_encounters: del self.active_encounters[player["id"]] else: # Battle continues move_list = [move["name"] for move in result["available_moves"]] self.send_message(channel, f"Your turn! Available moves: {', '.join(move_list)}") def cmd_flee(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return success = self.run_async_command(self.game_engine.battle_engine.flee_battle(player["id"])) if success: self.send_message(channel, f"💨 {nickname}: You successfully escaped from battle!") # Remove encounter if player["id"] in self.active_encounters: del self.active_encounters[player["id"]] else: self.send_message(channel, f"❌ {nickname}: Couldn't escape! The battle continues!") def cmd_moves(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return # Get player's active pets pets = self.run_async_command(self.database.get_player_pets(player["id"], active_only=True)) if not pets: self.send_message(channel, f"{nickname}: You don't have any active pets! Use !team to check your pets.") return active_pet = pets[0] # Use first active pet available_moves = self.game_engine.battle_engine.get_available_moves(active_pet) if not available_moves: self.send_message(channel, f"{nickname}: Your {active_pet['species_name']} has no available moves!") return # Limit to 4 moves max moves_to_show = available_moves[:4] move_info = [] for move in moves_to_show: power_info = f"Power: {move.get('power', 'N/A')}" if move.get('power') else "Status move" move_info.append(f"{move['name']} ({move['type']}) - {power_info}") pet_name = active_pet["nickname"] or active_pet["species_name"] self.send_message(channel, f"🎯 {pet_name}'s moves:") for move_line in move_info: self.send_message(channel, f" • {move_line}") def cmd_catch(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return # Check if player is in an active battle active_battle = self.run_async_command(self.game_engine.battle_engine.get_active_battle(player["id"])) if active_battle: # Catching during battle wild_pet = active_battle["wild_pet"] current_hp = active_battle["wild_hp"] max_hp = wild_pet["max_hp"] # Calculate catch rate based on remaining HP (lower HP = higher catch rate) hp_percentage = current_hp / max_hp base_catch_rate = 0.3 # Lower base rate than direct catch hp_bonus = (1.0 - hp_percentage) * 0.5 # Up to 50% bonus for low HP final_catch_rate = min(0.9, base_catch_rate + hp_bonus) # Cap at 90% import random if random.random() < final_catch_rate: # Successful catch during battle result = self.run_async_command(self.game_engine.attempt_catch_current_location(player["id"], wild_pet)) # End the battle await_result = self.run_async_command(self.game_engine.battle_engine.end_battle(player["id"], "caught")) # Check for achievements achievements = self.run_async_command(self.game_engine.check_and_award_achievements(player["id"], "catch_type", "")) if achievements: for achievement in achievements: self.send_message(channel, f"🏆 {nickname}: Achievement unlocked: {achievement['name']}! {achievement['description']}") # Remove encounter if player["id"] in self.active_encounters: del self.active_encounters[player["id"]] self.send_message(channel, f"🎯 {nickname}: Caught the {wild_pet['species_name']} during battle! (HP: {current_hp}/{max_hp})") else: # Failed catch - battle continues catch_percentage = int(final_catch_rate * 100) self.send_message(channel, f"🎯 {nickname}: The {wild_pet['species_name']} broke free! ({catch_percentage}% catch rate) Battle continues!") return # Regular exploration catch (not in battle) if player["id"] not in self.active_encounters: self.send_message(channel, f"{nickname}: You need to !explore first to find a pet, or be in battle to catch!") return target_pet = self.active_encounters[player["id"]] result = self.run_async_command(self.game_engine.attempt_catch_current_location(player["id"], target_pet)) # Check for achievements after successful catch if "Success!" in result: achievements = self.run_async_command(self.game_engine.check_and_award_achievements(player["id"], "catch_type", "")) if achievements: for achievement in achievements: self.send_message(channel, f"🏆 {nickname}: Achievement unlocked: {achievement['name']}! {achievement['description']}") # Remove the encounter regardless of success del self.active_encounters[player["id"]] self.send_message(channel, f"🎯 {nickname}: {result}") def cmd_travel(self, channel, nickname, args): if not args: self.send_message(channel, f"{nickname}: Specify where to travel! Available: Starter Town, Whispering Woods, Electric Canyon, Crystal Caves, Frozen Tundra, Dragon's Peak") return player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return destination = " ".join(args).title() # Normalize to Title Case location = self.run_async_command(self.database.get_location_by_name(destination)) if not location: self.send_message(channel, f"{nickname}: '{destination}' is not a valid location!") return # Check if player can access this location can_access = self.run_async_command(self.database.can_access_location(player["id"], location["id"])) if not can_access: self.send_message(channel, f"{nickname}: You cannot access {destination} yet! Complete achievements to unlock new areas. Use !achievements to see progress.") return # Clear any active encounters when traveling if player["id"] in self.active_encounters: del self.active_encounters[player["id"]] self.run_async_command(self.database.update_player_location(player["id"], location["id"])) # Show weather info weather = self.run_async_command(self.database.get_location_weather(location["id"])) weather_msg = "" if weather: weather_patterns = getattr(self.game_engine, 'weather_patterns', {}) weather_info = weather_patterns.get("weather_types", {}).get(weather["weather_type"], {}) weather_desc = weather_info.get("description", f"{weather['weather_type']} weather") weather_msg = f" Weather: {weather['weather_type']} - {weather_desc}" self.send_message(channel, f"🗺️ {nickname}: You traveled to {destination}. {location['description']}{weather_msg}") def cmd_location(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return location = self.run_async_command(self.database.get_player_location(player["id"])) if location: self.send_message(channel, f"📍 {nickname}: You are currently in {location['name']}. {location['description']}") else: self.send_message(channel, f"{nickname}: You seem to be lost! Contact an admin.") def cmd_achievements(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return achievements = self.run_async_command(self.database.get_player_achievements(player["id"])) if achievements: self.send_message(channel, f"🏆 {nickname}'s Achievements:") for achievement in achievements[:5]: # Show last 5 achievements self.send_message(channel, f"• {achievement['name']}: {achievement['description']}") if len(achievements) > 5: self.send_message(channel, f"... and {len(achievements) - 5} more!") else: self.send_message(channel, f"{nickname}: No achievements yet! Keep exploring and catching pets to unlock new areas!") def cmd_weather(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return location = self.run_async_command(self.database.get_player_location(player["id"])) if not location: self.send_message(channel, f"{nickname}: You seem to be lost!") return weather = self.run_async_command(self.database.get_location_weather(location["id"])) if weather: weather_patterns = getattr(self.game_engine, 'weather_patterns', {}) weather_info = weather_patterns.get("weather_types", {}).get(weather["weather_type"], {}) weather_desc = weather_info.get("description", f"{weather['weather_type']} weather") self.send_message(channel, f"🌤️ {nickname}: Current weather in {location['name']}: {weather['weather_type']}") self.send_message(channel, f"Effect: {weather_desc}") else: self.send_message(channel, f"🌤️ {nickname}: The weather in {location['name']} is calm with no special effects.") def cmd_team(self, channel, nickname): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return pets = self.run_async_command(self.database.get_player_pets(player["id"], active_only=False)) if not pets: self.send_message(channel, f"{nickname}: You don't have any pets! Use !catch to find some.") return # Show active pets first, then others active_pets = [pet for pet in pets if pet.get("is_active")] inactive_pets = [pet for pet in pets if not pet.get("is_active")] team_info = [] # Active pets with star for pet in active_pets: name = pet["nickname"] or pet["species_name"] team_info.append(f"⭐{name} (Lv.{pet['level']}) - {pet['hp']}/{pet['max_hp']} HP") # Inactive pets for pet in inactive_pets[:5]: # Show max 5 inactive name = pet["nickname"] or pet["species_name"] team_info.append(f"{name} (Lv.{pet['level']}) - {pet['hp']}/{pet['max_hp']} HP") if len(inactive_pets) > 5: team_info.append(f"... and {len(inactive_pets) - 5} more in storage") self.send_message(channel, f"🐾 {nickname}'s team: " + " | ".join(team_info)) def cmd_wild(self, channel, nickname, args): if not args: self.send_message(channel, f"{nickname}: Specify a location! Available: Starter Town, Whispering Woods, Electric Canyon, Crystal Caves, Frozen Tundra, Dragon's Peak") return location_name = " ".join(args).title() # Normalize to Title Case wild_pets = self.run_async_command(self.game_engine.get_location_spawns(location_name)) if wild_pets: pet_list = ", ".join([pet["name"] for pet in wild_pets]) self.send_message(channel, f"🌿 Wild pets in {location_name}: {pet_list}") else: self.send_message(channel, f"{nickname}: No location found called '{location_name}'") def cmd_stats(self, channel, nickname, args): player = self.run_async_command(self.database.get_player(nickname)) if not player: self.send_message(channel, f"{nickname}: Use !start to begin your journey first!") return self.send_message(channel, f"📊 {nickname}: Level {player['level']} | {player['experience']} XP | ${player['money']}") if __name__ == "__main__": print("🐾 Starting Pet Bot for IRC...") bot = PetBot() try: bot.connect() except KeyboardInterrupt: print("\n✓ Bot stopped by user") except Exception as e: print(f"✗ Bot crashed: {e}") import traceback traceback.print_exc()